master
Philippe Pittoli 2023-02-06 10:44:51 +01:00
commit 721d8842c5
14 changed files with 2019 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
zig-cache
zig-out
docs
*.swp

70
build.zig Normal file
View File

@ -0,0 +1,70 @@
const std = @import("std");
const VERSION = "0.1.0";
// Although this function looks imperative, note that its job is to
// declaratively construct a build graph that will be executed by an external
// runner.
pub fn build(b: *std.Build) void {
// Standard target options allows the person running `zig build` to choose
// what target to build for. Here we do not override the defaults, which
// means any target is allowed, and the default is native. Other options
// for restricting supported target set are available.
const target = b.standardTargetOptions(.{});
// Standard optimization options allow the person running `zig build` to select
// between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not
// set a preferred release mode, allowing the user to decide how to optimize.
const optimize = b.standardOptimizeOption(.{});
const static_lib = b.addStaticLibrary(.{
.name = "ipc",
// In this case the main source file is merely a path, however, in more
// complicated build scripts, this could be a generated file.
.root_source_file = .{ .path = "src/bindings.zig" },
.target = target,
.optimize = optimize,
});
// Link with the libc of the target system since the C allocator
// is required in the bindings.
static_lib.linkLibC();
// This declares intent for the library to be installed into the standard
// location when the user invokes the "install" step (the default step when
// running `zig build`).
static_lib.install();
const shared_lib = b.addSharedLibrary(.{
.name = "ipc",
.root_source_file = .{ .path = "src/bindings.zig" },
.version = comptime (try std.builtin.Version.parse(VERSION)),
.target = target,
.optimize = optimize,
});
shared_lib.linkLibC();
shared_lib.install();
// Creates a step for unit testing.
const main_tests = b.addTest(.{
.root_source_file = .{ .path = "src/main.zig" },
.target = target,
.optimize = optimize,
});
main_tests.linkLibC();
// This creates a build step. It will be visible in the `zig build --help` menu,
// and can be selected like this: `zig build test`
// This will evaluate the `test` step rather than the default, which is "install".
const test_step = b.step("test", "Run library tests");
test_step.dependOn(&main_tests.step);
const install_static_lib = b.addInstallArtifact(static_lib);
const static_lib_step = b.step("static", "Compile LibIPC as a static library.");
static_lib_step.dependOn(&install_static_lib.step);
const install_shared_lib = b.addInstallArtifact(shared_lib);
// b.getInstallStep().dependOn(&install_shared_lib.step);
const shared_lib_step = b.step("shared", "Compile LibIPC as a shared library.");
shared_lib_step.dependOn(&install_shared_lib.step);
}

48
libipc.h Normal file
View File

@ -0,0 +1,48 @@
#ifndef LIBIPC
#define LIBIPC
#include <stdint.h>
enum event_types {
ERROR = 0 // A problem occured.
, CONNECTION = 1 // New user.
, DISCONNECTION = 2 // User disconnected.
, MESSAGE_RX = 3 // New message.
, MESSAGE_TX = 4 // Message sent.
, TIMER = 5 // Timeout in the poll(2) function.
, EXTERNAL = 6 // Message received from a non IPC socket.
, SWITCH_RX = 7 // Message received from a switched FD.
, SWITCH_TX = 8 // Message sent to a switched fd.
};
// Return type of callback functions when switching.
enum cb_event_types {
CB_NO_ERROR = 0 // No error. A message was generated.
, CB_ERROR = 1 // Generic error.
, CB_FD_CLOSING = 2 // The fd is closing.
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
};
int ipc_context_init (void** ptr);
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
void ipc_context_deinit (void** ctx);
int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen);
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen);
void ipc_context_timer (void* ctx, int timer);
int ipc_close_fd (void* ctx, int fd);
int ipc_close (void* ctx, size_t index);
int ipc_close_all (void* ctx);
// Switch functions (for "protocol" services, such as TCPd).
int ipc_add_external (void* ctx, int newfd);
int ipc_add_switch (void* ctx, int fd1, int fd2);
int ipc_set_switch_callbacks (void* ctx, int fd
, enum cb_event_types (*in (int orig, const char *payload, uint32_t *mlen))
, enum cb_event_types (*out(int dest, char *payload, uint32_t mlen)));
#endif

