From f07b9151240b15eed2cae13094bc888935e84dbc Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Wed, 18 Jan 2023 17:22:56 +0100 Subject: [PATCH] API change introduced bugs in switch: fixed. --- zig-impl/libipc.h | 14 +++++++++++ zig-impl/src/bindings.zig | 1 - zig-impl/src/context.zig | 3 ++- zig-impl/src/switch.zig | 46 +++++++++++++++++++++++++--------- zig-impl/src/tcpd.zig | 16 ++++++++++++ zig-impl/test-bindings/pong.c | 12 +++++---- zig-impl/test-bindings/pongd.c | 7 +++--- 7 files changed, 77 insertions(+), 22 deletions(-) diff --git a/zig-impl/libipc.h b/zig-impl/libipc.h index a68fe4c..c425685 100644 --- a/zig-impl/libipc.h +++ b/zig-impl/libipc.h @@ -18,6 +18,14 @@ enum event_types { , TX = 8 // Message sent. }; +// Return type of callback functions when switching. +enum cb_event_types { + NO_ERROR = 0, // No error. A message was generated. + , ERROR = 1, // Generic error. + , FD_CLOSING = 2, // The fd is closing. + , IGNORE = 3, // The message should be ignored (protocol specific). +}; + int ipc_context_init (void** ptr); int ipc_service_init (void* ctx, int* servicefd, const char* service_name, unsigned short service_name_len); int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, unsigned short service_name_len); @@ -30,7 +38,13 @@ int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffe void ipc_context_timer (void* ctx, int timer); int ipc_close_fd (void* ctx, int fd); int ipc_close (void* ctx, size_t index); + +// Switch functions (for "protocol" services, such as TCPd). int ipc_add_external (void* ctx, int newfd); int ipc_add_switch (void* ctx, int fd1, int fd2); +int ipc_set_switch_callbacks (void* ctx, int fd + , enum cb_event_types (*in (int orig, const char *payload, unsigned int *mlen)) + , enum cb_event_types (*out(int dest, char *payload, unsigned int mlen))); + #endif diff --git a/zig-impl/src/bindings.zig b/zig-impl/src/bindings.zig index f1d073a..b8df095 100644 --- a/zig-impl/src/bindings.zig +++ b/zig-impl/src/bindings.zig @@ -132,7 +132,6 @@ export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 { return 0; } -/// TODO: change the functions in the switch code, not to take a Message as a parameter. export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32 , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 { diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index c0ac264..37db580 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -275,6 +275,7 @@ pub const Context = struct { } pub fn schedule (self: *Self, m: Message) !void { + print ("scheduling new message {}\n", .{m}); try self.tx.append(m); } @@ -426,7 +427,7 @@ pub const Context = struct { return error.incoherentSwitchError; }, } - return Event.init(Event.Type.SWITCH_RX, i, fd.fd, null); + return current_event; } // EXTERNAL = user handles IO else if (self.connections.items[i].t == .EXTERNAL) { diff --git a/zig-impl/src/switch.zig b/zig-impl/src/switch.zig index 71b2447..9117dd7 100644 --- a/zig-impl/src/switch.zig +++ b/zig-impl/src/switch.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const hexdump = @import("./hexdump.zig"); const testing = std.testing; const fmt = std.fmt; @@ -79,19 +80,22 @@ pub const SwitchDB = struct { var message_size: u32 = @truncate(u32, buffer.len); var r: CBEventType = managedconnection.in(fd, &buffer, &message_size); - // TODO: read message - // TODO: better allocator? - // TODO: better errors? - var message: Message = Message.init(managedconnection.dest - , std.heap.c_allocator - , buffer[0..message_size]) catch { - return error.generic; - }; - + print ("MESSAGE READ: {} (from {} to {})\n", .{r, fd, managedconnection.dest}); switch (r) { // The message should be ignored (protocol specific). CBEventType.IGNORE => { return null; }, - CBEventType.NO_ERROR => { return message; }, + CBEventType.NO_ERROR => { + // TODO: read message + // TODO: better allocator? + // TODO: better errors? + var message: Message + = Message.init(managedconnection.dest + , std.heap.c_allocator + , buffer[0..message_size]) catch { + return error.generic; + }; + return message; + }, CBEventType.FD_CLOSING => { return error.closeFD; }, // Generic error, or the message was read but with errors. CBEventType.ERROR => { return error.generic; }, @@ -113,7 +117,8 @@ pub const SwitchDB = struct { // returning basic errors, no details. _ = message.write(writer) catch return error.generic; var written = fbs.getWritten(); - var r = managedconnection.out(managedconnection.dest, written.ptr, @truncate(u32, written.len)); + + var r = managedconnection.out(message.fd, written.ptr, @truncate(u32, written.len)); switch (r) { // The message should be ignored (protocol specific). @@ -298,11 +303,19 @@ fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType { // Let's handle this as a disconnection. if (packet_size < 4) { + print("message is less than 4 bytes ({} bytes)\n", .{packet_size}); return CBEventType.FD_CLOSING; } mlen.* = @truncate(u32, packet_size); + var hexbuf: [4000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + hexdump.hexdump(hexwriter, "DEFAULT IN: MESSAGE RECEIVED", mcontent[0..packet_size]) catch unreachable; + print("{s}\n", .{hexfbs.getWritten()}); + print("packet_size {}, mlen.* {}\n", .{packet_size, mlen.*}); + return CBEventType.NO_ERROR; } @@ -310,7 +323,16 @@ fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType { // print ("sending a message originated from {}\n", .{origin}); // Message contains the fd, no need to search for the right structure to copy, // let's just recreate a Stream from the fd. + + var to_send = mcontent[0..mlen]; + var hexbuf: [4000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + hexdump.hexdump(hexwriter, "DEFAULT OUT: MESSAGE TO SEND", to_send) catch unreachable; + print("{s}\n", .{hexfbs.getWritten()}); + var stream = net.Stream { .handle = fd }; - _ = stream.write (mcontent[0..mlen]) catch return CBEventType.ERROR; + var bytes_sent = stream.write (to_send) catch return CBEventType.ERROR; + print("sent {} to {}\n", .{bytes_sent, fd}); return CBEventType.NO_ERROR; } diff --git a/zig-impl/src/tcpd.zig b/zig-impl/src/tcpd.zig index 70f61b0..31e4a5e 100644 --- a/zig-impl/src/tcpd.zig +++ b/zig-impl/src/tcpd.zig @@ -98,10 +98,16 @@ fn create_service() !void { try ctx.add_external (serverfd); var some_event: ipc.Event = undefined; + var previous_event: ipc.Event.Type = ipc.Event.Type.ERROR; ctx.timer = 1000; // 1 second var count: u32 = 0; while(! S.should_quit) { some_event = try ctx.wait_event(); + + // For clarity in the output. + if (some_event.t != .TIMER and previous_event == .TIMER ) { print("\n", .{}); } + previous_event = some_event.t; + switch (some_event.t) { .TIMER => { print ("\rTimer! ({})", .{count}); @@ -151,6 +157,16 @@ fn create_service() !void { .SWITCH_RX => { print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin}); + // if (some_event.m) |m| { + // var hexbuf: [4000]u8 = undefined; + // var hexfbs = std.io.fixedBufferStream(&hexbuf); + // var hexwriter = hexfbs.writer(); + // try hexdump.hexdump(hexwriter, "Received", m.payload); + // print("{s}\n", .{hexfbs.getWritten()}); + // } + // else { + // print ("Message received without actually a message?! {}", .{some_event}); + // } }, .SWITCH_TX => { diff --git a/zig-impl/test-bindings/pong.c b/zig-impl/test-bindings/pong.c index 242e894..c78acc7 100644 --- a/zig-impl/test-bindings/pong.c +++ b/zig-impl/test-bindings/pong.c @@ -7,6 +7,8 @@ #define SERVICE "pong" #define SERVICE_LEN 4 +#define MAX_MSG_SIZE 10000 + int direct_write_then_read(void); int wait_event(void); @@ -50,8 +52,8 @@ int direct_write_then_read(void) { return 1; } - char message[10000]; - size_t size = 10000; + char message[MAX_MSG_SIZE]; + size_t size = MAX_MSG_SIZE; ret = ipc_read_fd (ctx, servicefd, message, &size); @@ -86,9 +88,9 @@ int direct_write_then_read(void) { int wait_event(void) { int ret = 0; int servicefd = 0; - char message[10000]; + char message[MAX_MSG_SIZE]; memset (message, 0, 1000); - size_t size = 10000; + size_t size = MAX_MSG_SIZE; char event_type; size_t index = 0; int originfd = 0; @@ -134,7 +136,7 @@ int wait_event(void) { char should_continue = 1; size_t count = 0; while(should_continue) { - size = 10000; + size = MAX_MSG_SIZE; ret = ipc_wait_event (ctx, &event_type, &index, &originfd, message, &size); if (ret != 0) { printf ("Error while waiting for an event.\n"); diff --git a/zig-impl/test-bindings/pongd.c b/zig-impl/test-bindings/pongd.c index 3f65003..e3b5bee 100644 --- a/zig-impl/test-bindings/pongd.c +++ b/zig-impl/test-bindings/pongd.c @@ -6,12 +6,13 @@ #define SERVICE "pong" #define SERVICE_LEN 4 +#define MAX_MSG_SIZE 10000 int main(int argc, char**argv) { int ret = 0; int servicefd = 0; - char message[10000]; - size_t size = 10000; + char message[MAX_MSG_SIZE]; + size_t size = MAX_MSG_SIZE; char event_type; size_t index = 0; int originfd = 0; @@ -48,7 +49,7 @@ int main(int argc, char**argv) { size_t count = 0; size_t count_timer = 0; while(should_continue) { - size = 10000; + size = MAX_MSG_SIZE; ret = ipc_wait_event (ctx, &event_type, &index, &originfd, message, &size); if (ret != 0) { printf ("Error while waiting for an event.\n");