Bindings: simpler in and out fn for switchs, leading to simpler bindings.
This commit is contained in:
parent
e80e99d47f
commit
13a60d0158
@ -1,6 +1,11 @@
|
||||
#ifndef LIBIPC
|
||||
#define LIBIPC
|
||||
|
||||
struct message {
|
||||
uint32_t size;
|
||||
char* payload;
|
||||
};
|
||||
|
||||
enum event_types {
|
||||
ERROR = 0 // A problem occured.
|
||||
, EXTERNAL = 1 // Message received from a non IPC socket.
|
||||
|
@ -3,6 +3,7 @@ const print = std.debug.print;
|
||||
const ipc = @import("./main.zig");
|
||||
const Context = ipc.Context;
|
||||
const Message = ipc.Message;
|
||||
const CBEventType = ipc.CBEvent.Type;
|
||||
|
||||
export fn ipc_context_init (ptr: **Context) callconv(.C) i32 {
|
||||
ptr.* = std.heap.c_allocator.create(Context) catch return -1;
|
||||
@ -131,5 +132,10 @@ export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Later.
|
||||
// pub fn set_switch_callbacks
|
||||
/// TODO: change the functions in the switch code, not to take a Message as a parameter.
|
||||
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32
|
||||
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
|
||||
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
|
||||
ctx.set_switch_callbacks (fd, in, out) catch return -1;
|
||||
return 0;
|
||||
}
|
||||
|
@ -294,8 +294,8 @@ pub const Context = struct {
|
||||
}
|
||||
|
||||
pub fn set_switch_callbacks(self: *Self, fd: i32
|
||||
, in : *const fn (origin: i32, m: *Message) CBEventType
|
||||
, out : *const fn (origin: i32, m: *const Message) CBEventType) !void {
|
||||
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
|
||||
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
|
||||
try self.switchdb.set_callbacks(fd,in, out);
|
||||
}
|
||||
|
||||
|
@ -15,9 +15,26 @@ const print = std.debug.print;
|
||||
|
||||
const Event = ipc.Event;
|
||||
|
||||
/// SwitchDB
|
||||
/// Functions read and write: handle
|
||||
|
||||
/// SwitchDB: store relations between clients and services.
|
||||
///
|
||||
/// A protocol service, such as TPCd can handle "external" communications (TCP in this case)
|
||||
/// meaning that a client can connect to this service through a canal that isn't a simple
|
||||
/// libipc UNIX socket, and this client is then connected to a local service.
|
||||
/// OTOH, a local client can ask TCPd to establish a connection to a remote service.
|
||||
/// In both cases, at least one of the connection isn't libipc-based and should be
|
||||
/// handled in a specific way that only TPCd (or another protocol service) can.
|
||||
///
|
||||
/// TCPd marks both file descriptors as "related" (add_switch) so libipc can automatically
|
||||
/// handle messages between the client and the service. Any input from one end will be sent
|
||||
/// to the other.
|
||||
/// TCPd registers functions to handle specific input and output operations from and to the
|
||||
/// remote connection (set_callbacks).
|
||||
///
|
||||
/// At any point, TCPd can safely close a connection and remote it from the SwitchDB (nuke),
|
||||
/// resulting in the removal of both the connection's FD and its related FD (both the client
|
||||
/// and the service connections are removed).
|
||||
///
|
||||
/// Currently, libipc automatically closes both the client and its service when an error occurs.
|
||||
pub const SwitchDB = struct {
|
||||
const Self = @This();
|
||||
|
||||
@ -45,9 +62,8 @@ pub const SwitchDB = struct {
|
||||
}
|
||||
|
||||
pub fn set_callbacks(self: *Self, fd: i32
|
||||
, in : *const fn (origin: i32, m: *Message) CBEventType
|
||||
, out : *const fn (origin: i32, m: *const Message) CBEventType) !void {
|
||||
|
||||
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
|
||||
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
|
||||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||
managedconnection.in = in;
|
||||
managedconnection.out = out;
|
||||
@ -58,23 +74,27 @@ pub const SwitchDB = struct {
|
||||
pub fn read (self: *Self, fd: i32) !?Message {
|
||||
// assert there is an entry with this fd as a key.
|
||||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||
var message: Message = undefined;
|
||||
|
||||
var r: CBEventType = managedconnection.in(fd, &message);
|
||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||
var message_size: u32 = @truncate(u32, buffer.len);
|
||||
var r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
||||
|
||||
// TODO: read message
|
||||
// TODO: better allocator?
|
||||
// TODO: better errors?
|
||||
var message: Message = Message.init(managedconnection.dest
|
||||
, std.heap.c_allocator
|
||||
, buffer[0..message_size]) catch {
|
||||
return error.generic;
|
||||
};
|
||||
|
||||
switch (r) {
|
||||
// The message should be ignored (protocol specific).
|
||||
CBEventType.IGNORE => { return null; },
|
||||
// No error. A message was generated.
|
||||
CBEventType.NO_ERROR => {
|
||||
message.fd = managedconnection.dest;
|
||||
return message;
|
||||
},
|
||||
CBEventType.IGNORE => { return null; },
|
||||
CBEventType.NO_ERROR => { return message; },
|
||||
CBEventType.FD_CLOSING => { return error.closeFD; },
|
||||
// Generic error, or the message was read but with errors.
|
||||
CBEventType.ERROR => {
|
||||
return error.generic;
|
||||
},
|
||||
CBEventType.ERROR => { return error.generic; },
|
||||
}
|
||||
|
||||
unreachable;
|
||||
@ -85,7 +105,15 @@ pub const SwitchDB = struct {
|
||||
pub fn write (self: *Self, message: Message) !void {
|
||||
// assert there is an entry with this fd as a key.
|
||||
var managedconnection = self.db.get(message.fd) orelse return error.unregisteredFD;
|
||||
var r = managedconnection.out(managedconnection.dest, &message);
|
||||
|
||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
// returning basic errors, no details.
|
||||
_ = message.write(writer) catch return error.generic;
|
||||
var written = fbs.getWritten();
|
||||
var r = managedconnection.out(managedconnection.dest, written.ptr, @truncate(u32, written.len));
|
||||
|
||||
switch (r) {
|
||||
// The message should be ignored (protocol specific).
|
||||
@ -104,6 +132,7 @@ pub const SwitchDB = struct {
|
||||
unreachable;
|
||||
}
|
||||
|
||||
/// From a message to read on a socket to an Event.
|
||||
pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event {
|
||||
var message: ?Message = null;
|
||||
message = self.read (fd) catch |err| switch(err) {
|
||||
@ -139,6 +168,7 @@ pub const SwitchDB = struct {
|
||||
return self.db.get(fd).?.dest;
|
||||
}
|
||||
|
||||
/// Remove both entries (client and service) from the DB.
|
||||
pub fn nuke (self: *Self, fd: i32) void {
|
||||
if (self.db.fetchSwapRemove(fd)) |kv| {
|
||||
_ = self.db.swapRemove(kv.value.dest);
|
||||
@ -148,8 +178,8 @@ pub const SwitchDB = struct {
|
||||
|
||||
const ManagedConnection = struct {
|
||||
dest : i32,
|
||||
in : *const fn (origin: i32, m: *Message) CBEventType = default_in,
|
||||
out : *const fn (origin: i32, m: *const Message) CBEventType = default_out,
|
||||
in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in,
|
||||
out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out,
|
||||
};
|
||||
|
||||
test "creation and display" {
|
||||
@ -167,14 +197,18 @@ test "creation and display" {
|
||||
try print_eq("{ (5,6)(6,5) }", .{switchdb});
|
||||
}
|
||||
|
||||
fn successful_in (_: i32, m: *Message) CBEventType {
|
||||
m.* = Message.init(8, std.heap.c_allocator, "coucou") catch {
|
||||
return CBEventType.ERROR;
|
||||
};
|
||||
fn successful_in (_: i32, mcontent: [*]const u8, mlen: *u32) CBEventType {
|
||||
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
|
||||
defer m.deinit();
|
||||
|
||||
var fbs = std.io.fixedBufferStream(mcontent);
|
||||
var writer = fbs.writer();
|
||||
_ = m.write (writer) catch unreachable;
|
||||
mlen.* = @truncate(u32, m.payload.len);
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
fn successful_out (_: i32, _: *const Message) CBEventType {
|
||||
fn successful_out (_: i32, _: [*]const u8, _: u32) CBEventType {
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
@ -205,11 +239,11 @@ test "successful exchanges" {
|
||||
if (event_3.m) |_| { return error.ShouldNotCarryMessage; }
|
||||
}
|
||||
|
||||
fn unsuccessful_in (_: i32, _: *Message) CBEventType {
|
||||
fn unsuccessful_in (_: i32, _: [*]const u8, _: *u32) CBEventType {
|
||||
return CBEventType.ERROR;
|
||||
}
|
||||
|
||||
fn unsuccessful_out (_: i32, _: *const Message) CBEventType {
|
||||
fn unsuccessful_out (_: i32, _: [*]const u8, _: u32) CBEventType {
|
||||
return CBEventType.ERROR;
|
||||
}
|
||||
|
||||
@ -255,39 +289,28 @@ test "nuke 'em" {
|
||||
try testing.expect(switchdb.db.count() == 0);
|
||||
}
|
||||
|
||||
fn default_in (origin: i32, m: *Message) CBEventType {
|
||||
fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||
// print ("receiving a message originated from {}\n", .{origin});
|
||||
var buffer: [2000000]u8 = undefined; // TODO: FIXME??
|
||||
var packet_size: usize = undefined;
|
||||
|
||||
// This may be kinda hacky, idk.
|
||||
var stream: net.Stream = .{ .handle = origin };
|
||||
packet_size = stream.read(buffer[0..]) catch return CBEventType.ERROR;
|
||||
var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
|
||||
|
||||
// Let's handle this as a disconnection.
|
||||
if (packet_size <= 4) {
|
||||
if (packet_size < 4) {
|
||||
return CBEventType.FD_CLOSING;
|
||||
}
|
||||
|
||||
m.* = Message.read(origin, buffer[0..], std.heap.c_allocator)
|
||||
catch return CBEventType.ERROR;
|
||||
mlen.* = @truncate(u32, packet_size);
|
||||
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
fn default_out (_: i32, m: *const Message) CBEventType {
|
||||
fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
|
||||
// print ("sending a message originated from {}\n", .{origin});
|
||||
// Message contains the fd, no need to search for
|
||||
// the right structure to copy, let's just recreate
|
||||
// a Stream from the fd.
|
||||
var stream = net.Stream { .handle = m.fd };
|
||||
|
||||
var buffer: [200000]u8 = undefined; // TODO: buffer size
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
// returning basic errors, no details.
|
||||
_ = m.write(writer) catch return CBEventType.ERROR;
|
||||
_ = stream.write (fbs.getWritten()) catch return CBEventType.ERROR;
|
||||
// Message contains the fd, no need to search for the right structure to copy,
|
||||
// let's just recreate a Stream from the fd.
|
||||
var stream = net.Stream { .handle = fd };
|
||||
_ = stream.write (mcontent[0..mlen]) catch return CBEventType.ERROR;
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
@ -40,8 +40,8 @@ int main(int argc, char**argv) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf ("Set the timer to ten seconds.\n");
|
||||
ipc_context_timer (ctx, 10000);
|
||||
printf ("Set the timer to two seconds.\n");
|
||||
ipc_context_timer (ctx, 2000);
|
||||
|
||||
printf ("Loop over events.\n");
|
||||
char should_continue = 1;
|
||||
|
Reference in New Issue
Block a user