147
src/bindings.zig Normal file
View File

@ -0,0 +1,147 @@
const std = @import("std");
const log = std.log.scoped(.libipc_bindings);
const ipc = @import("./main.zig");
const Context = ipc.Context;
const Message = ipc.Message;
const CBEventType = ipc.CBEvent.Type;
export fn ipc_context_init (ptr: **Context) callconv(.C) i32 {
ptr.* = std.heap.c_allocator.create(Context) catch return -1;
ptr.*.* = Context.init(std.heap.c_allocator) catch |err| {
log.warn("error while init context: {}\n", .{err});
return -1;
};
return 0;
}
/// Start a libipc service.
export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
var streamserver = ctx.server_init (service_name[0..service_name_len]) catch return -1;
servicefd.* = streamserver.sockfd.?;
return 0;
}
/// Connect to a libipc service, possibly through IPCd.
export fn ipc_connect_service (ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
var fd = ctx.connect_ipc (service_name[0..service_name_len]) catch return -1;
servicefd.* = fd;
return 0;
}
export fn ipc_context_deinit (ctx: **Context) callconv(.C) void {
var ptr: *Context = ctx.*;
ptr.deinit();
std.heap.c_allocator.destroy(ptr);
}
/// Write a message (no waiting).
export fn ipc_write (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
// TODO: better default length.
var buffer = [_]u8{0} ** 100000;
var fba = std.heap.FixedBufferAllocator.init(&buffer);
var message = Message.init(servicefd, fba.allocator(), mcontent[0..mlen]) catch return -1;
ctx.write(message) catch return -1;
return 0;
}
/// Schedule a message.
/// Use the same allocator as the context.
export fn ipc_schedule (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
var message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
ctx.schedule(message) catch return -2;
return 0;
}
/// Read a message from a file descriptor.
/// Buffer length will be changed to the size of the received message.
export fn ipc_read_fd (ctx: *Context, fd: i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
var m = ctx.read_fd (fd) catch {return -1;} orelse return -2;
if (m.payload.len > buflen.*) return -3;
buflen.* = m.payload.len;
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
var writer = fbs.writer();
_ = writer.write(m.payload) catch return -4;
m.deinit();
return 0;
}
/// Read a message.
/// Buffer length will be changed to the size of the received message.
export fn ipc_read (ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
var m = ctx.read (index) catch {return -1;} orelse return -2;
if (m.payload.len > buflen.*) return -3;
buflen.* = m.payload.len;
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
var writer = fbs.writer();
_ = writer.write(m.payload) catch return -4;
m.deinit();
return 0;
}
/// Wait for an event.
/// Buffer length will be changed to the size of the received message.
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
var event = ctx.wait_event() catch return -1;
t.* = @enumToInt(event.t);
index.* = event.index;
originfd.* = event.origin;
if (event.m) |m| {
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
var writer = fbs.writer();
_ = writer.write(m.payload) catch return -4;
buflen.* = m.payload.len;
m.deinit();
}
else {
buflen.* = 0;
}
return 0;
}
/// Change the timer (ms).
export fn ipc_context_timer (ctx: *Context, timer: i32) callconv(.C) void {
ctx.timer = timer;
}
export fn ipc_close_fd (ctx: *Context, fd: i32) callconv(.C) i32 {
ctx.close_fd (fd) catch return -1;
return 0;
}
export fn ipc_close (ctx: *Context, index: usize) callconv(.C) i32 {
ctx.close (index) catch return -1;
return 0;
}
export fn ipc_close_all (ctx: *Context) callconv(.C) i32 {
ctx.close_all () catch return -1;
return 0;
}
/// Add a new file descriptor to listen to.
/// The FD is marked as "external"; it isn't a simple libipc connection.
/// You may want to handle any operation on it by yourself.
export fn ipc_add_external (ctx: *Context, newfd: i32) callconv(.C) i32 {
ctx.add_external (newfd) catch return -1;
return 0;
}
export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
ctx.add_switch (fd1, fd2) catch return -1;
return 0;
}
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
ctx.set_switch_callbacks (fd, in, out) catch return -1;
return 0;
}

