From 727de2988f6f2456998e49c4e0a7421d6fb2c51e Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Sun, 25 Dec 2022 21:45:51 +0100 Subject: [PATCH] Echoing stuff. --- zig-impl/src/context.zig | 88 +++++++++++++++++++++++++++++++++------- zig-impl/src/main.zig | 6 +-- zig-impl/src/message.zig | 10 +++-- 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 6e9f242..4b0fb83 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -83,6 +83,10 @@ pub const Context = struct { self.allocator.free(self.rundir); self.connections.deinit(); self.pollfd.deinit(); + for (self.tx.items) |m| { + print("context deinit: removing message {}\n", .{m}); + m.deinit(); + } self.tx.deinit(); if (self.switchdb) |sdb| { sdb.deinit(); } } @@ -155,20 +159,39 @@ pub const Context = struct { return server; } - pub fn write (self: *Self, m: Message) !void { - print("write fd {}\n", .{m.fd}); - self.tx.append(m); + pub fn write (_: *Self, m: Message) !void { + print("write msg on fd {}\n", .{m.fd}); + + // TODO + // Message contains the fd, no need to search for + // the right structure to copy, let's just recreate + // a Stream from the fd. + var stream = net.Stream { .handle = m.fd }; + + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + _ = try m.write(writer); // returns paylen + + _ = try stream.write (fbs.getWritten()); } - pub fn read (self: *Self, index: usize) !Message { + pub fn schedule (self: *Self, m: Message) !void { + print("schedule msg for fd {}\n", .{m.fd}); + try self.tx.append(m); + } + + pub fn read (self: *Self, index: usize) !?Message { if (index >= self.pollfd.items.len) { return error.IndexOutOfBounds; } print("read index {}\n", .{index}); var buffer: [2000000]u8 = undefined; // TODO: FIXME?? - var origin: i32 = undefined; + var packet_size: usize = undefined; + // TODO: this is a problem from the network API in Zig, // servers and clients are different, they aren't just fds. // Maybe there is something to change in the API. @@ -177,17 +200,18 @@ pub const Context = struct { orelse return error.NoClientHere; var stream: net.Stream = client.stream; origin = stream.handle; - _ = try stream.read(buffer[0..]); + packet_size = try stream.read(buffer[0..]); } else if (self.connections.items[index].t == .SERVER) { return error.messageOnServer; } - // var m = try self.allocator.create(Message); - // m.* = try Message.read(buffer[0..], self.allocator); - var m = try Message.read(buffer[0..], self.allocator); - m.fd = origin; - return m; + // Let's handle this as a disconnection. + if (packet_size <= 4) { + return null; + } + + return try Message.read(origin, buffer[0..], self.allocator); } // Wait an event. @@ -216,7 +240,7 @@ pub const Context = struct { } } - // TODO: before initiate a timer + // before initiate a timer var timer = try Timer.start(); // Polling. @@ -270,8 +294,13 @@ pub const Context = struct { else { // TODO: handle incoming message // TODO: handle_new_message - var m = try self.read(i); - return Event.init(Event.Type.MESSAGE, i, fd.fd, m); + var maybe_message = try self.read(i); + if (maybe_message) |m| { + return Event.init(Event.Type.MESSAGE, i, fd.fd, m); + } + + try self.close(i); + return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); } } @@ -287,6 +316,18 @@ pub const Context = struct { else { // otherwise = write message for the msg.fd // TODO: handle_writing_message + + var index: usize = undefined; + for (self.tx.items) |m, index_| { + if (m.fd == self.pollfd.items[i].fd) { + index = index_; + break; + } + } + + var m = self.tx.swapRemove(index); + try self.write (m); + m.deinit(); return Event.init(Event.Type.TX, i, fd.fd, null); } } @@ -328,7 +369,24 @@ pub const Context = struct { // Close the client's socket. c.stream.close(); } - _ = self.pollfd.swapRemove(index); + var pollfd = self.pollfd.swapRemove(index); + print("client at index {} removed\n", .{index}); + + // Remove all its non-sent messages. + var i: usize = 0; + while (true) { + if (i >= self.tx.items.len) + break; + + if (self.tx.items[i].fd == pollfd.fd) { + var m = self.tx.swapRemove(i); + print("Removing message targeted to client {}: {}\n", .{index, m}); + m.deinit(); + continue; + } + + i += 1; + } } pub fn close_all(self: *Self) !void { diff --git a/zig-impl/src/main.zig b/zig-impl/src/main.zig index c2fb6b7..439990e 100644 --- a/zig-impl/src/main.zig +++ b/zig-impl/src/main.zig @@ -79,15 +79,13 @@ fn create_service() !void { }, .DISCONNECTION => { print("User disconnected.\n", .{}); - break; }, .MESSAGE => { print("New message. {}\n", .{some_event}); + print("Let's echo, once\n", .{}); if (some_event.m) |m| { - print("message: {}\n", .{m}); - m.deinit(); + try ctx.schedule(m); } - break; }, .LOOKUP => { print("Client asking for a service through ipcd.\n", .{}); diff --git a/zig-impl/src/message.zig b/zig-impl/src/message.zig index 3d3cefb..ff31bf0 100644 --- a/zig-impl/src/message.zig +++ b/zig-impl/src/message.zig @@ -4,6 +4,7 @@ const testing = std.testing; const net = std.net; const fmt = std.fmt; +const print = std.debug.print; const print_eq = @import("./util.zig").print_eq; pub const Messages = std.ArrayList(Message); @@ -46,7 +47,7 @@ pub const Message = struct { self.allocator.free(self.payload); } - pub fn read(buffer: []const u8, allocator: std.mem.Allocator) !Self { + pub fn read(fd: i32, buffer: []const u8, allocator: std.mem.Allocator) !Self { // var hexbuf: [4000]u8 = undefined; // var hexfbs = std.io.fixedBufferStream(&hexbuf); @@ -54,16 +55,17 @@ pub const Message = struct { // try hexdump.hexdump(hexwriter, "Message.read input buffer", buffer); // print("{s}\n", .{hexfbs.getWritten()}); - // var payload = allocator. - // defer allocator.free(payload); var fbs = std.io.fixedBufferStream(buffer); var reader = fbs.reader(); const msg_type = @intToEnum(Message.Type, try reader.readByte()); const msg_len = try reader.readIntBig(u32); + if (msg_len >= buffer.len) { + return error.wrongMessageLength; + } const msg_payload = buffer[5..5+msg_len]; - return try Message.init(0, msg_type, allocator, msg_payload); + return try Message.init(fd, msg_type, allocator, msg_payload); } pub fn write(self: Self, writer: anytype) !usize {