From 8e889ef242d6cee038048e5395ba1aea63beb205 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Sat, 15 Jun 2024 22:19:31 +0200 Subject: [PATCH] Event now includes a "newfd" field for new client's fd. --- libipc.h | 2 +- src/bindings.zig | 3 ++- src/context.zig | 39 +++++++++++++++++++++++---------------- src/event.zig | 14 +++++++++----- src/hexdump.zig | 4 ++-- src/switch.zig | 12 ++++++------ 6 files changed, 43 insertions(+), 31 deletions(-) diff --git a/libipc.h b/libipc.h index 2bddef9..0385ee2 100644 --- a/libipc.h +++ b/libipc.h @@ -31,7 +31,7 @@ int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen); int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen); int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen); int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen); -int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen); +int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, int* newfd, char* buffer, size_t* buflen); void ipc_context_timer (void* ctx, int timer); int ipc_close_fd (void* ctx, int fd); int ipc_close (void* ctx, size_t index); diff --git a/src/bindings.zig b/src/bindings.zig index 545e343..6e7f924 100644 --- a/src/bindings.zig +++ b/src/bindings.zig @@ -90,7 +90,7 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c /// Wait for an event. /// Buffer length will be changed to the size of the received message. -export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { +export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, newfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { const event = ctx.wait_event() catch |err| switch (err) { else => { log.warn("error while waiting for an event: {}\n", .{err}); @@ -100,6 +100,7 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b t.* = @intFromEnum(event.t); index.* = event.index; originfd.* = event.origin; + newfd.* = event.newfd; if (event.m) |m| { var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]); diff --git a/src/context.zig b/src/context.zig index 194fc35..2210801 100644 --- a/src/context.zig +++ b/src/context.zig @@ -218,6 +218,13 @@ pub const Context = struct { try self.add_(newcon, newfd); } + // /// Change the type of a file descriptor. + // /// Useful for protocol daemons (ex: TCPd) or proxies listening to local IPC connections for new clients. + // /// In this case, the client can be "switched" but needs to be marked as such. + // /// TODO + // pub fn change_fd_type (self: *Self, fd: i32, new_type: Connection.Type) !void { + // } + fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { // net.Server const serverfd = self.pollfd.items[server_index].fd; @@ -233,7 +240,7 @@ pub const Context = struct { const sfd = server.stream.handle; // WARNING: imply every new item is last - event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null); + event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, newfd, null); } // Create a unix socket. @@ -365,7 +372,7 @@ pub const Context = struct { // Wait for an event. pub fn wait_event(self: *Self) !Event { - var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null); + var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, 0, null); var wait_duration: i32 = -1; // -1 == unlimited if (self.timer) |t| { @@ -399,17 +406,17 @@ pub const Context = struct { if (count < 0) { log.err("there is a problem: poll < 0", .{}); - current_event = Event.init(Event.Type.ERROR, 0, 0, null); + current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null); return current_event; } const duration = timer.read() / 1000000; // ns -> ms if (count == 0) { if (duration >= wait_duration) { - current_event = Event.init(Event.Type.TIMER, 0, 0, null); + current_event = Event.init(Event.Type.TIMER, 0, 0, 0, null); } else { // In case nothing happened, and poll wasn't triggered by time out. - current_event = Event.init(Event.Type.ERROR, 0, 0, null); + current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null); } return current_event; } @@ -455,7 +462,7 @@ pub const Context = struct { } // EXTERNAL = user handles IO else if (self.connections.items[i].t == .EXTERNAL) { - return Event.init(Event.Type.EXTERNAL, i, current_fd, null); + return Event.init(Event.Type.EXTERNAL, i, current_fd, 0, null); } // otherwise = new message or disconnection else { @@ -463,26 +470,26 @@ pub const Context = struct { error.ConnectionResetByPeer => { log.warn("connection reset by peer", .{}); try self.close(i); - return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); + return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null); }, error.wrongMessageLength => { log.warn("wrong message length, terminating the connection", .{}); try self.close(i); - return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); + return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null); }, else => { log.warn("unmanaged error while reading a message ({})", .{err}); try self.close(i); - return Event.init(Event.Type.ERROR, i, current_fd, null); + return Event.init(Event.Type.ERROR, i, current_fd, 0, null); }, }; if (maybe_message) |m| { - return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m); + return Event.init(Event.Type.MESSAGE_RX, i, current_fd, 0, m); } try self.close(i); - return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); + return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null); } } @@ -526,21 +533,21 @@ pub const Context = struct { error.BrokenPipe => { log.warn("cannot send message, dest probably closed the connection ({})", .{err}); try self.close(i); - return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); + return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null); }, else => { log.warn("unmanaged error while sending a message ({})", .{err}); try self.close(i); - return Event.init(Event.Type.ERROR, i, current_fd, null); + return Event.init(Event.Type.ERROR, i, current_fd, 0, null); }, }; - return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null); + return Event.init(Event.Type.MESSAGE_TX, i, current_fd, 0, null); } } // .revent is POLLHUP if (fd.revents & std.os.linux.POLL.HUP > 0) { // handle disconnection - current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null); + current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null); try self.close(i); return current_event; } @@ -548,7 +555,7 @@ pub const Context = struct { if ((fd.revents & std.os.linux.POLL.HUP > 0) or (fd.revents & std.os.linux.POLL.NVAL > 0)) { - return Event.init(Event.Type.ERROR, i, current_fd, null); + return Event.init(Event.Type.ERROR, i, current_fd, 0, null); } } diff --git a/src/event.zig b/src/event.zig index 2f64be2..b559ee6 100644 --- a/src/event.zig +++ b/src/event.zig @@ -48,23 +48,26 @@ pub const Event = struct { t: Event.Type, index: usize, origin: i32, // socket fd + newfd: i32, // on new connection, tell the new client's socket fd m: ?Message, // message const Self = @This(); - pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self { + pub fn init(t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) Self { return Self{ .t = t, .index = index, .origin = origin, + .newfd = newfd, .m = m, }; } - pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void { + pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) void { self.t = t; self.index = index; self.origin = origin; + self.newfd = newfd; self.m = m; } @@ -72,6 +75,7 @@ pub const Event = struct { self.t = Event.Type.ERROR; self.index = @as(usize, 0); self.origin = @as(i32, 0); + self.newfd = @as(i32, 0); if (self.m) |message| { message.deinit(); } @@ -79,7 +83,7 @@ pub const Event = struct { } pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { - try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m }); + try fmt.format(out_stream, "{}, origin: {}, newfd: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.newfd, self.index, self.m }); } }; @@ -92,7 +96,7 @@ test "Event - creation and display" { const s = "hello!!"; var m = try Message.init(1, allocator, s); // fd type payload defer m.deinit(); - const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message + const e = Event.init(Event.Type.CONNECTION, 5, 8, 4, m); // type index origin message - try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e); + try print_eq("event.Event.Type.CONNECTION, origin: 8, newfd: 4, index 5, message: [fd: 1, payload: [hello!!]]", e); } diff --git a/src/hexdump.zig b/src/hexdump.zig index f600fa4..8f21436 100644 --- a/src/hexdump.zig +++ b/src/hexdump.zig @@ -1,13 +1,13 @@ const std = @import("std"); -pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void { +pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.posix.WriteError!void { // Print a header. if (header.len > 0) { var hdr: [64]u8 = undefined; const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1); @memset(hdr[0..hdr.len], ' '); - std.mem.copy(u8, hdr[offset..hdr.len], header); + std.mem.copyForwards(u8, hdr[offset..hdr.len], header); try stream.writeAll(hdr[0..hdr.len]); try stream.writeAll("\n"); diff --git a/src/switch.zig b/src/switch.zig index 10d1308..9d13594 100644 --- a/src/switch.zig +++ b/src/switch.zig @@ -145,26 +145,26 @@ pub const SwitchDB = struct { var message: ?Message = null; message = self.read(fd) catch |err| switch (err) { error.closeFD => { - return Event.init(Event.Type.DISCONNECTION, index, fd, null); + return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null); }, error.unregisteredFD, error.generic => { - return Event.init(Event.Type.ERROR, index, fd, null); + return Event.init(Event.Type.ERROR, index, fd, 0, null); }, }; - return Event.init(Event.Type.SWITCH_RX, index, fd, message); + return Event.init(Event.Type.SWITCH_RX, index, fd, 0, message); } pub fn handle_event_write(self: *Self, index: usize, message: Message) Event { const fd = message.fd; self.write(message) catch |err| switch (err) { error.closeFD => { - return Event.init(Event.Type.DISCONNECTION, index, fd, null); + return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null); }, error.unregisteredFD, error.generic => { - return Event.init(Event.Type.ERROR, index, fd, null); + return Event.init(Event.Type.ERROR, index, fd, 0, null); }, }; - return Event.init(Event.Type.SWITCH_TX, index, fd, null); + return Event.init(Event.Type.SWITCH_TX, index, fd, 0, null); } /// Simple wrapper around self.db.get.