35
src/callback.zig Normal file
View File

@ -0,0 +1,35 @@
pub const CBEvent = struct {
// CallBack Event types.
// In the main event loop, servers and clients can receive connections,
// disconnections, errors or messages from their pairs. They also can
// set a timer so the loop will allow a periodic routine (sending ping
// messages for websockets, for instance).
//
// A few other events can occur.
//
// Extra socket
// The main loop waiting for an event can be used as an unique entry
// point for socket management. libipc users can register sockets via
// ipc_add_fd allowing them to trigger an event, so events unrelated
// to libipc are managed the same way.
// Switch
// libipc can be used to create protocol-related programs, such as a
// websocket proxy allowing libipc services to be accessible online.
// To help those programs (with TCP-complient sockets), two sockets
// can be bound together, each message coming from one end will be
// automatically transfered to the other socket and a Switch event
// will be triggered.
// Look Up
// When a client establishes a connection to a service, it asks the
// ipc daemon (ipcd) to locate the service and establish a connection
// to it. This is a lookup.
// For IO callbacks (switching).
pub const Type = enum {
NO_ERROR, // No error. A message was generated.
ERROR, // Generic error.
FD_CLOSING, // The fd is closing.
IGNORE, // The message should be ignored (protocol specific).
};
};

42
src/connection.zig Normal file
View File

@ -0,0 +1,42 @@
const std = @import("std");
const net = std.net;
const fmt = std.fmt;
const print_eq = @import("./util.zig").print_eq;
pub const Connections = std.ArrayList(Connection);
pub const Connection = struct {
pub const Type = enum {
IPC, // Standard connection.
EXTERNAL, // Non IPC connection (TCP, UDP, etc.).
SERVER, // Messages received = new connections.
SWITCHED, // IO operations should go through registered callbacks.
};
t: Connection.Type,
path: ?[] const u8, // Not always needed.
const Self = @This();
pub fn init(t: Connection.Type, path: ?[] const u8) Self {
return Self {
.t = t,
.path = path,
};
}
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path});
}
};
test "Connection - creation and display" {
// origin destination
var path = "/some/path";
var c1 = Connection.init(Connection.Type.EXTERNAL, path);
var c2 = Connection.init(Connection.Type.IPC , null);
try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1);
try print_eq("connection.Connection.Type.IPC, path null", c2);
}

706
src/context.zig Normal file
View File

