2023-02-06 10:44:51 +01:00
|
|
|
const std = @import("std");
|
|
|
|
const testing = std.testing;
|
|
|
|
const net = std.net;
|
|
|
|
const os = std.os;
|
|
|
|
const fmt = std.fmt;
|
2023-02-13 22:09:57 +01:00
|
|
|
const c = std.c;
|
|
|
|
|
|
|
|
// TODO: to remove once PR https://github.com/ziglang/zig/pull/14639 is accepted.
|
|
|
|
pub extern "c" fn umask(mode: c.mode_t) c.mode_t;
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
const log = std.log.scoped(.libipc_context);
|
|
|
|
|
|
|
|
const receive_fd = @import("./exchange-fd.zig").receive_fd;
|
|
|
|
|
|
|
|
const Timer = std.time.Timer;
|
|
|
|
|
|
|
|
const CBEvent = @import("./callback.zig").CBEvent;
|
|
|
|
const Connection = @import("./connection.zig").Connection;
|
|
|
|
const Message = @import("./message.zig").Message;
|
|
|
|
const Event = @import("./event.zig").Event;
|
|
|
|
const Switch = @import("./switch.zig").Switch;
|
|
|
|
const print_eq = @import("./util.zig").print_eq;
|
|
|
|
|
|
|
|
const Messages = @import("./message.zig").Messages;
|
|
|
|
const SwitchDB = @import("./switch.zig").SwitchDB;
|
|
|
|
const Connections = @import("./connection.zig").Connections;
|
|
|
|
const CBEventType = @import("./main.zig").CBEvent.Type;
|
|
|
|
|
|
|
|
pub const PollFD = std.ArrayList(std.os.pollfd);
|
|
|
|
|
|
|
|
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
|
|
|
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
|
2023-02-07 07:47:00 +01:00
|
|
|
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
|
2023-02-06 10:44:51 +01:00
|
|
|
pub const IPC_VERSION = 1;
|
|
|
|
|
|
|
|
// Context of the whole networking state.
|
|
|
|
pub const Context = struct {
|
2023-02-07 07:47:00 +01:00
|
|
|
rundir: []u8,
|
|
|
|
allocator: std.mem.Allocator, // Memory allocator.
|
|
|
|
connections: Connections, // Keep track of connections.
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// "pollfd" structures passed to poll(2). Same indexes as "connections".
|
2023-02-07 07:47:00 +01:00
|
|
|
pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16)
|
2023-02-06 10:44:51 +01:00
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
tx: Messages, // Messages to send, once their fd is available.
|
2023-02-06 10:44:51 +01:00
|
|
|
switchdb: SwitchDB, // Relations between fd.
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
timer: ?i32 = null, // No timer by default (no TIMER event).
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
const Self = @This();
|
|
|
|
|
|
|
|
// Context initialization:
|
|
|
|
// - init structures (provide the allocator)
|
|
|
|
pub fn init(allocator: std.mem.Allocator) !Self {
|
2023-02-07 07:47:00 +01:00
|
|
|
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
error.EnvironmentVariableNotFound => blk: {
|
2023-02-14 19:16:07 +01:00
|
|
|
break :blk try allocator.dupeZ(u8, "/tmp/.libipc-run/");
|
2023-02-06 10:44:51 +01:00
|
|
|
},
|
|
|
|
else => {
|
|
|
|
return err;
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
2023-02-14 19:16:07 +01:00
|
|
|
// Allow mkdir to create a directory with 0o770 permissions.
|
|
|
|
var previous_mask = umask(0o007);
|
|
|
|
defer _ = umask(previous_mask);
|
|
|
|
|
|
|
|
// Create the run directory, where all UNIX sockets will be.
|
2023-05-03 03:21:30 +02:00
|
|
|
std.os.mkdir(rundir, 0o0770) catch |err| switch (err) {
|
2023-02-14 19:16:07 +01:00
|
|
|
error.PathAlreadyExists => {
|
|
|
|
log.warn("runtime directory ({s}) already exists, (everything is fine, ignoring)", .{rundir});
|
|
|
|
},
|
|
|
|
else => {
|
2023-05-03 03:21:30 +02:00
|
|
|
log.warn("runtime directory ({s}): {any}", .{ rundir, err });
|
2023-02-14 19:16:07 +01:00
|
|
|
return err;
|
|
|
|
},
|
|
|
|
};
|
2023-02-06 10:44:51 +01:00
|
|
|
|
2023-02-14 19:16:07 +01:00
|
|
|
return Self{ .rundir = rundir, .connections = Connections.init(allocator), .pollfd = PollFD.init(allocator), .tx = Messages.init(allocator), .switchdb = SwitchDB.init(allocator), .allocator = allocator };
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn deinit(self: *Self) void {
|
2023-02-07 07:47:00 +01:00
|
|
|
self.close_all() catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
error.IndexOutOfBounds => {
|
|
|
|
log.err("context.deinit(): IndexOutOfBounds", .{});
|
|
|
|
},
|
|
|
|
};
|
|
|
|
self.allocator.free(self.rundir);
|
|
|
|
self.connections.deinit();
|
|
|
|
self.pollfd.deinit();
|
|
|
|
for (self.tx.items) |m| {
|
|
|
|
m.deinit();
|
|
|
|
}
|
|
|
|
self.tx.deinit();
|
|
|
|
self.switchdb.deinit();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Both simple connection and the switched one share this code.
|
2023-02-07 07:47:00 +01:00
|
|
|
fn connect_(self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
|
2023-02-06 10:44:51 +01:00
|
|
|
var stream = try net.connectUnixSocket(path);
|
|
|
|
const newfd = stream.handle;
|
|
|
|
errdefer std.os.closeSocket(newfd);
|
|
|
|
var newcon = Connection.init(ctype, null);
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.add_(newcon, newfd);
|
2023-02-06 10:44:51 +01:00
|
|
|
return newfd;
|
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
fn connect_ipcd(self: *Self, service_name: []const u8, connection_type: Connection.Type) !?i32 {
|
2023-02-06 10:44:51 +01:00
|
|
|
const buffer_size = 10000;
|
|
|
|
var buffer: [buffer_size]u8 = undefined;
|
|
|
|
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
|
|
|
var allocator = fba.allocator();
|
|
|
|
|
|
|
|
// Get IPC_NETWORK environment variable
|
|
|
|
// IPC_NETWORK is shared with the network service to choose the protocol stack,
|
|
|
|
// according to the target service.
|
|
|
|
//
|
|
|
|
// Example, connecting to 'audio' service through tor service:
|
|
|
|
// IPC_NETWORK="audio tor://some.example.com/audio"
|
|
|
|
//
|
|
|
|
// Routing directives can be chained using " ;" separator:
|
|
|
|
// IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong"
|
2023-02-07 07:47:00 +01:00
|
|
|
var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
// error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet)
|
|
|
|
error.EnvironmentVariableNotFound => {
|
|
|
|
log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{});
|
|
|
|
return null;
|
|
|
|
}, // no need to contact IPCd
|
2023-02-07 07:47:00 +01:00
|
|
|
else => {
|
|
|
|
return err;
|
|
|
|
},
|
2023-02-06 10:44:51 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
var lookupbuffer: [buffer_size]u8 = undefined;
|
|
|
|
var lookupfbs = std.io.fixedBufferStream(&lookupbuffer);
|
|
|
|
var lookupwriter = lookupfbs.writer();
|
2023-02-07 07:47:00 +01:00
|
|
|
try lookupwriter.print("{s};{s}", .{ service_name, network_envvar });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// Try to connect to the IPCd service
|
|
|
|
var ipcdfd = try self.connect_service("ipc");
|
2023-02-07 07:47:00 +01:00
|
|
|
defer self.close_fd(ipcdfd) catch {}; // in any case, connection should be closed
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// Send LOOKUP message
|
|
|
|
// content: target service name;${IPC_NETWORK}
|
|
|
|
// example: pong;pong tls://example.com:8998/pong
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
var m = try Message.init(ipcdfd, allocator, lookupfbs.getWritten());
|
|
|
|
try self.write(m);
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// Read LOOKUP response
|
|
|
|
// case error: ignore and move on (TODO)
|
|
|
|
// else: get fd sent by IPCd then close IPCd fd
|
|
|
|
var reception_buffer: [2000]u8 = undefined;
|
|
|
|
var reception_size: usize = 0;
|
2023-02-07 07:47:00 +01:00
|
|
|
var newfd = try receive_fd(ipcdfd, &reception_buffer, &reception_size);
|
2023-02-06 10:44:51 +01:00
|
|
|
if (reception_size == 0) {
|
|
|
|
return error.IPCdFailedNoMessage;
|
|
|
|
}
|
|
|
|
|
|
|
|
var response: []u8 = reception_buffer[0..reception_size];
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
if (!std.mem.eql(u8, response, "ok")) {
|
2023-02-06 10:44:51 +01:00
|
|
|
return error.IPCdFailedNotOk;
|
|
|
|
}
|
|
|
|
var newcon = Connection.init(connection_type, null);
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.add_(newcon, newfd);
|
2023-02-06 10:44:51 +01:00
|
|
|
return newfd;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO: Add a new connection, but takes care of memory problems:
|
|
|
|
/// in case one of the arrays cannot sustain another entry, the other
|
|
|
|
/// won't be added.
|
2023-02-07 07:47:00 +01:00
|
|
|
fn add_(self: *Self, new_connection: Connection, fd: os.socket_t) !void {
|
2023-02-06 10:44:51 +01:00
|
|
|
try self.connections.append(new_connection);
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.pollfd.append(.{ .fd = fd, .events = std.os.linux.POLL.IN, .revents = 0 });
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
fn fd_to_index(self: Self, fd: i32) !usize {
|
2023-02-06 10:44:51 +01:00
|
|
|
var i: usize = 0;
|
2023-02-07 07:47:00 +01:00
|
|
|
while (i < self.pollfd.items.len) {
|
|
|
|
if (self.pollfd.items[i].fd == fd) {
|
|
|
|
return i;
|
|
|
|
}
|
|
|
|
i += 1;
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
return error.IndexNotFound;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Connect to the service directly, without reaching IPCd first.
|
|
|
|
/// Return the connection FD.
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn connect_service(self: *Self, service_name: []const u8) !i32 {
|
2023-02-06 10:44:51 +01:00
|
|
|
var buffer: [1000]u8 = undefined;
|
2023-02-14 19:16:07 +01:00
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
2023-02-07 07:47:00 +01:00
|
|
|
return self.connect_(Connection.Type.IPC, path);
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Tries to connect to IPCd first, then the service (if needed).
|
|
|
|
/// Return the connection FD.
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn connect_ipc(self: *Self, service_name: []const u8) !i32 {
|
2023-02-06 10:44:51 +01:00
|
|
|
// First, try ipcd.
|
2023-02-07 07:47:00 +01:00
|
|
|
if (try self.connect_ipcd(service_name, Connection.Type.IPC)) |fd| {
|
2023-02-06 10:44:51 +01:00
|
|
|
log.debug("Connected via IPCd, fd is {}", .{fd});
|
|
|
|
return fd;
|
|
|
|
}
|
|
|
|
// In case this doesn't work, connect directly.
|
2023-02-07 07:47:00 +01:00
|
|
|
return try self.connect_service(service_name);
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Add a new file descriptor to follow, labeled as EXTERNAL.
|
|
|
|
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
|
|
|
|
/// clients trying to reach a libipc service.
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn add_external(self: *Self, newfd: i32) !void {
|
2023-02-06 10:44:51 +01:00
|
|
|
var newcon = Connection.init(Connection.Type.EXTERNAL, null);
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.add_(newcon, newfd);
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
|
|
|
// net.StreamServer
|
|
|
|
var serverfd = self.pollfd.items[server_index].fd;
|
|
|
|
var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
|
2023-05-03 03:21:30 +02:00
|
|
|
var server = net.StreamServer{ .sockfd = serverfd, .kernel_backlog = 100, .reuse_address = false, .reuse_port = false, .listen_address = try net.Address.initUnix(path) };
|
2023-02-06 10:44:51 +01:00
|
|
|
var client = try server.accept(); // net.StreamServer.Connection
|
|
|
|
|
|
|
|
const newfd = client.stream.handle;
|
|
|
|
var newcon = Connection.init(Connection.Type.IPC, null);
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.add_(newcon, newfd);
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
const sfd = server.sockfd orelse return error.SocketLOL; // TODO
|
|
|
|
// WARNING: imply every new item is last
|
|
|
|
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a unix socket.
|
|
|
|
// Store std lib structures in the context.
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn server_init(self: *Self, service_name: []const u8) !net.StreamServer {
|
2023-02-06 10:44:51 +01:00
|
|
|
var buffer: [1000]u8 = undefined;
|
2023-02-15 15:12:09 +01:00
|
|
|
var buffer_lock: [1000]u8 = undefined;
|
2023-02-14 19:16:07 +01:00
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
2023-05-03 03:21:30 +02:00
|
|
|
var lock = try std.fmt.bufPrint(&buffer_lock, "{s}.lock", .{path});
|
2023-02-15 15:12:09 +01:00
|
|
|
|
|
|
|
// Create a lock file (and lock it) in order to prevent a race condition.
|
|
|
|
// While the program is running, the lock is enabled.
|
|
|
|
// Once the program stops (even if it crashes), the lock is then disabled.
|
|
|
|
// Quit if the lock is still active.
|
2023-05-03 03:21:30 +02:00
|
|
|
const lock_opts = .{ .lock = .Exclusive, .lock_nonblocking = true };
|
2023-02-15 15:12:09 +01:00
|
|
|
_ = std.fs.createFileAbsolute(lock, lock_opts) catch |err| {
|
2023-05-03 03:21:30 +02:00
|
|
|
log.err("cannot init server at {s}, lock {s} is causing a problem: {any}", .{ path, lock, err });
|
2023-02-15 15:12:09 +01:00
|
|
|
log.err("you may have lauched the service twice.", .{});
|
|
|
|
return err;
|
|
|
|
};
|
2023-02-06 10:44:51 +01:00
|
|
|
|
2023-02-13 22:09:57 +01:00
|
|
|
// Allow to create a unix socket with the right permissions.
|
|
|
|
// Group should include write permissions.
|
|
|
|
var previous_mask = umask(0o117);
|
|
|
|
defer _ = umask(previous_mask);
|
|
|
|
|
2023-02-15 15:12:09 +01:00
|
|
|
// Remove the old UNIX socket.
|
2023-05-03 03:21:30 +02:00
|
|
|
std.os.unlink(path) catch |err| switch (err) {
|
2023-02-15 15:12:09 +01:00
|
|
|
error.FileNotFound => log.debug("no unlink necessary for {s}", .{path}),
|
|
|
|
else => return err,
|
|
|
|
};
|
|
|
|
|
2023-02-06 10:44:51 +01:00
|
|
|
var server = net.StreamServer.init(.{});
|
|
|
|
var socket_addr = try net.Address.initUnix(path);
|
|
|
|
try server.listen(socket_addr);
|
|
|
|
|
|
|
|
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
|
|
|
|
// Store the path in the Connection structure, so the UNIX socket file can be removed later.
|
|
|
|
var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.add_(newcon, newfd);
|
2023-02-13 22:09:57 +01:00
|
|
|
|
2023-02-06 10:44:51 +01:00
|
|
|
return server;
|
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn write(_: *Self, m: Message) !void {
|
2023-02-06 10:44:51 +01:00
|
|
|
// Message contains the fd, no need to search for
|
|
|
|
// the right structure to copy, let's just recreate
|
|
|
|
// a Stream from the fd.
|
2023-02-07 07:47:00 +01:00
|
|
|
var stream = net.Stream{ .handle = m.fd };
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
|
|
|
|
var fbs = std.io.fixedBufferStream(&buffer);
|
|
|
|
var writer = fbs.writer();
|
|
|
|
|
|
|
|
_ = try m.write(writer); // returns paylen
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
_ = try stream.write(fbs.getWritten());
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn schedule(self: *Self, m: Message) !void {
|
2023-02-06 10:44:51 +01:00
|
|
|
try self.tx.append(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Read from a client (indexed by a FD).
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn read_fd(self: *Self, fd: i32) !?Message {
|
|
|
|
return try self.read(try self.fd_to_index(fd));
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
|
|
|
|
var index_origin = try self.fd_to_index(fd1);
|
|
|
|
var index_destinataire = try self.fd_to_index(fd2);
|
|
|
|
|
|
|
|
self.connections.items[index_origin].t = Connection.Type.SWITCHED;
|
|
|
|
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.switchdb.add_switch(fd1, fd2);
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn set_switch_callbacks(self: *Self, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
|
|
|
|
try self.switchdb.set_callbacks(fd, in, out);
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
pub fn read(self: *Self, index: usize) !?Message {
|
2023-02-06 10:44:51 +01:00
|
|
|
if (index >= self.pollfd.items.len) {
|
|
|
|
return error.IndexOutOfBounds;
|
|
|
|
}
|
|
|
|
|
|
|
|
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
|
|
|
|
var packet_size: usize = undefined;
|
|
|
|
|
|
|
|
// TODO: this is a problem from the network API in Zig,
|
|
|
|
// servers and clients are different, they aren't just fds.
|
|
|
|
// Maybe there is something to change in the API.
|
|
|
|
if (self.connections.items[index].t == .SERVER) {
|
|
|
|
return error.messageOnServer;
|
|
|
|
}
|
|
|
|
|
|
|
|
// This may be kinda hacky, idk.
|
|
|
|
var fd = self.pollfd.items[index].fd;
|
|
|
|
var stream: net.Stream = .{ .handle = fd };
|
|
|
|
packet_size = try stream.read(buffer[0..]);
|
|
|
|
|
|
|
|
// Let's handle this as a disconnection.
|
|
|
|
if (packet_size <= 4) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
return try Message.read(fd, buffer[0..], self.allocator);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Before closing the fd, test it via the 'fcntl' syscall.
|
|
|
|
/// This is useful for switched connections: FDs could be closed without libipc being informed.
|
2023-02-07 07:47:00 +01:00
|
|
|
fn safe_close_fd(self: *Self, fd: i32) void {
|
2023-02-06 10:44:51 +01:00
|
|
|
var should_close = true;
|
|
|
|
_ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch {
|
|
|
|
should_close = false;
|
|
|
|
};
|
|
|
|
if (should_close) {
|
|
|
|
self.close_fd(fd) catch {};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for an event.
|
|
|
|
pub fn wait_event(self: *Self) !Event {
|
|
|
|
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
|
|
|
|
var wait_duration: i32 = -1; // -1 == unlimited
|
|
|
|
|
2023-02-07 07:47:00 +01:00
|
|
|
if (self.timer) |t| {
|
|
|
|
log.debug("listening (timer: {} ms)", .{t});
|
|
|
|
wait_duration = t;
|
|
|
|
} else {
|
|
|
|
log.debug("listening (no timer)", .{});
|
|
|
|
}
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// Make sure we listen to the right file descriptors,
|
|
|
|
// setting POLLIN & POLLOUT flags.
|
|
|
|
for (self.pollfd.items) |*fd| {
|
|
|
|
fd.events |= std.os.linux.POLL.IN; // just to make sure
|
|
|
|
}
|
|
|
|
|
|
|
|
for (self.tx.items) |m| {
|
|
|
|
for (self.pollfd.items) |*fd| {
|
|
|
|
if (fd.fd == m.fd) {
|
|
|
|
fd.events |= std.os.linux.POLL.OUT; // just to make sure
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// before initiate a timer
|
|
|
|
var timer = try Timer.start();
|
|
|
|
|
|
|
|
// Polling.
|
|
|
|
var count: usize = undefined;
|
|
|
|
|
|
|
|
count = try os.poll(self.pollfd.items, wait_duration);
|
|
|
|
|
|
|
|
if (count < 0) {
|
|
|
|
log.err("there is a problem: poll < 0", .{});
|
|
|
|
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
|
|
|
|
var duration = timer.read() / 1000000; // ns -> ms
|
|
|
|
if (count == 0) {
|
|
|
|
if (duration >= wait_duration) {
|
|
|
|
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
2023-02-07 07:47:00 +01:00
|
|
|
} else {
|
2023-02-06 10:44:51 +01:00
|
|
|
// In case nothing happened, and poll wasn't triggered by time out.
|
|
|
|
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
|
|
|
}
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
|
|
|
|
// handle messages
|
|
|
|
// => loop over self.pollfd.items
|
2023-05-03 03:21:30 +02:00
|
|
|
for (self.pollfd.items, 0..) |*fd, i| {
|
2023-02-06 10:44:51 +01:00
|
|
|
// .revents is POLLIN
|
2023-02-07 07:47:00 +01:00
|
|
|
if (fd.revents & std.os.linux.POLL.IN > 0) {
|
2023-02-06 10:44:51 +01:00
|
|
|
// SERVER = new connection
|
|
|
|
if (self.connections.items[i].t == .SERVER) {
|
|
|
|
try self.accept_new_client(¤t_event, i);
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
// SWITCHED = send message to the right dest (or drop the switch)
|
|
|
|
else if (self.connections.items[i].t == .SWITCHED) {
|
2023-02-07 07:47:00 +01:00
|
|
|
current_event = self.switchdb.handle_event_read(i, fd.fd);
|
2023-02-06 10:44:51 +01:00
|
|
|
switch (current_event.t) {
|
|
|
|
.SWITCH_RX => {
|
|
|
|
try self.schedule(current_event.m.?);
|
|
|
|
},
|
|
|
|
.DISCONNECTION => {
|
|
|
|
var dest = try self.switchdb.getDest(fd.fd);
|
2023-02-07 07:47:00 +01:00
|
|
|
log.debug("disconnection from {} -> removing {}, too", .{ fd.fd, dest });
|
2023-02-06 10:44:51 +01:00
|
|
|
self.switchdb.nuke(fd.fd);
|
|
|
|
self.safe_close_fd(fd.fd);
|
|
|
|
self.safe_close_fd(dest);
|
|
|
|
},
|
|
|
|
.ERROR => {
|
|
|
|
var dest = try self.switchdb.getDest(fd.fd);
|
2023-02-07 07:47:00 +01:00
|
|
|
log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
|
2023-02-06 10:44:51 +01:00
|
|
|
self.switchdb.nuke(fd.fd);
|
|
|
|
self.safe_close_fd(fd.fd);
|
|
|
|
self.safe_close_fd(dest);
|
|
|
|
},
|
|
|
|
else => {
|
|
|
|
log.warn("switch rx incoherent error: {}", .{current_event.t});
|
|
|
|
return error.incoherentSwitchError;
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
// EXTERNAL = user handles IO
|
|
|
|
else if (self.connections.items[i].t == .EXTERNAL) {
|
|
|
|
return Event.init(Event.Type.EXTERNAL, i, fd.fd, null);
|
|
|
|
}
|
|
|
|
// otherwise = new message or disconnection
|
|
|
|
else {
|
2023-02-07 07:47:00 +01:00
|
|
|
var maybe_message = self.read(i) catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
error.ConnectionResetByPeer => {
|
|
|
|
log.warn("connection reset by peer", .{});
|
|
|
|
try self.close(i);
|
|
|
|
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
|
|
|
},
|
2023-02-07 07:47:00 +01:00
|
|
|
else => {
|
|
|
|
return err;
|
|
|
|
},
|
2023-02-06 10:44:51 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
if (maybe_message) |m| {
|
|
|
|
return Event.init(Event.Type.MESSAGE_RX, i, fd.fd, m);
|
|
|
|
}
|
|
|
|
|
|
|
|
try self.close(i);
|
|
|
|
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// .revent is POLLOUT
|
2023-02-07 07:47:00 +01:00
|
|
|
if (fd.revents & std.os.linux.POLL.OUT > 0) {
|
|
|
|
fd.events &= ~@as(i16, std.os.linux.POLL.OUT);
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
var index: usize = undefined;
|
2023-05-03 03:21:30 +02:00
|
|
|
for (self.tx.items, 0..) |m, index_| {
|
2023-02-06 10:44:51 +01:00
|
|
|
if (m.fd == self.pollfd.items[i].fd) {
|
|
|
|
index = index_;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var m = self.tx.swapRemove(index);
|
2023-05-15 09:12:32 +02:00
|
|
|
defer m.deinit();
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// SWITCHED = write message for its switch buddy (callbacks)
|
|
|
|
if (self.connections.items[i].t == .SWITCHED) {
|
2023-02-07 07:47:00 +01:00
|
|
|
current_event = self.switchdb.handle_event_write(i, m);
|
2023-02-06 10:44:51 +01:00
|
|
|
// Message inner memory is already freed.
|
|
|
|
switch (current_event.t) {
|
2023-02-07 07:47:00 +01:00
|
|
|
.SWITCH_TX => {},
|
2023-02-06 10:44:51 +01:00
|
|
|
.ERROR => {
|
|
|
|
var dest = try self.switchdb.getDest(fd.fd);
|
2023-02-07 07:47:00 +01:00
|
|
|
log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
|
2023-02-06 10:44:51 +01:00
|
|
|
self.switchdb.nuke(fd.fd);
|
|
|
|
self.safe_close_fd(fd.fd);
|
|
|
|
self.safe_close_fd(dest);
|
|
|
|
},
|
|
|
|
else => {
|
|
|
|
log.warn("switch tx incoherent error: {}", .{current_event.t});
|
|
|
|
return error.incoherentSwitchError;
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return current_event;
|
2023-02-07 07:47:00 +01:00
|
|
|
} else {
|
2023-02-06 10:44:51 +01:00
|
|
|
// otherwise = write message for the msg.fd
|
2023-05-15 09:12:32 +02:00
|
|
|
self.write(m) catch |err| switch(err) {
|
|
|
|
error.BrokenPipe => {
|
|
|
|
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
|
|
|
try self.close(i);
|
|
|
|
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
|
|
|
},
|
|
|
|
else => {
|
|
|
|
log.warn("unmanaged error while sending a message ({})", .{err});
|
|
|
|
try self.close(i);
|
|
|
|
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
|
|
|
},
|
|
|
|
};
|
2023-02-06 10:44:51 +01:00
|
|
|
return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// .revent is POLLHUP
|
2023-02-07 07:47:00 +01:00
|
|
|
if (fd.revents & std.os.linux.POLL.HUP > 0) {
|
2023-02-06 10:44:51 +01:00
|
|
|
// handle disconnection
|
|
|
|
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
|
|
|
|
try self.close(i);
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
// if fd revent is POLLERR or POLLNVAL
|
2023-02-07 07:47:00 +01:00
|
|
|
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
|
|
|
|
(fd.revents & std.os.linux.POLL.NVAL > 0))
|
|
|
|
{
|
2023-02-06 10:44:51 +01:00
|
|
|
return Event.init(Event.Type.ERROR, i, fd.fd, null);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return current_event;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Remove a connection based on its file descriptor.
|
|
|
|
pub fn close_fd(self: *Self, fd: i32) !void {
|
2023-02-07 07:47:00 +01:00
|
|
|
try self.close(try self.fd_to_index(fd));
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn close(self: *Self, index: usize) !void {
|
|
|
|
// REMINDER: connections and pollfd have the same length
|
|
|
|
if (index >= self.pollfd.items.len) {
|
|
|
|
return error.IndexOutOfBounds;
|
|
|
|
}
|
|
|
|
|
|
|
|
// close the connection and remove it from the two structures
|
|
|
|
var con = self.connections.swapRemove(index);
|
|
|
|
// Remove service's UNIX socket file.
|
|
|
|
if (con.path) |path| {
|
|
|
|
std.fs.cwd().deleteFile(path) catch {};
|
|
|
|
self.allocator.free(path);
|
|
|
|
}
|
|
|
|
var pollfd = self.pollfd.swapRemove(index);
|
|
|
|
std.os.close(pollfd.fd);
|
|
|
|
|
|
|
|
// Remove all its non-sent messages.
|
|
|
|
var i: usize = 0;
|
|
|
|
while (true) {
|
|
|
|
if (i >= self.tx.items.len)
|
|
|
|
break;
|
|
|
|
|
|
|
|
if (self.tx.items[i].fd == pollfd.fd) {
|
|
|
|
var m = self.tx.swapRemove(i);
|
|
|
|
m.deinit();
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
i += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn close_all(self: *Self) !void {
|
2023-02-07 07:47:00 +01:00
|
|
|
while (self.connections.items.len > 0) {
|
|
|
|
try self.close(0);
|
|
|
|
}
|
2023-02-06 10:44:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void {
|
2023-02-07 07:47:00 +01:00
|
|
|
try fmt.format(out_stream, "context ({} connections and {} messages):", .{ self.connections.items.len, self.tx.items.len });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
for (self.connections.items) |con| {
|
|
|
|
try fmt.format(out_stream, "\n- ", .{});
|
|
|
|
try con.format(form, options, out_stream);
|
|
|
|
}
|
|
|
|
|
|
|
|
for (self.tx.items) |tx| {
|
|
|
|
try fmt.format(out_stream, "\n- ", .{});
|
|
|
|
try tx.format(form, options, out_stream);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Creating a new thread: testing UNIX communication.
|
|
|
|
// This is a client sending a raw "Hello world!" bytestring,
|
|
|
|
// not an instance of Message.
|
|
|
|
const CommunicationTestThread = struct {
|
|
|
|
fn clientFn() !void {
|
2023-02-07 07:47:00 +01:00
|
|
|
const config = .{ .safety = true };
|
2023-02-06 10:44:51 +01:00
|
|
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
|
|
|
defer _ = gpa.deinit();
|
|
|
|
const allocator = gpa.allocator();
|
|
|
|
|
2023-02-13 22:09:57 +01:00
|
|
|
var ctx = try Context.init(allocator);
|
|
|
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
var buffer: [1000]u8 = undefined;
|
2023-02-14 19:16:07 +01:00
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
const socket = try net.connectUnixSocket(path);
|
|
|
|
defer socket.close();
|
|
|
|
_ = try socket.writer().writeAll("Hello world!");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
test "Context - creation, display and memory check" {
|
2023-02-07 07:47:00 +01:00
|
|
|
const config = .{ .safety = true };
|
2023-02-06 10:44:51 +01:00
|
|
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
|
|
|
defer _ = gpa.deinit();
|
|
|
|
|
|
|
|
const allocator = gpa.allocator();
|
|
|
|
|
2023-02-13 22:09:57 +01:00
|
|
|
var ctx = try Context.init(allocator);
|
|
|
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
var buffer: [1000]u8 = undefined;
|
2023-02-14 19:16:07 +01:00
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// SERVER SIDE: creating a service.
|
2023-02-13 22:09:57 +01:00
|
|
|
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
error.FileNotFound => {
|
|
|
|
log.err("cannot init server at {s}", .{path});
|
|
|
|
return err;
|
|
|
|
},
|
|
|
|
else => return err,
|
|
|
|
};
|
|
|
|
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
|
|
|
|
|
|
|
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
|
|
|
|
defer t.join();
|
|
|
|
|
|
|
|
// Server.accept returns a net.StreamServer.Connection.
|
|
|
|
var client = try server.accept();
|
|
|
|
defer client.stream.close();
|
|
|
|
var buf: [16]u8 = undefined;
|
|
|
|
const n = try client.stream.reader().read(&buf);
|
|
|
|
|
|
|
|
try testing.expectEqual(@as(usize, 12), n);
|
|
|
|
try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Creating a new thread: testing UNIX communication.
|
|
|
|
// This is a client sending a an instance of Message.
|
|
|
|
const ConnectThenSendMessageThread = struct {
|
|
|
|
fn clientFn() !void {
|
2023-02-07 07:47:00 +01:00
|
|
|
const config = .{ .safety = true };
|
2023-02-06 10:44:51 +01:00
|
|
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
|
|
|
defer _ = gpa.deinit();
|
|
|
|
const allocator = gpa.allocator();
|
|
|
|
|
2023-02-13 22:09:57 +01:00
|
|
|
var ctx = try Context.init(allocator);
|
|
|
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
2023-02-06 10:44:51 +01:00
|
|
|
|
2023-02-14 19:16:07 +01:00
|
|
|
var buffer: [1000]u8 = undefined;
|
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// Actual UNIX socket connection.
|
|
|
|
const socket = try net.connectUnixSocket(path);
|
|
|
|
defer socket.close();
|
|
|
|
|
|
|
|
// Writing message into a buffer.
|
|
|
|
var message_buffer: [1000]u8 = undefined;
|
|
|
|
var message_fbs = std.io.fixedBufferStream(&message_buffer);
|
|
|
|
var message_writer = message_fbs.writer();
|
|
|
|
// 'fd' parameter is not taken into account here (no loop)
|
|
|
|
|
|
|
|
var m = try Message.init(0, allocator, "Hello world!");
|
|
|
|
defer m.deinit();
|
|
|
|
_ = try m.write(message_writer);
|
|
|
|
|
|
|
|
_ = try socket.writer().writeAll(message_fbs.getWritten());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
test "Context - creation, echo once" {
|
2023-02-07 07:47:00 +01:00
|
|
|
const config = .{ .safety = true };
|
2023-02-06 10:44:51 +01:00
|
|
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
|
|
|
defer _ = gpa.deinit();
|
|
|
|
|
|
|
|
const allocator = gpa.allocator();
|
|
|
|
|
2023-02-13 22:09:57 +01:00
|
|
|
var ctx = try Context.init(allocator);
|
|
|
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
var buffer: [1000]u8 = undefined;
|
2023-02-14 19:16:07 +01:00
|
|
|
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
2023-02-06 10:44:51 +01:00
|
|
|
|
|
|
|
// SERVER SIDE: creating a service.
|
2023-02-13 22:09:57 +01:00
|
|
|
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
2023-02-06 10:44:51 +01:00
|
|
|
error.FileNotFound => {
|
|
|
|
log.err("cannot init server at {s}", .{path});
|
|
|
|
return err;
|
|
|
|
},
|
|
|
|
else => return err,
|
|
|
|
};
|
|
|
|
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
|
|
|
|
|
|
|
const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{});
|
|
|
|
defer t.join();
|
|
|
|
|
|
|
|
// Server.accept returns a net.StreamServer.Connection.
|
|
|
|
var client = try server.accept();
|
|
|
|
defer client.stream.close();
|
|
|
|
var buf: [1000]u8 = undefined;
|
|
|
|
const n = try client.stream.reader().read(&buf);
|
|
|
|
var m = try Message.read(8, buf[0..n], allocator); // 8 == random client's fd number
|
|
|
|
defer m.deinit();
|
|
|
|
|
|
|
|
try testing.expectEqual(@as(usize, 12), m.payload.len);
|
|
|
|
try testing.expectEqualSlices(u8, m.payload, "Hello world!");
|
|
|
|
}
|