diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 1e24b42..f208976 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -62,7 +62,7 @@ pub const Context = struct { , .connections = Connections.init(allocator) , .pollfd = PollFD.init(allocator) , .tx = Messages.init(allocator) - , .switchdb = try Switches.init(allocator) + , .switchdb = Switches.init(allocator) , .allocator = allocator }; } @@ -370,7 +370,10 @@ pub const Context = struct { } // SWITCHED = send message to the right dest (or drop the switch) else if (self.connections.items[i].t == .SWITCHED) { - // TODO: read message from fd.fd + schedule read message + self.swichdb.handle_event_read (¤t_event, i, fd.fd); + if (current_event.t == .SWITCH_RX) { + self.schedule(current_event.message.?); + } return Event.init(Event.Type.SWITCH, i, fd.fd, null); } // EXTERNAL = user handles IO @@ -403,8 +406,7 @@ pub const Context = struct { // SWITCHED = write message for its switch buddy (callbacks) if (self.connections.items[i].t == .SWITCHED) { - // TODO: write message - return Event.init(Event.Type.SWITCH, i, fd.fd, null); + return self.swichdb.handle_event_write (¤t_event, i, fd.fd); } else { // otherwise = write message for the msg.fd diff --git a/zig-impl/src/event.zig b/zig-impl/src/event.zig index b244b45..001b71e 100644 --- a/zig-impl/src/event.zig +++ b/zig-impl/src/event.zig @@ -36,7 +36,8 @@ pub const Event = struct { pub const Type = enum { ERROR, // A problem occured. EXTERNAL, // Message received from a non IPC socket. - SWITCH, // Message to send to a corresponding fd. + SWITCH_RX, // Message received from a switched FD. + SWITCH_TX, // Message sent to a switched fd. CONNECTION, // New user. DISCONNECTION, // User disconnected. MESSAGE, // New message. diff --git a/zig-impl/src/switch.zig b/zig-impl/src/switch.zig index 59dd0bf..067b6c8 100644 --- a/zig-impl/src/switch.zig +++ b/zig-impl/src/switch.zig @@ -27,15 +27,17 @@ pub const Switches = struct { } pub fn deinit (self: *Self) void { - self.deinit(); + self.db.deinit(); } - // read message from switched fd - pub fn read (self: *Self, fd: i32) !?Message - { - // If the socket is associated to another one for ipcd: - // read and write automatically and provide a new IPC_EVENT_TYPE indicating the switch. + pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { + for(self.db.keys()) |k,i| { + try fmt.format(out_stream, "({},{})", .{k, self.db.values()[i].dest}); + } + } + // Read message from a switched fd. + pub fn read (self: *Self, fd: i32) !?Message { var managedconnection = self.db.get(fd); var message: Message = undefined; @@ -55,11 +57,13 @@ pub const Switches = struct { .PARSING_ERROR => { return error.generic; }, - }; + } + + unreachable; } - // fd_switching_write enables to write a message to a switched fd. - pub fn write (message: Message) !void { + // Write a message to a switched fd. + pub fn write (self: *Self, message: Message) !void { var managedconnection = self.db.get(message.fd); var r = managedconnection.out(managedconnection.dest, message); @@ -77,27 +81,49 @@ pub const Switches = struct { .PARSING_ERROR => { return error.generic; }, - }; + } unreachable; } + + pub fn handle_event_read (self: *Self, event: *Event, index: usize, fd: i32) void { + var message: ?Message = null; + message = self.read (fd) catch |err| switch(err) { + error.closeFD => { + event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null); + return; + }, + error.generic => { + event.* = Event.init(Event.Type.ERROR, index, fd, null); + return; + }, + }; + event.* = Event.init(Event.Type.SWITCH_RX, index, fd, message); + return; + } + + pub fn handle_event_write (self: *Self, event: *Event, index: usize, message: Message) void { + defer message.deinit(); + var fd = message.fd; + self.write(message) catch |err| switch(err) { + error.closeFD => { + event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null); + return; + }, + error.generic => { + event.* = Event.init(Event.Type.ERROR, index, fd, null); + return; + }, + }; + event.* = Event.init(Event.Type.SWITCH_TX, index, fd, null); + return; + } }; pub const ManagedConnection = struct { dest : i32, in : *const fn (origin: i32, m: *Message) CBEventType = default_in, out : *const fn (origin: i32, m: *Message) CBEventType = default_out, - - // pub fn read - // pub fn write - -// pub fn set_callbacks(self: *Self, -// in : *const fn (origin: i32, m: *Message) CBEventType, -// out : *const fn (origin: i32, m: *Message) CBEventType) void { -// -// self.in = in; -// self.out = out; -// } }; test "creation and display" { @@ -109,10 +135,23 @@ test "creation and display" { var switchdb = Switches.init(allocator); defer switchdb.deinit(); - switchdb.db.put(5, ManagedConnection {.dest = 6}); - switchdb.db.put(6, ManagedConnection {.dest = 5}); + try switchdb.db.put(5, ManagedConnection {.dest = 6}); + try switchdb.db.put(6, ManagedConnection {.dest = 5}); + + try print_eq("{ (5,6)(6,5) }", .{switchdb}); } +// test "read" { +// const config = .{.safety = true}; +// var gpa = std.heap.GeneralPurposeAllocator(config){}; +// defer _ = gpa.deinit(); +// const allocator = gpa.allocator(); +// +// var switchdb = Switches.init(allocator); +// defer switchdb.deinit(); +// +// } + // For IO callbacks (switching). // pub const Type = enum { // NO_ERROR, // No error. A message was generated. @@ -129,7 +168,7 @@ fn default_in (origin: i32, m: *Message) CBEventType { // This may be kinda hacky, idk. var stream: net.Stream = .{ .handle = origin }; - packet_size = try stream.read(buffer[0..]); + packet_size = stream.read(buffer[0..]) catch return CBEventType.FD_ERROR; // Let's handle this as a disconnection. if (packet_size <= 4) {