@ -0,0 +1,706 @@
const std = @import("std");
const testing = std.testing;
const net = std.net;
const os = std.os;
const fmt = std.fmt;
const log = std.log.scoped(.libipc_context);
const receive_fd = @import("./exchange-fd.zig").receive_fd;
const Timer = std.time.Timer;
const CBEvent = @import("./callback.zig").CBEvent;
const Connection = @import("./connection.zig").Connection;
const Message = @import("./message.zig").Message;
const Event = @import("./event.zig").Event;
const Switch = @import("./switch.zig").Switch;
const print_eq = @import("./util.zig").print_eq;
const Messages = @import("./message.zig").Messages;
const SwitchDB = @import("./switch.zig").SwitchDB;
const Connections = @import("./connection.zig").Connections;
const CBEventType = @import("./main.zig").CBEvent.Type;
pub const PollFD = std.ArrayList(std.os.pollfd);
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE-IPC_HEADER_SIZE;
pub const IPC_VERSION = 1;
// Context of the whole networking state.
pub const Context = struct {
rundir: [] u8,
allocator: std.mem.Allocator, // Memory allocator.
connections: Connections, // Keep track of connections.
// "pollfd" structures passed to poll(2). Same indexes as "connections".
pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16)
tx: Messages, // Messages to send, once their fd is available.
switchdb: SwitchDB, // Relations between fd.
timer: ?i32 = null, // No timer by default (no TIMER event).
const Self = @This();
// Context initialization:
// - init structures (provide the allocator)
pub fn init(allocator: std.mem.Allocator) !Self {
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) {
error.EnvironmentVariableNotFound => blk: {
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
},
else => {
return err;
},
};
return Self {
.rundir = rundir
, .connections = Connections.init(allocator)
, .pollfd = PollFD.init(allocator)
, .tx = Messages.init(allocator)
, .switchdb = SwitchDB.init(allocator)
, .allocator = allocator
};
}
// create a server path for the UNIX socket based on the service name
pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void {
try writer.print("{s}/{s}", .{self.rundir, service_name});
}
pub fn deinit(self: *Self) void {
self.close_all() catch |err| switch(err){
error.IndexOutOfBounds => {
log.err("context.deinit(): IndexOutOfBounds", .{});
},
};
self.allocator.free(self.rundir);
self.connections.deinit();
self.pollfd.deinit();
for (self.tx.items) |m| {
m.deinit();
}
self.tx.deinit();
self.switchdb.deinit();
}
// Both simple connection and the switched one share this code.
fn connect_ (self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
var stream = try net.connectUnixSocket(path);
const newfd = stream.handle;
errdefer std.os.closeSocket(newfd);
var newcon = Connection.init(ctype, null);
try self.add_ (newcon, newfd);
return newfd;
}
fn connect_ipcd (self: *Self, service_name: []const u8
, connection_type: Connection.Type) !?i32 {
const buffer_size = 10000;
var buffer: [buffer_size]u8 = undefined;
var fba = std.heap.FixedBufferAllocator.init(&buffer);
var allocator = fba.allocator();
// Get IPC_NETWORK environment variable
// IPC_NETWORK is shared with the network service to choose the protocol stack,
// according to the target service.
//
// Example, connecting to 'audio' service through tor service:
// IPC_NETWORK="audio tor://some.example.com/audio"
//
// Routing directives can be chained using " ;" separator:
// IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong"
var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch(err) {
// error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet)
error.EnvironmentVariableNotFound => {
log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{});
return null;
}, // no need to contact IPCd
else => { return err; },
};
var lookupbuffer: [buffer_size]u8 = undefined;
var lookupfbs = std.io.fixedBufferStream(&lookupbuffer);
var lookupwriter = lookupfbs.writer();
try lookupwriter.print("{s};{s}", .{service_name, network_envvar});
// Try to connect to the IPCd service
var ipcdfd = try self.connect_service("ipc");
defer self.close_fd (ipcdfd) catch {}; // in any case, connection should be closed
// Send LOOKUP message
// content: target service name;${IPC_NETWORK}
// example: pong;pong tls://example.com:8998/pong
var m = try Message.init (ipcdfd, allocator, lookupfbs.getWritten());
try self.write (m);
// Read LOOKUP response
// case error: ignore and move on (TODO)
// else: get fd sent by IPCd then close IPCd fd
var reception_buffer: [2000]u8 = undefined;
var reception_size: usize = 0;
var newfd = try receive_fd (ipcdfd, &reception_buffer, &reception_size);
if (reception_size == 0) {
return error.IPCdFailedNoMessage;
}
var response: []u8 = reception_buffer[0..reception_size];
if (! std.mem.eql(u8, response, "ok")) {
return error.IPCdFailedNotOk;
}
var newcon = Connection.init(connection_type, null);
try self.add_ (newcon, newfd);
return newfd;
}
/// TODO: Add a new connection, but takes care of memory problems:
/// in case one of the arrays cannot sustain another entry, the other
/// won't be added.
fn add_ (self: *Self, new_connection: Connection, fd: os.socket_t) !void {
try self.connections.append(new_connection);
try self.pollfd.append(.{ .fd = fd
, .events = std.os.linux.POLL.IN
, .revents = 0 });
}
fn fd_to_index (self: Self, fd: i32) !usize {
var i: usize = 0;
while(i < self.pollfd.items.len) {
if (self.pollfd.items[i].fd == fd) {
return i;
}
i += 1;
}
return error.IndexNotFound;
}
/// Connect to the service directly, without reaching IPCd first.
/// Return the connection FD.
pub fn connect_service (self: *Self, service_name: []const u8) !i32 {
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
try self.server_path(service_name, writer);
var path = fbs.getWritten();
return self.connect_ (Connection.Type.IPC, path);
}
/// Tries to connect to IPCd first, then the service (if needed).
/// Return the connection FD.
pub fn connect_ipc (self: *Self, service_name: []const u8) !i32 {
// First, try ipcd.
if (try self.connect_ipcd (service_name, Connection.Type.IPC)) |fd| {
log.debug("Connected via IPCd, fd is {}", .{fd});
return fd;
}
// In case this doesn't work, connect directly.
return try self.connect_service (service_name);
}
/// Add a new file descriptor to follow, labeled as EXTERNAL.
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
/// clients trying to reach a libipc service.
pub fn add_external (self: *Self, newfd: i32) !void {
var newcon = Connection.init(Connection.Type.EXTERNAL, null);
try self.add_ (newcon, newfd);
}
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
// net.StreamServer
var serverfd = self.pollfd.items[server_index].fd;
var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
var server = net.StreamServer {
.sockfd = serverfd
, .kernel_backlog = 100
, .reuse_address = false
, .listen_address = try net.Address.initUnix(path)
};
var client = try server.accept(); // net.StreamServer.Connection
const newfd = client.stream.handle;
var newcon = Connection.init(Connection.Type.IPC, null);
try self.add_ (newcon, newfd);
const sfd = server.sockfd orelse return error.SocketLOL; // TODO
// WARNING: imply every new item is last
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
}
// Create a unix socket.
// Store std lib structures in the context.
pub fn server_init(self: *Self, service_name: [] const u8) !net.StreamServer {
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
try self.server_path(service_name, writer);
var path = fbs.getWritten();
var server = net.StreamServer.init(.{});
var socket_addr = try net.Address.initUnix(path);
try server.listen(socket_addr);
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
// Store the path in the Connection structure, so the UNIX socket file can be removed later.
var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
try self.add_ (newcon, newfd);
return server;
}
pub fn write (_: *Self, m: Message) !void {
// Message contains the fd, no need to search for
// the right structure to copy, let's just recreate
// a Stream from the fd.
var stream = net.Stream { .handle = m.fd };
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
_ = try m.write(writer); // returns paylen
_ = try stream.write (fbs.getWritten());
}
pub fn schedule (self: *Self, m: Message) !void {
try self.tx.append(m);
}
/// Read from a client (indexed by a FD).
pub fn read_fd (self: *Self, fd: i32) !?Message {
return try self.read(try self.fd_to_index (fd));
}
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
var index_origin = try self.fd_to_index(fd1);
var index_destinataire = try self.fd_to_index(fd2);
self.connections.items[index_origin].t = Connection.Type.SWITCHED;
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
try self.switchdb.add_switch(fd1,fd2);
}
pub fn set_switch_callbacks(self: *Self, fd: i32
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
try self.switchdb.set_callbacks(fd,in, out);
}
pub fn read (self: *Self, index: usize) !?Message {
if (index >= self.pollfd.items.len) {
return error.IndexOutOfBounds;
}
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
var packet_size: usize = undefined;
// TODO: this is a problem from the network API in Zig,
// servers and clients are different, they aren't just fds.
// Maybe there is something to change in the API.
if (self.connections.items[index].t == .SERVER) {
return error.messageOnServer;
}
// This may be kinda hacky, idk.
var fd = self.pollfd.items[index].fd;
var stream: net.Stream = .{ .handle = fd };
packet_size = try stream.read(buffer[0..]);
// Let's handle this as a disconnection.
if (packet_size <= 4) {
return null;
}
return try Message.read(fd, buffer[0..], self.allocator);
}
/// Before closing the fd, test it via the 'fcntl' syscall.
/// This is useful for switched connections: FDs could be closed without libipc being informed.
fn safe_close_fd (self: *Self, fd: i32) void {
var should_close = true;
_ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch {
should_close = false;
};
if (should_close) {
self.close_fd(fd) catch {};
}
}
// Wait for an event.
pub fn wait_event(self: *Self) !Event {
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
var wait_duration: i32 = -1; // -1 == unlimited
if (self.timer) |t| { log.debug("listening (timer: {} ms)", .{t}); wait_duration = t; }
else { log.debug("listening (no timer)", .{}); }
// Make sure we listen to the right file descriptors,
// setting POLLIN & POLLOUT flags.
for (self.pollfd.items) |*fd| {
fd.events |= std.os.linux.POLL.IN; // just to make sure
}
for (self.tx.items) |m| {
for (self.pollfd.items) |*fd| {
if (fd.fd == m.fd) {
fd.events |= std.os.linux.POLL.OUT; // just to make sure
}
}
}
// before initiate a timer
var timer = try Timer.start();
// Polling.
var count: usize = undefined;
count = try os.poll(self.pollfd.items, wait_duration);
if (count < 0) {
log.err("there is a problem: poll < 0", .{});
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
return current_event;
}
var duration = timer.read() / 1000000; // ns -> ms
if (count == 0) {
if (duration >= wait_duration) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
}
else {
// In case nothing happened, and poll wasn't triggered by time out.
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
}
return current_event;
}
// handle messages
// => loop over self.pollfd.items
for (self.pollfd.items) |*fd, i| {
// .revents is POLLIN
if(fd.revents & std.os.linux.POLL.IN > 0) {
// SERVER = new connection
if (self.connections.items[i].t == .SERVER) {
try self.accept_new_client(&current_event, i);
return current_event;
}
// SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) {
current_event = self.switchdb.handle_event_read (i, fd.fd);
switch (current_event.t) {
.SWITCH_RX => {
try self.schedule(current_event.m.?);
},
.DISCONNECTION => {
var dest = try self.switchdb.getDest(fd.fd);
log.debug("disconnection from {} -> removing {}, too", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
.ERROR => {
var dest = try self.switchdb.getDest(fd.fd);
log.warn("error from {} -> removing {}, too", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
else => {
log.warn("switch rx incoherent error: {}", .{current_event.t});
return error.incoherentSwitchError;
},
}
return current_event;
}
// EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) {
return Event.init(Event.Type.EXTERNAL, i, fd.fd, null);
}
// otherwise = new message or disconnection
else {
var maybe_message = self.read(i) catch |err| switch(err) {
error.ConnectionResetByPeer => {
log.warn("connection reset by peer", .{});
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
},
else => { return err; },
};
if (maybe_message) |m| {
return Event.init(Event.Type.MESSAGE_RX, i, fd.fd, m);
}
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
}
}
// .revent is POLLOUT
if(fd.revents & std.os.linux.POLL.OUT > 0) {
fd.events &= ~ @as(i16, std.os.linux.POLL.OUT);
var index: usize = undefined;
for (self.tx.items) |m, index_| {
if (m.fd == self.pollfd.items[i].fd) {
index = index_;
break;
}
}
var m = self.tx.swapRemove(index);
// SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) {
current_event = self.switchdb.handle_event_write (i, m);
// Message inner memory is already freed.
switch (current_event.t) {
.SWITCH_TX => {
},
.ERROR => {
var dest = try self.switchdb.getDest(fd.fd);
log.warn("error from {} -> removing {}, too", .{fd.fd, dest});
self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd);
self.safe_close_fd(dest);
},
else => {
log.warn("switch tx incoherent error: {}", .{current_event.t});
return error.incoherentSwitchError;
},
}
return current_event;
}
else {
// otherwise = write message for the msg.fd
try self.write (m);
m.deinit();
return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null);
}
}
// .revent is POLLHUP
if(fd.revents & std.os.linux.POLL.HUP > 0) {
// handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
try self.close(i);
return current_event;
}
// if fd revent is POLLERR or POLLNVAL
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.NVAL > 0)) {
return Event.init(Event.Type.ERROR, i, fd.fd, null);
}
}
return current_event;
}
/// Remove a connection based on its file descriptor.
pub fn close_fd(self: *Self, fd: i32) !void {
try self.close(try self.fd_to_index (fd));
}
pub fn close(self: *Self, index: usize) !void {
// REMINDER: connections and pollfd have the same length
if (index >= self.pollfd.items.len) {
return error.IndexOutOfBounds;
}
// close the connection and remove it from the two structures
var con = self.connections.swapRemove(index);
// Remove service's UNIX socket file.
if (con.path) |path| {
std.fs.cwd().deleteFile(path) catch {};
self.allocator.free(path);
}
var pollfd = self.pollfd.swapRemove(index);
std.os.close(pollfd.fd);
// Remove all its non-sent messages.
var i: usize = 0;
while (true) {
if (i >= self.tx.items.len)
break;
if (self.tx.items[i].fd == pollfd.fd) {
var m = self.tx.swapRemove(i);
m.deinit();
continue;
}
i += 1;
}
}
pub fn close_all(self: *Self) !void {
while(self.connections.items.len > 0) { try self.close(0); }
}
pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream
, "context ({} connections and {} messages):"
, .{self.connections.items.len, self.tx.items.len});
for (self.connections.items) |con| {
try fmt.format(out_stream, "\n- ", .{});
try con.format(form, options, out_stream);
}
for (self.tx.items) |tx| {
try fmt.format(out_stream, "\n- ", .{});
try tx.format(form, options, out_stream);
}
}
};
// Creating a new thread: testing UNIX communication.
// This is a client sending a raw "Hello world!" bytestring,
// not an instance of Message.
const CommunicationTestThread = struct {
fn clientFn() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var c = try Context.init(allocator);
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
try c.server_path("simple-context-test", writer);
var path = fbs.getWritten();
const socket = try net.connectUnixSocket(path);
defer socket.close();
_ = try socket.writer().writeAll("Hello world!");
}
};
test "Context - creation, display and memory check" {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var c = try Context.init(allocator);
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
try c.server_path("simple-context-test", writer);
var path = fbs.getWritten();
// SERVER SIDE: creating a service.
var server = c.server_init("simple-context-test") catch |err| switch(err) {
error.FileNotFound => {
log.err("cannot init server at {s}", .{path});
return err;
},
else => return err,
};
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
defer t.join();
// Server.accept returns a net.StreamServer.Connection.
var client = try server.accept();
defer client.stream.close();
var buf: [16]u8 = undefined;
const n = try client.stream.reader().read(&buf);
try testing.expectEqual(@as(usize, 12), n);
try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]);
}
// Creating a new thread: testing UNIX communication.
// This is a client sending a an instance of Message.
const ConnectThenSendMessageThread = struct {
fn clientFn() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var c = try Context.init(allocator);
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
var path_buffer: [1000]u8 = undefined;
var path_fbs = std.io.fixedBufferStream(&path_buffer);
var path_writer = path_fbs.writer();
try c.server_path("simple-context-test", path_writer);
var path = path_fbs.getWritten();
// Actual UNIX socket connection.
const socket = try net.connectUnixSocket(path);
defer socket.close();
// Writing message into a buffer.
var message_buffer: [1000]u8 = undefined;
var message_fbs = std.io.fixedBufferStream(&message_buffer);
var message_writer = message_fbs.writer();
// 'fd' parameter is not taken into account here (no loop)
var m = try Message.init(0, allocator, "Hello world!");
defer m.deinit();
_ = try m.write(message_writer);
_ = try socket.writer().writeAll(message_fbs.getWritten());
}
};
test "Context - creation, echo once" {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var c = try Context.init(allocator);
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer();
try c.server_path("simple-context-test", writer);
var path = fbs.getWritten();
// SERVER SIDE: creating a service.
var server = c.server_init("simple-context-test") catch |err| switch(err) {
error.FileNotFound => {
log.err("cannot init server at {s}", .{path});
return err;
},
else => return err,
};
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{});
defer t.join();
// Server.accept returns a net.StreamServer.Connection.
var client = try server.accept();
defer client.stream.close();
var buf: [1000]u8 = undefined;
const n = try client.stream.reader().read(&buf);
var m = try Message.read(8, buf[0..n], allocator); // 8 == random client's fd number
defer m.deinit();
try testing.expectEqual(@as(usize, 12), m.payload.len);
try testing.expectEqualSlices(u8, m.payload, "Hello world!");
}

