Switch code mostly done, needs testing.

master
Philippe Pittoli 2023-01-10 17:09:34 +01:00
parent 5ad00d0675
commit a0e9515600
3 changed files with 79 additions and 28 deletions

View File

@ -298,6 +298,18 @@ pub const Context = struct {
return try Message.read(fd, buffer[0..], self.allocator);
}
/// Before closing the fd, test it via the 'fcntl' syscall.
/// This is useful for switched connections: FDs could be closed without libipc being informed.
fn safe_close_fd (self: *Self, fd: i32) void {
var should_close = true;
_ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch {
should_close = false;
};
if (should_close) {
self.close_fd(fd) catch {};
}
}
// Wait for an event.
pub fn wait_event(self: *Self) !Event {
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
@ -360,11 +372,32 @@ pub const Context = struct {
}
// SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) {
current_event = self.swichdb.handle_event_read (i, fd.fd);
if (current_event.t == .SWITCH_RX) {
self.schedule(current_event.message.?);
current_event = self.switchdb.handle_event_read (i, fd.fd);
switch (current_event.t) {
.SWITCH_RX => {
try self.schedule(current_event.m.?);
},
// TODO: DISCONNECTION and ERROR do not handle errors.
.DISCONNECTION => {
var dest = try self.switchdb.getDest(fd.fd);
print("disconnection from {} -> removing {}, too\n", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
.ERROR => {
var dest = try self.switchdb.getDest(fd.fd);
print("error from {} -> removing {}, too\n", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
else => {
print("switch rx incoherent error: {}\n", .{current_event.t});
return error.incoherentSwitchError;
},
}
return Event.init(Event.Type.SWITCH, i, fd.fd, null);
return Event.init(Event.Type.SWITCH_RX, i, fd.fd, null);
}
// EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) {
@ -406,9 +439,25 @@ pub const Context = struct {
// SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) {
current_event = self.swichdb.handle_event_write (i, fd.fd, m);
current_event = self.switchdb.handle_event_write (i, m);
// TODO: remove the message from the tx array.
// Message inner memory is already freed.
switch (current_event.t) {
.SWITCH_TX => {
try self.schedule(current_event.m.?);
},
.ERROR => {
var dest = try self.switchdb.getDest(fd.fd);
print("error from {} -> removing {}, too\n", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
else => {
print("switch tx incoherent error: {}\n", .{current_event.t});
return error.incoherentSwitchError;
},
}
return current_event;
}
else {
@ -493,23 +542,6 @@ pub const Context = struct {
}
};
//test "Simple structures - init, display and memory check" {
// // origin destination
// var s = Switch.init(3,8);
// var payload = "hello!!";
// // fd type payload
// var m = Message.init(0, allocator, payload);
//
// // type index origin message
// var e = Event.init(Event.Type.CONNECTION, 5, 8, &m);
// // CLIENT SIDE: connection to a service.
// _ = try c.connect(path);
// // TODO: connection to a server, but switched with clientfd "3".
// _ = try c.connection_switched(path, 3);
//}
// Creating a new thread: testing UNIX communication.
// This is a client sending a raw "Hello world!" bytestring,
// not an instance of Message.

View File

@ -94,8 +94,14 @@ fn create_service() !void {
break;
},
.SWITCH => {
print("Message to send to a corresponding fd.\n", .{});
.SWITCH_RX => {
print("Message has been received (SWITCH).\n", .{});
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
break;
},
.SWITCH_TX => {
print("Message has been sent (SWITCH).\n", .{});
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
break;
},

View File

@ -39,6 +39,15 @@ pub const SwitchDB = struct {
}
}
pub fn set_callbacks(self: *Self, fd: i32
, in : *const fn (origin: i32, m: *Message) CBEventType
, out : *const fn (origin: i32, m: *const Message) CBEventType) !void {
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
managedconnection.in = in;
managedconnection.out = out;
}
/// Dig the "db" hashmap, perform "in" fn, may provide a message.
/// Errors from the "in" fn are reported as Zig errors.
pub fn read (self: *Self, fd: i32) !?Message {
@ -90,7 +99,6 @@ pub const SwitchDB = struct {
unreachable;
}
/// TODO: remove relevant info in the db when there is an error.
pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event {
var message: ?Message = null;
message = self.read (fd) catch |err| switch(err) {
@ -105,7 +113,7 @@ pub const SwitchDB = struct {
return Event.init(Event.Type.SWITCH_RX, index, fd, message);
}
/// TODO: remove relevant info in the db when there is an error.
/// Message is free'd in any case.
pub fn handle_event_write (self: *Self, index: usize, message: Message) Event {
defer message.deinit();
var fd = message.fd;
@ -121,7 +129,12 @@ pub const SwitchDB = struct {
return Event.init(Event.Type.SWITCH_TX, index, fd, null);
}
fn nuke (self: *Self, fd: i32) void {
/// Simple wrapper around self.db.get.
pub fn getDest (self: *Self, fd: i32) !i32 {
return self.db.get(fd).?.dest;
}
pub fn nuke (self: *Self, fd: i32) void {
if (self.db.fetchSwapRemove(fd)) |kv| {
_ = self.db.swapRemove(kv.value.dest);
}
@ -232,8 +245,8 @@ test "nuke 'em" {
try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out});
try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out});
try testing.expect(switchdb.db.count() == 2);
switchdb.nuke(5);
try testing.expect(switchdb.db.count() == 0);
}