Code style using 'zig fmt'.

master
Philippe Pittoli 2023-02-07 07:47:00 +01:00
parent 5ca55f0525
commit 70c062a598
11 changed files with 294 additions and 311 deletions

View File

@ -5,7 +5,7 @@ const Context = ipc.Context;
const Message = ipc.Message; const Message = ipc.Message;
const CBEventType = ipc.CBEvent.Type; const CBEventType = ipc.CBEvent.Type;
export fn ipc_context_init (ptr: **Context) callconv(.C) i32 { export fn ipc_context_init(ptr: **Context) callconv(.C) i32 {
ptr.* = std.heap.c_allocator.create(Context) catch return -1; ptr.* = std.heap.c_allocator.create(Context) catch return -1;
ptr.*.* = Context.init(std.heap.c_allocator) catch |err| { ptr.*.* = Context.init(std.heap.c_allocator) catch |err| {
@ -17,26 +17,26 @@ export fn ipc_context_init (ptr: **Context) callconv(.C) i32 {
/// Start a libipc service. /// Start a libipc service.
export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 { export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
var streamserver = ctx.server_init (service_name[0..service_name_len]) catch return -1; var streamserver = ctx.server_init(service_name[0..service_name_len]) catch return -1;
servicefd.* = streamserver.sockfd.?; servicefd.* = streamserver.sockfd.?;
return 0; return 0;
} }
/// Connect to a libipc service, possibly through IPCd. /// Connect to a libipc service, possibly through IPCd.
export fn ipc_connect_service (ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 { export fn ipc_connect_service(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
var fd = ctx.connect_ipc (service_name[0..service_name_len]) catch return -1; var fd = ctx.connect_ipc(service_name[0..service_name_len]) catch return -1;
servicefd.* = fd; servicefd.* = fd;
return 0; return 0;
} }
export fn ipc_context_deinit (ctx: **Context) callconv(.C) void { export fn ipc_context_deinit(ctx: **Context) callconv(.C) void {
var ptr: *Context = ctx.*; var ptr: *Context = ctx.*;
ptr.deinit(); ptr.deinit();
std.heap.c_allocator.destroy(ptr); std.heap.c_allocator.destroy(ptr);
} }
/// Write a message (no waiting). /// Write a message (no waiting).
export fn ipc_write (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 { export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
// TODO: better default length. // TODO: better default length.
var buffer = [_]u8{0} ** 100000; var buffer = [_]u8{0} ** 100000;
var fba = std.heap.FixedBufferAllocator.init(&buffer); var fba = std.heap.FixedBufferAllocator.init(&buffer);
@ -48,7 +48,7 @@ export fn ipc_write (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
/// Schedule a message. /// Schedule a message.
/// Use the same allocator as the context. /// Use the same allocator as the context.
export fn ipc_schedule (ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 { export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
var message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1; var message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
ctx.schedule(message) catch return -2; ctx.schedule(message) catch return -2;
return 0; return 0;
@ -56,8 +56,10 @@ export fn ipc_schedule (ctx: *Context, servicefd: i32, mcontent: [*]const u8, ml
/// Read a message from a file descriptor. /// Read a message from a file descriptor.
/// Buffer length will be changed to the size of the received message. /// Buffer length will be changed to the size of the received message.
export fn ipc_read_fd (ctx: *Context, fd: i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { export fn ipc_read_fd(ctx: *Context, fd: i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
var m = ctx.read_fd (fd) catch {return -1;} orelse return -2; var m = ctx.read_fd(fd) catch {
return -1;
} orelse return -2;
if (m.payload.len > buflen.*) return -3; if (m.payload.len > buflen.*) return -3;
buflen.* = m.payload.len; buflen.* = m.payload.len;
@ -71,8 +73,10 @@ export fn ipc_read_fd (ctx: *Context, fd: i32, buffer: [*]u8, buflen: *usize) ca
/// Read a message. /// Read a message.
/// Buffer length will be changed to the size of the received message. /// Buffer length will be changed to the size of the received message.
export fn ipc_read (ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) callconv(.C) i32 { export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
var m = ctx.read (index) catch {return -1;} orelse return -2; var m = ctx.read(index) catch {
return -1;
} orelse return -2;
if (m.payload.len > buflen.*) return -3; if (m.payload.len > buflen.*) return -3;
buflen.* = m.payload.len; buflen.* = m.payload.len;
@ -98,8 +102,7 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
_ = writer.write(m.payload) catch return -4; _ = writer.write(m.payload) catch return -4;
buflen.* = m.payload.len; buflen.* = m.payload.len;
m.deinit(); m.deinit();
} } else {
else {
buflen.* = 0; buflen.* = 0;
} }
@ -107,41 +110,39 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
} }
/// Change the timer (ms). /// Change the timer (ms).
export fn ipc_context_timer (ctx: *Context, timer: i32) callconv(.C) void { export fn ipc_context_timer(ctx: *Context, timer: i32) callconv(.C) void {
ctx.timer = timer; ctx.timer = timer;
} }
export fn ipc_close_fd (ctx: *Context, fd: i32) callconv(.C) i32 { export fn ipc_close_fd(ctx: *Context, fd: i32) callconv(.C) i32 {
ctx.close_fd (fd) catch return -1; ctx.close_fd(fd) catch return -1;
return 0; return 0;
} }
export fn ipc_close (ctx: *Context, index: usize) callconv(.C) i32 { export fn ipc_close(ctx: *Context, index: usize) callconv(.C) i32 {
ctx.close (index) catch return -1; ctx.close(index) catch return -1;
return 0; return 0;
} }
export fn ipc_close_all (ctx: *Context) callconv(.C) i32 { export fn ipc_close_all(ctx: *Context) callconv(.C) i32 {
ctx.close_all () catch return -1; ctx.close_all() catch return -1;
return 0; return 0;
} }
/// Add a new file descriptor to listen to. /// Add a new file descriptor to listen to.
/// The FD is marked as "external"; it isn't a simple libipc connection. /// The FD is marked as "external"; it isn't a simple libipc connection.
/// You may want to handle any operation on it by yourself. /// You may want to handle any operation on it by yourself.
export fn ipc_add_external (ctx: *Context, newfd: i32) callconv(.C) i32 { export fn ipc_add_external(ctx: *Context, newfd: i32) callconv(.C) i32 {
ctx.add_external (newfd) catch return -1; ctx.add_external(newfd) catch return -1;
return 0; return 0;
} }
export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 { export fn ipc_add_switch(ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
ctx.add_switch (fd1, fd2) catch return -1; ctx.add_switch(fd1, fd2) catch return -1;
return 0; return 0;
} }
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32 export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType ctx.set_switch_callbacks(fd, in, out) catch return -1;
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
ctx.set_switch_callbacks (fd, in, out) catch return -1;
return 0; return 0;
} }

View File

@ -27,9 +27,9 @@ pub const CBEvent = struct {
// For IO callbacks (switching). // For IO callbacks (switching).
pub const Type = enum { pub const Type = enum {
NO_ERROR, // No error. A message was generated. NO_ERROR, // No error. A message was generated.
ERROR, // Generic error. ERROR, // Generic error.
FD_CLOSING, // The fd is closing. FD_CLOSING, // The fd is closing.
IGNORE, // The message should be ignored (protocol specific). IGNORE, // The message should be ignored (protocol specific).
}; };
}; };

View File

@ -7,28 +7,27 @@ const print_eq = @import("./util.zig").print_eq;
pub const Connections = std.ArrayList(Connection); pub const Connections = std.ArrayList(Connection);
pub const Connection = struct { pub const Connection = struct {
pub const Type = enum { pub const Type = enum {
IPC, // Standard connection. IPC, // Standard connection.
EXTERNAL, // Non IPC connection (TCP, UDP, etc.). EXTERNAL, // Non IPC connection (TCP, UDP, etc.).
SERVER, // Messages received = new connections. SERVER, // Messages received = new connections.
SWITCHED, // IO operations should go through registered callbacks. SWITCHED, // IO operations should go through registered callbacks.
}; };
t: Connection.Type, t: Connection.Type,
path: ?[] const u8, // Not always needed. path: ?[]const u8, // Not always needed.
const Self = @This(); const Self = @This();
pub fn init(t: Connection.Type, path: ?[] const u8) Self { pub fn init(t: Connection.Type, path: ?[]const u8) Self {
return Self { return Self{
.t = t, .t = t,
.path = path, .path = path,
}; };
} }
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path}); try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path });
} }
}; };
@ -36,7 +35,7 @@ test "Connection - creation and display" {
// origin destination // origin destination
var path = "/some/path"; var path = "/some/path";
var c1 = Connection.init(Connection.Type.EXTERNAL, path); var c1 = Connection.init(Connection.Type.EXTERNAL, path);
var c2 = Connection.init(Connection.Type.IPC , null); var c2 = Connection.init(Connection.Type.IPC, null);
try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1); try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1);
try print_eq("connection.Connection.Type.IPC, path null", c2); try print_eq("connection.Connection.Type.IPC, path null", c2);
} }