96
src/event.zig Normal file
View File

@ -0,0 +1,96 @@
const std = @import("std");
const testing = std.testing;
const fmt = std.fmt;
const Message = @import("./message.zig").Message;
const print_eq = @import("./util.zig").print_eq;
pub const Event = struct {
// Event types.
// In the main event loop, servers and clients can receive connections,
// disconnections, errors or messages from their pairs. They also can
// set a timer so the loop will allow a periodic routine (sending ping
// messages for websockets, for instance).
//
// A few other events can occur.
//
// Extra socket
// The main loop waiting for an event can be used as an unique entry
// point for socket management. libipc users can register sockets via
// ipc_add_fd allowing them to trigger an event, so events unrelated
// to libipc are managed the same way.
// Switch
// libipc can be used to create protocol-related programs, such as a
// websocket proxy allowing libipc services to be accessible online.
// To help those programs (with TCP-complient sockets), two sockets
// can be bound together, each message coming from one end will be
// automatically transfered to the other socket and a Switch event
// will be triggered.
// Look Up
// When a client establishes a connection to a service, it asks the
// ipc daemon (ipcd) to locate the service and establish a connection
// to it. This is a lookup.
pub const Type = enum {
ERROR, // A problem occured.
CONNECTION, // New user.
DISCONNECTION, // User disconnected.
MESSAGE_RX, // New message.
MESSAGE_TX, // Message sent.
TIMER, // Timeout in the poll(2) function.
EXTERNAL, // Message received from a non IPC socket.
SWITCH_RX, // Message received from a switched FD.
SWITCH_TX, // Message sent to a switched fd.
};
t: Event.Type,
index: usize,
origin: i32, // socket fd
m: ?Message, // message
const Self = @This();
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
return Self { .t = t, .index = index, .origin = origin, .m = m, };
}
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
self.t = t;
self.index = index;
self.origin = origin;
self.m = m;
}
pub fn clean(self: *Self) void {
self.t = Event.Type.ERROR;
self.index = @as(usize,0);
self.origin = @as(i32,0);
if (self.m) |message| {
message.deinit();
}
self.m = null;
}
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream
, "{}, origin: {}, index {}, message: [{?}]"
, .{ self.t, self.origin, self.index, self.m} );
}
};
test "Event - creation and display" {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var s = "hello!!";
var m = try Message.init(1, allocator, s); // fd type payload
defer m.deinit();
var e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
}

