From 2ad505b3052209f06770b1bc69389e84e8015c6b Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Sat, 31 Dec 2022 04:58:45 +0100 Subject: [PATCH] Clients are working. --- zig-impl/src/context.zig | 83 +++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 099ccf2..7345d5f 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -97,7 +97,6 @@ pub const Context = struct { const newfd = stream.handle; errdefer std.os.closeSocket(newfd); var newcon = Connection.init(ctype, path); - newcon.client = stream; try self.connections.append(newcon); try self.pollfd.append(.{ .fd = newfd , .events = std.os.linux.POLL.IN @@ -254,29 +253,26 @@ pub const Context = struct { print("read index {}\n", .{index}); var buffer: [2000000]u8 = undefined; // TODO: FIXME?? - var origin: i32 = undefined; var packet_size: usize = undefined; // TODO: this is a problem from the network API in Zig, // servers and clients are different, they aren't just fds. // Maybe there is something to change in the API. - if (self.connections.items[index].t == .IPC) { - var client = self.connections.items[index].client - orelse return error.NoClientHere; - var stream: net.Stream = client.stream; - origin = stream.handle; - packet_size = try stream.read(buffer[0..]); - } - else if (self.connections.items[index].t == .SERVER) { + if (self.connections.items[index].t == .SERVER) { return error.messageOnServer; } + // This may be kinda hacky, idk. + var fd = self.pollfd.items[index].fd; + var stream: net.Stream = .{ .handle = fd }; + packet_size = try stream.read(buffer[0..]); + // Let's handle this as a disconnection. if (packet_size <= 4) { return null; } - return try Message.read(origin, buffer[0..], self.allocator); + return try Message.read(fd, buffer[0..], self.allocator); } // Wait an event. @@ -340,74 +336,81 @@ pub const Context = struct { for (self.pollfd.items) |*fd, i| { // .revents is POLLIN if(fd.revents & std.os.linux.POLL.IN > 0) { - // SERVER = new connection + // SERVER = new connection if (self.connections.items[i].t == .SERVER) { try self.accept_new_client(¤t_event, i); return current_event; } - // SWITCHED = send message to the right dest (or drop the switch) + // SWITCHED = send message to the right dest (or drop the switch) else if (self.connections.items[i].t == .SWITCHED) { // TODO: send message to SWITCH dest // TODO: handle_switched_message return Event.init(Event.Type.SWITCH, i, fd.fd, null); } - // EXTERNAL = user handles IO + // EXTERNAL = user handles IO else if (self.connections.items[i].t == .EXTERNAL) { return Event.init(Event.Type.EXTERNAL, i, fd.fd, null); } - // otherwise = new message or disconnection - else { - // TODO: handle incoming message - // TODO: handle_new_message - var maybe_message = try self.read(i); - if (maybe_message) |m| { + // otherwise = new message or disconnection + else { + // TODO: handle incoming message + // TODO: handle_new_message + var maybe_message = self.read(i) catch |err| switch(err) { + error.ConnectionResetByPeer => { + print("connection reset by peer\n", .{}); + try self.close(i); + return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); + }, + else => { return err; }, + }; + if (maybe_message) |m| { return Event.init(Event.Type.MESSAGE, i, fd.fd, m); } try self.close(i); return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); - } + } } // .revent is POLLOUT if(fd.revents & std.os.linux.POLL.OUT > 0) { - fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); + fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); - // SWITCHED = write message for its switch buddy (callbacks) + // SWITCHED = write message for its switch buddy (callbacks) if (self.connections.items[i].t == .SWITCHED) { - // TODO: handle_writing_switched_message + // TODO: handle_writing_switched_message return Event.init(Event.Type.SWITCH, i, fd.fd, null); - } + } else { - // otherwise = write message for the msg.fd - // TODO: handle_writing_message + // otherwise = write message for the msg.fd + // TODO: handle_writing_message - var index: usize = undefined; - for (self.tx.items) |m, index_| { - if (m.fd == self.pollfd.items[i].fd) { - index = index_; - break; - } - } + var index: usize = undefined; + for (self.tx.items) |m, index_| { + if (m.fd == self.pollfd.items[i].fd) { + index = index_; + break; + } + } var m = self.tx.swapRemove(index); - try self.write (m); - m.deinit(); + try self.write (m); + m.deinit(); return Event.init(Event.Type.TX, i, fd.fd, null); } } - // .revent is POLLHUP + // .revent is POLLHUP if(fd.revents & std.os.linux.POLL.HUP > 0) { - // handle disconnection + // handle disconnection current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); try self.close(i); return current_event; } - // if fd revent is POLLERR or POLLNVAL + // if fd revent is POLLERR or POLLNVAL if ((fd.revents & std.os.linux.POLL.HUP > 0) or (fd.revents & std.os.linux.POLL.NVAL > 0)) { return Event.init(Event.Type.ERROR, i, fd.fd, null); - } + } } // TODO: check for LOOKUP events.