Switch: first solid draft, lacks some tests.

master
Philippe Pittoli 2023-01-08 12:46:21 +01:00
parent ec787d7496
commit 978676051a
3 changed files with 71 additions and 29 deletions

View File

@ -62,7 +62,7 @@ pub const Context = struct {
, .connections = Connections.init(allocator)
, .pollfd = PollFD.init(allocator)
, .tx = Messages.init(allocator)
, .switchdb = try Switches.init(allocator)
, .switchdb = Switches.init(allocator)
, .allocator = allocator
};
}
@ -370,7 +370,10 @@ pub const Context = struct {
}
// SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) {
// TODO: read message from fd.fd + schedule read message
self.swichdb.handle_event_read (&current_event, i, fd.fd);
if (current_event.t == .SWITCH_RX) {
self.schedule(current_event.message.?);
}
return Event.init(Event.Type.SWITCH, i, fd.fd, null);
}
// EXTERNAL = user handles IO
@ -403,8 +406,7 @@ pub const Context = struct {
// SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) {
// TODO: write message
return Event.init(Event.Type.SWITCH, i, fd.fd, null);
return self.swichdb.handle_event_write (&current_event, i, fd.fd);
}
else {
// otherwise = write message for the msg.fd

View File

@ -36,7 +36,8 @@ pub const Event = struct {
pub const Type = enum {
ERROR, // A problem occured.
EXTERNAL, // Message received from a non IPC socket.
SWITCH, // Message to send to a corresponding fd.
SWITCH_RX, // Message received from a switched FD.
SWITCH_TX, // Message sent to a switched fd.
CONNECTION, // New user.
DISCONNECTION, // User disconnected.
MESSAGE, // New message.

View File

@ -27,15 +27,17 @@ pub const Switches = struct {
}
pub fn deinit (self: *Self) void {
self.deinit();
self.db.deinit();
}
// read message from switched fd
pub fn read (self: *Self, fd: i32) !?Message
{
// If the socket is associated to another one for ipcd:
// read and write automatically and provide a new IPC_EVENT_TYPE indicating the switch.
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
for(self.db.keys()) |k,i| {
try fmt.format(out_stream, "({},{})", .{k, self.db.values()[i].dest});
}
}
// Read message from a switched fd.
pub fn read (self: *Self, fd: i32) !?Message {
var managedconnection = self.db.get(fd);
var message: Message = undefined;
@ -55,11 +57,13 @@ pub const Switches = struct {
.PARSING_ERROR => {
return error.generic;
},
};
}
unreachable;
}
// fd_switching_write enables to write a message to a switched fd.
pub fn write (message: Message) !void {
// Write a message to a switched fd.
pub fn write (self: *Self, message: Message) !void {
var managedconnection = self.db.get(message.fd);
var r = managedconnection.out(managedconnection.dest, message);
@ -77,27 +81,49 @@ pub const Switches = struct {
.PARSING_ERROR => {
return error.generic;
},
};
}
unreachable;
}
pub fn handle_event_read (self: *Self, event: *Event, index: usize, fd: i32) void {
var message: ?Message = null;
message = self.read (fd) catch |err| switch(err) {
error.closeFD => {
event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null);
return;
},
error.generic => {
event.* = Event.init(Event.Type.ERROR, index, fd, null);
return;
},
};
event.* = Event.init(Event.Type.SWITCH_RX, index, fd, message);
return;
}
pub fn handle_event_write (self: *Self, event: *Event, index: usize, message: Message) void {
defer message.deinit();
var fd = message.fd;
self.write(message) catch |err| switch(err) {
error.closeFD => {
event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null);
return;
},
error.generic => {
event.* = Event.init(Event.Type.ERROR, index, fd, null);
return;
},
};
event.* = Event.init(Event.Type.SWITCH_TX, index, fd, null);
return;
}
};
pub const ManagedConnection = struct {
dest : i32,
in : *const fn (origin: i32, m: *Message) CBEventType = default_in,
out : *const fn (origin: i32, m: *Message) CBEventType = default_out,
// pub fn read
// pub fn write
// pub fn set_callbacks(self: *Self,
// in : *const fn (origin: i32, m: *Message) CBEventType,
// out : *const fn (origin: i32, m: *Message) CBEventType) void {
//
// self.in = in;
// self.out = out;
// }
};
test "creation and display" {
@ -109,10 +135,23 @@ test "creation and display" {
var switchdb = Switches.init(allocator);
defer switchdb.deinit();
switchdb.db.put(5, ManagedConnection {.dest = 6});
switchdb.db.put(6, ManagedConnection {.dest = 5});
try switchdb.db.put(5, ManagedConnection {.dest = 6});
try switchdb.db.put(6, ManagedConnection {.dest = 5});
try print_eq("{ (5,6)(6,5) }", .{switchdb});
}
// test "read" {
// const config = .{.safety = true};
// var gpa = std.heap.GeneralPurposeAllocator(config){};
// defer _ = gpa.deinit();
// const allocator = gpa.allocator();
//
// var switchdb = Switches.init(allocator);
// defer switchdb.deinit();
//
// }
// For IO callbacks (switching).
// pub const Type = enum {
// NO_ERROR, // No error. A message was generated.
@ -129,7 +168,7 @@ fn default_in (origin: i32, m: *Message) CBEventType {
// This may be kinda hacky, idk.
var stream: net.Stream = .{ .handle = origin };
packet_size = try stream.read(buffer[0..]);
packet_size = stream.read(buffer[0..]) catch return CBEventType.FD_ERROR;
// Let's handle this as a disconnection.
if (packet_size <= 4) {