226
src/exchange-fd.zig Normal file
View File

@ -0,0 +1,226 @@
const std = @import("std");
const testing = std.testing;
const os = std.os;
const log = std.log.scoped(.libipc_exchangefd);
const builtin = @import("builtin");
const windows = std.os.windows;
const errno = std.os.errno;
const system = std.os.system;
const unexpectedErrno = std.os.unexpectedErrno;
const SendMsgError = std.os.SendMsgError;
const SCM_RIGHTS: c_int = 1;
/// This definition enables the use of Zig types with a cmsghdr structure.
/// The oddity of this layout is that the data must be aligned to @sizeOf(usize)
/// rather than its natural alignment.
pub fn Cmsghdr(comptime T: type) type {
const Header = extern struct {
len: usize,
level: c_int,
@"type": c_int,
};
const data_align = @sizeOf(usize);
const data_offset = std.mem.alignForward(@sizeOf(Header), data_align);
return extern struct {
const Self = @This();
bytes: [data_offset + @sizeOf(T)]u8 align(@alignOf(Header)),
pub fn init(args: struct {
level: c_int,
@"type": c_int,
data: T,
}) Self {
var self: Self = undefined;
self.headerPtr().* = .{
.len = data_offset + @sizeOf(T),
.level = args.level,
.@"type" = args.@"type",
};
self.dataPtr().* = args.data;
return self;
}
// TODO: include this version if we submit a PR to add this to std
pub fn initNoData(args: struct {
level: c_int,
@"type": c_int,
}) Self {
var self: Self = undefined;
self.headerPtr().* = .{
.len = data_offset + @sizeOf(T),
.level = args.level,