From 721d8842c5a7b6b26a3f9d8a2dbb31b780527a94 Mon Sep 17 00:00:00 2001 From: Philippe Pittoli Date: Mon, 6 Feb 2023 10:44:51 +0100 Subject: [PATCH] Initial commit. Old commit history: https://git.baguette.netlib.re/Baguette/libipc-old --- .gitignore | 4 + build.zig | 70 +++++ libipc.h | 48 +++ src/bindings.zig | 147 +++++++++ src/callback.zig | 35 +++ src/connection.zig | 42 +++ src/context.zig | 706 ++++++++++++++++++++++++++++++++++++++++++++ src/event.zig | 96 ++++++ src/exchange-fd.zig | 226 ++++++++++++++ src/hexdump.zig | 128 ++++++++ src/main.zig | 24 ++ src/message.zig | 106 +++++++ src/switch.zig | 320 ++++++++++++++++++++ src/util.zig | 67 +++++ 14 files changed, 2019 insertions(+) create mode 100644 .gitignore create mode 100644 build.zig create mode 100644 libipc.h create mode 100644 src/bindings.zig create mode 100644 src/callback.zig create mode 100644 src/connection.zig create mode 100644 src/context.zig create mode 100644 src/event.zig create mode 100644 src/exchange-fd.zig create mode 100644 src/hexdump.zig create mode 100644 src/main.zig create mode 100644 src/message.zig create mode 100644 src/switch.zig create mode 100644 src/util.zig diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..86d888e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +zig-cache +zig-out +docs +*.swp diff --git a/build.zig b/build.zig new file mode 100644 index 0000000..ae859ba --- /dev/null +++ b/build.zig @@ -0,0 +1,70 @@ +const std = @import("std"); + +const VERSION = "0.1.0"; + +// Although this function looks imperative, note that its job is to +// declaratively construct a build graph that will be executed by an external +// runner. +pub fn build(b: *std.Build) void { + // Standard target options allows the person running `zig build` to choose + // what target to build for. Here we do not override the defaults, which + // means any target is allowed, and the default is native. Other options + // for restricting supported target set are available. + const target = b.standardTargetOptions(.{}); + + // Standard optimization options allow the person running `zig build` to select + // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not + // set a preferred release mode, allowing the user to decide how to optimize. + const optimize = b.standardOptimizeOption(.{}); + + const static_lib = b.addStaticLibrary(.{ + .name = "ipc", + // In this case the main source file is merely a path, however, in more + // complicated build scripts, this could be a generated file. + .root_source_file = .{ .path = "src/bindings.zig" }, + .target = target, + .optimize = optimize, + }); + + // Link with the libc of the target system since the C allocator + // is required in the bindings. + static_lib.linkLibC(); + + // This declares intent for the library to be installed into the standard + // location when the user invokes the "install" step (the default step when + // running `zig build`). + static_lib.install(); + + const shared_lib = b.addSharedLibrary(.{ + .name = "ipc", + .root_source_file = .{ .path = "src/bindings.zig" }, + .version = comptime (try std.builtin.Version.parse(VERSION)), + .target = target, + .optimize = optimize, + }); + shared_lib.linkLibC(); + shared_lib.install(); + + // Creates a step for unit testing. + const main_tests = b.addTest(.{ + .root_source_file = .{ .path = "src/main.zig" }, + .target = target, + .optimize = optimize, + }); + main_tests.linkLibC(); + + // This creates a build step. It will be visible in the `zig build --help` menu, + // and can be selected like this: `zig build test` + // This will evaluate the `test` step rather than the default, which is "install". + const test_step = b.step("test", "Run library tests"); + test_step.dependOn(&main_tests.step); + + const install_static_lib = b.addInstallArtifact(static_lib); + const static_lib_step = b.step("static", "Compile LibIPC as a static library."); + static_lib_step.dependOn(&install_static_lib.step); + + const install_shared_lib = b.addInstallArtifact(shared_lib); + // b.getInstallStep().dependOn(&install_shared_lib.step); + const shared_lib_step = b.step("shared", "Compile LibIPC as a shared library."); + shared_lib_step.dependOn(&install_shared_lib.step); +} diff --git a/libipc.h b/libipc.h new file mode 100644 index 0000000..bca8c14 --- /dev/null +++ b/libipc.h @@ -0,0 +1,48 @@ +#ifndef LIBIPC +#define LIBIPC + +#include + +enum event_types { + ERROR = 0 // A problem occured. + , CONNECTION = 1 // New user. + , DISCONNECTION = 2 // User disconnected. + , MESSAGE_RX = 3 // New message. + , MESSAGE_TX = 4 // Message sent. + , TIMER = 5 // Timeout in the poll(2) function. + , EXTERNAL = 6 // Message received from a non IPC socket. + , SWITCH_RX = 7 // Message received from a switched FD. + , SWITCH_TX = 8 // Message sent to a switched fd. +}; + +// Return type of callback functions when switching. +enum cb_event_types { + CB_NO_ERROR = 0 // No error. A message was generated. + , CB_ERROR = 1 // Generic error. + , CB_FD_CLOSING = 2 // The fd is closing. + , CB_IGNORE = 3 // The message should be ignored (protocol specific). +}; + +int ipc_context_init (void** ptr); +int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len); +int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len); +void ipc_context_deinit (void** ctx); +int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen); +int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen); +int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen); +int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen); +int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen); +void ipc_context_timer (void* ctx, int timer); +int ipc_close_fd (void* ctx, int fd); +int ipc_close (void* ctx, size_t index); +int ipc_close_all (void* ctx); + +// Switch functions (for "protocol" services, such as TCPd). +int ipc_add_external (void* ctx, int newfd); +int ipc_add_switch (void* ctx, int fd1, int fd2); + +int ipc_set_switch_callbacks (void* ctx, int fd + , enum cb_event_types (*in (int orig, const char *payload, uint32_t *mlen)) + , enum cb_event_types (*out(int dest, char *payload, uint32_t mlen))); + +#endif diff --git a/src/bindings.zig b/src/bindings.zig new file mode 100644 index 0000000..75610dd --- /dev/null +++ b/src/bindings.zig @@ -0,0 +1,147 @@ +const std = @import("std"); +const log = std.log.scoped(.libipc_bindings); +const ipc = @import("./main.zig"); +const Context = ipc.Context; +const Message = ipc.Message; +const CBEventType = ipc.CBEvent.Type; + +export fn ipc_context_init (ptr: **Context) callconv(.C) i32 { + ptr.* = std.heap.c_allocator.create(Context) catch return -1; + + ptr.*.* = Context.init(std.heap.c_allocator) catch |err| { + log.warn("error while init context: {}\n", .{err}); + return -1; + }; + return 0; +} + +/// Start a libipc service. +export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 { + var streamserver = ctx.server_init (service_name[0..service_name_len]) catch return -1; + servicefd.* = streamserver.sockfd.?; + return 0; +} + +/// Connect to a libipc service, possibly through IPCd. +export fn ipc_connect_service (ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 { + var fd = ctx.connect_ipc (service_name[0..service_name_len]) catch return -1; + servicefd.* = fd; + return 0; +} + +export fn ipc_context_deinit (ctx: **Context) callconv(.C) void { + var ptr: *Context = ctx.*; + ptr.deinit(); + std.heap.c_allocator.destroy(ptr); +} + +/// Write a message (no waiting). +export fn ipc_write (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 { + // TODO: better default length. + var buffer = [_]u8{0} ** 100000; + var fba = std.heap.FixedBufferAllocator.init(&buffer); + + var message = Message.init(servicefd, fba.allocator(), mcontent[0..mlen]) catch return -1; + ctx.write(message) catch return -1; + return 0; +} + +/// Schedule a message. +/// Use the same allocator as the context. +export fn ipc_schedule (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 { + var message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1; + ctx.schedule(message) catch return -2; + return 0; +} + +/// Read a message from a file descriptor. +/// Buffer length will be changed to the size of the received message. +export fn ipc_read_fd (ctx: *Context, fd: i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { + var m = ctx.read_fd (fd) catch {return -1;} orelse return -2; + if (m.payload.len > buflen.*) return -3; + buflen.* = m.payload.len; + + var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]); + var writer = fbs.writer(); + _ = writer.write(m.payload) catch return -4; + m.deinit(); + + return 0; +} + +/// Read a message. +/// Buffer length will be changed to the size of the received message. +export fn ipc_read (ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { + var m = ctx.read (index) catch {return -1;} orelse return -2; + if (m.payload.len > buflen.*) return -3; + buflen.* = m.payload.len; + + var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]); + var writer = fbs.writer(); + _ = writer.write(m.payload) catch return -4; + m.deinit(); + + return 0; +} + +/// Wait for an event. +/// Buffer length will be changed to the size of the received message. +export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { + var event = ctx.wait_event() catch return -1; + t.* = @enumToInt(event.t); + index.* = event.index; + originfd.* = event.origin; + + if (event.m) |m| { + var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]); + var writer = fbs.writer(); + _ = writer.write(m.payload) catch return -4; + buflen.* = m.payload.len; + m.deinit(); + } + else { + buflen.* = 0; + } + + return 0; +} + +/// Change the timer (ms). +export fn ipc_context_timer (ctx: *Context, timer: i32) callconv(.C) void { + ctx.timer = timer; +} + +export fn ipc_close_fd (ctx: *Context, fd: i32) callconv(.C) i32 { + ctx.close_fd (fd) catch return -1; + return 0; +} + +export fn ipc_close (ctx: *Context, index: usize) callconv(.C) i32 { + ctx.close (index) catch return -1; + return 0; +} + +export fn ipc_close_all (ctx: *Context) callconv(.C) i32 { + ctx.close_all () catch return -1; + return 0; +} + +/// Add a new file descriptor to listen to. +/// The FD is marked as "external"; it isn't a simple libipc connection. +/// You may want to handle any operation on it by yourself. +export fn ipc_add_external (ctx: *Context, newfd: i32) callconv(.C) i32 { + ctx.add_external (newfd) catch return -1; + return 0; +} + +export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 { + ctx.add_switch (fd1, fd2) catch return -1; + return 0; +} + +export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32 + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 { + ctx.set_switch_callbacks (fd, in, out) catch return -1; + return 0; +} diff --git a/src/callback.zig b/src/callback.zig new file mode 100644 index 0000000..16731aa --- /dev/null +++ b/src/callback.zig @@ -0,0 +1,35 @@ +pub const CBEvent = struct { + + // CallBack Event types. + // In the main event loop, servers and clients can receive connections, + // disconnections, errors or messages from their pairs. They also can + // set a timer so the loop will allow a periodic routine (sending ping + // messages for websockets, for instance). + // + // A few other events can occur. + // + // Extra socket + // The main loop waiting for an event can be used as an unique entry + // point for socket management. libipc users can register sockets via + // ipc_add_fd allowing them to trigger an event, so events unrelated + // to libipc are managed the same way. + // Switch + // libipc can be used to create protocol-related programs, such as a + // websocket proxy allowing libipc services to be accessible online. + // To help those programs (with TCP-complient sockets), two sockets + // can be bound together, each message coming from one end will be + // automatically transfered to the other socket and a Switch event + // will be triggered. + // Look Up + // When a client establishes a connection to a service, it asks the + // ipc daemon (ipcd) to locate the service and establish a connection + // to it. This is a lookup. + + // For IO callbacks (switching). + pub const Type = enum { + NO_ERROR, // No error. A message was generated. + ERROR, // Generic error. + FD_CLOSING, // The fd is closing. + IGNORE, // The message should be ignored (protocol specific). + }; +}; diff --git a/src/connection.zig b/src/connection.zig new file mode 100644 index 0000000..05ecd37 --- /dev/null +++ b/src/connection.zig @@ -0,0 +1,42 @@ +const std = @import("std"); +const net = std.net; +const fmt = std.fmt; + +const print_eq = @import("./util.zig").print_eq; + +pub const Connections = std.ArrayList(Connection); + +pub const Connection = struct { + + pub const Type = enum { + IPC, // Standard connection. + EXTERNAL, // Non IPC connection (TCP, UDP, etc.). + SERVER, // Messages received = new connections. + SWITCHED, // IO operations should go through registered callbacks. + }; + + t: Connection.Type, + path: ?[] const u8, // Not always needed. + + const Self = @This(); + + pub fn init(t: Connection.Type, path: ?[] const u8) Self { + return Self { + .t = t, + .path = path, + }; + } + + pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { + try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path}); + } +}; + +test "Connection - creation and display" { + // origin destination + var path = "/some/path"; + var c1 = Connection.init(Connection.Type.EXTERNAL, path); + var c2 = Connection.init(Connection.Type.IPC , null); + try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1); + try print_eq("connection.Connection.Type.IPC, path null", c2); +} diff --git a/src/context.zig b/src/context.zig new file mode 100644 index 0000000..fe4afba --- /dev/null +++ b/src/context.zig @@ -0,0 +1,706 @@ +const std = @import("std"); +const testing = std.testing; +const net = std.net; +const os = std.os; +const fmt = std.fmt; + +const log = std.log.scoped(.libipc_context); + +const receive_fd = @import("./exchange-fd.zig").receive_fd; + +const Timer = std.time.Timer; + +const CBEvent = @import("./callback.zig").CBEvent; +const Connection = @import("./connection.zig").Connection; +const Message = @import("./message.zig").Message; +const Event = @import("./event.zig").Event; +const Switch = @import("./switch.zig").Switch; +const print_eq = @import("./util.zig").print_eq; + +const Messages = @import("./message.zig").Messages; +const SwitchDB = @import("./switch.zig").SwitchDB; +const Connections = @import("./connection.zig").Connections; +const CBEventType = @import("./main.zig").CBEvent.Type; + +pub const PollFD = std.ArrayList(std.os.pollfd); + +pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content. +pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages +pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE-IPC_HEADER_SIZE; +pub const IPC_VERSION = 1; + +// Context of the whole networking state. +pub const Context = struct { + rundir: [] u8, + allocator: std.mem.Allocator, // Memory allocator. + connections: Connections, // Keep track of connections. + + // "pollfd" structures passed to poll(2). Same indexes as "connections". + pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16) + + tx: Messages, // Messages to send, once their fd is available. + switchdb: SwitchDB, // Relations between fd. + + timer: ?i32 = null, // No timer by default (no TIMER event). + + const Self = @This(); + + // Context initialization: + // - init structures (provide the allocator) + pub fn init(allocator: std.mem.Allocator) !Self { + var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) { + error.EnvironmentVariableNotFound => blk: { + break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/"); + }, + else => { + return err; + }, + }; + + return Self { + .rundir = rundir + , .connections = Connections.init(allocator) + , .pollfd = PollFD.init(allocator) + , .tx = Messages.init(allocator) + , .switchdb = SwitchDB.init(allocator) + , .allocator = allocator + }; + } + + // create a server path for the UNIX socket based on the service name + pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void { + try writer.print("{s}/{s}", .{self.rundir, service_name}); + } + + pub fn deinit(self: *Self) void { + self.close_all() catch |err| switch(err){ + error.IndexOutOfBounds => { + log.err("context.deinit(): IndexOutOfBounds", .{}); + }, + }; + self.allocator.free(self.rundir); + self.connections.deinit(); + self.pollfd.deinit(); + for (self.tx.items) |m| { + m.deinit(); + } + self.tx.deinit(); + self.switchdb.deinit(); + } + + // Both simple connection and the switched one share this code. + fn connect_ (self: *Self, ctype: Connection.Type, path: []const u8) !i32 { + var stream = try net.connectUnixSocket(path); + const newfd = stream.handle; + errdefer std.os.closeSocket(newfd); + var newcon = Connection.init(ctype, null); + try self.add_ (newcon, newfd); + return newfd; + } + + fn connect_ipcd (self: *Self, service_name: []const u8 + , connection_type: Connection.Type) !?i32 { + + const buffer_size = 10000; + var buffer: [buffer_size]u8 = undefined; + var fba = std.heap.FixedBufferAllocator.init(&buffer); + var allocator = fba.allocator(); + + // Get IPC_NETWORK environment variable + // IPC_NETWORK is shared with the network service to choose the protocol stack, + // according to the target service. + // + // Example, connecting to 'audio' service through tor service: + // IPC_NETWORK="audio tor://some.example.com/audio" + // + // Routing directives can be chained using " ;" separator: + // IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong" + var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch(err) { + // error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet) + error.EnvironmentVariableNotFound => { + log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{}); + return null; + }, // no need to contact IPCd + else => { return err; }, + }; + + var lookupbuffer: [buffer_size]u8 = undefined; + var lookupfbs = std.io.fixedBufferStream(&lookupbuffer); + var lookupwriter = lookupfbs.writer(); + try lookupwriter.print("{s};{s}", .{service_name, network_envvar}); + + // Try to connect to the IPCd service + var ipcdfd = try self.connect_service("ipc"); + defer self.close_fd (ipcdfd) catch {}; // in any case, connection should be closed + + // Send LOOKUP message + // content: target service name;${IPC_NETWORK} + // example: pong;pong tls://example.com:8998/pong + + var m = try Message.init (ipcdfd, allocator, lookupfbs.getWritten()); + try self.write (m); + + // Read LOOKUP response + // case error: ignore and move on (TODO) + // else: get fd sent by IPCd then close IPCd fd + var reception_buffer: [2000]u8 = undefined; + var reception_size: usize = 0; + var newfd = try receive_fd (ipcdfd, &reception_buffer, &reception_size); + if (reception_size == 0) { + return error.IPCdFailedNoMessage; + } + + var response: []u8 = reception_buffer[0..reception_size]; + + if (! std.mem.eql(u8, response, "ok")) { + return error.IPCdFailedNotOk; + } + var newcon = Connection.init(connection_type, null); + try self.add_ (newcon, newfd); + return newfd; + } + + /// TODO: Add a new connection, but takes care of memory problems: + /// in case one of the arrays cannot sustain another entry, the other + /// won't be added. + fn add_ (self: *Self, new_connection: Connection, fd: os.socket_t) !void { + try self.connections.append(new_connection); + try self.pollfd.append(.{ .fd = fd + , .events = std.os.linux.POLL.IN + , .revents = 0 }); + } + + fn fd_to_index (self: Self, fd: i32) !usize { + var i: usize = 0; + while(i < self.pollfd.items.len) { + if (self.pollfd.items[i].fd == fd) { + return i; + } + i += 1; + } + return error.IndexNotFound; + } + + /// Connect to the service directly, without reaching IPCd first. + /// Return the connection FD. + pub fn connect_service (self: *Self, service_name: []const u8) !i32 { + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + try self.server_path(service_name, writer); + var path = fbs.getWritten(); + + return self.connect_ (Connection.Type.IPC, path); + } + + /// Tries to connect to IPCd first, then the service (if needed). + /// Return the connection FD. + pub fn connect_ipc (self: *Self, service_name: []const u8) !i32 { + // First, try ipcd. + if (try self.connect_ipcd (service_name, Connection.Type.IPC)) |fd| { + log.debug("Connected via IPCd, fd is {}", .{fd}); + return fd; + } + // In case this doesn't work, connect directly. + return try self.connect_service (service_name); + } + + /// Add a new file descriptor to follow, labeled as EXTERNAL. + /// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections, + /// clients trying to reach a libipc service. + pub fn add_external (self: *Self, newfd: i32) !void { + var newcon = Connection.init(Connection.Type.EXTERNAL, null); + try self.add_ (newcon, newfd); + } + + fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { + // net.StreamServer + var serverfd = self.pollfd.items[server_index].fd; + var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath; + var server = net.StreamServer { + .sockfd = serverfd + , .kernel_backlog = 100 + , .reuse_address = false + , .listen_address = try net.Address.initUnix(path) + }; + var client = try server.accept(); // net.StreamServer.Connection + + const newfd = client.stream.handle; + var newcon = Connection.init(Connection.Type.IPC, null); + try self.add_ (newcon, newfd); + + const sfd = server.sockfd orelse return error.SocketLOL; // TODO + // WARNING: imply every new item is last + event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null); + } + + // Create a unix socket. + // Store std lib structures in the context. + pub fn server_init(self: *Self, service_name: [] const u8) !net.StreamServer { + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + try self.server_path(service_name, writer); + var path = fbs.getWritten(); + + var server = net.StreamServer.init(.{}); + var socket_addr = try net.Address.initUnix(path); + try server.listen(socket_addr); + + const newfd = server.sockfd orelse return error.SocketLOL; // TODO + // Store the path in the Connection structure, so the UNIX socket file can be removed later. + var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path)); + try self.add_ (newcon, newfd); + return server; + } + + pub fn write (_: *Self, m: Message) !void { + // Message contains the fd, no need to search for + // the right structure to copy, let's just recreate + // a Stream from the fd. + var stream = net.Stream { .handle = m.fd }; + + var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + _ = try m.write(writer); // returns paylen + + _ = try stream.write (fbs.getWritten()); + } + + pub fn schedule (self: *Self, m: Message) !void { + try self.tx.append(m); + } + + /// Read from a client (indexed by a FD). + pub fn read_fd (self: *Self, fd: i32) !?Message { + return try self.read(try self.fd_to_index (fd)); + } + + pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void { + var index_origin = try self.fd_to_index(fd1); + var index_destinataire = try self.fd_to_index(fd2); + + self.connections.items[index_origin].t = Connection.Type.SWITCHED; + self.connections.items[index_destinataire].t = Connection.Type.SWITCHED; + + try self.switchdb.add_switch(fd1,fd2); + } + + pub fn set_switch_callbacks(self: *Self, fd: i32 + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void { + try self.switchdb.set_callbacks(fd,in, out); + } + + pub fn read (self: *Self, index: usize) !?Message { + if (index >= self.pollfd.items.len) { + return error.IndexOutOfBounds; + } + + var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE; + var packet_size: usize = undefined; + + // TODO: this is a problem from the network API in Zig, + // servers and clients are different, they aren't just fds. + // Maybe there is something to change in the API. + if (self.connections.items[index].t == .SERVER) { + return error.messageOnServer; + } + + // This may be kinda hacky, idk. + var fd = self.pollfd.items[index].fd; + var stream: net.Stream = .{ .handle = fd }; + packet_size = try stream.read(buffer[0..]); + + // Let's handle this as a disconnection. + if (packet_size <= 4) { + return null; + } + + return try Message.read(fd, buffer[0..], self.allocator); + } + + /// Before closing the fd, test it via the 'fcntl' syscall. + /// This is useful for switched connections: FDs could be closed without libipc being informed. + fn safe_close_fd (self: *Self, fd: i32) void { + var should_close = true; + _ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch { + should_close = false; + }; + if (should_close) { + self.close_fd(fd) catch {}; + } + } + + // Wait for an event. + pub fn wait_event(self: *Self) !Event { + var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null); + var wait_duration: i32 = -1; // -1 == unlimited + + if (self.timer) |t| { log.debug("listening (timer: {} ms)", .{t}); wait_duration = t; } + else { log.debug("listening (no timer)", .{}); } + + // Make sure we listen to the right file descriptors, + // setting POLLIN & POLLOUT flags. + for (self.pollfd.items) |*fd| { + fd.events |= std.os.linux.POLL.IN; // just to make sure + } + + for (self.tx.items) |m| { + for (self.pollfd.items) |*fd| { + if (fd.fd == m.fd) { + fd.events |= std.os.linux.POLL.OUT; // just to make sure + } + } + } + + // before initiate a timer + var timer = try Timer.start(); + + // Polling. + var count: usize = undefined; + + count = try os.poll(self.pollfd.items, wait_duration); + + if (count < 0) { + log.err("there is a problem: poll < 0", .{}); + current_event = Event.init(Event.Type.ERROR, 0, 0, null); + return current_event; + } + + var duration = timer.read() / 1000000; // ns -> ms + if (count == 0) { + if (duration >= wait_duration) { + current_event = Event.init(Event.Type.TIMER, 0, 0, null); + } + else { + // In case nothing happened, and poll wasn't triggered by time out. + current_event = Event.init(Event.Type.ERROR, 0, 0, null); + } + return current_event; + } + + // handle messages + // => loop over self.pollfd.items + for (self.pollfd.items) |*fd, i| { + // .revents is POLLIN + if(fd.revents & std.os.linux.POLL.IN > 0) { + // SERVER = new connection + if (self.connections.items[i].t == .SERVER) { + try self.accept_new_client(¤t_event, i); + return current_event; + } + // SWITCHED = send message to the right dest (or drop the switch) + else if (self.connections.items[i].t == .SWITCHED) { + current_event = self.switchdb.handle_event_read (i, fd.fd); + switch (current_event.t) { + .SWITCH_RX => { + try self.schedule(current_event.m.?); + }, + .DISCONNECTION => { + var dest = try self.switchdb.getDest(fd.fd); + log.debug("disconnection from {} -> removing {}, too", .{fd.fd, dest}); + self.switchdb.nuke(fd.fd); + self.safe_close_fd(fd.fd); + self.safe_close_fd(dest); + }, + .ERROR => { + var dest = try self.switchdb.getDest(fd.fd); + log.warn("error from {} -> removing {}, too", .{fd.fd, dest}); + self.switchdb.nuke(fd.fd); + self.safe_close_fd(fd.fd); + self.safe_close_fd(dest); + }, + else => { + log.warn("switch rx incoherent error: {}", .{current_event.t}); + return error.incoherentSwitchError; + }, + } + return current_event; + } + // EXTERNAL = user handles IO + else if (self.connections.items[i].t == .EXTERNAL) { + return Event.init(Event.Type.EXTERNAL, i, fd.fd, null); + } + // otherwise = new message or disconnection + else { + var maybe_message = self.read(i) catch |err| switch(err) { + error.ConnectionResetByPeer => { + log.warn("connection reset by peer", .{}); + try self.close(i); + return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); + }, + else => { return err; }, + }; + + if (maybe_message) |m| { + return Event.init(Event.Type.MESSAGE_RX, i, fd.fd, m); + } + + try self.close(i); + return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); + } + } + + // .revent is POLLOUT + if(fd.revents & std.os.linux.POLL.OUT > 0) { + fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); + + var index: usize = undefined; + for (self.tx.items) |m, index_| { + if (m.fd == self.pollfd.items[i].fd) { + index = index_; + break; + } + } + + var m = self.tx.swapRemove(index); + + // SWITCHED = write message for its switch buddy (callbacks) + if (self.connections.items[i].t == .SWITCHED) { + current_event = self.switchdb.handle_event_write (i, m); + // Message inner memory is already freed. + switch (current_event.t) { + .SWITCH_TX => { + }, + .ERROR => { + var dest = try self.switchdb.getDest(fd.fd); + log.warn("error from {} -> removing {}, too", .{fd.fd, dest}); + self.switchdb.nuke(fd.fd); + self.safe_close_fd(fd.fd); + self.safe_close_fd(dest); + }, + else => { + log.warn("switch tx incoherent error: {}", .{current_event.t}); + return error.incoherentSwitchError; + }, + } + return current_event; + } + else { + // otherwise = write message for the msg.fd + try self.write (m); + m.deinit(); + return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null); + } + } + // .revent is POLLHUP + if(fd.revents & std.os.linux.POLL.HUP > 0) { + // handle disconnection + current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); + try self.close(i); + return current_event; + } + // if fd revent is POLLERR or POLLNVAL + if ((fd.revents & std.os.linux.POLL.HUP > 0) or + (fd.revents & std.os.linux.POLL.NVAL > 0)) { + return Event.init(Event.Type.ERROR, i, fd.fd, null); + } + } + + return current_event; + } + + /// Remove a connection based on its file descriptor. + pub fn close_fd(self: *Self, fd: i32) !void { + try self.close(try self.fd_to_index (fd)); + } + + pub fn close(self: *Self, index: usize) !void { + // REMINDER: connections and pollfd have the same length + if (index >= self.pollfd.items.len) { + return error.IndexOutOfBounds; + } + + // close the connection and remove it from the two structures + var con = self.connections.swapRemove(index); + // Remove service's UNIX socket file. + if (con.path) |path| { + std.fs.cwd().deleteFile(path) catch {}; + self.allocator.free(path); + } + var pollfd = self.pollfd.swapRemove(index); + std.os.close(pollfd.fd); + + // Remove all its non-sent messages. + var i: usize = 0; + while (true) { + if (i >= self.tx.items.len) + break; + + if (self.tx.items[i].fd == pollfd.fd) { + var m = self.tx.swapRemove(i); + m.deinit(); + continue; + } + + i += 1; + } + } + + pub fn close_all(self: *Self) !void { + while(self.connections.items.len > 0) { try self.close(0); } + } + + pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void { + try fmt.format(out_stream + , "context ({} connections and {} messages):" + , .{self.connections.items.len, self.tx.items.len}); + + for (self.connections.items) |con| { + try fmt.format(out_stream, "\n- ", .{}); + try con.format(form, options, out_stream); + } + + for (self.tx.items) |tx| { + try fmt.format(out_stream, "\n- ", .{}); + try tx.format(form, options, out_stream); + } + } +}; + +// Creating a new thread: testing UNIX communication. +// This is a client sending a raw "Hello world!" bytestring, +// not an instance of Message. +const CommunicationTestThread = struct { + fn clientFn() !void { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var c = try Context.init(allocator); + defer c.deinit(); // There. Can't leak. Isn't Zig wonderful? + + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + try c.server_path("simple-context-test", writer); + var path = fbs.getWritten(); + const socket = try net.connectUnixSocket(path); + defer socket.close(); + _ = try socket.writer().writeAll("Hello world!"); + } +}; + +test "Context - creation, display and memory check" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + + const allocator = gpa.allocator(); + + var c = try Context.init(allocator); + defer c.deinit(); // There. Can't leak. Isn't Zig wonderful? + + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + try c.server_path("simple-context-test", writer); + var path = fbs.getWritten(); + + // SERVER SIDE: creating a service. + var server = c.server_init("simple-context-test") catch |err| switch(err) { + error.FileNotFound => { + log.err("cannot init server at {s}", .{path}); + return err; + }, + else => return err, + }; + defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file. + + const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{}); + defer t.join(); + + // Server.accept returns a net.StreamServer.Connection. + var client = try server.accept(); + defer client.stream.close(); + var buf: [16]u8 = undefined; + const n = try client.stream.reader().read(&buf); + + try testing.expectEqual(@as(usize, 12), n); + try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]); +} + +// Creating a new thread: testing UNIX communication. +// This is a client sending a an instance of Message. +const ConnectThenSendMessageThread = struct { + fn clientFn() !void { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var c = try Context.init(allocator); + defer c.deinit(); // There. Can't leak. Isn't Zig wonderful? + + var path_buffer: [1000]u8 = undefined; + var path_fbs = std.io.fixedBufferStream(&path_buffer); + var path_writer = path_fbs.writer(); + try c.server_path("simple-context-test", path_writer); + var path = path_fbs.getWritten(); + + // Actual UNIX socket connection. + const socket = try net.connectUnixSocket(path); + defer socket.close(); + + // Writing message into a buffer. + var message_buffer: [1000]u8 = undefined; + var message_fbs = std.io.fixedBufferStream(&message_buffer); + var message_writer = message_fbs.writer(); + // 'fd' parameter is not taken into account here (no loop) + + var m = try Message.init(0, allocator, "Hello world!"); + defer m.deinit(); + _ = try m.write(message_writer); + + _ = try socket.writer().writeAll(message_fbs.getWritten()); + } +}; + + +test "Context - creation, echo once" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + + const allocator = gpa.allocator(); + + var c = try Context.init(allocator); + defer c.deinit(); // There. Can't leak. Isn't Zig wonderful? + + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + try c.server_path("simple-context-test", writer); + var path = fbs.getWritten(); + + // SERVER SIDE: creating a service. + var server = c.server_init("simple-context-test") catch |err| switch(err) { + error.FileNotFound => { + log.err("cannot init server at {s}", .{path}); + return err; + }, + else => return err, + }; + defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file. + + const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{}); + defer t.join(); + + // Server.accept returns a net.StreamServer.Connection. + var client = try server.accept(); + defer client.stream.close(); + var buf: [1000]u8 = undefined; + const n = try client.stream.reader().read(&buf); + var m = try Message.read(8, buf[0..n], allocator); // 8 == random client's fd number + defer m.deinit(); + + try testing.expectEqual(@as(usize, 12), m.payload.len); + try testing.expectEqualSlices(u8, m.payload, "Hello world!"); +} diff --git a/src/event.zig b/src/event.zig new file mode 100644 index 0000000..2b82ac2 --- /dev/null +++ b/src/event.zig @@ -0,0 +1,96 @@ +const std = @import("std"); +const testing = std.testing; +const fmt = std.fmt; + +const Message = @import("./message.zig").Message; + +const print_eq = @import("./util.zig").print_eq; + +pub const Event = struct { + + // Event types. + // In the main event loop, servers and clients can receive connections, + // disconnections, errors or messages from their pairs. They also can + // set a timer so the loop will allow a periodic routine (sending ping + // messages for websockets, for instance). + // + // A few other events can occur. + // + // Extra socket + // The main loop waiting for an event can be used as an unique entry + // point for socket management. libipc users can register sockets via + // ipc_add_fd allowing them to trigger an event, so events unrelated + // to libipc are managed the same way. + // Switch + // libipc can be used to create protocol-related programs, such as a + // websocket proxy allowing libipc services to be accessible online. + // To help those programs (with TCP-complient sockets), two sockets + // can be bound together, each message coming from one end will be + // automatically transfered to the other socket and a Switch event + // will be triggered. + // Look Up + // When a client establishes a connection to a service, it asks the + // ipc daemon (ipcd) to locate the service and establish a connection + // to it. This is a lookup. + + pub const Type = enum { + ERROR, // A problem occured. + CONNECTION, // New user. + DISCONNECTION, // User disconnected. + MESSAGE_RX, // New message. + MESSAGE_TX, // Message sent. + TIMER, // Timeout in the poll(2) function. + EXTERNAL, // Message received from a non IPC socket. + SWITCH_RX, // Message received from a switched FD. + SWITCH_TX, // Message sent to a switched fd. + }; + + t: Event.Type, + index: usize, + origin: i32, // socket fd + m: ?Message, // message + + const Self = @This(); + + pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self { + return Self { .t = t, .index = index, .origin = origin, .m = m, }; + } + + pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void { + self.t = t; + self.index = index; + self.origin = origin; + self.m = m; + } + + pub fn clean(self: *Self) void { + self.t = Event.Type.ERROR; + self.index = @as(usize,0); + self.origin = @as(i32,0); + if (self.m) |message| { + message.deinit(); + } + self.m = null; + } + + pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { + try fmt.format(out_stream + , "{}, origin: {}, index {}, message: [{?}]" + , .{ self.t, self.origin, self.index, self.m} ); + } + +}; + +test "Event - creation and display" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var s = "hello!!"; + var m = try Message.init(1, allocator, s); // fd type payload + defer m.deinit(); + var e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message + + try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e); +} diff --git a/src/exchange-fd.zig b/src/exchange-fd.zig new file mode 100644 index 0000000..df7a8a6 --- /dev/null +++ b/src/exchange-fd.zig @@ -0,0 +1,226 @@ +const std = @import("std"); +const testing = std.testing; +const os = std.os; +const log = std.log.scoped(.libipc_exchangefd); + +const builtin = @import("builtin"); +const windows = std.os.windows; +const errno = std.os.errno; +const system = std.os.system; +const unexpectedErrno = std.os.unexpectedErrno; +const SendMsgError = std.os.SendMsgError; + +const SCM_RIGHTS: c_int = 1; + +/// This definition enables the use of Zig types with a cmsghdr structure. +/// The oddity of this layout is that the data must be aligned to @sizeOf(usize) +/// rather than its natural alignment. +pub fn Cmsghdr(comptime T: type) type { + const Header = extern struct { + len: usize, + level: c_int, + @"type": c_int, + }; + + const data_align = @sizeOf(usize); + const data_offset = std.mem.alignForward(@sizeOf(Header), data_align); + + return extern struct { + const Self = @This(); + + bytes: [data_offset + @sizeOf(T)]u8 align(@alignOf(Header)), + + pub fn init(args: struct { + level: c_int, + @"type": c_int, + data: T, + }) Self { + var self: Self = undefined; + self.headerPtr().* = .{ + .len = data_offset + @sizeOf(T), + .level = args.level, + .@"type" = args.@"type", + }; + self.dataPtr().* = args.data; + return self; + } + + // TODO: include this version if we submit a PR to add this to std + pub fn initNoData(args: struct { + level: c_int, + @"type": c_int, + }) Self { + var self: Self = undefined; + self.headerPtr().* = .{ + .len = data_offset + @sizeOf(T), + .level = args.level, + .@"type" = args.@"type", + }; + return self; + } + + pub fn headerPtr(self: *Self) *Header { + return @ptrCast(*Header, self); + } + pub fn dataPtr(self: *Self) *align(data_align) T { + return @ptrCast(*T, self.bytes[data_offset..]); + } + }; +} + +test { + std.testing.refAllDecls(Cmsghdr([3]std.os.fd_t)); +} + +/// Send a file descriptor and a message through a UNIX socket. +/// TODO: currently voluntarily crashes if data isn't sent properly, should return an error instead. +pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void { + var iov = [_]os.iovec_const{ + .{ + .iov_base = msg.ptr, + .iov_len = msg.len, + }, + }; + + var cmsg = Cmsghdr(os.fd_t).init(.{ + .level = os.SOL.SOCKET, + .@"type" = SCM_RIGHTS, + .data = fd, + }); + + const len = os.sendmsg(sockfd, .{ + .name = null, + .namelen = 0, + .iov = &iov, + .iovlen = iov.len, + .control = &cmsg, + .controllen = @sizeOf(@TypeOf(cmsg)), + .flags = 0, + }, 0) catch |err| { + log.err("error sendmsg failed with {s}", .{@errorName(err)}); + return; + }; + + if (len != msg.len) { + // We don't have much choice but to exit here. + log.err("expected sendmsg to return {} but got {}", .{msg.len, len}); + os.exit(0xff); + } +} + +/// WARNING: recvmsg is a WIP. +/// WARNING: errors aren't RECEPTION errors. +/// WARNING: can only work on linux for now (recvmsg is lacking on other systems). +pub fn recvmsg( + /// The file descriptor of the sending socket. + sockfd: os.socket_t, + /// Message header and iovecs + msg: std.os.msghdr, + flags: u32, +) SendMsgError!usize { + while (true) { + var m = msg; + const rc = system.recvmsg(sockfd, @ptrCast(*std.os.msghdr, &m), @intCast(c_uint, flags)); + if (builtin.os.tag == .windows) { + if (rc == windows.ws2_32.SOCKET_ERROR) { + switch (windows.ws2_32.WSAGetLastError()) { + .WSAEACCES => return error.AccessDenied, + .WSAEADDRNOTAVAIL => return error.AddressNotAvailable, + .WSAECONNRESET => return error.ConnectionResetByPeer, + .WSAEMSGSIZE => return error.MessageTooBig, + .WSAENOBUFS => return error.SystemResources, + .WSAENOTSOCK => return error.FileDescriptorNotASocket, + .WSAEAFNOSUPPORT => return error.AddressFamilyNotSupported, + .WSAEDESTADDRREQ => unreachable, // A destination address is required. + .WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small. + .WSAEHOSTUNREACH => return error.NetworkUnreachable, + // TODO: WSAEINPROGRESS, WSAEINTR + .WSAEINVAL => unreachable, + .WSAENETDOWN => return error.NetworkSubsystemFailed, + .WSAENETRESET => return error.ConnectionResetByPeer, + .WSAENETUNREACH => return error.NetworkUnreachable, + .WSAENOTCONN => return error.SocketNotConnected, + .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. + .WSAEWOULDBLOCK => return error.WouldBlock, + .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. + else => |err| return windows.unexpectedWSAError(err), + } + } else { + return @intCast(usize, rc); + } + } else { + switch (errno(rc)) { + .SUCCESS => return @intCast(usize, rc), + + .ACCES => return error.AccessDenied, + .AGAIN => return error.WouldBlock, + .ALREADY => return error.FastOpenAlreadyInProgress, + .BADF => unreachable, // always a race condition + .CONNRESET => return error.ConnectionResetByPeer, + .DESTADDRREQ => unreachable, // The socket is not connection-mode, and no peer address is set. + .FAULT => unreachable, // An invalid user space address was specified for an argument. + .INTR => continue, + .INVAL => unreachable, // Invalid argument passed. + .ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified + .MSGSIZE => return error.MessageTooBig, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + .OPNOTSUPP => unreachable, // Some bit in the flags argument is inappropriate for the socket type. + .PIPE => return error.BrokenPipe, + .AFNOSUPPORT => return error.AddressFamilyNotSupported, + .LOOP => return error.SymLinkLoop, + .NAMETOOLONG => return error.NameTooLong, + .NOENT => return error.FileNotFound, + .NOTDIR => return error.NotDir, + .HOSTUNREACH => return error.NetworkUnreachable, + .NETUNREACH => return error.NetworkUnreachable, + .NOTCONN => return error.SocketNotConnected, + .NETDOWN => return error.NetworkSubsystemFailed, + else => |err| return unexpectedErrno(err), + } + } + } +} + +/// Receive a file descriptor through a UNIX socket. +/// A message can be carried with it, copied into 'buffer'. +/// WARNING: buffer must be at least 1500 bytes. +pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t { + + var msg_buffer = [_]u8{0} ** 1500; + + var iov = [_]os.iovec{ + .{ + .iov_base = msg_buffer[0..] + , .iov_len = msg_buffer.len + }, + }; + + var cmsg = Cmsghdr(os.fd_t).init(.{ + .level = os.SOL.SOCKET, + .@"type" = SCM_RIGHTS, + .data = 0, + }); + + var msg: std.os.msghdr = .{ + .name = null + , .namelen = 0 + , .iov = &iov + , .iovlen = 1 + , .control = &cmsg + , .controllen = @sizeOf(@TypeOf(cmsg)) + , .flags = 0 + }; + + var msglen = recvmsg(sockfd, msg, 0) catch |err| { + log.err("error recvmsg failed with {s}", .{@errorName(err)}); + return 0; + }; + + var received_fd = @as(i32, cmsg.dataPtr().*); + std.mem.copy(u8, buffer, &msg_buffer); + msg_size.* = msglen; + + return received_fd; +} diff --git a/src/hexdump.zig b/src/hexdump.zig new file mode 100644 index 0000000..cd329bf --- /dev/null +++ b/src/hexdump.zig @@ -0,0 +1,128 @@ +const std = @import("std"); + +pub fn hexdump(stream: anytype, header: [] const u8, buffer: [] const u8) std.os.WriteError!void { + // Print a header. + if (header.len > 0) { + var hdr: [64] u8 = undefined; + var offset: usize = (hdr.len / 2) - ((header.len / 2) - 1); + + std.mem.set(u8, hdr[0..hdr.len], ' '); + std.mem.copy(u8, hdr[offset..hdr.len], header); + + try stream.writeAll(hdr[0..hdr.len]); + try stream.writeAll("\n"); + } + + var hexb: u32 = 0; + var ascii: [16] u8 = undefined; + // First line, first left side (simple number). + try stream.print("\n {d:0>4}: ", .{ hexb }); + + // Loop on all values in the buffer (i from 0 to buffer.len). + var i: u32 = 0; + while (i < buffer.len) : (i += 1) { + // Print actual hexadecimal value. + try stream.print("{X:0>2} ", .{ buffer[i] }); + + // What to print (simple ascii text, right side). + if (buffer[i] >= ' ' and buffer[i] <= '~') { + ascii[(i % 16)] = buffer[i]; + } else { + ascii[(i % 16)] = '.'; + } + + // Next input is a multiple of 8 = extra space. + if ((i + 1) % 8 == 0) { + try stream.writeAll(" "); + } + + // No next input: print the right amount of spaces. + if ((i + 1) == buffer.len) { + // Each line is 16 bytes to print, each byte takes 3 characters. + var missing_spaces = 3 * (15 - (i%16)); + // Missing an extra space if the current index % 16 is less than 7. + if ((i%16) < 7) { missing_spaces += 1; } + while (missing_spaces > 0) : (missing_spaces -= 1) { + try stream.writeAll(" "); + } + } + + // Every 16 bytes: print ascii text and line return. + + // Case 1: it's been 16 bytes AND it's the last byte to print. + if ((i + 1) % 16 == 0 and (i + 1) == buffer.len) { + try stream.print("{s}\n", .{ ascii[0..ascii.len] }); + } + // Case 2: it's been 16 bytes but it's not the end of the buffer. + else if ((i + 1) % 16 == 0 and (i + 1) != buffer.len) { + try stream.print("{s}\n", .{ ascii[0..ascii.len] }); + hexb += 16; + try stream.print(" {d:0>4}: ", .{ hexb }); + } + // Case 3: not the end of the 16 bytes row but it's the end of the buffer. + else if ((i + 1) % 16 != 0 and (i + 1) == buffer.len) { + try stream.print(" {s}\n", .{ ascii[0..((i+1) % 16)] }); + } + // Case 4: not the end of the 16 bytes row and not the end of the buffer. + // Do nothing. + } + + try stream.writeAll("\n"); +} + +const print = std.debug.print; + +test "36-byte hexdump test" { + print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{}); + + var buffer = "hello this is a simple text to print"; + var hexbuf: [2000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + try hexdump(hexwriter, "Hello World", buffer); + print("{s}\n", .{hexfbs.getWritten()}); +} + +test "32-byte hexdump test" { + print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{}); + + var buffer = "THIS IS THE END, MY ONLY... END"; + var hexbuf: [2000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + try hexdump(hexwriter, "Hello World", buffer); + print("{s}\n", .{hexfbs.getWritten()}); +} + +test "26-byte hexdump test" { + print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{}); + + var buffer = "hello this is another text"; + var hexbuf: [2000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + try hexdump(hexwriter, "Hello World", buffer); + print("{s}\n", .{hexfbs.getWritten()}); +} + +test "1-byte hexdump test" { + print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{}); + + var buffer = "h"; + var hexbuf: [2000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + try hexdump(hexwriter, "Hello World", buffer); + print("{s}\n", .{hexfbs.getWritten()}); +} + +test "0-byte hexdump test" { + print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{}); + + var buffer = ""; + var hexbuf: [2000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + try hexdump(hexwriter, "Hello World", buffer); + print("{s}\n", .{hexfbs.getWritten()}); +} diff --git a/src/main.zig b/src/main.zig new file mode 100644 index 0000000..c517a93 --- /dev/null +++ b/src/main.zig @@ -0,0 +1,24 @@ +pub const CBEvent = @import("./callback.zig").CBEvent; +pub const Connection = @import("./connection.zig").Connection; +pub const Message = @import("./message.zig").Message; +pub const Event = @import("./event.zig").Event; +pub const Switch = @import("./switch.zig").Switch; + +pub const Messages = @import("./message.zig").Messages; +pub const Switches = @import("./switch.zig").Switches; +pub const Connections = @import("./connection.zig").Connections; +pub const Context = @import("./context.zig").Context; + +pub const util = @import("./util.zig"); +pub const hexdump = @import("./hexdump.zig"); +pub const exchangefd = @import("./exchange-fd.zig"); + +test { + _ = @import("./callback.zig"); + _ = @import("./connection.zig"); + _ = @import("./context.zig"); + _ = @import("./event.zig"); + _ = @import("./message.zig"); + _ = @import("./switch.zig"); + _ = @import("./util.zig"); +} diff --git a/src/message.zig b/src/message.zig new file mode 100644 index 0000000..de2045c --- /dev/null +++ b/src/message.zig @@ -0,0 +1,106 @@ +const std = @import("std"); +const testing = std.testing; +const net = std.net; +const fmt = std.fmt; + +const print_eq = @import("./util.zig").print_eq; + +pub const Messages = std.ArrayList(Message); + +pub const Message = struct { + + fd: i32, // File descriptor concerned about this message. + payload: []const u8, + + allocator: std.mem.Allocator, // Memory allocator. + + const Self = @This(); + + pub fn init(fd: i32 + , allocator: std.mem.Allocator + , payload: []const u8) !Self { + return Message { .fd = fd + , .allocator = allocator + , .payload = try allocator.dupe(u8, payload) }; + } + + pub fn deinit(self: Self) void { + self.allocator.free(self.payload); + } + + pub fn read(fd: i32, buffer: []const u8, allocator: std.mem.Allocator) !Self { + + var fbs = std.io.fixedBufferStream(buffer); + var reader = fbs.reader(); + + const msg_len = try reader.readIntBig(u32); + if (msg_len > buffer.len - 4) { + return error.wrongMessageLength; + } + const msg_payload = buffer[4..4+msg_len]; + + return try Message.init(fd, allocator, msg_payload); + } + + pub fn write(self: Self, writer: anytype) !usize { + try writer.writeIntBig(u32, @truncate(u32, self.payload.len)); + return 4 + try writer.write(self.payload); + } + + pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { + try fmt.format(out_stream, "fd: {}, payload: [{s}]", + .{self.fd, self.payload} ); + } +}; + +test "Message - creation and display" { + // fd payload + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var m = try Message.init(1, allocator, "hello!!"); + defer m.deinit(); + + try print_eq("fd: 1, payload: [hello!!]", m); +} + +test "Message - read and write" { + // fd payload + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + // First, create a message. + var s = "hello!!"; + var first_message = try Message.init(1, allocator, s); + defer first_message.deinit(); + + // Test its content. + try testing.expect(first_message.fd == 1); + try testing.expect(first_message.payload.len == 7); + try testing.expectEqualSlices(u8, first_message.payload, "hello!!"); + + // Write it in a buffer, similar to sending it on the network. + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + var count = try first_message.write(writer); + + var second_buffer: [2000]u8 = undefined; + var fba = std.heap.FixedBufferAllocator.init(&second_buffer); + var second_allocator = fba.allocator(); + + // Read the buffer, similar to receiving a message on the network. + // (8 == random client's fd number) + var second_message = try Message.read(8, buffer[0..count], second_allocator); + // var second_message = try Message.read(fbs.getWritten(), second_allocator); + defer second_message.deinit(); + + // Test its content, should be equal to the first. + try testing.expect(second_message.payload.len == first_message.payload.len); + try testing.expectEqualSlices(u8, second_message.payload, first_message.payload); +} diff --git a/src/switch.zig b/src/switch.zig new file mode 100644 index 0000000..15d9589 --- /dev/null +++ b/src/switch.zig @@ -0,0 +1,320 @@ +const std = @import("std"); +const testing = std.testing; +const fmt = std.fmt; + +const net = std.net; + +const ipc = @import("./main.zig"); +const Message = ipc.Message; +const CBEventType = ipc.CBEvent.Type; + +const Allocator = std.mem.Allocator; + +const util = @import("./util.zig"); +const print_eq = util.print_eq; +const log = std.log.scoped(.libipc_switch); + +const Event = ipc.Event; + +/// SwitchDB: store relations between clients and services. +/// +/// A protocol service, such as TPCd can handle "external" communications (TCP in this case) +/// meaning that a client can connect to this service through a canal that isn't a simple +/// libipc UNIX socket, and this client is then connected to a local service. +/// OTOH, a local client can ask TCPd to establish a connection to a remote service. +/// In both cases, at least one of the connection isn't libipc-based and should be +/// handled in a specific way that only TPCd (or another protocol service) can. +/// +/// TCPd marks both file descriptors as "related" (add_switch) so libipc can automatically +/// handle messages between the client and the service. Any input from one end will be sent +/// to the other. +/// TCPd registers functions to handle specific input and output operations from and to the +/// remote connection (set_callbacks). +/// +/// At any point, TCPd can safely close a connection and remote it from the SwitchDB (nuke), +/// resulting in the removal of both the connection's FD and its related FD (both the client +/// and the service connections are removed). +/// +/// Currently, libipc automatically closes both the client and its service when an error occurs. +pub const SwitchDB = struct { + const Self = @This(); + + db: std.AutoArrayHashMap(i32, ManagedConnection), + + pub fn init (allocator: Allocator) Self { + return Self { + .db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator), + }; + } + + pub fn deinit (self: *Self) void { + self.db.deinit(); + } + + pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { + for(self.db.keys()) |k,i| { + try fmt.format(out_stream, "({},{})", .{k, self.db.values()[i].dest}); + } + } + + pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void { + try self.db.put(fd1, ManagedConnection {.dest = fd2}); + try self.db.put(fd2, ManagedConnection {.dest = fd1}); + } + + pub fn set_callbacks(self: *Self, fd: i32 + , in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType + , out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void { + var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; + managedconnection.in = in; + managedconnection.out = out; + } + + /// Dig the "db" hashmap, perform "in" fn, may provide a message. + /// Errors from the "in" fn are reported as Zig errors. + pub fn read (self: *Self, fd: i32) !?Message { + // assert there is an entry with this fd as a key. + var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; + + var buffer = [_]u8{0} ** 100000; // TODO: buffer size + var message_size: u32 = @truncate(u32, buffer.len); + var r: CBEventType = managedconnection.in(fd, &buffer, &message_size); + + switch (r) { + // The message should be ignored (protocol specific). + CBEventType.IGNORE => { return null; }, + CBEventType.NO_ERROR => { + // TODO: read message + // TODO: better allocator? + // TODO: better errors? + var message: Message + = Message.read(managedconnection.dest + , buffer[0..message_size] + , std.heap.c_allocator) catch { + return error.generic; + }; + return message; + }, + CBEventType.FD_CLOSING => { return error.closeFD; }, + // Generic error, or the message was read but with errors. + CBEventType.ERROR => { return error.generic; }, + } + + unreachable; + } + + /// Dig the "db" hashmap and perform "out" fn. + /// Errors from the "out" fn are reported as Zig errors. + pub fn write (self: *Self, message: Message) !void { + // assert there is an entry with this fd as a key. + var managedconnection = self.db.get(message.fd) orelse return error.unregisteredFD; + + var buffer = [_]u8{0} ** 100000; // TODO: buffer size + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + // returning basic errors, no details. + _ = message.write(writer) catch return error.generic; + var written = fbs.getWritten(); + + var r = managedconnection.out(message.fd, written.ptr, @truncate(u32, written.len)); + + switch (r) { + // The message should be ignored (protocol specific). + // No error. A message was generated. + CBEventType.NO_ERROR => { + return; + }, + CBEventType.FD_CLOSING => { return error.closeFD; }, + // Generic error, or the message was read but with errors. + CBEventType.IGNORE, + CBEventType.ERROR => { + return error.generic; + }, + } + + unreachable; + } + + /// From a message to read on a socket to an Event. + pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event { + var message: ?Message = null; + message = self.read (fd) catch |err| switch(err) { + error.closeFD => { + return Event.init(Event.Type.DISCONNECTION, index, fd, null); + }, + error.unregisteredFD, + error.generic => { + return Event.init(Event.Type.ERROR, index, fd, null); + }, + }; + return Event.init(Event.Type.SWITCH_RX, index, fd, message); + } + + /// Message is free'd in any case. + pub fn handle_event_write (self: *Self, index: usize, message: Message) Event { + defer message.deinit(); + var fd = message.fd; + self.write(message) catch |err| switch(err) { + error.closeFD => { + return Event.init(Event.Type.DISCONNECTION, index, fd, null); + }, + error.unregisteredFD, + error.generic => { + return Event.init(Event.Type.ERROR, index, fd, null); + }, + }; + return Event.init(Event.Type.SWITCH_TX, index, fd, null); + } + + /// Simple wrapper around self.db.get. + pub fn getDest (self: *Self, fd: i32) !i32 { + return self.db.get(fd).?.dest; + } + + /// Remove both entries (client and service) from the DB. + pub fn nuke (self: *Self, fd: i32) void { + if (self.db.fetchSwapRemove(fd)) |kv| { + _ = self.db.swapRemove(kv.value.dest); + } + } +}; + +const ManagedConnection = struct { + dest : i32, + in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in, + out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out, +}; + +test "creation and display" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6}); + try switchdb.db.put(6, ManagedConnection {.dest = 5}); + + try print_eq("{ (5,6)(6,5) }", .{switchdb}); +} + +fn successful_in (_: i32, mcontent: [*]u8, mlen: *u32) CBEventType { + var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable; + defer m.deinit(); + + var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]); + var writer = fbs.writer(); + const bytes_written = m.write (writer) catch unreachable; + mlen.* = @truncate(u32, bytes_written); + return CBEventType.NO_ERROR; +} + +fn successful_out (_: i32, _: [*]const u8, _: u32) CBEventType { + return CBEventType.NO_ERROR; +} + +test "successful exchanges" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = successful_in, .out = successful_out}); + try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = successful_in, .out = successful_out}); + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_1: Event = switchdb.handle_event_read (1, 5); + if (event_1.m) |m| { m.deinit(); } + else { return error.NoMessage; } + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_2: Event = switchdb.handle_event_read (1, 6); + if (event_2.m) |m| { m.deinit(); } + else { return error.NoMessage; } + + var message = try Message.init(6, allocator, "coucou"); + var event_3 = switchdb.handle_event_write (5, message); + if (event_3.m) |_| { return error.ShouldNotCarryMessage; } +} + +fn unsuccessful_in (_: i32, _: [*]const u8, _: *u32) CBEventType { + return CBEventType.ERROR; +} + +fn unsuccessful_out (_: i32, _: [*]const u8, _: u32) CBEventType { + return CBEventType.ERROR; +} + +test "unsuccessful exchanges" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out}); + try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out}); + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_1: Event = switchdb.handle_event_read (1, 5); + if (event_1.m) |_| { return error.ShouldNotCarryMessage; } + + // should return a new message (hardcoded: fd 8, payload "coucou") + var event_2: Event = switchdb.handle_event_read (1, 6); + if (event_2.m) |_| { return error.ShouldNotCarryMessage; } + + var message = try Message.init(6, allocator, "coucou"); + var event_3 = switchdb.handle_event_write (5, message); + if (event_3.m) |_| { return error.ShouldNotCarryMessage; } +} + +test "nuke 'em" { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var switchdb = SwitchDB.init(allocator); + defer switchdb.deinit(); + + try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out}); + try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out}); + + try testing.expect(switchdb.db.count() == 2); + switchdb.nuke(5); + try testing.expect(switchdb.db.count() == 0); +} + +fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType { + // This may be kinda hacky, idk. + var stream: net.Stream = .{ .handle = origin }; + var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR; + + // Let's handle this as a disconnection. + if (packet_size < 4) { + log.debug("message is less than 4 bytes ({} bytes)", .{packet_size}); + return CBEventType.FD_CLOSING; + } + + mlen.* = @truncate(u32, packet_size); + + return CBEventType.NO_ERROR; +} + +fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType { + // Message contains the fd, no need to search for the right structure to copy, + // let's just recreate a Stream from the fd. + + var to_send = mcontent[0..mlen]; + var stream = net.Stream { .handle = fd }; + _ = stream.write (to_send) catch return CBEventType.ERROR; + return CBEventType.NO_ERROR; +} diff --git a/src/util.zig b/src/util.zig new file mode 100644 index 0000000..bc3d5bd --- /dev/null +++ b/src/util.zig @@ -0,0 +1,67 @@ +const std = @import("std"); +const hexdump = @import("./hexdump.zig"); +const testing = std.testing; + +const log = std.log.scoped(.libipc_util); +const Message = @import("./message.zig").Message; + +/// A VERY LIGHT and INCOMPLETE way of decoding URI. +/// DO NOT USE IT UNLESS YOU KNOW WHAT TO EXPECT. +pub const URI = struct { + protocol: []const u8, + address: []const u8, + path: []const u8, + + const Self = @This(); + + pub fn read(uri_to_decode: []const u8) Self { + + var protocolit = std.mem.split(u8, uri_to_decode, "://"); + var protocol = protocolit.first(); + + var addressit = std.mem.split(u8, protocolit.next().?, "/"); + var address = addressit.first(); + + var path = addressit.rest(); + + return Self { .protocol = protocol + , .address = address + , .path = path }; + } +}; + +test "URI simple decoding" { + var uri = URI.read("tcp://127.0.0.1:8000/some-path"); + try testing.expectEqualSlices(u8, uri.protocol, "tcp"); + try testing.expectEqualSlices(u8, uri.address, "127.0.0.1:8000"); + try testing.expectEqualSlices(u8, uri.path, "some-path"); +} + +pub fn print_buffer (header: []const u8, buffer: []const u8) void { + var hexbuf: [4000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + var hexwriter = hexfbs.writer(); + hexdump.hexdump(hexwriter, header, buffer) catch unreachable; + log.debug("{s}", .{hexfbs.getWritten()}); +} + +pub fn print_message (header: []const u8, m: Message) void { + print_buffer (header, m.payload); +} + +pub fn print_eq(expected: anytype, obj: anytype) !void { + var buffer: [4096]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + try writer.print("{}", .{obj}); + + // typing workaround + var secbuffer: [4096]u8 = undefined; + var secfbs = std.io.fixedBufferStream(&secbuffer); + var secwriter = secfbs.writer(); + + try secwriter.print("{s}", .{expected}); + + try testing.expectEqualSlices(u8, secfbs.getWritten(), fbs.getWritten()); +}