Event now includes a "newfd" field for new client's fd.

This commit is contained in:
Philippe PITTOLI 2024-06-15 22:19:31 +02:00
parent 31e05ef1c2
commit 8e889ef242
6 changed files with 43 additions and 31 deletions

View File

@ -31,7 +31,7 @@ int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen); int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen);
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen); int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen); int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen); int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, int* newfd, char* buffer, size_t* buflen);
void ipc_context_timer (void* ctx, int timer); void ipc_context_timer (void* ctx, int timer);
int ipc_close_fd (void* ctx, int fd); int ipc_close_fd (void* ctx, int fd);
int ipc_close (void* ctx, size_t index); int ipc_close (void* ctx, size_t index);

View File

@ -90,7 +90,7 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c
/// Wait for an event. /// Wait for an event.
/// Buffer length will be changed to the size of the received message. /// Buffer length will be changed to the size of the received message.
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, newfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
const event = ctx.wait_event() catch |err| switch (err) { const event = ctx.wait_event() catch |err| switch (err) {
else => { else => {
log.warn("error while waiting for an event: {}\n", .{err}); log.warn("error while waiting for an event: {}\n", .{err});
@ -100,6 +100,7 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
t.* = @intFromEnum(event.t); t.* = @intFromEnum(event.t);
index.* = event.index; index.* = event.index;
originfd.* = event.origin; originfd.* = event.origin;
newfd.* = event.newfd;
if (event.m) |m| { if (event.m) |m| {
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]); var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);

View File

@ -218,6 +218,13 @@ pub const Context = struct {
try self.add_(newcon, newfd); try self.add_(newcon, newfd);
} }
// /// Change the type of a file descriptor.
// /// Useful for protocol daemons (ex: TCPd) or proxies listening to local IPC connections for new clients.
// /// In this case, the client can be "switched" but needs to be marked as such.
// /// TODO
// pub fn change_fd_type (self: *Self, fd: i32, new_type: Connection.Type) !void {
// }
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
// net.Server // net.Server
const serverfd = self.pollfd.items[server_index].fd; const serverfd = self.pollfd.items[server_index].fd;
@ -233,7 +240,7 @@ pub const Context = struct {
const sfd = server.stream.handle; const sfd = server.stream.handle;
// WARNING: imply every new item is last // WARNING: imply every new item is last
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null); event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, newfd, null);
} }
// Create a unix socket. // Create a unix socket.
@ -365,7 +372,7 @@ pub const Context = struct {
// Wait for an event. // Wait for an event.
pub fn wait_event(self: *Self) !Event { pub fn wait_event(self: *Self) !Event {
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null); var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
var wait_duration: i32 = -1; // -1 == unlimited var wait_duration: i32 = -1; // -1 == unlimited
if (self.timer) |t| { if (self.timer) |t| {
@ -399,17 +406,17 @@ pub const Context = struct {
if (count < 0) { if (count < 0) {
log.err("there is a problem: poll < 0", .{}); log.err("there is a problem: poll < 0", .{});
current_event = Event.init(Event.Type.ERROR, 0, 0, null); current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
return current_event; return current_event;
} }
const duration = timer.read() / 1000000; // ns -> ms const duration = timer.read() / 1000000; // ns -> ms
if (count == 0) { if (count == 0) {
if (duration >= wait_duration) { if (duration >= wait_duration) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null); current_event = Event.init(Event.Type.TIMER, 0, 0, 0, null);
} else { } else {
// In case nothing happened, and poll wasn't triggered by time out. // In case nothing happened, and poll wasn't triggered by time out.
current_event = Event.init(Event.Type.ERROR, 0, 0, null); current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
} }
return current_event; return current_event;
} }
@ -455,7 +462,7 @@ pub const Context = struct {
} }
// EXTERNAL = user handles IO // EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) { else if (self.connections.items[i].t == .EXTERNAL) {
return Event.init(Event.Type.EXTERNAL, i, current_fd, null); return Event.init(Event.Type.EXTERNAL, i, current_fd, 0, null);
} }
// otherwise = new message or disconnection // otherwise = new message or disconnection
else { else {
@ -463,26 +470,26 @@ pub const Context = struct {
error.ConnectionResetByPeer => { error.ConnectionResetByPeer => {
log.warn("connection reset by peer", .{}); log.warn("connection reset by peer", .{});
try self.close(i); try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
}, },
error.wrongMessageLength => { error.wrongMessageLength => {
log.warn("wrong message length, terminating the connection", .{}); log.warn("wrong message length, terminating the connection", .{});
try self.close(i); try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
}, },
else => { else => {
log.warn("unmanaged error while reading a message ({})", .{err}); log.warn("unmanaged error while reading a message ({})", .{err});
try self.close(i); try self.close(i);
return Event.init(Event.Type.ERROR, i, current_fd, null); return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
}, },
}; };
if (maybe_message) |m| { if (maybe_message) |m| {
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m); return Event.init(Event.Type.MESSAGE_RX, i, current_fd, 0, m);
} }
try self.close(i); try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
} }
} }
@ -526,21 +533,21 @@ pub const Context = struct {
error.BrokenPipe => { error.BrokenPipe => {
log.warn("cannot send message, dest probably closed the connection ({})", .{err}); log.warn("cannot send message, dest probably closed the connection ({})", .{err});
try self.close(i); try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null); return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
}, },
else => { else => {
log.warn("unmanaged error while sending a message ({})", .{err}); log.warn("unmanaged error while sending a message ({})", .{err});
try self.close(i); try self.close(i);
return Event.init(Event.Type.ERROR, i, current_fd, null); return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
}, },
}; };
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null); return Event.init(Event.Type.MESSAGE_TX, i, current_fd, 0, null);
} }
} }
// .revent is POLLHUP // .revent is POLLHUP
if (fd.revents & std.os.linux.POLL.HUP > 0) { if (fd.revents & std.os.linux.POLL.HUP > 0) {
// handle disconnection // handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null); current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
try self.close(i); try self.close(i);
return current_event; return current_event;
} }
@ -548,7 +555,7 @@ pub const Context = struct {
if ((fd.revents & std.os.linux.POLL.HUP > 0) or if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.NVAL > 0)) (fd.revents & std.os.linux.POLL.NVAL > 0))
{ {
return Event.init(Event.Type.ERROR, i, current_fd, null); return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
} }
} }

