Obsolete
/
libipc-old
Archived
3
0
Fork 0

Echoing stuff.

master
Philippe Pittoli 2022-12-25 21:45:51 +01:00
parent d52fbdf61d
commit 727de2988f
3 changed files with 81 additions and 23 deletions

View File

@ -83,6 +83,10 @@ pub const Context = struct {
self.allocator.free(self.rundir); self.allocator.free(self.rundir);
self.connections.deinit(); self.connections.deinit();
self.pollfd.deinit(); self.pollfd.deinit();
for (self.tx.items) |m| {
print("context deinit: removing message {}\n", .{m});
m.deinit();
}
self.tx.deinit(); self.tx.deinit();
if (self.switchdb) |sdb| { sdb.deinit(); } if (self.switchdb) |sdb| { sdb.deinit(); }
} }
@ -155,20 +159,39 @@ pub const Context = struct {
return server; return server;
} }
pub fn write (self: *Self, m: Message) !void { pub fn write (_: *Self, m: Message) !void {
print("write fd {}\n", .{m.fd}); print("write msg on fd {}\n", .{m.fd});
self.tx.append(m);
// TODO
// Message contains the fd, no need to search for
// the right structure to copy, let's just recreate
// a Stream from the fd.
var stream = net.Stream { .handle = m.fd };
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
_ = try m.write(writer); // returns paylen
_ = try stream.write (fbs.getWritten());
} }
pub fn read (self: *Self, index: usize) !Message { pub fn schedule (self: *Self, m: Message) !void {
print("schedule msg for fd {}\n", .{m.fd});
try self.tx.append(m);
}
pub fn read (self: *Self, index: usize) !?Message {
if (index >= self.pollfd.items.len) { if (index >= self.pollfd.items.len) {
return error.IndexOutOfBounds; return error.IndexOutOfBounds;
} }
print("read index {}\n", .{index}); print("read index {}\n", .{index});
var buffer: [2000000]u8 = undefined; // TODO: FIXME?? var buffer: [2000000]u8 = undefined; // TODO: FIXME??
var origin: i32 = undefined; var origin: i32 = undefined;
var packet_size: usize = undefined;
// TODO: this is a problem from the network API in Zig, // TODO: this is a problem from the network API in Zig,
// servers and clients are different, they aren't just fds. // servers and clients are different, they aren't just fds.
// Maybe there is something to change in the API. // Maybe there is something to change in the API.
@ -177,17 +200,18 @@ pub const Context = struct {
orelse return error.NoClientHere; orelse return error.NoClientHere;
var stream: net.Stream = client.stream; var stream: net.Stream = client.stream;
origin = stream.handle; origin = stream.handle;
_ = try stream.read(buffer[0..]); packet_size = try stream.read(buffer[0..]);
} }
else if (self.connections.items[index].t == .SERVER) { else if (self.connections.items[index].t == .SERVER) {
return error.messageOnServer; return error.messageOnServer;
} }
// var m = try self.allocator.create(Message); // Let's handle this as a disconnection.
// m.* = try Message.read(buffer[0..], self.allocator); if (packet_size <= 4) {
var m = try Message.read(buffer[0..], self.allocator); return null;
m.fd = origin; }
return m;
return try Message.read(origin, buffer[0..], self.allocator);
} }
// Wait an event. // Wait an event.
@ -216,7 +240,7 @@ pub const Context = struct {
} }
} }
// TODO: before initiate a timer // before initiate a timer
var timer = try Timer.start(); var timer = try Timer.start();
// Polling. // Polling.
@ -270,8 +294,13 @@ pub const Context = struct {
else { else {
// TODO: handle incoming message // TODO: handle incoming message
// TODO: handle_new_message // TODO: handle_new_message
var m = try self.read(i); var maybe_message = try self.read(i);
return Event.init(Event.Type.MESSAGE, i, fd.fd, m); if (maybe_message) |m| {
return Event.init(Event.Type.MESSAGE, i, fd.fd, m);
}
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
} }
} }
@ -287,6 +316,18 @@ pub const Context = struct {
else { else {
// otherwise = write message for the msg.fd // otherwise = write message for the msg.fd
// TODO: handle_writing_message // TODO: handle_writing_message
var index: usize = undefined;
for (self.tx.items) |m, index_| {
if (m.fd == self.pollfd.items[i].fd) {
index = index_;
break;
}
}
var m = self.tx.swapRemove(index);
try self.write (m);
m.deinit();
return Event.init(Event.Type.TX, i, fd.fd, null); return Event.init(Event.Type.TX, i, fd.fd, null);
} }
} }
@ -328,7 +369,24 @@ pub const Context = struct {
// Close the client's socket. // Close the client's socket.
c.stream.close(); c.stream.close();
} }
_ = self.pollfd.swapRemove(index); var pollfd = self.pollfd.swapRemove(index);
print("client at index {} removed\n", .{index});
// Remove all its non-sent messages.
var i: usize = 0;
while (true) {
if (i >= self.tx.items.len)
break;
if (self.tx.items[i].fd == pollfd.fd) {
var m = self.tx.swapRemove(i);
print("Removing message targeted to client {}: {}\n", .{index, m});
m.deinit();
continue;
}
i += 1;
}
} }
pub fn close_all(self: *Self) !void { pub fn close_all(self: *Self) !void {

View File

@ -79,15 +79,13 @@ fn create_service() !void {
}, },
.DISCONNECTION => { .DISCONNECTION => {
print("User disconnected.\n", .{}); print("User disconnected.\n", .{});
break;
}, },
.MESSAGE => { .MESSAGE => {
print("New message. {}\n", .{some_event}); print("New message. {}\n", .{some_event});
print("Let's echo, once\n", .{});
if (some_event.m) |m| { if (some_event.m) |m| {
print("message: {}\n", .{m}); try ctx.schedule(m);
m.deinit();
} }
break;
}, },
.LOOKUP => { .LOOKUP => {
print("Client asking for a service through ipcd.\n", .{}); print("Client asking for a service through ipcd.\n", .{});

View File

@ -4,6 +4,7 @@ const testing = std.testing;
const net = std.net; const net = std.net;
const fmt = std.fmt; const fmt = std.fmt;
const print = std.debug.print;
const print_eq = @import("./util.zig").print_eq; const print_eq = @import("./util.zig").print_eq;
pub const Messages = std.ArrayList(Message); pub const Messages = std.ArrayList(Message);
@ -46,7 +47,7 @@ pub const Message = struct {
self.allocator.free(self.payload); self.allocator.free(self.payload);
} }
pub fn read(buffer: []const u8, allocator: std.mem.Allocator) !Self { pub fn read(fd: i32, buffer: []const u8, allocator: std.mem.Allocator) !Self {
// var hexbuf: [4000]u8 = undefined; // var hexbuf: [4000]u8 = undefined;
// var hexfbs = std.io.fixedBufferStream(&hexbuf); // var hexfbs = std.io.fixedBufferStream(&hexbuf);
@ -54,16 +55,17 @@ pub const Message = struct {
// try hexdump.hexdump(hexwriter, "Message.read input buffer", buffer); // try hexdump.hexdump(hexwriter, "Message.read input buffer", buffer);
// print("{s}\n", .{hexfbs.getWritten()}); // print("{s}\n", .{hexfbs.getWritten()});
// var payload = allocator.
// defer allocator.free(payload);
var fbs = std.io.fixedBufferStream(buffer); var fbs = std.io.fixedBufferStream(buffer);
var reader = fbs.reader(); var reader = fbs.reader();
const msg_type = @intToEnum(Message.Type, try reader.readByte()); const msg_type = @intToEnum(Message.Type, try reader.readByte());
const msg_len = try reader.readIntBig(u32); const msg_len = try reader.readIntBig(u32);
if (msg_len >= buffer.len) {
return error.wrongMessageLength;
}
const msg_payload = buffer[5..5+msg_len]; const msg_payload = buffer[5..5+msg_len];
return try Message.init(0, msg_type, allocator, msg_payload); return try Message.init(fd, msg_type, allocator, msg_payload);
} }
pub fn write(self: Self, writer: anytype) !usize { pub fn write(self: Self, writer: anytype) !usize {