wait_event function: first draft okay

master
Philippe Pittoli 2022-12-24 23:09:25 +01:00
parent bc0fe07990
commit 1f5ac951cb
4 changed files with 107 additions and 32 deletions

View File

@ -5,6 +5,8 @@ const net = std.net;
const os = std.os;
const fmt = std.fmt;
const Timer = std.time.Timer;
const print = std.debug.print;
const CBEvent = @import("./callback.zig").CBEvent;
@ -138,7 +140,7 @@ pub const Context = struct {
self.tx.append(m);
}
pub fn read (self: *Self, index: u32) !Message {
pub fn read (self: *Self, index: usize) !Message {
if (index >= self.pollfd.items.len) {
return error.IndexOutOfBounds;
}
@ -158,7 +160,7 @@ pub const Context = struct {
// Wait an event.
pub fn wait_event(self: *Self) !Event {
var current_event: Event = undefined;
var current_event: Event = Event.init(Event.Type.NOT_SET, 0, 0, null);
var wait_duration: i32 = -1; // -1 == unlimited
if (self.timer) |t| { wait_duration = t; }
@ -170,50 +172,106 @@ pub const Context = struct {
// setting POLLIN & POLLOUT flags.
for (self.pollfd.items) |*fd| {
// print("listening to fd {}\n", .{fd.fd});
fd.events = std.os.linux.POLL.IN; // just to make sure
fd.events |= std.os.linux.POLL.IN; // just to make sure
}
for (self.tx.items) |m| {
print("wait for writing a message to fd {}\n", .{m.fd});
for (self.pollfd.items) |*fd| {
if (fd.fd == m.fd) {
fd.events = std.os.linux.POLL.OUT; // just to make sure
fd.events |= std.os.linux.POLL.OUT; // just to make sure
}
}
}
// TODO: before initiate a timer
var timer = try Timer.start();
// Polling.
var count: usize = undefined;
// print("Let's wait for an event (either stdin or unix socket)\n", .{});
print("fds: {any}\n", .{self.pollfd.items});
print("fds: {any}\n", .{self.pollfd.items});
count = try os.poll(self.pollfd.items, wait_duration);
print("fds NOW: {any}\n", .{self.pollfd.items});
// TODO: timer = end - start; if 0 => return timer event
if (count < 0) {
print("there is a problem: poll < 0\n", .{});
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
return current_event;
}
var duration = timer.read() / 1000000; // ns -> ms
if (count == 0) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
print("wait: configured {} measured {}\n", .{wait_duration, duration});
if (duration >= wait_duration) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
}
else {
// In case nothing happened, and poll wasn't triggered by time out.
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
}
return current_event;
}
// TODO: handle messages
// => loop over ctx.size
// => if pollfd[i].revents is set to POLLIN
// => if fd is SERVER => new connection
// => if fd is SWITCHED => msg to exchange (or drop the switch)
// => if fd is EXTERNAL => let user handle IO operations
// => otherwise => new message or disconnection
// => if fd revent is POLLOUT
// => if SWITCHED => write message for its switch buddy
// => otherwise => write message for the msg.fd
// if fd revent is POLLHUP
// => handle disconnection:
// close + remove fd from pollfd + return event
// if fd revent is POLLERR or POLLNVAL
// => return error event
// => loop over self.pollfd.items
for (self.pollfd.items) |*fd, i| {
// .revents is POLLIN
if(fd.revents & std.os.linux.POLL.IN > 0) {
// SERVER = new connection
if (self.connections.items[i].t == .SERVER) {
// TODO: ipc_accept_add
current_event = Event.init(Event.Type.CONNECTION, i, fd.fd, null);
}
// SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) {
// TODO: send message to SWITCH dest
// TODO: handle_switched_message
current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null);
}
// EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) {
current_event = Event.init(Event.Type.EXTERNAL, i, fd.fd, null);
}
// otherwise = new message or disconnection
else {
// TODO: handle incoming message
// TODO: handle_new_message
current_event = Event.init(Event.Type.MESSAGE, i, fd.fd, null);
}
}
// .revent is POLLOUT
if(fd.revents & std.os.linux.POLL.OUT > 0) {
fd.events &= ~ @as(i16, std.os.linux.POLL.OUT);
// SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) {
// TODO: handle_writing_switched_message
current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null);
}
else {
// otherwise = write message for the msg.fd
// TODO: handle_writing_message
current_event = Event.init(Event.Type.TX, i, fd.fd, null);
}
}
// .revent is POLLHUP
if(fd.revents & std.os.linux.POLL.HUP > 0) {
// handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
try self.close(i);
}
// if fd revent is POLLERR or POLLNVAL
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.HUP > 0)) {
current_event = Event.init(Event.Type.ERROR, i, fd.fd, null);
}
}
// TODO: check for LOOKUP events.
// LOOKUP = Client asking for a service through ipcd.
return current_event;
}

View File

@ -36,7 +36,7 @@ pub const Event = struct {
pub const Type = enum {
NOT_SET, // Default. TODO: should we keep this?
ERROR, // A problem occured.
EXTRA_SOCKET, // Message received from a non IPC socket.
EXTERNAL, // Message received from a non IPC socket.
SWITCH, // Message to send to a corresponding fd.
CONNECTION, // New user.
DISCONNECTION, // User disconnected.
@ -47,17 +47,17 @@ pub const Event = struct {
};
t: Event.Type,
index: u32,
origin: usize, // socket fd
index: usize,
origin: i32, // socket fd
m: ?*Message, // message pointer
const Self = @This();
pub fn init(t: Event.Type, index: u32, origin: usize, m: ?*Message) Self {
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?*Message) Self {
return Self { .t = t, .index = index, .origin = origin, .m = m, };
}
pub fn set(self: *Self, t: Event.Type, index: u32, origin: usize, m: ?*Message) void {
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?*Message) void {
self.t = t;
self.index = index;
self.origin = origin;
@ -66,8 +66,8 @@ pub const Event = struct {
pub fn clean(self: *Self) void {
self.t = Event.Type.NOT_SET;
self.index = @as(u8,0);
self.origin = @as(usize,0);
self.index = @as(usize,0);
self.origin = @as(i32,0);
if (self.m) |message| {
message.deinit();
}

View File

@ -54,7 +54,7 @@ fn create_service() !void {
var count_down: i16 = 5;
var some_event: Event = undefined;
ctx.timer = 1000; // 1 second
ctx.timer = 2000; // 1 second
while(true) {
some_event = try ctx.wait_event();
switch (some_event.t) {
@ -70,7 +70,7 @@ fn create_service() !void {
break;
}
},
.EXTRA_SOCKET => {
.EXTERNAL => {
print("Message received from a non IPC socket.", .{});
break;
},

View File

@ -4,10 +4,12 @@ const testing = std.testing;
const net = std.net;
const fmt = std.fmt;
const Timer = std.time.Timer;
const print = std.debug.print;
const P = std.ArrayList(std.os.pollfd);
fn create_service() !void {
fn arraylist_test() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
@ -21,7 +23,22 @@ fn create_service() !void {
for(p.items) |i| { print("fd: {}\n", .{i.fd}); }
}
fn timer_test() !void {
var timer = try Timer.start();
var count: u64 = 0;
while (count < 100000) {
count += 1;
print("\rcount = {}", .{count});
}
print("\n", .{});
var duration = timer.read();
print("took {} us\n", .{duration / 1000});
}
pub fn main() !u8 {
try create_service();
// try arraylist_test();
try timer_test();
return 0;
}