From 6819de1da5928dab3755b4ab9222ed47e517b1c8 Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Sun, 25 Dec 2022 06:26:38 +0100 Subject: [PATCH] Messages can be received. --- zig-impl/src/context.zig | 45 ++++++++++++++++++++++++++-------------- zig-impl/src/event.zig | 6 +++--- zig-impl/src/main.zig | 19 ++++++++++------- zig-impl/src/message.zig | 8 +++---- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 33173ba..6e9f242 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -165,16 +165,28 @@ pub const Context = struct { return error.IndexOutOfBounds; } print("read index {}\n", .{index}); - return self.read_fd(self.pollfd.items[index].fd); - } - pub fn read_fd (self: *Self, fd: i32) !Message { - print("read fd {}\n", .{fd}); + var buffer: [2000000]u8 = undefined; // TODO: FIXME?? - // TODO: read the actual content. - var payload = "hello!!"; + var origin: i32 = undefined; + // TODO: this is a problem from the network API in Zig, + // servers and clients are different, they aren't just fds. + // Maybe there is something to change in the API. + if (self.connections.items[index].t == .IPC) { + var client = self.connections.items[index].client + orelse return error.NoClientHere; + var stream: net.Stream = client.stream; + origin = stream.handle; + _ = try stream.read(buffer[0..]); + } + else if (self.connections.items[index].t == .SERVER) { + return error.messageOnServer; + } - var m = Message.init(fd, Message.Type.DATA, self.allocator, payload); + // var m = try self.allocator.create(Message); + // m.* = try Message.read(buffer[0..], self.allocator); + var m = try Message.read(buffer[0..], self.allocator); + m.fd = origin; return m; } @@ -242,22 +254,24 @@ pub const Context = struct { // SERVER = new connection if (self.connections.items[i].t == .SERVER) { try self.accept_new_client(¤t_event, i); + return current_event; } // SWITCHED = send message to the right dest (or drop the switch) else if (self.connections.items[i].t == .SWITCHED) { // TODO: send message to SWITCH dest // TODO: handle_switched_message - current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); + return Event.init(Event.Type.SWITCH, i, fd.fd, null); } // EXTERNAL = user handles IO else if (self.connections.items[i].t == .EXTERNAL) { - current_event = Event.init(Event.Type.EXTERNAL, i, fd.fd, null); + return Event.init(Event.Type.EXTERNAL, i, fd.fd, null); } // otherwise = new message or disconnection else { // TODO: handle incoming message // TODO: handle_new_message - current_event = Event.init(Event.Type.MESSAGE, i, fd.fd, null); + var m = try self.read(i); + return Event.init(Event.Type.MESSAGE, i, fd.fd, m); } } @@ -268,12 +282,12 @@ pub const Context = struct { // SWITCHED = write message for its switch buddy (callbacks) if (self.connections.items[i].t == .SWITCHED) { // TODO: handle_writing_switched_message - current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); + return Event.init(Event.Type.SWITCH, i, fd.fd, null); } else { // otherwise = write message for the msg.fd // TODO: handle_writing_message - current_event = Event.init(Event.Type.TX, i, fd.fd, null); + return Event.init(Event.Type.TX, i, fd.fd, null); } } // .revent is POLLHUP @@ -281,11 +295,12 @@ pub const Context = struct { // handle disconnection current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); try self.close(i); + return current_event; } // if fd revent is POLLERR or POLLNVAL - if ((fd.revents & std.os.linux.POLL.HUP > 0) or - (fd.revents & std.os.linux.POLL.HUP > 0)) { - current_event = Event.init(Event.Type.ERROR, i, fd.fd, null); + if ((fd.revents & std.os.linux.POLL.HUP > 0) or + (fd.revents & std.os.linux.POLL.NVAL > 0)) { + return Event.init(Event.Type.ERROR, i, fd.fd, null); } } diff --git a/zig-impl/src/event.zig b/zig-impl/src/event.zig index ac1d89c..ba190c3 100644 --- a/zig-impl/src/event.zig +++ b/zig-impl/src/event.zig @@ -49,15 +49,15 @@ pub const Event = struct { t: Event.Type, index: usize, origin: i32, // socket fd - m: ?*Message, // message pointer + m: ?Message, // message const Self = @This(); - pub fn init(t: Event.Type, index: usize, origin: i32, m: ?*Message) Self { + pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self { return Self { .t = t, .index = index, .origin = origin, .m = m, }; } - pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?*Message) void { + pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void { self.t = t; self.index = index; self.origin = origin; diff --git a/zig-impl/src/main.zig b/zig-impl/src/main.zig index b8a6592..c2fb6b7 100644 --- a/zig-impl/src/main.zig +++ b/zig-impl/src/main.zig @@ -60,7 +60,6 @@ fn create_service() !void { switch (some_event.t) { .CONNECTION => { print("New connection: {}!\n", .{some_event}); - break; }, .TIMER => { print("Timer! ({})\n", .{count_down}); @@ -71,27 +70,31 @@ fn create_service() !void { } }, .EXTERNAL => { - print("Message received from a non IPC socket.", .{}); + print("Message received from a non IPC socket.\n", .{}); break; }, .SWITCH => { - print("Message to send to a corresponding fd.", .{}); + print("Message to send to a corresponding fd.\n", .{}); break; }, .DISCONNECTION => { - print("User disconnected.", .{}); + print("User disconnected.\n", .{}); break; }, .MESSAGE => { - print("New message.", .{}); + print("New message. {}\n", .{some_event}); + if (some_event.m) |m| { + print("message: {}\n", .{m}); + m.deinit(); + } break; }, .LOOKUP => { - print("Client asking for a service through ipcd.", .{}); + print("Client asking for a service through ipcd.\n", .{}); break; }, .TX => { - print("Message sent.", .{}); + print("Message sent.\n", .{}); break; }, .NOT_SET => { @@ -99,7 +102,7 @@ fn create_service() !void { break; }, .ERROR => { - print("A problem occured.\n", .{}); + print("A problem occured, event: {}\n", .{some_event}); break; }, } diff --git a/zig-impl/src/message.zig b/zig-impl/src/message.zig index 3bf8c0f..3d3cefb 100644 --- a/zig-impl/src/message.zig +++ b/zig-impl/src/message.zig @@ -18,7 +18,7 @@ pub const Message = struct { }; t: Message.Type, // Internal message type. - fd: usize, // File descriptor concerned about this message. + fd: i32, // File descriptor concerned about this message. payload: []const u8, allocator: std.mem.Allocator, // Memory allocator. @@ -26,7 +26,7 @@ pub const Message = struct { const Self = @This(); // TODO - //pub fn initFromConnection(fd: usize) Self { + //pub fn initFromConnection(fd: i32) Self { // return Self{ // .t = Message.Type.ERROR, // .fd = fd, @@ -34,7 +34,7 @@ pub const Message = struct { // }; //} - pub fn init(fd: usize, t: Message.Type + pub fn init(fd: i32, t: Message.Type , allocator: std.mem.Allocator , payload: []const u8) !Self { return Message { .fd = fd, .t = t @@ -42,7 +42,7 @@ pub const Message = struct { , .payload = try allocator.dupe(u8, payload) }; } - pub fn deinit(self: *Self) void { + pub fn deinit(self: Self) void { self.allocator.free(self.payload); }