View File

@ -26,29 +26,29 @@ pub const PollFD = std.ArrayList(std.os.pollfd);
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content. pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE-IPC_HEADER_SIZE; pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
pub const IPC_VERSION = 1; pub const IPC_VERSION = 1;
// Context of the whole networking state. // Context of the whole networking state.
pub const Context = struct { pub const Context = struct {
rundir: [] u8, rundir: []u8,
allocator: std.mem.Allocator, // Memory allocator. allocator: std.mem.Allocator, // Memory allocator.
connections: Connections, // Keep track of connections. connections: Connections, // Keep track of connections.
// "pollfd" structures passed to poll(2). Same indexes as "connections". // "pollfd" structures passed to poll(2). Same indexes as "connections".
pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16) pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16)
tx: Messages, // Messages to send, once their fd is available. tx: Messages, // Messages to send, once their fd is available.
switchdb: SwitchDB, // Relations between fd. switchdb: SwitchDB, // Relations between fd.
timer: ?i32 = null, // No timer by default (no TIMER event). timer: ?i32 = null, // No timer by default (no TIMER event).
const Self = @This(); const Self = @This();
// Context initialization: // Context initialization:
// - init structures (provide the allocator) // - init structures (provide the allocator)
pub fn init(allocator: std.mem.Allocator) !Self { pub fn init(allocator: std.mem.Allocator) !Self {
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) { var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch (err) {
error.EnvironmentVariableNotFound => blk: { error.EnvironmentVariableNotFound => blk: {
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/"); break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
}, },
@ -57,23 +57,16 @@ pub const Context = struct {
}, },
}; };
return Self { return Self{ .rundir = rundir, .connections = Connections.init(allocator), .pollfd = PollFD.init(allocator), .tx = Messages.init(allocator), .switchdb = SwitchDB.init(allocator), .allocator = allocator };
.rundir = rundir
, .connections = Connections.init(allocator)
, .pollfd = PollFD.init(allocator)
, .tx = Messages.init(allocator)
, .switchdb = SwitchDB.init(allocator)
, .allocator = allocator
};
} }
// create a server path for the UNIX socket based on the service name // create a server path for the UNIX socket based on the service name
pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void { pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void {
try writer.print("{s}/{s}", .{self.rundir, service_name}); try writer.print("{s}/{s}", .{ self.rundir, service_name });
} }
pub fn deinit(self: *Self) void { pub fn deinit(self: *Self) void {
self.close_all() catch |err| switch(err){ self.close_all() catch |err| switch (err) {
error.IndexOutOfBounds => { error.IndexOutOfBounds => {
log.err("context.deinit(): IndexOutOfBounds", .{}); log.err("context.deinit(): IndexOutOfBounds", .{});
}, },
@ -89,18 +82,16 @@ pub const Context = struct {
} }
// Both simple connection and the switched one share this code. // Both simple connection and the switched one share this code.
fn connect_ (self: *Self, ctype: Connection.Type, path: []const u8) !i32 { fn connect_(self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
var stream = try net.connectUnixSocket(path); var stream = try net.connectUnixSocket(path);
const newfd = stream.handle; const newfd = stream.handle;
errdefer std.os.closeSocket(newfd); errdefer std.os.closeSocket(newfd);
var newcon = Connection.init(ctype, null); var newcon = Connection.init(ctype, null);
try self.add_ (newcon, newfd); try self.add_(newcon, newfd);
return newfd; return newfd;
} }
fn connect_ipcd (self: *Self, service_name: []const u8 fn connect_ipcd(self: *Self, service_name: []const u8, connection_type: Connection.Type) !?i32 {
, connection_type: Connection.Type) !?i32 {
const buffer_size = 10000; const buffer_size = 10000;
var buffer: [buffer_size]u8 = undefined; var buffer: [buffer_size]u8 = undefined;
var fba = std.heap.FixedBufferAllocator.init(&buffer); var fba = std.heap.FixedBufferAllocator.init(&buffer);
@ -115,75 +106,75 @@ pub const Context = struct {
// //
// Routing directives can be chained using " ;" separator: // Routing directives can be chained using " ;" separator:
// IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong" // IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong"
var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch(err) { var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch (err) {
// error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet) // error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet)
error.EnvironmentVariableNotFound => { error.EnvironmentVariableNotFound => {
log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{}); log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{});
return null; return null;
}, // no need to contact IPCd }, // no need to contact IPCd
else => { return err; }, else => {
return err;
},
}; };
var lookupbuffer: [buffer_size]u8 = undefined; var lookupbuffer: [buffer_size]u8 = undefined;
var lookupfbs = std.io.fixedBufferStream(&lookupbuffer); var lookupfbs = std.io.fixedBufferStream(&lookupbuffer);
var lookupwriter = lookupfbs.writer(); var lookupwriter = lookupfbs.writer();
try lookupwriter.print("{s};{s}", .{service_name, network_envvar}); try lookupwriter.print("{s};{s}", .{ service_name, network_envvar });
// Try to connect to the IPCd service // Try to connect to the IPCd service
var ipcdfd = try self.connect_service("ipc"); var ipcdfd = try self.connect_service("ipc");
defer self.close_fd (ipcdfd) catch {}; // in any case, connection should be closed defer self.close_fd(ipcdfd) catch {}; // in any case, connection should be closed
// Send LOOKUP message // Send LOOKUP message
// content: target service name;${IPC_NETWORK} // content: target service name;${IPC_NETWORK}
// example: pong;pong tls://example.com:8998/pong // example: pong;pong tls://example.com:8998/pong
var m = try Message.init (ipcdfd, allocator, lookupfbs.getWritten()); var m = try Message.init(ipcdfd, allocator, lookupfbs.getWritten());
try self.write (m); try self.write(m);
// Read LOOKUP response // Read LOOKUP response
// case error: ignore and move on (TODO) // case error: ignore and move on (TODO)
// else: get fd sent by IPCd then close IPCd fd // else: get fd sent by IPCd then close IPCd fd
var reception_buffer: [2000]u8 = undefined; var reception_buffer: [2000]u8 = undefined;
var reception_size: usize = 0; var reception_size: usize = 0;
var newfd = try receive_fd (ipcdfd, &reception_buffer, &reception_size); var newfd = try receive_fd(ipcdfd, &reception_buffer, &reception_size);
if (reception_size == 0) { if (reception_size == 0) {
return error.IPCdFailedNoMessage; return error.IPCdFailedNoMessage;
} }
var response: []u8 = reception_buffer[0..reception_size]; var response: []u8 = reception_buffer[0..reception_size];
if (! std.mem.eql(u8, response, "ok")) { if (!std.mem.eql(u8, response, "ok")) {
return error.IPCdFailedNotOk; return error.IPCdFailedNotOk;
} }
var newcon = Connection.init(connection_type, null); var newcon = Connection.init(connection_type, null);
try self.add_ (newcon, newfd); try self.add_(newcon, newfd);
return newfd; return newfd;
} }
/// TODO: Add a new connection, but takes care of memory problems: /// TODO: Add a new connection, but takes care of memory problems:
/// in case one of the arrays cannot sustain another entry, the other /// in case one of the arrays cannot sustain another entry, the other
/// won't be added. /// won't be added.
fn add_ (self: *Self, new_connection: Connection, fd: os.socket_t) !void { fn add_(self: *Self, new_connection: Connection, fd: os.socket_t) !void {
try self.connections.append(new_connection); try self.connections.append(new_connection);
try self.pollfd.append(.{ .fd = fd try self.pollfd.append(.{ .fd = fd, .events = std.os.linux.POLL.IN, .revents = 0 });
, .events = std.os.linux.POLL.IN
, .revents = 0 });
} }
fn fd_to_index (self: Self, fd: i32) !usize { fn fd_to_index(self: Self, fd: i32) !usize {
var i: usize = 0; var i: usize = 0;
while(i < self.pollfd.items.len) { while (i < self.pollfd.items.len) {
if (self.pollfd.items[i].fd == fd) { if (self.pollfd.items[i].fd == fd) {
return i; return i;
} }
i += 1; i += 1;
} }
return error.IndexNotFound; return error.IndexNotFound;
} }
/// Connect to the service directly, without reaching IPCd first. /// Connect to the service directly, without reaching IPCd first.
/// Return the connection FD. /// Return the connection FD.
pub fn connect_service (self: *Self, service_name: []const u8) !i32 { pub fn connect_service(self: *Self, service_name: []const u8) !i32 {
var buffer: [1000]u8 = undefined; var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer); var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer(); var writer = fbs.writer();
@ -191,44 +182,39 @@ pub const Context = struct {
try self.server_path(service_name, writer); try self.server_path(service_name, writer);
var path = fbs.getWritten(); var path = fbs.getWritten();
return self.connect_ (Connection.Type.IPC, path); return self.connect_(Connection.Type.IPC, path);
} }
/// Tries to connect to IPCd first, then the service (if needed). /// Tries to connect to IPCd first, then the service (if needed).
/// Return the connection FD. /// Return the connection FD.
pub fn connect_ipc (self: *Self, service_name: []const u8) !i32 { pub fn connect_ipc(self: *Self, service_name: []const u8) !i32 {
// First, try ipcd. // First, try ipcd.
if (try self.connect_ipcd (service_name, Connection.Type.IPC)) |fd| { if (try self.connect_ipcd(service_name, Connection.Type.IPC)) |fd| {
log.debug("Connected via IPCd, fd is {}", .{fd}); log.debug("Connected via IPCd, fd is {}", .{fd});
return fd; return fd;
} }
// In case this doesn't work, connect directly. // In case this doesn't work, connect directly.
return try self.connect_service (service_name); return try self.connect_service(service_name);
} }
/// Add a new file descriptor to follow, labeled as EXTERNAL. /// Add a new file descriptor to follow, labeled as EXTERNAL.
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections, /// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
/// clients trying to reach a libipc service. /// clients trying to reach a libipc service.
pub fn add_external (self: *Self, newfd: i32) !void { pub fn add_external(self: *Self, newfd: i32) !void {
var newcon = Connection.init(Connection.Type.EXTERNAL, null); var newcon = Connection.init(Connection.Type.EXTERNAL, null);
try self.add_ (newcon, newfd); try self.add_(newcon, newfd);
} }
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
// net.StreamServer // net.StreamServer
var serverfd = self.pollfd.items[server_index].fd; var serverfd = self.pollfd.items[server_index].fd;
var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath; var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
var server = net.StreamServer { var server = net.StreamServer{ .sockfd = serverfd, .kernel_backlog = 100, .reuse_address = false, .listen_address = try net.Address.initUnix(path) };
.sockfd = serverfd
, .kernel_backlog = 100
, .reuse_address = false
, .listen_address = try net.Address.initUnix(path)
};
var client = try server.accept(); // net.StreamServer.Connection var client = try server.accept(); // net.StreamServer.Connection
const newfd = client.stream.handle; const newfd = client.stream.handle;
var newcon = Connection.init(Connection.Type.IPC, null); var newcon = Connection.init(Connection.Type.IPC, null);
try self.add_ (newcon, newfd); try self.add_(newcon, newfd);
const sfd = server.sockfd orelse return error.SocketLOL; // TODO const sfd = server.sockfd orelse return error.SocketLOL; // TODO
// WARNING: imply every new item is last // WARNING: imply every new item is last
@ -237,7 +223,7 @@ pub const Context = struct {
// Create a unix socket. // Create a unix socket.
// Store std lib structures in the context. // Store std lib structures in the context.
pub fn server_init(self: *Self, service_name: [] const u8) !net.StreamServer { pub fn server_init(self: *Self, service_name: []const u8) !net.StreamServer {
var buffer: [1000]u8 = undefined; var buffer: [1000]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer); var fbs = std.io.fixedBufferStream(&buffer);
var writer = fbs.writer(); var writer = fbs.writer();
@ -252,15 +238,15 @@ pub const Context = struct {
const newfd = server.sockfd orelse return error.SocketLOL; // TODO const newfd = server.sockfd orelse return error.SocketLOL; // TODO
// Store the path in the Connection structure, so the UNIX socket file can be removed later. // Store the path in the Connection structure, so the UNIX socket file can be removed later.
var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path)); var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
try self.add_ (newcon, newfd); try self.add_(newcon, newfd);
return server; return server;
} }
pub fn write (_: *Self, m: Message) !void { pub fn write(_: *Self, m: Message) !void {
// Message contains the fd, no need to search for // Message contains the fd, no need to search for
// the right structure to copy, let's just recreate // the right structure to copy, let's just recreate
// a Stream from the fd. // a Stream from the fd.
var stream = net.Stream { .handle = m.fd }; var stream = net.Stream{ .handle = m.fd };
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE; var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
var fbs = std.io.fixedBufferStream(&buffer); var fbs = std.io.fixedBufferStream(&buffer);
@ -268,16 +254,16 @@ pub const Context = struct {
_ = try m.write(writer); // returns paylen _ = try m.write(writer); // returns paylen
_ = try stream.write (fbs.getWritten()); _ = try stream.write(fbs.getWritten());
} }
pub fn schedule (self: *Self, m: Message) !void { pub fn schedule(self: *Self, m: Message) !void {
try self.tx.append(m); try self.tx.append(m);
} }
/// Read from a client (indexed by a FD). /// Read from a client (indexed by a FD).
pub fn read_fd (self: *Self, fd: i32) !?Message { pub fn read_fd(self: *Self, fd: i32) !?Message {
return try self.read(try self.fd_to_index (fd)); return try self.read(try self.fd_to_index(fd));
} }
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void { pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
@ -287,16 +273,14 @@ pub const Context = struct {
self.connections.items[index_origin].t = Connection.Type.SWITCHED; self.connections.items[index_origin].t = Connection.Type.SWITCHED;
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED; self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
try self.switchdb.add_switch(fd1,fd2); try self.switchdb.add_switch(fd1, fd2);
} }
pub fn set_switch_callbacks(self: *Self, fd: i32 pub fn set_switch_callbacks(self: *Self, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType try self.switchdb.set_callbacks(fd, in, out);
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
try self.switchdb.set_callbacks(fd,in, out);
} }
pub fn read (self: *Self, index: usize) !?Message { pub fn read(self: *Self, index: usize) !?Message {
if (index >= self.pollfd.items.len) { if (index >= self.pollfd.items.len) {
return error.IndexOutOfBounds; return error.IndexOutOfBounds;
} }
@ -326,7 +310,7 @@ pub const Context = struct {
/// Before closing the fd, test it via the 'fcntl' syscall. /// Before closing the fd, test it via the 'fcntl' syscall.
/// This is useful for switched connections: FDs could be closed without libipc being informed. /// This is useful for switched connections: FDs could be closed without libipc being informed.
fn safe_close_fd (self: *Self, fd: i32) void { fn safe_close_fd(self: *Self, fd: i32) void {
var should_close = true; var should_close = true;
_ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch { _ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch {
should_close = false; should_close = false;
@ -341,8 +325,12 @@ pub const Context = struct {
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null); var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
var wait_duration: i32 = -1; // -1 == unlimited var wait_duration: i32 = -1; // -1 == unlimited
if (self.timer) |t| { log.debug("listening (timer: {} ms)", .{t}); wait_duration = t; } if (self.timer) |t| {
else { log.debug("listening (no timer)", .{}); } log.debug("listening (timer: {} ms)", .{t});
wait_duration = t;
} else {
log.debug("listening (no timer)", .{});
}
// Make sure we listen to the right file descriptors, // Make sure we listen to the right file descriptors,
// setting POLLIN & POLLOUT flags. // setting POLLIN & POLLOUT flags.
@ -376,8 +364,7 @@ pub const Context = struct {
if (count == 0) { if (count == 0) {
if (duration >= wait_duration) { if (duration >= wait_duration) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null); current_event = Event.init(Event.Type.TIMER, 0, 0, null);
} } else {
else {
// In case nothing happened, and poll wasn't triggered by time out. // In case nothing happened, and poll wasn't triggered by time out.
current_event = Event.init(Event.Type.ERROR, 0, 0, null); current_event = Event.init(Event.Type.ERROR, 0, 0, null);
} }
@ -388,7 +375,7 @@ pub const Context = struct {
// => loop over self.pollfd.items // => loop over self.pollfd.items
for (self.pollfd.items) |*fd, i| { for (self.pollfd.items) |*fd, i| {
// .revents is POLLIN // .revents is POLLIN
if(fd.revents & std.os.linux.POLL.IN > 0) { if (fd.revents & std.os.linux.POLL.IN > 0) {
// SERVER = new connection // SERVER = new connection
if (self.connections.items[i].t == .SERVER) { if (self.connections.items[i].t == .SERVER) {
try self.accept_new_client(&current_event, i); try self.accept_new_client(&current_event, i);
@ -396,21 +383,21 @@ pub const Context = struct {
} }
// SWITCHED = send message to the right dest (or drop the switch) // SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) { else if (self.connections.items[i].t == .SWITCHED) {
current_event = self.switchdb.handle_event_read (i, fd.fd); current_event = self.switchdb.handle_event_read(i, fd.fd);
switch (current_event.t) { switch (current_event.t) {
.SWITCH_RX => { .SWITCH_RX => {
try self.schedule(current_event.m.?); try self.schedule(current_event.m.?);
}, },
.DISCONNECTION => { .DISCONNECTION => {
var dest = try self.switchdb.getDest(fd.fd); var dest = try self.switchdb.getDest(fd.fd);
log.debug("disconnection from {} -> removing {}, too", .{fd.fd, dest}); log.debug("disconnection from {} -> removing {}, too", .{ fd.fd, dest });
self.switchdb.nuke(fd.fd); self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd); self.safe_close_fd(fd.fd);
self.safe_close_fd(dest); self.safe_close_fd(dest);
}, },
.ERROR => { .ERROR => {
var dest = try self.switchdb.getDest(fd.fd); var dest = try self.switchdb.getDest(fd.fd);
log.warn("error from {} -> removing {}, too", .{fd.fd, dest}); log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
self.switchdb.nuke(fd.fd); self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd); self.safe_close_fd(fd.fd);
self.safe_close_fd(dest); self.safe_close_fd(dest);
@ -428,13 +415,15 @@ pub const Context = struct {
} }
// otherwise = new message or disconnection // otherwise = new message or disconnection
else { else {
var maybe_message = self.read(i) catch |err| switch(err) { var maybe_message = self.read(i) catch |err| switch (err) {
error.ConnectionResetByPeer => { error.ConnectionResetByPeer => {
log.warn("connection reset by peer", .{}); log.warn("connection reset by peer", .{});
try self.close(i); try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); return Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
}, },
else => { return err; }, else => {
return err;
},
}; };
if (maybe_message) |m| { if (maybe_message) |m| {
@ -447,8 +436,8 @@ pub const Context = struct {
} }
// .revent is POLLOUT // .revent is POLLOUT
if(fd.revents & std.os.linux.POLL.OUT > 0) { if (fd.revents & std.os.linux.POLL.OUT > 0) {
fd.events &= ~ @as(i16, std.os.linux.POLL.OUT); fd.events &= ~@as(i16, std.os.linux.POLL.OUT);
var index: usize = undefined; var index: usize = undefined;
for (self.tx.items) |m, index_| { for (self.tx.items) |m, index_| {
@ -462,14 +451,13 @@ pub const Context = struct {
// SWITCHED = write message for its switch buddy (callbacks) // SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) { if (self.connections.items[i].t == .SWITCHED) {
current_event = self.switchdb.handle_event_write (i, m); current_event = self.switchdb.handle_event_write(i, m);
// Message inner memory is already freed. // Message inner memory is already freed.
switch (current_event.t) { switch (current_event.t) {
.SWITCH_TX => { .SWITCH_TX => {},
},
.ERROR => { .ERROR => {
var dest = try self.switchdb.getDest(fd.fd); var dest = try self.switchdb.getDest(fd.fd);
log.warn("error from {} -> removing {}, too", .{fd.fd, dest}); log.warn("error from {} -> removing {}, too", .{ fd.fd, dest });
self.switchdb.nuke(fd.fd); self.switchdb.nuke(fd.fd);
self.safe_close_fd(fd.fd); self.safe_close_fd(fd.fd);
self.safe_close_fd(dest); self.safe_close_fd(dest);
@ -480,24 +468,24 @@ pub const Context = struct {
}, },
} }
return current_event; return current_event;
} } else {
else {
// otherwise = write message for the msg.fd // otherwise = write message for the msg.fd
try self.write (m); try self.write(m);
m.deinit(); m.deinit();
return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null); return Event.init(Event.Type.MESSAGE_TX, i, fd.fd, null);
} }
} }
// .revent is POLLHUP // .revent is POLLHUP
if(fd.revents & std.os.linux.POLL.HUP > 0) { if (fd.revents & std.os.linux.POLL.HUP > 0) {
// handle disconnection // handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
try self.close(i); try self.close(i);
return current_event; return current_event;
} }
// if fd revent is POLLERR or POLLNVAL // if fd revent is POLLERR or POLLNVAL
if ((fd.revents & std.os.linux.POLL.HUP > 0) or if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.NVAL > 0)) { (fd.revents & std.os.linux.POLL.NVAL > 0))
{
return Event.init(Event.Type.ERROR, i, fd.fd, null); return Event.init(Event.Type.ERROR, i, fd.fd, null);
} }
} }
@ -507,7 +495,7 @@ pub const Context = struct {
/// Remove a connection based on its file descriptor. /// Remove a connection based on its file descriptor.
pub fn close_fd(self: *Self, fd: i32) !void { pub fn close_fd(self: *Self, fd: i32) !void {
try self.close(try self.fd_to_index (fd)); try self.close(try self.fd_to_index(fd));
} }
pub fn close(self: *Self, index: usize) !void { pub fn close(self: *Self, index: usize) !void {
@ -543,13 +531,13 @@ pub const Context = struct {
} }
pub fn close_all(self: *Self) !void { pub fn close_all(self: *Self) !void {
while(self.connections.items.len > 0) { try self.close(0); } while (self.connections.items.len > 0) {
try self.close(0);
}
} }
pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream try fmt.format(out_stream, "context ({} connections and {} messages):", .{ self.connections.items.len, self.tx.items.len });
, "context ({} connections and {} messages):"
, .{self.connections.items.len, self.tx.items.len});
for (self.connections.items) |con| { for (self.connections.items) |con| {
try fmt.format(out_stream, "\n- ", .{}); try fmt.format(out_stream, "\n- ", .{});
@ -568,7 +556,7 @@ pub const Context = struct {
// not an instance of Message. // not an instance of Message.
const CommunicationTestThread = struct { const CommunicationTestThread = struct {
fn clientFn() !void { fn clientFn() !void {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -589,7 +577,7 @@ const CommunicationTestThread = struct {
}; };
test "Context - creation, display and memory check" { test "Context - creation, display and memory check" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
@ -605,7 +593,7 @@ test "Context - creation, display and memory check" {
var path = fbs.getWritten(); var path = fbs.getWritten();
// SERVER SIDE: creating a service. // SERVER SIDE: creating a service.
var server = c.server_init("simple-context-test") catch |err| switch(err) { var server = c.server_init("simple-context-test") catch |err| switch (err) {
error.FileNotFound => { error.FileNotFound => {
log.err("cannot init server at {s}", .{path}); log.err("cannot init server at {s}", .{path});
return err; return err;
@ -631,7 +619,7 @@ test "Context - creation, display and memory check" {
// This is a client sending a an instance of Message. // This is a client sending a an instance of Message.
const ConnectThenSendMessageThread = struct { const ConnectThenSendMessageThread = struct {
fn clientFn() !void { fn clientFn() !void {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -663,9 +651,8 @@ const ConnectThenSendMessageThread = struct {
} }
}; };
test "Context - creation, echo once" { test "Context - creation, echo once" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
@ -681,7 +668,7 @@ test "Context - creation, echo once" {
var path = fbs.getWritten(); var path = fbs.getWritten();
// SERVER SIDE: creating a service. // SERVER SIDE: creating a service.
var server = c.server_init("simple-context-test") catch |err| switch(err) { var server = c.server_init("simple-context-test") catch |err| switch (err) {
error.FileNotFound => { error.FileNotFound => {
log.err("cannot init server at {s}", .{path}); log.err("cannot init server at {s}", .{path});
return err; return err;

View File

@ -34,26 +34,31 @@ pub const Event = struct {
// to it. This is a lookup. // to it. This is a lookup.
pub const Type = enum { pub const Type = enum {
ERROR, // A problem occured. ERROR, // A problem occured.
CONNECTION, // New user. CONNECTION, // New user.
DISCONNECTION, // User disconnected. DISCONNECTION, // User disconnected.
MESSAGE_RX, // New message. MESSAGE_RX, // New message.
MESSAGE_TX, // Message sent. MESSAGE_TX, // Message sent.
TIMER, // Timeout in the poll(2) function. TIMER, // Timeout in the poll(2) function.
EXTERNAL, // Message received from a non IPC socket. EXTERNAL, // Message received from a non IPC socket.
SWITCH_RX, // Message received from a switched FD. SWITCH_RX, // Message received from a switched FD.
SWITCH_TX, // Message sent to a switched fd. SWITCH_TX, // Message sent to a switched fd.
}; };
t: Event.Type, t: Event.Type,
index: usize, index: usize,
origin: i32, // socket fd origin: i32, // socket fd
m: ?Message, // message m: ?Message, // message
const Self = @This(); const Self = @This();
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self { pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
return Self { .t = t, .index = index, .origin = origin, .m = m, }; return Self{
.t = t,
.index = index,
.origin = origin,
.m = m,
};
} }
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void { pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
@ -65,8 +70,8 @@ pub const Event = struct {
pub fn clean(self: *Self) void { pub fn clean(self: *Self) void {
self.t = Event.Type.ERROR; self.t = Event.Type.ERROR;
self.index = @as(usize,0); self.index = @as(usize, 0);
self.origin = @as(i32,0); self.origin = @as(i32, 0);
if (self.m) |message| { if (self.m) |message| {
message.deinit(); message.deinit();
} }
@ -74,15 +79,12 @@ pub const Event = struct {
} }
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m });
, "{}, origin: {}, index {}, message: [{?}]"
, .{ self.t, self.origin, self.index, self.m} );
} }
}; };
test "Event - creation and display" { test "Event - creation and display" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();

View File

@ -5,7 +5,7 @@ const log = std.log.scoped(.libipc_exchangefd);
const builtin = @import("builtin"); const builtin = @import("builtin");
const windows = std.os.windows; const windows = std.os.windows;
const errno = std.os.errno; const errno = std.os.errno;
const system = std.os.system; const system = std.os.system;
const unexpectedErrno = std.os.unexpectedErrno; const unexpectedErrno = std.os.unexpectedErrno;
const SendMsgError = std.os.SendMsgError; const SendMsgError = std.os.SendMsgError;
@ -19,7 +19,7 @@ pub fn Cmsghdr(comptime T: type) type {
const Header = extern struct { const Header = extern struct {
len: usize, len: usize,
level: c_int, level: c_int,
@"type": c_int, type: c_int,
}; };
const data_align = @sizeOf(usize); const data_align = @sizeOf(usize);
@ -32,14 +32,14 @@ pub fn Cmsghdr(comptime T: type) type {
pub fn init(args: struct { pub fn init(args: struct {
level: c_int, level: c_int,
@"type": c_int, type: c_int,
data: T, data: T,
}) Self { }) Self {
var self: Self = undefined; var self: Self = undefined;
self.headerPtr().* = .{ self.headerPtr().* = .{
.len = data_offset + @sizeOf(T), .len = data_offset + @sizeOf(T),
.level = args.level, .level = args.level,
.@"type" = args.@"type", .type = args.type,
}; };
self.dataPtr().* = args.data; self.dataPtr().* = args.data;
return self; return self;
@ -48,13 +48,13 @@ pub fn Cmsghdr(comptime T: type) type {
// TODO: include this version if we submit a PR to add this to std // TODO: include this version if we submit a PR to add this to std
pub fn initNoData(args: struct { pub fn initNoData(args: struct {
level: c_int, level: c_int,
@"type": c_int, type: c_int,
}) Self { }) Self {
var self: Self = undefined; var self: Self = undefined;
self.headerPtr().* = .{ self.headerPtr().* = .{
.len = data_offset + @sizeOf(T), .len = data_offset + @sizeOf(T),
.level = args.level, .level = args.level,
.@"type" = args.@"type", .type = args.type,
}; };
return self; return self;
} }
@ -84,7 +84,7 @@ pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void {
var cmsg = Cmsghdr(os.fd_t).init(.{ var cmsg = Cmsghdr(os.fd_t).init(.{
.level = os.SOL.SOCKET, .level = os.SOL.SOCKET,
.@"type" = SCM_RIGHTS, .type = SCM_RIGHTS,
.data = fd, .data = fd,
}); });
@ -96,14 +96,14 @@ pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void {
.control = &cmsg, .control = &cmsg,
.controllen = @sizeOf(@TypeOf(cmsg)), .controllen = @sizeOf(@TypeOf(cmsg)),
.flags = 0, .flags = 0,
}, 0) catch |err| { }, 0) catch |err| {
log.err("error sendmsg failed with {s}", .{@errorName(err)}); log.err("error sendmsg failed with {s}", .{@errorName(err)});
return; return;
}; };
if (len != msg.len) { if (len != msg.len) {
// We don't have much choice but to exit here. // We don't have much choice but to exit here.
log.err("expected sendmsg to return {} but got {}", .{msg.len, len}); log.err("expected sendmsg to return {} but got {}", .{ msg.len, len });
os.exit(0xff); os.exit(0xff);
} }
} }
@ -187,31 +187,19 @@ pub fn recvmsg(
/// A message can be carried with it, copied into 'buffer'. /// A message can be carried with it, copied into 'buffer'.
/// WARNING: buffer must be at least 1500 bytes. /// WARNING: buffer must be at least 1500 bytes.
pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t { pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t {
var msg_buffer = [_]u8{0} ** 1500; var msg_buffer = [_]u8{0} ** 1500;
var iov = [_]os.iovec{ var iov = [_]os.iovec{
.{ .{ .iov_base = msg_buffer[0..], .iov_len = msg_buffer.len },
.iov_base = msg_buffer[0..]
, .iov_len = msg_buffer.len
},
}; };
var cmsg = Cmsghdr(os.fd_t).init(.{ var cmsg = Cmsghdr(os.fd_t).init(.{
.level = os.SOL.SOCKET, .level = os.SOL.SOCKET,
.@"type" = SCM_RIGHTS, .type = SCM_RIGHTS,
.data = 0, .data = 0,
}); });
var msg: std.os.msghdr = .{ var msg: std.os.msghdr = .{ .name = null, .namelen = 0, .iov = &iov, .iovlen = 1, .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0 };
.name = null
, .namelen = 0
, .iov = &iov
, .iovlen = 1
, .control = &cmsg
, .controllen = @sizeOf(@TypeOf(cmsg))
, .flags = 0
};
var msglen = recvmsg(sockfd, msg, 0) catch |err| { var msglen = recvmsg(sockfd, msg, 0) catch |err| {
log.err("error recvmsg failed with {s}", .{@errorName(err)}); log.err("error recvmsg failed with {s}", .{@errorName(err)});

View File

@ -1,9 +1,9 @@
const std = @import("std"); const std = @import("std");
pub fn hexdump(stream: anytype, header: [] const u8, buffer: [] const u8) std.os.WriteError!void { pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void {
// Print a header. // Print a header.
if (header.len > 0) { if (header.len > 0) {
var hdr: [64] u8 = undefined; var hdr: [64]u8 = undefined;
var offset: usize = (hdr.len / 2) - ((header.len / 2) - 1); var offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
std.mem.set(u8, hdr[0..hdr.len], ' '); std.mem.set(u8, hdr[0..hdr.len], ' ');
@ -14,15 +14,15 @@ pub fn hexdump(stream: anytype, header: [] const u8, buffer: [] const u8) std.os
} }
var hexb: u32 = 0; var hexb: u32 = 0;
var ascii: [16] u8 = undefined; var ascii: [16]u8 = undefined;
// First line, first left side (simple number). // First line, first left side (simple number).
try stream.print("\n {d:0>4}: ", .{ hexb }); try stream.print("\n {d:0>4}: ", .{hexb});
// Loop on all values in the buffer (i from 0 to buffer.len). // Loop on all values in the buffer (i from 0 to buffer.len).
var i: u32 = 0; var i: u32 = 0;
while (i < buffer.len) : (i += 1) { while (i < buffer.len) : (i += 1) {
// Print actual hexadecimal value. // Print actual hexadecimal value.
try stream.print("{X:0>2} ", .{ buffer[i] }); try stream.print("{X:0>2} ", .{buffer[i]});
// What to print (simple ascii text, right side). // What to print (simple ascii text, right side).
if (buffer[i] >= ' ' and buffer[i] <= '~') { if (buffer[i] >= ' ' and buffer[i] <= '~') {
@ -39,9 +39,11 @@ pub fn hexdump(stream: anytype, header: [] const u8, buffer: [] const u8) std.os
// No next input: print the right amount of spaces. // No next input: print the right amount of spaces.
if ((i + 1) == buffer.len) { if ((i + 1) == buffer.len) {
// Each line is 16 bytes to print, each byte takes 3 characters. // Each line is 16 bytes to print, each byte takes 3 characters.
var missing_spaces = 3 * (15 - (i%16)); var missing_spaces = 3 * (15 - (i % 16));
// Missing an extra space if the current index % 16 is less than 7. // Missing an extra space if the current index % 16 is less than 7.
if ((i%16) < 7) { missing_spaces += 1; } if ((i % 16) < 7) {
missing_spaces += 1;
}
while (missing_spaces > 0) : (missing_spaces -= 1) { while (missing_spaces > 0) : (missing_spaces -= 1) {
try stream.writeAll(" "); try stream.writeAll(" ");
} }
@ -51,17 +53,17 @@ pub fn hexdump(stream: anytype, header: [] const u8, buffer: [] const u8) std.os
// Case 1: it's been 16 bytes AND it's the last byte to print. // Case 1: it's been 16 bytes AND it's the last byte to print.
if ((i + 1) % 16 == 0 and (i + 1) == buffer.len) { if ((i + 1) % 16 == 0 and (i + 1) == buffer.len) {
try stream.print("{s}\n", .{ ascii[0..ascii.len] }); try stream.print("{s}\n", .{ascii[0..ascii.len]});
} }
// Case 2: it's been 16 bytes but it's not the end of the buffer. // Case 2: it's been 16 bytes but it's not the end of the buffer.
else if ((i + 1) % 16 == 0 and (i + 1) != buffer.len) { else if ((i + 1) % 16 == 0 and (i + 1) != buffer.len) {
try stream.print("{s}\n", .{ ascii[0..ascii.len] }); try stream.print("{s}\n", .{ascii[0..ascii.len]});
hexb += 16; hexb += 16;
try stream.print(" {d:0>4}: ", .{ hexb }); try stream.print(" {d:0>4}: ", .{hexb});
} }
// Case 3: not the end of the 16 bytes row but it's the end of the buffer. // Case 3: not the end of the 16 bytes row but it's the end of the buffer.
else if ((i + 1) % 16 != 0 and (i + 1) == buffer.len) { else if ((i + 1) % 16 != 0 and (i + 1) == buffer.len) {
try stream.print(" {s}\n", .{ ascii[0..((i+1) % 16)] }); try stream.print(" {s}\n", .{ascii[0..((i + 1) % 16)]});
} }
// Case 4: not the end of the 16 bytes row and not the end of the buffer. // Case 4: not the end of the 16 bytes row and not the end of the buffer.
// Do nothing. // Do nothing.

View File

@ -1,17 +1,17 @@
pub const CBEvent = @import("./callback.zig").CBEvent; pub const CBEvent = @import("./callback.zig").CBEvent;
pub const Connection = @import("./connection.zig").Connection; pub const Connection = @import("./connection.zig").Connection;
pub const Message = @import("./message.zig").Message; pub const Message = @import("./message.zig").Message;
pub const Event = @import("./event.zig").Event; pub const Event = @import("./event.zig").Event;
pub const Switch = @import("./switch.zig").Switch; pub const Switch = @import("./switch.zig").Switch;
pub const Messages = @import("./message.zig").Messages; pub const Messages = @import("./message.zig").Messages;
pub const Switches = @import("./switch.zig").Switches; pub const Switches = @import("./switch.zig").Switches;
pub const Connections = @import("./connection.zig").Connections; pub const Connections = @import("./connection.zig").Connections;
pub const Context = @import("./context.zig").Context; pub const Context = @import("./context.zig").Context;
pub const util = @import("./util.zig"); pub const util = @import("./util.zig");
pub const hexdump = @import("./hexdump.zig"); pub const hexdump = @import("./hexdump.zig");
pub const exchangefd = @import("./exchange-fd.zig"); pub const exchangefd = @import("./exchange-fd.zig");
test { test {
_ = @import("./callback.zig"); _ = @import("./callback.zig");

View File

@ -8,20 +8,15 @@ const print_eq = @import("./util.zig").print_eq;
pub const Messages = std.ArrayList(Message); pub const Messages = std.ArrayList(Message);
pub const Message = struct { pub const Message = struct {
fd: i32, // File descriptor concerned about this message.
fd: i32, // File descriptor concerned about this message.
payload: []const u8, payload: []const u8,
allocator: std.mem.Allocator, // Memory allocator. allocator: std.mem.Allocator, // Memory allocator.
const Self = @This(); const Self = @This();
pub fn init(fd: i32 pub fn init(fd: i32, allocator: std.mem.Allocator, payload: []const u8) !Self {
, allocator: std.mem.Allocator return Message{ .fd = fd, .allocator = allocator, .payload = try allocator.dupe(u8, payload) };
, payload: []const u8) !Self {
return Message { .fd = fd
, .allocator = allocator
, .payload = try allocator.dupe(u8, payload) };
} }
pub fn deinit(self: Self) void { pub fn deinit(self: Self) void {
@ -29,7 +24,6 @@ pub const Message = struct {
} }
pub fn read(fd: i32, buffer: []const u8, allocator: std.mem.Allocator) !Self { pub fn read(fd: i32, buffer: []const u8, allocator: std.mem.Allocator) !Self {
var fbs = std.io.fixedBufferStream(buffer); var fbs = std.io.fixedBufferStream(buffer);
var reader = fbs.reader(); var reader = fbs.reader();
@ -37,7 +31,7 @@ pub const Message = struct {
if (msg_len > buffer.len - 4) { if (msg_len > buffer.len - 4) {
return error.wrongMessageLength; return error.wrongMessageLength;
} }
const msg_payload = buffer[4..4+msg_len]; const msg_payload = buffer[4 .. 4 + msg_len];
return try Message.init(fd, allocator, msg_payload); return try Message.init(fd, allocator, msg_payload);
} }
@ -48,14 +42,13 @@ pub const Message = struct {
} }
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream, "fd: {}, payload: [{s}]", try fmt.format(out_stream, "fd: {}, payload: [{s}]", .{ self.fd, self.payload });
.{self.fd, self.payload} );
} }
}; };
test "Message - creation and display" { test "Message - creation and display" {
// fd payload // fd payload
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -68,7 +61,7 @@ test "Message - creation and display" {
test "Message - read and write" { test "Message - read and write" {
// fd payload // fd payload
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();

View File

@ -5,7 +5,7 @@ const fmt = std.fmt;
const net = std.net; const net = std.net;
const ipc = @import("./main.zig"); const ipc = @import("./main.zig");
const Message = ipc.Message; const Message = ipc.Message;
const CBEventType = ipc.CBEvent.Type; const CBEventType = ipc.CBEvent.Type;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
@ -41,30 +41,28 @@ pub const SwitchDB = struct {
db: std.AutoArrayHashMap(i32, ManagedConnection), db: std.AutoArrayHashMap(i32, ManagedConnection),
pub fn init (allocator: Allocator) Self { pub fn init(allocator: Allocator) Self {
return Self { return Self{
.db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator), .db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator),
}; };
} }
pub fn deinit (self: *Self) void { pub fn deinit(self: *Self) void {
self.db.deinit(); self.db.deinit();
} }
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void { pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
for(self.db.keys()) |k,i| { for (self.db.keys()) |k, i| {
try fmt.format(out_stream, "({},{})", .{k, self.db.values()[i].dest}); try fmt.format(out_stream, "({},{})", .{ k, self.db.values()[i].dest });
} }
} }
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void { pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
try self.db.put(fd1, ManagedConnection {.dest = fd2}); try self.db.put(fd1, ManagedConnection{ .dest = fd2 });
try self.db.put(fd2, ManagedConnection {.dest = fd1}); try self.db.put(fd2, ManagedConnection{ .dest = fd1 });
} }
pub fn set_callbacks(self: *Self, fd: i32 pub fn set_callbacks(self: *Self, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
managedconnection.in = in; managedconnection.in = in;
managedconnection.out = out; managedconnection.out = out;
@ -72,7 +70,7 @@ pub const SwitchDB = struct {
/// Dig the "db" hashmap, perform "in" fn, may provide a message. /// Dig the "db" hashmap, perform "in" fn, may provide a message.
/// Errors from the "in" fn are reported as Zig errors. /// Errors from the "in" fn are reported as Zig errors.
pub fn read (self: *Self, fd: i32) !?Message { pub fn read(self: *Self, fd: i32) !?Message {
// assert there is an entry with this fd as a key. // assert there is an entry with this fd as a key.
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD; var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
@ -82,22 +80,25 @@ pub const SwitchDB = struct {
switch (r) { switch (r) {
// The message should be ignored (protocol specific). // The message should be ignored (protocol specific).
CBEventType.IGNORE => { return null; }, CBEventType.IGNORE => {
CBEventType.NO_ERROR => { return null;
},
CBEventType.NO_ERROR => {
// TODO: read message // TODO: read message
// TODO: better allocator? // TODO: better allocator?
// TODO: better errors? // TODO: better errors?
var message: Message var message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
= Message.read(managedconnection.dest
, buffer[0..message_size]
, std.heap.c_allocator) catch {
return error.generic; return error.generic;
}; };
return message; return message;
}, },
CBEventType.FD_CLOSING => { return error.closeFD; }, CBEventType.FD_CLOSING => {
return error.closeFD;
},
// Generic error, or the message was read but with errors. // Generic error, or the message was read but with errors.
CBEventType.ERROR => { return error.generic; }, CBEventType.ERROR => {
return error.generic;
},
} }
unreachable; unreachable;
@ -105,7 +106,7 @@ pub const SwitchDB = struct {
/// Dig the "db" hashmap and perform "out" fn. /// Dig the "db" hashmap and perform "out" fn.
/// Errors from the "out" fn are reported as Zig errors. /// Errors from the "out" fn are reported as Zig errors.
pub fn write (self: *Self, message: Message) !void { pub fn write(self: *Self, message: Message) !void {
// assert there is an entry with this fd as a key. // assert there is an entry with this fd as a key.
var managedconnection = self.db.get(message.fd) orelse return error.unregisteredFD; var managedconnection = self.db.get(message.fd) orelse return error.unregisteredFD;
@ -125,10 +126,11 @@ pub const SwitchDB = struct {
CBEventType.NO_ERROR => { CBEventType.NO_ERROR => {
return; return;
}, },
CBEventType.FD_CLOSING => { return error.closeFD; }, CBEventType.FD_CLOSING => {
return error.closeFD;
},
// Generic error, or the message was read but with errors. // Generic error, or the message was read but with errors.
CBEventType.IGNORE, CBEventType.IGNORE, CBEventType.ERROR => {
CBEventType.ERROR => {
return error.generic; return error.generic;
}, },
} }
@ -137,14 +139,13 @@ pub const SwitchDB = struct {
} }
/// From a message to read on a socket to an Event. /// From a message to read on a socket to an Event.
pub fn handle_event_read (self: *Self, index: usize, fd: i32) Event { pub fn handle_event_read(self: *Self, index: usize, fd: i32) Event {
var message: ?Message = null; var message: ?Message = null;
message = self.read (fd) catch |err| switch(err) { message = self.read(fd) catch |err| switch (err) {
error.closeFD => { error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null); return Event.init(Event.Type.DISCONNECTION, index, fd, null);
}, },
error.unregisteredFD, error.unregisteredFD, error.generic => {
error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null); return Event.init(Event.Type.ERROR, index, fd, null);
}, },
}; };
@ -152,15 +153,14 @@ pub const SwitchDB = struct {
} }
/// Message is free'd in any case. /// Message is free'd in any case.
pub fn handle_event_write (self: *Self, index: usize, message: Message) Event { pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
defer message.deinit(); defer message.deinit();
var fd = message.fd; var fd = message.fd;
self.write(message) catch |err| switch(err) { self.write(message) catch |err| switch (err) {
error.closeFD => { error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null); return Event.init(Event.Type.DISCONNECTION, index, fd, null);
}, },
error.unregisteredFD, error.unregisteredFD, error.generic => {
error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null); return Event.init(Event.Type.ERROR, index, fd, null);
}, },
}; };
@ -168,12 +168,12 @@ pub const SwitchDB = struct {
} }
/// Simple wrapper around self.db.get. /// Simple wrapper around self.db.get.
pub fn getDest (self: *Self, fd: i32) !i32 { pub fn getDest(self: *Self, fd: i32) !i32 {
return self.db.get(fd).?.dest; return self.db.get(fd).?.dest;
} }
/// Remove both entries (client and service) from the DB. /// Remove both entries (client and service) from the DB.
pub fn nuke (self: *Self, fd: i32) void { pub fn nuke(self: *Self, fd: i32) void {
if (self.db.fetchSwapRemove(fd)) |kv| { if (self.db.fetchSwapRemove(fd)) |kv| {
_ = self.db.swapRemove(kv.value.dest); _ = self.db.swapRemove(kv.value.dest);
} }
@ -181,43 +181,43 @@ pub const SwitchDB = struct {
}; };
const ManagedConnection = struct { const ManagedConnection = struct {
dest : i32, dest: i32,
in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in,
out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out,
}; };
test "creation and display" { test "creation and display" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
var switchdb = SwitchDB.init(allocator); var switchdb = SwitchDB.init(allocator);
defer switchdb.deinit(); defer switchdb.deinit();
try switchdb.db.put(5, ManagedConnection {.dest = 6}); try switchdb.db.put(5, ManagedConnection{ .dest = 6 });
try switchdb.db.put(6, ManagedConnection {.dest = 5}); try switchdb.db.put(6, ManagedConnection{ .dest = 5 });
try print_eq("{ (5,6)(6,5) }", .{switchdb}); try print_eq("{ (5,6)(6,5) }", .{switchdb});
} }
fn successful_in (_: i32, mcontent: [*]u8, mlen: *u32) CBEventType { fn successful_in(_: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable; var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
defer m.deinit(); defer m.deinit();
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]); var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
var writer = fbs.writer(); var writer = fbs.writer();
const bytes_written = m.write (writer) catch unreachable; const bytes_written = m.write(writer) catch unreachable;
mlen.* = @truncate(u32, bytes_written); mlen.* = @truncate(u32, bytes_written);
return CBEventType.NO_ERROR; return CBEventType.NO_ERROR;
} }
fn successful_out (_: i32, _: [*]const u8, _: u32) CBEventType { fn successful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
return CBEventType.NO_ERROR; return CBEventType.NO_ERROR;
} }
test "successful exchanges" { test "successful exchanges" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -225,34 +225,42 @@ test "successful exchanges" {
var switchdb = SwitchDB.init(allocator); var switchdb = SwitchDB.init(allocator);
defer switchdb.deinit(); defer switchdb.deinit();
try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = successful_in, .out = successful_out}); try switchdb.db.put(5, ManagedConnection{ .dest = 6, .in = successful_in, .out = successful_out });
try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = successful_in, .out = successful_out}); try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = successful_in, .out = successful_out });
// should return a new message (hardcoded: fd 8, payload "coucou") // should return a new message (hardcoded: fd 8, payload "coucou")
var event_1: Event = switchdb.handle_event_read (1, 5); var event_1: Event = switchdb.handle_event_read(1, 5);
if (event_1.m) |m| { m.deinit(); } if (event_1.m) |m| {
else { return error.NoMessage; } m.deinit();
} else {
return error.NoMessage;
}
// should return a new message (hardcoded: fd 8, payload "coucou") // should return a new message (hardcoded: fd 8, payload "coucou")
var event_2: Event = switchdb.handle_event_read (1, 6); var event_2: Event = switchdb.handle_event_read(1, 6);
if (event_2.m) |m| { m.deinit(); } if (event_2.m) |m| {
else { return error.NoMessage; } m.deinit();
} else {
return error.NoMessage;
}
var message = try Message.init(6, allocator, "coucou"); var message = try Message.init(6, allocator, "coucou");
var event_3 = switchdb.handle_event_write (5, message); var event_3 = switchdb.handle_event_write(5, message);
if (event_3.m) |_| { return error.ShouldNotCarryMessage; } if (event_3.m) |_| {
return error.ShouldNotCarryMessage;
}
} }
fn unsuccessful_in (_: i32, _: [*]const u8, _: *u32) CBEventType { fn unsuccessful_in(_: i32, _: [*]const u8, _: *u32) CBEventType {
return CBEventType.ERROR; return CBEventType.ERROR;
} }
fn unsuccessful_out (_: i32, _: [*]const u8, _: u32) CBEventType { fn unsuccessful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
return CBEventType.ERROR; return CBEventType.ERROR;
} }
test "unsuccessful exchanges" { test "unsuccessful exchanges" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -260,24 +268,30 @@ test "unsuccessful exchanges" {
var switchdb = SwitchDB.init(allocator); var switchdb = SwitchDB.init(allocator);
defer switchdb.deinit(); defer switchdb.deinit();
try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out}); try switchdb.db.put(5, ManagedConnection{ .dest = 6, .in = unsuccessful_in, .out = unsuccessful_out });
try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out}); try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = unsuccessful_in, .out = unsuccessful_out });
// should return a new message (hardcoded: fd 8, payload "coucou") // should return a new message (hardcoded: fd 8, payload "coucou")
var event_1: Event = switchdb.handle_event_read (1, 5); var event_1: Event = switchdb.handle_event_read(1, 5);
if (event_1.m) |_| { return error.ShouldNotCarryMessage; } if (event_1.m) |_| {
return error.ShouldNotCarryMessage;
}
// should return a new message (hardcoded: fd 8, payload "coucou") // should return a new message (hardcoded: fd 8, payload "coucou")
var event_2: Event = switchdb.handle_event_read (1, 6); var event_2: Event = switchdb.handle_event_read(1, 6);
if (event_2.m) |_| { return error.ShouldNotCarryMessage; } if (event_2.m) |_| {
return error.ShouldNotCarryMessage;
}
var message = try Message.init(6, allocator, "coucou"); var message = try Message.init(6, allocator, "coucou");
var event_3 = switchdb.handle_event_write (5, message); var event_3 = switchdb.handle_event_write(5, message);
if (event_3.m) |_| { return error.ShouldNotCarryMessage; } if (event_3.m) |_| {
return error.ShouldNotCarryMessage;
}
} }
test "nuke 'em" { test "nuke 'em" {
const config = .{.safety = true}; const config = .{ .safety = true };
var gpa = std.heap.GeneralPurposeAllocator(config){}; var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit(); defer _ = gpa.deinit();
const allocator = gpa.allocator(); const allocator = gpa.allocator();
@ -285,15 +299,15 @@ test "nuke 'em" {
var switchdb = SwitchDB.init(allocator); var switchdb = SwitchDB.init(allocator);
defer switchdb.deinit(); defer switchdb.deinit();
try switchdb.db.put(5, ManagedConnection {.dest = 6, .in = unsuccessful_in, .out = unsuccessful_out}); try switchdb.db.put(5, ManagedConnection{ .dest = 6, .in = unsuccessful_in, .out = unsuccessful_out });
try switchdb.db.put(6, ManagedConnection {.dest = 5, .in = unsuccessful_in, .out = unsuccessful_out}); try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = unsuccessful_in, .out = unsuccessful_out });
try testing.expect(switchdb.db.count() == 2); try testing.expect(switchdb.db.count() == 2);
switchdb.nuke(5); switchdb.nuke(5);
try testing.expect(switchdb.db.count() == 0); try testing.expect(switchdb.db.count() == 0);
} }
fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType { fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
// This may be kinda hacky, idk. // This may be kinda hacky, idk.
var stream: net.Stream = .{ .handle = origin }; var stream: net.Stream = .{ .handle = origin };
var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR; var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
@ -309,12 +323,12 @@ fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
return CBEventType.NO_ERROR; return CBEventType.NO_ERROR;
} }
fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType { fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
// Message contains the fd, no need to search for the right structure to copy, // Message contains the fd, no need to search for the right structure to copy,
// let's just recreate a Stream from the fd. // let's just recreate a Stream from the fd.
var to_send = mcontent[0..mlen]; var to_send = mcontent[0..mlen];
var stream = net.Stream { .handle = fd }; var stream = net.Stream{ .handle = fd };
_ = stream.write (to_send) catch return CBEventType.ERROR; _ = stream.write(to_send) catch return CBEventType.ERROR;
return CBEventType.NO_ERROR; return CBEventType.NO_ERROR;
} }

View File

@ -9,13 +9,12 @@ const Message = @import("./message.zig").Message;
/// DO NOT USE IT UNLESS YOU KNOW WHAT TO EXPECT. /// DO NOT USE IT UNLESS YOU KNOW WHAT TO EXPECT.
pub const URI = struct { pub const URI = struct {
protocol: []const u8, protocol: []const u8,
address: []const u8, address: []const u8,
path: []const u8, path: []const u8,
const Self = @This(); const Self = @This();
pub fn read(uri_to_decode: []const u8) Self { pub fn read(uri_to_decode: []const u8) Self {
var protocolit = std.mem.split(u8, uri_to_decode, "://"); var protocolit = std.mem.split(u8, uri_to_decode, "://");
var protocol = protocolit.first(); var protocol = protocolit.first();
@ -24,9 +23,7 @@ pub const URI = struct {
var path = addressit.rest(); var path = addressit.rest();
return Self { .protocol = protocol return Self{ .protocol = protocol, .address = address, .path = path };
, .address = address
, .path = path };
} }
}; };
@ -37,7 +34,7 @@ test "URI simple decoding" {
try testing.expectEqualSlices(u8, uri.path, "some-path"); try testing.expectEqualSlices(u8, uri.path, "some-path");
} }
pub fn print_buffer (header: []const u8, buffer: []const u8) void { pub fn print_buffer(header: []const u8, buffer: []const u8) void {
var hexbuf: [4000]u8 = undefined; var hexbuf: [4000]u8 = undefined;
var hexfbs = std.io.fixedBufferStream(&hexbuf); var hexfbs = std.io.fixedBufferStream(&hexbuf);
var hexwriter = hexfbs.writer(); var hexwriter = hexfbs.writer();
@ -45,8 +42,8 @@ pub fn print_buffer (header: []const u8, buffer: []const u8) void {
log.debug("{s}", .{hexfbs.getWritten()}); log.debug("{s}", .{hexfbs.getWritten()});
} }
pub fn print_message (header: []const u8, m: Message) void { pub fn print_message(header: []const u8, m: Message) void {
print_buffer (header, m.payload); print_buffer(header, m.payload);
} }
pub fn print_eq(expected: anytype, obj: anytype) !void { pub fn print_eq(expected: anytype, obj: anytype) !void {