diff --git a/zig-impl/libipc.h b/zig-impl/libipc.h index f3dc9a1..a68fe4c 100644 --- a/zig-impl/libipc.h +++ b/zig-impl/libipc.h @@ -1,6 +1,11 @@ #ifndef LIBIPC #define LIBIPC +struct message { + uint32_t size; + char* payload; +}; + enum event_types { ERROR = 0 // A problem occured. , EXTERNAL = 1 // Message received from a non IPC socket. diff --git a/zig-impl/src/bindings.zig b/zig-impl/src/bindings.zig index a7ca08c..f1d073a 100644 --- a/zig-impl/src/bindings.zig +++ b/zig-impl/src/bindings.zig @@ -3,6 +3,7 @@ const print = std.debug.print; const ipc = @import("./main.zig"); const Context = ipc.Context; const Message = ipc.Message; +const CBEventType = ipc.CBEvent.Type; export fn ipc_context_init (ptr: **Context) callconv(.C) i32 { ptr.* = std.heap.c_allocator.create(Context) catch return -1; @@ -131,5 +132,10 @@ export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 { return 0; } -// Later. -// pub fn set_switch_callbacks +/// TODO: change the functions in the switch code, not to take a Message as a parameter. +export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32 + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 { + ctx.set_switch_callbacks (fd, in, out) catch return -1; + return 0; +} diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index bfd304b..c0ac264 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -294,8 +294,8 @@ pub const Context = struct { } pub fn set_switch_callbacks(self: *Self, fd: i32 - , in : *const fn (origin: i32, m: *Message) CBEventType - , out : *const fn (origin: i32, m: *const Message) CBEventType) !void { + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void { try self.switchdb.set_callbacks(fd,in, out); } diff --git a/zig-impl/src/switch.zig b/zig-impl/src/switch.zig index 095d158..6233622 100644 --- a/zig-impl/src/switch.zig +++ b/zig-impl/src/switch.zig @@ -15,9 +15,26 @@ const print = std.debug.print; const Event = ipc.Event; -/// SwitchDB -/// Functions read and write: handle - +/// SwitchDB: store relations between clients and services. +/// +/// A protocol service, such as TPCd can handle "external" communications (TCP in this case) +/// meaning that a client can connect to this service through a canal that isn't a simple +/// libipc UNIX socket, and this client is then connected to a local service. +/// OTOH, a local client can ask TCPd to establish a connection to a remote service. +/// In both cases, at least one of the connection isn't libipc-based and should be +/// handled in a specific way that only TPCd (or another protocol service) can. +/// +/// TCPd marks both file descriptors as "related" (add_switch) so libipc can automatically +/// handle messages between the client and the service. Any input from one end will be sent +/// to the other. +/// TCPd registers functions to handle specific input and output operations from and to the +/// remote connection (set_callbacks). +/// +/// At any point, TCPd can safely close a connection and remote it from the SwitchDB (nuke), +/// resulting in the removal of both the connection's FD and its related FD (both the client +/// and the service connections are removed). +/// +/// Currently, libipc automatically closes both the client and its service when an error occurs. pub const SwitchDB = struct { const Self = @This(); @@ -45,9 +62,8 @@ pub const SwitchDB = struct { } pub fn set_callbacks(self: *Self, fd: i32 - , in : *const fn (origin: i32, m: *Message) CBEventType - , out : *const fn (origin: i32, m: *const Message) CBEventType) !void { - + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void { var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; managedconnection.in = in; managedconnection.out = out; @@ -58,23 +74,27 @@ pub const SwitchDB = struct { pub fn read (self: *Self, fd: i32) !?Message { // assert there is an entry with this fd as a key. var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; - var message: Message = undefined; - var r: CBEventType = managedconnection.in(fd, &message); + var buffer = [_]u8{0} ** 100000; // TODO: buffer size + var message_size: u32 = @truncate(u32, buffer.len); + var r: CBEventType = managedconnection.in(fd, &buffer, &message_size); + + // TODO: read message + // TODO: better allocator? + // TODO: better errors? + var message: Message = Message.init(managedconnection.dest + , std.heap.c_allocator + , buffer[0..message_size]) catch { + return error.generic; + }; switch (r) { // The message should be ignored (protocol specific). - CBEventType.IGNORE => { return null; }, - // No error. A message was generated. - CBEventType.NO_ERROR => { - message.fd = managedconnection.dest; - return message; - }, + CBEventType.IGNORE => { return null; }, + CBEventType.NO_ERROR => { return message; }, CBEventType.FD_CLOSING => { return error.closeFD; }, // Generic error, or the message was read but with errors. - CBEventType.ERROR => { - return error.generic; - }, + CBEventType.ERROR => { return error.generic; }, } unreachable; @@ -85,7 +105,15 @@ pub const SwitchDB = struct { pub fn write (self: *Self, message: Message) !void { // assert there is an entry with this fd as a key. var managedconnection = self.db.get(message.fd) orelse return error.unregisteredFD; - var r = managedconnection.out(managedconnection.dest, &message); + + var buffer = [_]u8{0} ** 100000; // TODO: buffer size + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + // returning basic errors, no details. + _ = message.write(writer) catch return error.generic; + var written = fbs.getWritten(); + var r = managedconnection.out(managedconnection.dest, written.ptr, @truncate(u32, written.len)); switch (r) { // The message should be ignored (protocol specific). @@ -104,6 +132,7 @@ pub const SwitchDB = struct { unreachable; } + /// From a message to read on a socket to an Event. pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event { var message: ?Message = null; message = self.read (fd) catch |err| switch(err) { @@ -139,6 +168,7 @@ pub const SwitchDB = struct { return self.db.get(fd).?.dest; } + /// Remove both entries (client and service) from the DB. pub fn nuke (self: *Self, fd: i32) void { if (self.db.fetchSwapRemove(fd)) |kv| { _ = self.db.swapRemove(kv.value.dest); @@ -148,8 +178,8 @@ pub const SwitchDB = struct { const ManagedConnection = struct { dest : i32, - in : *const fn (origin: i32, m: *Message) CBEventType = default_in, - out : *const fn (origin: i32, m: *const Message) CBEventType = default_out, + in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in, + out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out, }; test "creation and display" { @@ -167,14 +197,18 @@ test "creation and display" { try print_eq("{ (5,6)(6,5) }", .{switchdb}); } -fn successful_in (_: i32, m: *Message) CBEventType { - m.* = Message.init(8, std.heap.c_allocator, "coucou") catch { - return CBEventType.ERROR; - }; +fn successful_in (_: i32, mcontent: [*]const u8, mlen: *u32) CBEventType { + var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable; + defer m.deinit(); + + var fbs = std.io.fixedBufferStream(mcontent); + var writer = fbs.writer(); + _ = m.write (writer) catch unreachable; + mlen.* = @truncate(u32, m.payload.len); return CBEventType.NO_ERROR; } -fn successful_out (_: i32, _: *const Message) CBEventType { +fn successful_out (_: i32, _: [*]const u8, _: u32) CBEventType { return CBEventType.NO_ERROR; } @@ -205,11 +239,11 @@ test "successful exchanges" { if (event_3.m) |_| { return error.ShouldNotCarryMessage; } } -fn unsuccessful_in (_: i32, _: *Message) CBEventType { +fn unsuccessful_in (_: i32, _: [*]const u8, _: *u32) CBEventType { return CBEventType.ERROR; } -fn unsuccessful_out (_: i32, _: *const Message) CBEventType { +fn unsuccessful_out (_: i32, _: [*]const u8, _: u32) CBEventType { return CBEventType.ERROR; } @@ -255,39 +289,28 @@ test "nuke 'em" { try testing.expect(switchdb.db.count() == 0); } -fn default_in (origin: i32, m: *Message) CBEventType { +fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType { // print ("receiving a message originated from {}\n", .{origin}); - var buffer: [2000000]u8 = undefined; // TODO: FIXME?? - var packet_size: usize = undefined; // This may be kinda hacky, idk. var stream: net.Stream = .{ .handle = origin }; - packet_size = stream.read(buffer[0..]) catch return CBEventType.ERROR; + var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR; // Let's handle this as a disconnection. - if (packet_size <= 4) { + if (packet_size < 4) { return CBEventType.FD_CLOSING; } - m.* = Message.read(origin, buffer[0..], std.heap.c_allocator) - catch return CBEventType.ERROR; + mlen.* = @truncate(u32, packet_size); return CBEventType.NO_ERROR; } -fn default_out (_: i32, m: *const Message) CBEventType { +fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType { // print ("sending a message originated from {}\n", .{origin}); - // Message contains the fd, no need to search for - // the right structure to copy, let's just recreate - // a Stream from the fd. - var stream = net.Stream { .handle = m.fd }; - - var buffer: [200000]u8 = undefined; // TODO: buffer size - var fbs = std.io.fixedBufferStream(&buffer); - var writer = fbs.writer(); - - // returning basic errors, no details. - _ = m.write(writer) catch return CBEventType.ERROR; - _ = stream.write (fbs.getWritten()) catch return CBEventType.ERROR; + // Message contains the fd, no need to search for the right structure to copy, + // let's just recreate a Stream from the fd. + var stream = net.Stream { .handle = fd }; + _ = stream.write (mcontent[0..mlen]) catch return CBEventType.ERROR; return CBEventType.NO_ERROR; } diff --git a/zig-impl/test-bindings/pongd.c b/zig-impl/test-bindings/pongd.c index f899db0..3f65003 100644 --- a/zig-impl/test-bindings/pongd.c +++ b/zig-impl/test-bindings/pongd.c @@ -40,8 +40,8 @@ int main(int argc, char**argv) { return 1; } - printf ("Set the timer to ten seconds.\n"); - ipc_context_timer (ctx, 10000); + printf ("Set the timer to two seconds.\n"); + ipc_context_timer (ctx, 2000); printf ("Loop over events.\n"); char should_continue = 1;