From 506bd21d57c881329130132250ae89c372a130d5 Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Sun, 8 Jan 2023 20:06:22 +0100 Subject: [PATCH] SwitchDB: in/out should be fine. --- zig-impl/src/callback.zig | 5 +- zig-impl/src/context.zig | 42 ++++---- zig-impl/src/switch.zig | 221 +++++++++++++++++--------------------- 3 files changed, 120 insertions(+), 148 deletions(-) diff --git a/zig-impl/src/callback.zig b/zig-impl/src/callback.zig index 6671e71..16731aa 100644 --- a/zig-impl/src/callback.zig +++ b/zig-impl/src/callback.zig @@ -28,11 +28,8 @@ pub const CBEvent = struct { // For IO callbacks (switching). pub const Type = enum { NO_ERROR, // No error. A message was generated. + ERROR, // Generic error. FD_CLOSING, // The fd is closing. - FD_ERROR, // Generic error. - PARSING_ERROR, // The message was read but with errors. IGNORE, // The message should be ignored (protocol specific). }; - - t: CBEvent.Type, }; diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index f208976..a30737f 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -19,7 +19,7 @@ const Switch = @import("./switch.zig").Switch; const print_eq = @import("./util.zig").print_eq; const Messages = @import("./message.zig").Messages; -const Switches = @import("./switch.zig").Switches; +const SwitchDB = @import("./switch.zig").SwitchDB; const Connections = @import("./connection.zig").Connections; pub const PollFD = std.ArrayList(std.os.pollfd); @@ -39,7 +39,7 @@ pub const Context = struct { pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16) tx: Messages, // Messages to send, once their fd is available. - switchdb: Switches, // Relations between fd. + switchdb: SwitchDB, // Relations between fd. timer: ?i32 = null, // No timer by default (no TIMER event). @@ -62,7 +62,7 @@ pub const Context = struct { , .connections = Connections.init(allocator) , .pollfd = PollFD.init(allocator) , .tx = Messages.init(allocator) - , .switchdb = Switches.init(allocator) + , .switchdb = SwitchDB.init(allocator) , .allocator = allocator }; } @@ -208,16 +208,6 @@ pub const Context = struct { return try self.connect_service (service_name); } - // Connection to a service, but with switched with the client fd. -// pub fn connection_switched(self: *Self -// , path: [] const u8 -// , clientfd: i32) !i32 { -// // print("connection switched from {} to path {s}\n", .{clientfd, path}); -// var newfd = try self.connect_ (Connection.Type.SWITCHED, path); -// // TODO: record switch. -// return newfd; -// } - pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { // net.StreamServer var serverfd = self.pollfd.items[server_index].fd; @@ -370,7 +360,7 @@ pub const Context = struct { } // SWITCHED = send message to the right dest (or drop the switch) else if (self.connections.items[i].t == .SWITCHED) { - self.swichdb.handle_event_read (¤t_event, i, fd.fd); + current_event = self.swichdb.handle_event_read (i, fd.fd); if (current_event.t == .SWITCH_RX) { self.schedule(current_event.message.?); } @@ -404,21 +394,25 @@ pub const Context = struct { if(fd.revents & std.os.linux.POLL.OUT > 0) { fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); + var index: usize = undefined; + for (self.tx.items) |m, index_| { + if (m.fd == self.pollfd.items[i].fd) { + index = index_; + break; + } + } + + var m = self.tx.swapRemove(index); + // SWITCHED = write message for its switch buddy (callbacks) if (self.connections.items[i].t == .SWITCHED) { - return self.swichdb.handle_event_write (¤t_event, i, fd.fd); + current_event = self.swichdb.handle_event_write (i, fd.fd, m); + // TODO: remove the message from the tx array. + // Message inner memory is already freed. + return current_event; } else { // otherwise = write message for the msg.fd - var index: usize = undefined; - for (self.tx.items) |m, index_| { - if (m.fd == self.pollfd.items[i].fd) { - index = index_; - break; - } - } - - var m = self.tx.swapRemove(index); try self.write (m); m.deinit(); return Event.init(Event.Type.TX, i, fd.fd, null); diff --git a/zig-impl/src/switch.zig b/zig-impl/src/switch.zig index 067b6c8..66a6ea8 100644 --- a/zig-impl/src/switch.zig +++ b/zig-impl/src/switch.zig @@ -15,7 +15,9 @@ const print = std.debug.print; const Event = ipc.Event; -pub const Switches = struct { +/// SwitchDB + +pub const SwitchDB = struct { const Self = @This(); db: std.AutoArrayHashMap(i32, ManagedConnection), @@ -38,23 +40,23 @@ pub const Switches = struct { // Read message from a switched fd. pub fn read (self: *Self, fd: i32) !?Message { + // TODO: assert there is an entry with this fd as a key. var managedconnection = self.db.get(fd); var message: Message = undefined; - var r: CBEventType = managedconnection.in(fd, &message); + var r: CBEventType = managedconnection.?.in(fd, &message); switch (r) { // The message should be ignored (protocol specific). - .IGNORE => { return null; }, + CBEventType.IGNORE => { return null; }, // No error. A message was generated. - .NO_ERROR => { - message.fd = managedconnection.dest; + CBEventType.NO_ERROR => { + message.fd = managedconnection.?.dest; return message; }, - .FD_CLOSING => { return error.closeFD; }, + CBEventType.FD_CLOSING => { return error.closeFD; }, // Generic error, or the message was read but with errors. - .FD_ERROR - .PARSING_ERROR => { + CBEventType.ERROR => { return error.generic; }, } @@ -64,21 +66,20 @@ pub const Switches = struct { // Write a message to a switched fd. pub fn write (self: *Self, message: Message) !void { - + // TODO: assert there is an entry with this fd as a key. var managedconnection = self.db.get(message.fd); - var r = managedconnection.out(managedconnection.dest, message); + var r = managedconnection.?.out(managedconnection.?.dest, &message); switch (r) { // The message should be ignored (protocol specific). // No error. A message was generated. - .NO_ERROR => { + CBEventType.NO_ERROR => { return; }, - .FD_CLOSING => { return error.closeFD; }, + CBEventType.FD_CLOSING => { return error.closeFD; }, // Generic error, or the message was read but with errors. - .IGNORE - .FD_ERROR - .PARSING_ERROR => { + CBEventType.IGNORE, + CBEventType.ERROR => { return error.generic; }, } @@ -86,44 +87,38 @@ pub const Switches = struct { unreachable; } - pub fn handle_event_read (self: *Self, event: *Event, index: usize, fd: i32) void { + pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event { var message: ?Message = null; message = self.read (fd) catch |err| switch(err) { error.closeFD => { - event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null); - return; + return Event.init(Event.Type.DISCONNECTION, index, fd, null); }, error.generic => { - event.* = Event.init(Event.Type.ERROR, index, fd, null); - return; + return Event.init(Event.Type.ERROR, index, fd, null); }, }; - event.* = Event.init(Event.Type.SWITCH_RX, index, fd, message); - return; + return Event.init(Event.Type.SWITCH_RX, index, fd, message); } - pub fn handle_event_write (self: *Self, event: *Event, index: usize, message: Message) void { + pub fn handle_event_write (self: *Self, index: usize, message: Message) Event { defer message.deinit(); var fd = message.fd; self.write(message) catch |err| switch(err) { error.closeFD => { - event.* = Event.init(Event.Type.DISCONNECTION, index, fd, null); - return; + return Event.init(Event.Type.DISCONNECTION, index, fd, null); }, error.generic => { - event.* = Event.init(Event.Type.ERROR, index, fd, null); - return; + return Event.init(Event.Type.ERROR, index, fd, null); }, }; - event.* = Event.init(Event.Type.SWITCH_TX, index, fd, null); - return; + return Event.init(Event.Type.SWITCH_TX, index, fd, null); } }; -pub const ManagedConnection = struct { +const ManagedConnection = struct { dest : i32, in : *const fn (origin: i32, m: *Message) CBEventType = default_in, - out : *const fn (origin: i32, m: *Message) CBEventType = default_out, + out : *const fn (origin: i32, m: *const Message) CBEventType = default_out, }; test "creation and display" { @@ -132,7 +127,7 @@ test "creation and display" { defer _ = gpa.deinit(); const allocator = gpa.allocator(); - var switchdb = Switches.init(allocator); + var switchdb = SwitchDB.init(allocator); defer switchdb.deinit(); try switchdb.db.put(5, ManagedConnection {.dest = 6}); @@ -141,25 +136,76 @@ test "creation and display" { try print_eq("{ (5,6)(6,5) }", .{switchdb}); } -// test "read" { -// const config = .{.safety = true}; -// var gpa = std.heap.GeneralPurposeAllocator(config){}; -// defer _ = gpa.deinit(); -// const allocator = gpa.allocator(); -// -// var switchdb = Switches.init(allocator); -// defer switchdb.deinit(); -// -// } +fn successful_in (_: i32, m: *Message) CBEventType { + m.* = Message.init(8, std.heap.c_allocator, "coucou") catch { + return CBEventType.ERROR; + }; + return CBEventType.NO_ERROR; +} -// For IO callbacks (switching). -// pub const Type = enum { -// NO_ERROR, // No error. A message was generated. -// FD_CLOSING, // The fd is closing. -// FD_ERROR, // Generic error. -// PARSING_ERROR, // The message was read but with errors. -// IGNORE, // The message should be ignored (protocol specific). -// }; +fn successful_out (_: i32, _: *const Message) CBEventType { + return CBEventType.NO_ERROR; +} + +test "successful exchanges" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = successful_in, .out = successful_out}); + try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = successful_in, .out = successful_out}); + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_1: Event = switchdb.handle_event_read (1, 5); + if (event_1.m) |m| { m.deinit(); } + else { return error.NoMessage; } + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_2: Event = switchdb.handle_event_read (1, 6); + if (event_2.m) |m| { m.deinit(); } + else { return error.NoMessage; } + + var message = try Message.init(6, allocator, "coucou"); + var event_3 = switchdb.handle_event_write (5, message); + if (event_3.m) |_| { return error.ShouldNotCarryMessage; } +} + +fn unsuccessful_in (_: i32, _: *Message) CBEventType { + return CBEventType.ERROR; +} + +fn unsuccessful_out (_: i32, _: *const Message) CBEventType { + return CBEventType.ERROR; +} + +test "unsuccessful exchanges" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out}); + try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out}); + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_1: Event = switchdb.handle_event_read (1, 5); + if (event_1.m) |_| { return error.ShouldNotCarryMessage; } + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_2: Event = switchdb.handle_event_read (1, 6); + if (event_2.m) |_| { return error.ShouldNotCarryMessage; } + + var message = try Message.init(6, allocator, "coucou"); + var event_3 = switchdb.handle_event_write (5, message); + if (event_3.m) |_| { return error.ShouldNotCarryMessage; } +} fn default_in (origin: i32, m: *Message) CBEventType { print ("receiving a message originated from {}\n", .{origin}); @@ -168,21 +214,20 @@ fn default_in (origin: i32, m: *Message) CBEventType { // This may be kinda hacky, idk. var stream: net.Stream = .{ .handle = origin }; - packet_size = stream.read(buffer[0..]) catch return CBEventType.FD_ERROR; + packet_size = stream.read(buffer[0..]) catch return CBEventType.ERROR; // Let's handle this as a disconnection. if (packet_size <= 4) { return CBEventType.FD_CLOSING; } - // TODO: handle memory errors. m.* = Message.read(origin, buffer[0..], std.heap.c_allocator) - catch return CBEventType.FD_ERROR; + catch return CBEventType.ERROR; return CBEventType.NO_ERROR; } -fn default_out (origin: i32, m: *Message) CBEventType { +fn default_out (origin: i32, m: *const Message) CBEventType { print ("sending a message originated from {}\n", .{origin}); // Message contains the fd, no need to search for // the right structure to copy, let's just recreate @@ -194,72 +239,8 @@ fn default_out (origin: i32, m: *Message) CBEventType { var writer = fbs.writer(); // returning basic errors, no details. - _ = m.write(writer) catch return CBEventType.FD_ERROR; - _ = stream.write (fbs.getWritten()) catch return CBEventType.FD_ERROR; + _ = m.write(writer) catch return CBEventType.ERROR; + _ = stream.write (fbs.getWritten()) catch return CBEventType.ERROR; return CBEventType.NO_ERROR; } - -// // TODO: actual switching. -// pub const Switch = struct { -// origin : i32, -// destination : i32, -// -// orig_in : *const fn (origin: i32, m: *Message) CBEventType, -// orig_out : *const fn (origin: i32, m: *Message) CBEventType, -// dest_in : *const fn (origin: i32, m: *Message) CBEventType, -// dest_out : *const fn (origin: i32, m: *Message) CBEventType, -// -// const Self = @This(); -// -// pub fn init(origin: i32, destination: i32) Self { -// return Self { -// .origin = origin -// , .destination = destination -// , .orig_in = default_in -// , .orig_out = default_out -// , .dest_in = default_in -// , .dest_out = default_out -// }; -// } -// -// pub fn set_callbacks(self: *Self, fd: i32, -// in : *const fn (origin: i32, m: *Message) CBEventType, -// out : *const fn (origin: i32, m: *Message) CBEventType) void { -// -// if (fd == self.origin) { -// self.orig_in = in; -// self.orig_out = out; -// } -// else { -// self.dest_in = in; -// self.dest_out = out; -// } -// } -// -// pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { -// try fmt.format(out_stream -// , "switch {} <-> {}" -// , .{ self.origin, self.destination} ); -// } -// }; - -// test "Switch - creation and display" { -// const config = .{.safety = true}; -// var gpa = std.heap.GeneralPurposeAllocator(config){}; -// defer _ = gpa.deinit(); -// const allocator = gpa.allocator(); -// -// var switchdb = Switches.init(allocator); -// defer switchdb.deinit(); -// -// var first = Switch.init(3,8); // origin destination -// var second = Switch.init(2,4); // origin destination -// try switchdb.append(first); -// try switchdb.append(second); -// -// try print_eq("switch 3 <-> 8", first); -// try print_eq("switch 2 <-> 4", second); -// -// try testing.expect(2 == switchdb.items.len); -// }