From 1f5ac951cb789b4dc707f7381952d4d6031b0411 Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Sat, 24 Dec 2022 23:09:25 +0100 Subject: [PATCH] wait_event function: first draft okay --- zig-impl/src/context.zig | 100 +++++++++++++++++++++++++++++-------- zig-impl/src/event.zig | 14 +++--- zig-impl/src/main.zig | 4 +- zig-impl/src/misc-test.zig | 21 +++++++- 4 files changed, 107 insertions(+), 32 deletions(-) diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 936ae03..7aae866 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -5,6 +5,8 @@ const net = std.net; const os = std.os; const fmt = std.fmt; +const Timer = std.time.Timer; + const print = std.debug.print; const CBEvent = @import("./callback.zig").CBEvent; @@ -138,7 +140,7 @@ pub const Context = struct { self.tx.append(m); } - pub fn read (self: *Self, index: u32) !Message { + pub fn read (self: *Self, index: usize) !Message { if (index >= self.pollfd.items.len) { return error.IndexOutOfBounds; } @@ -158,7 +160,7 @@ pub const Context = struct { // Wait an event. pub fn wait_event(self: *Self) !Event { - var current_event: Event = undefined; + var current_event: Event = Event.init(Event.Type.NOT_SET, 0, 0, null); var wait_duration: i32 = -1; // -1 == unlimited if (self.timer) |t| { wait_duration = t; } @@ -170,50 +172,106 @@ pub const Context = struct { // setting POLLIN & POLLOUT flags. for (self.pollfd.items) |*fd| { // print("listening to fd {}\n", .{fd.fd}); - fd.events = std.os.linux.POLL.IN; // just to make sure + fd.events |= std.os.linux.POLL.IN; // just to make sure } for (self.tx.items) |m| { print("wait for writing a message to fd {}\n", .{m.fd}); for (self.pollfd.items) |*fd| { if (fd.fd == m.fd) { - fd.events = std.os.linux.POLL.OUT; // just to make sure + fd.events |= std.os.linux.POLL.OUT; // just to make sure } } } // TODO: before initiate a timer + var timer = try Timer.start(); // Polling. var count: usize = undefined; // print("Let's wait for an event (either stdin or unix socket)\n", .{}); - print("fds: {any}\n", .{self.pollfd.items}); + print("fds: {any}\n", .{self.pollfd.items}); count = try os.poll(self.pollfd.items, wait_duration); print("fds NOW: {any}\n", .{self.pollfd.items}); - // TODO: timer = end - start; if 0 => return timer event + if (count < 0) { + print("there is a problem: poll < 0\n", .{}); + current_event = Event.init(Event.Type.ERROR, 0, 0, null); + return current_event; + } + var duration = timer.read() / 1000000; // ns -> ms if (count == 0) { - current_event = Event.init(Event.Type.TIMER, 0, 0, null); + print("wait: configured {} measured {}\n", .{wait_duration, duration}); + if (duration >= wait_duration) { + current_event = Event.init(Event.Type.TIMER, 0, 0, null); + } + else { + // In case nothing happened, and poll wasn't triggered by time out. + current_event = Event.init(Event.Type.ERROR, 0, 0, null); + } return current_event; } // TODO: handle messages - // => loop over ctx.size - // => if pollfd[i].revents is set to POLLIN - // => if fd is SERVER => new connection - // => if fd is SWITCHED => msg to exchange (or drop the switch) - // => if fd is EXTERNAL => let user handle IO operations - // => otherwise => new message or disconnection - // => if fd revent is POLLOUT - // => if SWITCHED => write message for its switch buddy - // => otherwise => write message for the msg.fd - // if fd revent is POLLHUP - // => handle disconnection: - // close + remove fd from pollfd + return event - // if fd revent is POLLERR or POLLNVAL - // => return error event + // => loop over self.pollfd.items + for (self.pollfd.items) |*fd, i| { + // .revents is POLLIN + if(fd.revents & std.os.linux.POLL.IN > 0) { + // SERVER = new connection + if (self.connections.items[i].t == .SERVER) { + // TODO: ipc_accept_add + current_event = Event.init(Event.Type.CONNECTION, i, fd.fd, null); + } + // SWITCHED = send message to the right dest (or drop the switch) + else if (self.connections.items[i].t == .SWITCHED) { + // TODO: send message to SWITCH dest + // TODO: handle_switched_message + current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); + } + // EXTERNAL = user handles IO + else if (self.connections.items[i].t == .EXTERNAL) { + current_event = Event.init(Event.Type.EXTERNAL, i, fd.fd, null); + } + // otherwise = new message or disconnection + else { + // TODO: handle incoming message + // TODO: handle_new_message + current_event = Event.init(Event.Type.MESSAGE, i, fd.fd, null); + } + } + + // .revent is POLLOUT + if(fd.revents & std.os.linux.POLL.OUT > 0) { + fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); + + // SWITCHED = write message for its switch buddy (callbacks) + if (self.connections.items[i].t == .SWITCHED) { + // TODO: handle_writing_switched_message + current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); + } + else { + // otherwise = write message for the msg.fd + // TODO: handle_writing_message + current_event = Event.init(Event.Type.TX, i, fd.fd, null); + } + } + // .revent is POLLHUP + if(fd.revents & std.os.linux.POLL.HUP > 0) { + // handle disconnection + current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); + try self.close(i); + } + // if fd revent is POLLERR or POLLNVAL + if ((fd.revents & std.os.linux.POLL.HUP > 0) or + (fd.revents & std.os.linux.POLL.HUP > 0)) { + current_event = Event.init(Event.Type.ERROR, i, fd.fd, null); + } + } + + // TODO: check for LOOKUP events. + // LOOKUP = Client asking for a service through ipcd. return current_event; } diff --git a/zig-impl/src/event.zig b/zig-impl/src/event.zig index a418e60..ac1d89c 100644 --- a/zig-impl/src/event.zig +++ b/zig-impl/src/event.zig @@ -36,7 +36,7 @@ pub const Event = struct { pub const Type = enum { NOT_SET, // Default. TODO: should we keep this? ERROR, // A problem occured. - EXTRA_SOCKET, // Message received from a non IPC socket. + EXTERNAL, // Message received from a non IPC socket. SWITCH, // Message to send to a corresponding fd. CONNECTION, // New user. DISCONNECTION, // User disconnected. @@ -47,17 +47,17 @@ pub const Event = struct { }; t: Event.Type, - index: u32, - origin: usize, // socket fd + index: usize, + origin: i32, // socket fd m: ?*Message, // message pointer const Self = @This(); - pub fn init(t: Event.Type, index: u32, origin: usize, m: ?*Message) Self { + pub fn init(t: Event.Type, index: usize, origin: i32, m: ?*Message) Self { return Self { .t = t, .index = index, .origin = origin, .m = m, }; } - pub fn set(self: *Self, t: Event.Type, index: u32, origin: usize, m: ?*Message) void { + pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?*Message) void { self.t = t; self.index = index; self.origin = origin; @@ -66,8 +66,8 @@ pub const Event = struct { pub fn clean(self: *Self) void { self.t = Event.Type.NOT_SET; - self.index = @as(u8,0); - self.origin = @as(usize,0); + self.index = @as(usize,0); + self.origin = @as(i32,0); if (self.m) |message| { message.deinit(); } diff --git a/zig-impl/src/main.zig b/zig-impl/src/main.zig index 546429d..9035592 100644 --- a/zig-impl/src/main.zig +++ b/zig-impl/src/main.zig @@ -54,7 +54,7 @@ fn create_service() !void { var count_down: i16 = 5; var some_event: Event = undefined; - ctx.timer = 1000; // 1 second + ctx.timer = 2000; // 1 second while(true) { some_event = try ctx.wait_event(); switch (some_event.t) { @@ -70,7 +70,7 @@ fn create_service() !void { break; } }, - .EXTRA_SOCKET => { + .EXTERNAL => { print("Message received from a non IPC socket.", .{}); break; }, diff --git a/zig-impl/src/misc-test.zig b/zig-impl/src/misc-test.zig index 69229d2..05c7426 100644 --- a/zig-impl/src/misc-test.zig +++ b/zig-impl/src/misc-test.zig @@ -4,10 +4,12 @@ const testing = std.testing; const net = std.net; const fmt = std.fmt; +const Timer = std.time.Timer; + const print = std.debug.print; const P = std.ArrayList(std.os.pollfd); -fn create_service() !void { +fn arraylist_test() !void { const config = .{.safety = true}; var gpa = std.heap.GeneralPurposeAllocator(config){}; defer _ = gpa.deinit(); @@ -21,7 +23,22 @@ fn create_service() !void { for(p.items) |i| { print("fd: {}\n", .{i.fd}); } } +fn timer_test() !void { + var timer = try Timer.start(); + + var count: u64 = 0; + while (count < 100000) { + count += 1; + print("\rcount = {}", .{count}); + } + print("\n", .{}); + + var duration = timer.read(); + print("took {} us\n", .{duration / 1000}); +} + pub fn main() !u8 { - try create_service(); + // try arraylist_test(); + try timer_test(); return 0; }