View File

@ -48,23 +48,26 @@ pub const Event = struct {
t: Event.Type, t: Event.Type,
index: usize, index: usize,
origin: i32, // socket fd origin: i32, // socket fd
newfd: i32, // on new connection, tell the new client's socket fd
m: ?Message, // message m: ?Message, // message
const Self = @This(); const Self = @This();
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self { pub fn init(t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) Self {
return Self{ return Self{
.t = t, .t = t,
.index = index, .index = index,
.origin = origin, .origin = origin,
.newfd = newfd,
.m = m, .m = m,
}; };
} }
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void { pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) void {
self.t = t; self.t = t;
self.index = index; self.index = index;
self.origin = origin; self.origin = origin;
self.newfd = newfd;
self.m = m; self.m = m;
} }
@ -72,6 +75,7 @@ pub const Event = struct {
self.t = Event.Type.ERROR; self.t = Event.Type.ERROR;
self.index = @as(usize, 0); self.index = @as(usize, 0);
self.origin = @as(i32, 0); self.origin = @as(i32, 0);
self.newfd = @as(i32, 0);
if (self.m) |message| { if (self.m) |message| {
message.deinit(); message.deinit();
} }
@ -79,7 +83,7 @@ pub const Event = struct {
} }
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m }); try fmt.format(out_stream, "{}, origin: {}, newfd: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.newfd, self.index, self.m });
} }
}; };
@ -92,7 +96,7 @@ test "Event - creation and display" {
const s = "hello!!"; const s = "hello!!";
var m = try Message.init(1, allocator, s); // fd type payload var m = try Message.init(1, allocator, s); // fd type payload
defer m.deinit(); defer m.deinit();
const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message const e = Event.init(Event.Type.CONNECTION, 5, 8, 4, m); // type index origin message
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e); try print_eq("event.Event.Type.CONNECTION, origin: 8, newfd: 4, index 5, message: [fd: 1, payload: [hello!!]]", e);
} }

View File

@ -1,13 +1,13 @@
const std = @import("std"); const std = @import("std");
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void { pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.posix.WriteError!void {
// Print a header. // Print a header.
if (header.len > 0) { if (header.len > 0) {
var hdr: [64]u8 = undefined; var hdr: [64]u8 = undefined;
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1); const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
@memset(hdr[0..hdr.len], ' '); @memset(hdr[0..hdr.len], ' ');
std.mem.copy(u8, hdr[offset..hdr.len], header); std.mem.copyForwards(u8, hdr[offset..hdr.len], header);
try stream.writeAll(hdr[0..hdr.len]); try stream.writeAll(hdr[0..hdr.len]);
try stream.writeAll("\n"); try stream.writeAll("\n");

View File

@ -145,26 +145,26 @@ pub const SwitchDB = struct {
var message: ?Message = null; var message: ?Message = null;
message = self.read(fd) catch |err| switch (err) { message = self.read(fd) catch |err| switch (err) {
error.closeFD => { error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null); return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
}, },
error.unregisteredFD, error.generic => { error.unregisteredFD, error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null); return Event.init(Event.Type.ERROR, index, fd, 0, null);
}, },
}; };
return Event.init(Event.Type.SWITCH_RX, index, fd, message); return Event.init(Event.Type.SWITCH_RX, index, fd, 0, message);
} }
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event { pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
const fd = message.fd; const fd = message.fd;
self.write(message) catch |err| switch (err) { self.write(message) catch |err| switch (err) {
error.closeFD => { error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null); return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
}, },
error.unregisteredFD, error.generic => { error.unregisteredFD, error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null); return Event.init(Event.Type.ERROR, index, fd, 0, null);
}, },
}; };
return Event.init(Event.Type.SWITCH_TX, index, fd, null); return Event.init(Event.Type.SWITCH_TX, index, fd, 0, null);
} }
/// Simple wrapper around self.db.get. /// Simple wrapper around self.db.get.