parent
aa6bc4952e
commit
d441b3bf88
|
@ -59,11 +59,11 @@ pub fn build(b: *std.Build) void {
|
||||||
const test_step = b.step("test", "Run library tests");
|
const test_step = b.step("test", "Run library tests");
|
||||||
test_step.dependOn(&main_tests.step);
|
test_step.dependOn(&main_tests.step);
|
||||||
|
|
||||||
const install_static_lib = b.addInstallArtifact(static_lib);
|
const install_static_lib = b.addInstallArtifact(static_lib, .{});
|
||||||
const static_lib_step = b.step("static", "Compile LibIPC as a static library.");
|
const static_lib_step = b.step("static", "Compile LibIPC as a static library.");
|
||||||
static_lib_step.dependOn(&install_static_lib.step);
|
static_lib_step.dependOn(&install_static_lib.step);
|
||||||
|
|
||||||
const install_shared_lib = b.addInstallArtifact(shared_lib);
|
const install_shared_lib = b.addInstallArtifact(shared_lib, .{});
|
||||||
// b.getInstallStep().dependOn(&install_shared_lib.step);
|
// b.getInstallStep().dependOn(&install_shared_lib.step);
|
||||||
const shared_lib_step = b.step("shared", "Compile LibIPC as a shared library.");
|
const shared_lib_step = b.step("shared", "Compile LibIPC as a shared library.");
|
||||||
shared_lib_step.dependOn(&install_shared_lib.step);
|
shared_lib_step.dependOn(&install_shared_lib.step);
|
||||||
|
|
|
@ -17,14 +17,14 @@ export fn ipc_context_init(ptr: **Context) callconv(.C) i32 {
|
||||||
|
|
||||||
/// Start a libipc service.
|
/// Start a libipc service.
|
||||||
export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
|
export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
|
||||||
var streamserver = ctx.server_init(service_name[0..service_name_len]) catch return -1;
|
const streamserver = ctx.server_init(service_name[0..service_name_len]) catch return -1;
|
||||||
servicefd.* = streamserver.sockfd.?;
|
servicefd.* = streamserver.sockfd.?;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connect to a libipc service, possibly through IPCd.
|
/// Connect to a libipc service, possibly through IPCd.
|
||||||
export fn ipc_connect_service(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
|
export fn ipc_connect_service(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
|
||||||
var fd = ctx.connect_ipc(service_name[0..service_name_len]) catch return -1;
|
const fd = ctx.connect_ipc(service_name[0..service_name_len]) catch return -1;
|
||||||
servicefd.* = fd;
|
servicefd.* = fd;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
|
||||||
var buffer = [_]u8{0} ** 100000;
|
var buffer = [_]u8{0} ** 100000;
|
||||||
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
||||||
|
|
||||||
var message = Message.init(servicefd, fba.allocator(), mcontent[0..mlen]) catch return -1;
|
const message = Message.init(servicefd, fba.allocator(), mcontent[0..mlen]) catch return -1;
|
||||||
ctx.write(message) catch return -1;
|
ctx.write(message) catch return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
|
||||||
/// Schedule a message.
|
/// Schedule a message.
|
||||||
/// Use the same allocator as the context.
|
/// Use the same allocator as the context.
|
||||||
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
||||||
var message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
|
const message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
|
||||||
ctx.schedule(message) catch return -2;
|
ctx.schedule(message) catch return -2;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -91,8 +91,11 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c
|
||||||
/// Wait for an event.
|
/// Wait for an event.
|
||||||
/// Buffer length will be changed to the size of the received message.
|
/// Buffer length will be changed to the size of the received message.
|
||||||
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
||||||
var event = ctx.wait_event() catch |err| switch(err) {
|
const event = ctx.wait_event() catch |err| switch (err) {
|
||||||
else => { log.warn("error while waiting for an event: {}\n", .{err}); return -1; },
|
else => {
|
||||||
|
log.warn("error while waiting for an event: {}\n", .{err});
|
||||||
|
return -1;
|
||||||
|
},
|
||||||
};
|
};
|
||||||
t.* = @intFromEnum(event.t);
|
t.* = @intFromEnum(event.t);
|
||||||
index.* = event.index;
|
index.* = event.index;
|
||||||
|
|
|
@ -33,9 +33,9 @@ pub const Connection = struct {
|
||||||
|
|
||||||
test "Connection - creation and display" {
|
test "Connection - creation and display" {
|
||||||
// origin destination
|
// origin destination
|
||||||
var path = "/some/path";
|
const path = "/some/path";
|
||||||
var c1 = Connection.init(Connection.Type.EXTERNAL, path);
|
const c1 = Connection.init(Connection.Type.EXTERNAL, path);
|
||||||
var c2 = Connection.init(Connection.Type.IPC, null);
|
const c2 = Connection.init(Connection.Type.IPC, null);
|
||||||
try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1);
|
try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1);
|
||||||
try print_eq("connection.Connection.Type.IPC, path null", c2);
|
try print_eq("connection.Connection.Type.IPC, path null", c2);
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ pub const Context = struct {
|
||||||
// Context initialization:
|
// Context initialization:
|
||||||
// - init structures (provide the allocator)
|
// - init structures (provide the allocator)
|
||||||
pub fn init(allocator: std.mem.Allocator) !Self {
|
pub fn init(allocator: std.mem.Allocator) !Self {
|
||||||
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch (err) {
|
const rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch (err) {
|
||||||
error.EnvironmentVariableNotFound => blk: {
|
error.EnvironmentVariableNotFound => blk: {
|
||||||
break :blk try allocator.dupeZ(u8, "/tmp/.libipc-run/");
|
break :blk try allocator.dupeZ(u8, "/tmp/.libipc-run/");
|
||||||
},
|
},
|
||||||
|
@ -65,7 +65,7 @@ pub const Context = struct {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Allow mkdir to create a directory with 0o770 permissions.
|
// Allow mkdir to create a directory with 0o770 permissions.
|
||||||
var previous_mask = umask(0o007);
|
const previous_mask = umask(0o007);
|
||||||
defer _ = umask(previous_mask);
|
defer _ = umask(previous_mask);
|
||||||
|
|
||||||
// Create the run directory, where all UNIX sockets will be.
|
// Create the run directory, where all UNIX sockets will be.
|
||||||
|
@ -100,10 +100,10 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Both simple connection and the switched one share this code.
|
// Both simple connection and the switched one share this code.
|
||||||
fn connect_(self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
|
fn connect_(self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
|
||||||
var stream = try net.connectUnixSocket(path);
|
const stream = try net.connectUnixSocket(path);
|
||||||
const newfd = stream.handle;
|
const newfd = stream.handle;
|
||||||
errdefer std.os.closeSocket(newfd);
|
errdefer std.os.closeSocket(newfd);
|
||||||
var newcon = Connection.init(ctype, null);
|
const newcon = Connection.init(ctype, null);
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
return newfd;
|
return newfd;
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ pub const Context = struct {
|
||||||
const buffer_size = 10000;
|
const buffer_size = 10000;
|
||||||
var buffer: [buffer_size]u8 = undefined;
|
var buffer: [buffer_size]u8 = undefined;
|
||||||
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
||||||
var allocator = fba.allocator();
|
const allocator = fba.allocator();
|
||||||
|
|
||||||
// Get IPC_NETWORK environment variable
|
// Get IPC_NETWORK environment variable
|
||||||
// IPC_NETWORK is shared with the network service to choose the protocol stack,
|
// IPC_NETWORK is shared with the network service to choose the protocol stack,
|
||||||
|
@ -123,7 +123,7 @@ pub const Context = struct {
|
||||||
//
|
//
|
||||||
// Routing directives can be chained using " ;" separator:
|
// Routing directives can be chained using " ;" separator:
|
||||||
// IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong"
|
// IPC_NETWORK="audio https://example.com/audio ;pong tls://pong.example.com/pong"
|
||||||
var network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch (err) {
|
const network_envvar = std.process.getEnvVarOwned(allocator, "IPC_NETWORK") catch |err| switch (err) {
|
||||||
// error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet)
|
// error{ OutOfMemory, EnvironmentVariableNotFound, InvalidUtf8 } (ErrorSet)
|
||||||
error.EnvironmentVariableNotFound => {
|
error.EnvironmentVariableNotFound => {
|
||||||
log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{});
|
log.debug("no IPC_NETWORK envvar: IPCd won't be contacted", .{});
|
||||||
|
@ -140,14 +140,14 @@ pub const Context = struct {
|
||||||
try lookupwriter.print("{s};{s}", .{ service_name, network_envvar });
|
try lookupwriter.print("{s};{s}", .{ service_name, network_envvar });
|
||||||
|
|
||||||
// Try to connect to the IPCd service
|
// Try to connect to the IPCd service
|
||||||
var ipcdfd = try self.connect_service("ipc");
|
const ipcdfd = try self.connect_service("ipc");
|
||||||
defer self.close_fd(ipcdfd) catch {}; // in any case, connection should be closed
|
defer self.close_fd(ipcdfd) catch {}; // in any case, connection should be closed
|
||||||
|
|
||||||
// Send LOOKUP message
|
// Send LOOKUP message
|
||||||
// content: target service name;${IPC_NETWORK}
|
// content: target service name;${IPC_NETWORK}
|
||||||
// example: pong;pong tls://example.com:8998/pong
|
// example: pong;pong tls://example.com:8998/pong
|
||||||
|
|
||||||
var m = try Message.init(ipcdfd, allocator, lookupfbs.getWritten());
|
const m = try Message.init(ipcdfd, allocator, lookupfbs.getWritten());
|
||||||
try self.write(m);
|
try self.write(m);
|
||||||
|
|
||||||
// Read LOOKUP response
|
// Read LOOKUP response
|
||||||
|
@ -155,17 +155,17 @@ pub const Context = struct {
|
||||||
// else: get fd sent by IPCd then close IPCd fd
|
// else: get fd sent by IPCd then close IPCd fd
|
||||||
var reception_buffer: [2000]u8 = undefined;
|
var reception_buffer: [2000]u8 = undefined;
|
||||||
var reception_size: usize = 0;
|
var reception_size: usize = 0;
|
||||||
var newfd = try receive_fd(ipcdfd, &reception_buffer, &reception_size);
|
const newfd = try receive_fd(ipcdfd, &reception_buffer, &reception_size);
|
||||||
if (reception_size == 0) {
|
if (reception_size == 0) {
|
||||||
return error.IPCdFailedNoMessage;
|
return error.IPCdFailedNoMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
var response: []u8 = reception_buffer[0..reception_size];
|
const response: []u8 = reception_buffer[0..reception_size];
|
||||||
|
|
||||||
if (!std.mem.eql(u8, response, "ok")) {
|
if (!std.mem.eql(u8, response, "ok")) {
|
||||||
return error.IPCdFailedNotOk;
|
return error.IPCdFailedNotOk;
|
||||||
}
|
}
|
||||||
var newcon = Connection.init(connection_type, null);
|
const newcon = Connection.init(connection_type, null);
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
return newfd;
|
return newfd;
|
||||||
}
|
}
|
||||||
|
@ -193,7 +193,7 @@ pub const Context = struct {
|
||||||
/// Return the connection FD.
|
/// Return the connection FD.
|
||||||
pub fn connect_service(self: *Self, service_name: []const u8) !i32 {
|
pub fn connect_service(self: *Self, service_name: []const u8) !i32 {
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
||||||
return self.connect_(Connection.Type.IPC, path);
|
return self.connect_(Connection.Type.IPC, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,19 +213,19 @@ pub const Context = struct {
|
||||||
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
|
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
|
||||||
/// clients trying to reach a libipc service.
|
/// clients trying to reach a libipc service.
|
||||||
pub fn add_external(self: *Self, newfd: i32) !void {
|
pub fn add_external(self: *Self, newfd: i32) !void {
|
||||||
var newcon = Connection.init(Connection.Type.EXTERNAL, null);
|
const newcon = Connection.init(Connection.Type.EXTERNAL, null);
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
||||||
// net.StreamServer
|
// net.StreamServer
|
||||||
var serverfd = self.pollfd.items[server_index].fd;
|
const serverfd = self.pollfd.items[server_index].fd;
|
||||||
var path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
|
const path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
|
||||||
var server = net.StreamServer{ .sockfd = serverfd, .kernel_backlog = 100, .reuse_address = false, .reuse_port = false, .listen_address = try net.Address.initUnix(path) };
|
var server = net.StreamServer{ .sockfd = serverfd, .kernel_backlog = 100, .reuse_address = false, .reuse_port = false, .listen_address = try net.Address.initUnix(path) };
|
||||||
var client = try server.accept(); // net.StreamServer.Connection
|
const client = try server.accept(); // net.StreamServer.Connection
|
||||||
|
|
||||||
const newfd = client.stream.handle;
|
const newfd = client.stream.handle;
|
||||||
var newcon = Connection.init(Connection.Type.IPC, null);
|
const newcon = Connection.init(Connection.Type.IPC, null);
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
|
|
||||||
const sfd = server.sockfd orelse return error.SocketLOL; // TODO
|
const sfd = server.sockfd orelse return error.SocketLOL; // TODO
|
||||||
|
@ -238,8 +238,8 @@ pub const Context = struct {
|
||||||
pub fn server_init(self: *Self, service_name: []const u8) !net.StreamServer {
|
pub fn server_init(self: *Self, service_name: []const u8) !net.StreamServer {
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var buffer_lock: [1000]u8 = undefined;
|
var buffer_lock: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
|
||||||
var lock = try std.fmt.bufPrint(&buffer_lock, "{s}.lock", .{path});
|
const lock = try std.fmt.bufPrint(&buffer_lock, "{s}.lock", .{path});
|
||||||
|
|
||||||
// Create a lock file (and lock it) in order to prevent a race condition.
|
// Create a lock file (and lock it) in order to prevent a race condition.
|
||||||
// While the program is running, the lock is enabled.
|
// While the program is running, the lock is enabled.
|
||||||
|
@ -254,7 +254,7 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Allow to create a unix socket with the right permissions.
|
// Allow to create a unix socket with the right permissions.
|
||||||
// Group should include write permissions.
|
// Group should include write permissions.
|
||||||
var previous_mask = umask(0o117);
|
const previous_mask = umask(0o117);
|
||||||
defer _ = umask(previous_mask);
|
defer _ = umask(previous_mask);
|
||||||
|
|
||||||
// Remove the old UNIX socket.
|
// Remove the old UNIX socket.
|
||||||
|
@ -264,12 +264,12 @@ pub const Context = struct {
|
||||||
};
|
};
|
||||||
|
|
||||||
var server = net.StreamServer.init(.{});
|
var server = net.StreamServer.init(.{});
|
||||||
var socket_addr = try net.Address.initUnix(path);
|
const socket_addr = try net.Address.initUnix(path);
|
||||||
try server.listen(socket_addr);
|
try server.listen(socket_addr);
|
||||||
|
|
||||||
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
|
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
|
||||||
// Store the path in the Connection structure, so the UNIX socket file can be removed later.
|
// Store the path in the Connection structure, so the UNIX socket file can be removed later.
|
||||||
var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
|
const newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
|
@ -283,7 +283,7 @@ pub const Context = struct {
|
||||||
|
|
||||||
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
|
var buffer = [_]u8{0} ** IPC_MAX_MESSAGE_SIZE;
|
||||||
var fbs = std.io.fixedBufferStream(&buffer);
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
var writer = fbs.writer();
|
const writer = fbs.writer();
|
||||||
|
|
||||||
_ = try m.write(writer); // returns paylen
|
_ = try m.write(writer); // returns paylen
|
||||||
|
|
||||||
|
@ -306,8 +306,8 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
|
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
|
||||||
var index_origin = try self.fd_to_index(fd1);
|
const index_origin = try self.fd_to_index(fd1);
|
||||||
var index_destinataire = try self.fd_to_index(fd2);
|
const index_destinataire = try self.fd_to_index(fd2);
|
||||||
|
|
||||||
self.connections.items[index_origin].t = Connection.Type.SWITCHED;
|
self.connections.items[index_origin].t = Connection.Type.SWITCHED;
|
||||||
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
|
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
|
||||||
|
@ -335,7 +335,7 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This may be kinda hacky, idk.
|
// This may be kinda hacky, idk.
|
||||||
var fd = self.pollfd.items[index].fd;
|
const fd = self.pollfd.items[index].fd;
|
||||||
var stream: net.Stream = .{ .handle = fd };
|
var stream: net.Stream = .{ .handle = fd };
|
||||||
packet_size = try stream.read(buffer[0..]);
|
packet_size = try stream.read(buffer[0..]);
|
||||||
|
|
||||||
|
@ -399,7 +399,7 @@ pub const Context = struct {
|
||||||
return current_event;
|
return current_event;
|
||||||
}
|
}
|
||||||
|
|
||||||
var duration = timer.read() / 1000000; // ns -> ms
|
const duration = timer.read() / 1000000; // ns -> ms
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
if (duration >= wait_duration) {
|
if (duration >= wait_duration) {
|
||||||
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
||||||
|
@ -413,7 +413,7 @@ pub const Context = struct {
|
||||||
// handle messages
|
// handle messages
|
||||||
// => loop over self.pollfd.items
|
// => loop over self.pollfd.items
|
||||||
for (self.pollfd.items, 0..) |*fd, i| {
|
for (self.pollfd.items, 0..) |*fd, i| {
|
||||||
var current_fd = fd.fd;
|
const current_fd = fd.fd;
|
||||||
// .revents is POLLIN
|
// .revents is POLLIN
|
||||||
if (fd.revents & std.os.linux.POLL.IN > 0) {
|
if (fd.revents & std.os.linux.POLL.IN > 0) {
|
||||||
// SERVER = new connection
|
// SERVER = new connection
|
||||||
|
@ -429,14 +429,14 @@ pub const Context = struct {
|
||||||
try self.schedule(current_event.m.?);
|
try self.schedule(current_event.m.?);
|
||||||
},
|
},
|
||||||
.DISCONNECTION => {
|
.DISCONNECTION => {
|
||||||
var dest = try self.switchdb.getDest(current_fd);
|
const dest = try self.switchdb.getDest(current_fd);
|
||||||
log.debug("disconnection from {} -> removing {}, too", .{ current_fd, dest });
|
log.debug("disconnection from {} -> removing {}, too", .{ current_fd, dest });
|
||||||
self.switchdb.nuke(current_fd);
|
self.switchdb.nuke(current_fd);
|
||||||
self.safe_close_fd(current_fd);
|
self.safe_close_fd(current_fd);
|
||||||
self.safe_close_fd(dest);
|
self.safe_close_fd(dest);
|
||||||
},
|
},
|
||||||
.ERROR => {
|
.ERROR => {
|
||||||
var dest = try self.switchdb.getDest(current_fd);
|
const dest = try self.switchdb.getDest(current_fd);
|
||||||
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
||||||
self.switchdb.nuke(current_fd);
|
self.switchdb.nuke(current_fd);
|
||||||
self.safe_close_fd(current_fd);
|
self.safe_close_fd(current_fd);
|
||||||
|
@ -455,7 +455,7 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
// otherwise = new message or disconnection
|
// otherwise = new message or disconnection
|
||||||
else {
|
else {
|
||||||
var maybe_message = self.read(i) catch |err| switch (err) {
|
const maybe_message = self.read(i) catch |err| switch (err) {
|
||||||
error.ConnectionResetByPeer => {
|
error.ConnectionResetByPeer => {
|
||||||
log.warn("connection reset by peer", .{});
|
log.warn("connection reset by peer", .{});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
|
@ -504,7 +504,7 @@ pub const Context = struct {
|
||||||
switch (current_event.t) {
|
switch (current_event.t) {
|
||||||
.SWITCH_TX => {},
|
.SWITCH_TX => {},
|
||||||
.ERROR => {
|
.ERROR => {
|
||||||
var dest = try self.switchdb.getDest(current_fd);
|
const dest = try self.switchdb.getDest(current_fd);
|
||||||
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
log.warn("error from {} -> removing {}, too", .{ current_fd, dest });
|
||||||
self.switchdb.nuke(current_fd);
|
self.switchdb.nuke(current_fd);
|
||||||
self.safe_close_fd(current_fd);
|
self.safe_close_fd(current_fd);
|
||||||
|
@ -518,7 +518,7 @@ pub const Context = struct {
|
||||||
return current_event;
|
return current_event;
|
||||||
} else {
|
} else {
|
||||||
// otherwise = write message for the msg.fd
|
// otherwise = write message for the msg.fd
|
||||||
self.write(m) catch |err| switch(err) {
|
self.write(m) catch |err| switch (err) {
|
||||||
error.BrokenPipe => {
|
error.BrokenPipe => {
|
||||||
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
|
@ -563,13 +563,13 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// close the connection and remove it from the two structures
|
// close the connection and remove it from the two structures
|
||||||
var con = self.connections.swapRemove(index);
|
const con = self.connections.swapRemove(index);
|
||||||
// Remove service's UNIX socket file.
|
// Remove service's UNIX socket file.
|
||||||
if (con.path) |path| {
|
if (con.path) |path| {
|
||||||
std.fs.cwd().deleteFile(path) catch {};
|
std.fs.cwd().deleteFile(path) catch {};
|
||||||
self.allocator.free(path);
|
self.allocator.free(path);
|
||||||
}
|
}
|
||||||
var pollfd = self.pollfd.swapRemove(index);
|
const pollfd = self.pollfd.swapRemove(index);
|
||||||
log.debug("closed client index {} (fd = {})", .{ index, pollfd.fd });
|
log.debug("closed client index {} (fd = {})", .{ index, pollfd.fd });
|
||||||
std.os.close(pollfd.fd);
|
std.os.close(pollfd.fd);
|
||||||
|
|
||||||
|
@ -624,7 +624,7 @@ const CommunicationTestThread = struct {
|
||||||
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||||
|
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
||||||
|
|
||||||
const socket = try net.connectUnixSocket(path);
|
const socket = try net.connectUnixSocket(path);
|
||||||
defer socket.close();
|
defer socket.close();
|
||||||
|
@ -643,7 +643,7 @@ test "Context - creation, display and memory check" {
|
||||||
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||||
|
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
||||||
|
|
||||||
// SERVER SIDE: creating a service.
|
// SERVER SIDE: creating a service.
|
||||||
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
||||||
|
@ -681,7 +681,7 @@ const ConnectThenSendMessageThread = struct {
|
||||||
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||||
|
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
||||||
|
|
||||||
// Actual UNIX socket connection.
|
// Actual UNIX socket connection.
|
||||||
const socket = try net.connectUnixSocket(path);
|
const socket = try net.connectUnixSocket(path);
|
||||||
|
@ -690,7 +690,7 @@ const ConnectThenSendMessageThread = struct {
|
||||||
// Writing message into a buffer.
|
// Writing message into a buffer.
|
||||||
var message_buffer: [1000]u8 = undefined;
|
var message_buffer: [1000]u8 = undefined;
|
||||||
var message_fbs = std.io.fixedBufferStream(&message_buffer);
|
var message_fbs = std.io.fixedBufferStream(&message_buffer);
|
||||||
var message_writer = message_fbs.writer();
|
const message_writer = message_fbs.writer();
|
||||||
// 'fd' parameter is not taken into account here (no loop)
|
// 'fd' parameter is not taken into account here (no loop)
|
||||||
|
|
||||||
var m = try Message.init(0, allocator, "Hello world!");
|
var m = try Message.init(0, allocator, "Hello world!");
|
||||||
|
@ -712,7 +712,7 @@ test "Context - creation, echo once" {
|
||||||
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||||
|
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ ctx.rundir, "simple-context-test" });
|
||||||
|
|
||||||
// SERVER SIDE: creating a service.
|
// SERVER SIDE: creating a service.
|
||||||
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
var server = ctx.server_init("simple-context-test") catch |err| switch (err) {
|
||||||
|
|
|
@ -89,10 +89,10 @@ test "Event - creation and display" {
|
||||||
defer _ = gpa.deinit();
|
defer _ = gpa.deinit();
|
||||||
const allocator = gpa.allocator();
|
const allocator = gpa.allocator();
|
||||||
|
|
||||||
var s = "hello!!";
|
const s = "hello!!";
|
||||||
var m = try Message.init(1, allocator, s); // fd type payload
|
var m = try Message.init(1, allocator, s); // fd type payload
|
||||||
defer m.deinit();
|
defer m.deinit();
|
||||||
var e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
|
const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
|
||||||
|
|
||||||
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,14 +199,14 @@ pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t
|
||||||
.data = 0,
|
.data = 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
var msg: std.os.msghdr = .{ .name = null, .namelen = 0, .iov = &iov, .iovlen = 1, .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0 };
|
const msg: std.os.msghdr = .{ .name = null, .namelen = 0, .iov = &iov, .iovlen = 1, .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0 };
|
||||||
|
|
||||||
var msglen = recvmsg(sockfd, msg, 0) catch |err| {
|
const msglen = recvmsg(sockfd, msg, 0) catch |err| {
|
||||||
log.err("error recvmsg failed with {s}", .{@errorName(err)});
|
log.err("error recvmsg failed with {s}", .{@errorName(err)});
|
||||||
return 0;
|
return 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
var received_fd = @as(i32, cmsg.dataPtr().*);
|
const received_fd = @as(i32, cmsg.dataPtr().*);
|
||||||
std.mem.copy(u8, buffer, &msg_buffer);
|
std.mem.copy(u8, buffer, &msg_buffer);
|
||||||
msg_size.* = msglen;
|
msg_size.* = msglen;
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.W
|
||||||
// Print a header.
|
// Print a header.
|
||||||
if (header.len > 0) {
|
if (header.len > 0) {
|
||||||
var hdr: [64]u8 = undefined;
|
var hdr: [64]u8 = undefined;
|
||||||
var offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
|
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
|
||||||
|
|
||||||
@memset(hdr[0..hdr.len], ' ');
|
@memset(hdr[0..hdr.len], ' ');
|
||||||
std.mem.copy(u8, hdr[offset..hdr.len], header);
|
std.mem.copy(u8, hdr[offset..hdr.len], header);
|
||||||
|
@ -77,10 +77,10 @@ const print = std.debug.print;
|
||||||
test "36-byte hexdump test" {
|
test "36-byte hexdump test" {
|
||||||
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
||||||
|
|
||||||
var buffer = "hello this is a simple text to print";
|
const buffer = "hello this is a simple text to print";
|
||||||
var hexbuf: [2000]u8 = undefined;
|
var hexbuf: [2000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
try hexdump(hexwriter, "Hello World", buffer);
|
try hexdump(hexwriter, "Hello World", buffer);
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
@ -88,10 +88,10 @@ test "36-byte hexdump test" {
|
||||||
test "32-byte hexdump test" {
|
test "32-byte hexdump test" {
|
||||||
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
||||||
|
|
||||||
var buffer = "THIS IS THE END, MY ONLY... END";
|
const buffer = "THIS IS THE END, MY ONLY... END";
|
||||||
var hexbuf: [2000]u8 = undefined;
|
var hexbuf: [2000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
try hexdump(hexwriter, "Hello World", buffer);
|
try hexdump(hexwriter, "Hello World", buffer);
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
@ -99,10 +99,10 @@ test "32-byte hexdump test" {
|
||||||
test "26-byte hexdump test" {
|
test "26-byte hexdump test" {
|
||||||
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
||||||
|
|
||||||
var buffer = "hello this is another text";
|
const buffer = "hello this is another text";
|
||||||
var hexbuf: [2000]u8 = undefined;
|
var hexbuf: [2000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
try hexdump(hexwriter, "Hello World", buffer);
|
try hexdump(hexwriter, "Hello World", buffer);
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
@ -110,10 +110,10 @@ test "26-byte hexdump test" {
|
||||||
test "1-byte hexdump test" {
|
test "1-byte hexdump test" {
|
||||||
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
||||||
|
|
||||||
var buffer = "h";
|
const buffer = "h";
|
||||||
var hexbuf: [2000]u8 = undefined;
|
var hexbuf: [2000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
try hexdump(hexwriter, "Hello World", buffer);
|
try hexdump(hexwriter, "Hello World", buffer);
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
@ -121,10 +121,10 @@ test "1-byte hexdump test" {
|
||||||
test "0-byte hexdump test" {
|
test "0-byte hexdump test" {
|
||||||
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
print("\nPrint hexdump, NO AUTOMATIC VERIFICATION, READ SOURCE CODE\n", .{});
|
||||||
|
|
||||||
var buffer = "";
|
const buffer = "";
|
||||||
var hexbuf: [2000]u8 = undefined;
|
var hexbuf: [2000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
try hexdump(hexwriter, "Hello World", buffer);
|
try hexdump(hexwriter, "Hello World", buffer);
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ test "Message - read and write" {
|
||||||
const allocator = gpa.allocator();
|
const allocator = gpa.allocator();
|
||||||
|
|
||||||
// First, create a message.
|
// First, create a message.
|
||||||
var s = "hello!!";
|
const s = "hello!!";
|
||||||
var first_message = try Message.init(1, allocator, s);
|
var first_message = try Message.init(1, allocator, s);
|
||||||
defer first_message.deinit();
|
defer first_message.deinit();
|
||||||
|
|
||||||
|
@ -79,13 +79,13 @@ test "Message - read and write" {
|
||||||
// Write it in a buffer, similar to sending it on the network.
|
// Write it in a buffer, similar to sending it on the network.
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var fbs = std.io.fixedBufferStream(&buffer);
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
var writer = fbs.writer();
|
const writer = fbs.writer();
|
||||||
|
|
||||||
var count = try first_message.write(writer);
|
const count = try first_message.write(writer);
|
||||||
|
|
||||||
var second_buffer: [2000]u8 = undefined;
|
var second_buffer: [2000]u8 = undefined;
|
||||||
var fba = std.heap.FixedBufferAllocator.init(&second_buffer);
|
var fba = std.heap.FixedBufferAllocator.init(&second_buffer);
|
||||||
var second_allocator = fba.allocator();
|
const second_allocator = fba.allocator();
|
||||||
|
|
||||||
// Read the buffer, similar to receiving a message on the network.
|
// Read the buffer, similar to receiving a message on the network.
|
||||||
// (8 == random client's fd number)
|
// (8 == random client's fd number)
|
||||||
|
|
|
@ -76,7 +76,7 @@ pub const SwitchDB = struct {
|
||||||
|
|
||||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||||
var message_size: u32 = @truncate(buffer.len);
|
var message_size: u32 = @truncate(buffer.len);
|
||||||
var r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
const r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
||||||
|
|
||||||
switch (r) {
|
switch (r) {
|
||||||
// The message should be ignored (protocol specific).
|
// The message should be ignored (protocol specific).
|
||||||
|
@ -87,7 +87,7 @@ pub const SwitchDB = struct {
|
||||||
// TODO: read message
|
// TODO: read message
|
||||||
// TODO: better allocator?
|
// TODO: better allocator?
|
||||||
// TODO: better errors?
|
// TODO: better errors?
|
||||||
var message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
|
const message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
|
||||||
return error.generic;
|
return error.generic;
|
||||||
};
|
};
|
||||||
return message;
|
return message;
|
||||||
|
@ -112,13 +112,13 @@ pub const SwitchDB = struct {
|
||||||
|
|
||||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||||
var fbs = std.io.fixedBufferStream(&buffer);
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
var writer = fbs.writer();
|
const writer = fbs.writer();
|
||||||
|
|
||||||
// returning basic errors, no details.
|
// returning basic errors, no details.
|
||||||
_ = message.write(writer) catch return error.generic;
|
_ = message.write(writer) catch return error.generic;
|
||||||
var written = fbs.getWritten();
|
const written = fbs.getWritten();
|
||||||
|
|
||||||
var r = managedconnection.out(message.fd, written.ptr, @truncate(written.len));
|
const r = managedconnection.out(message.fd, written.ptr, @truncate(written.len));
|
||||||
|
|
||||||
switch (r) {
|
switch (r) {
|
||||||
// The message should be ignored (protocol specific).
|
// The message should be ignored (protocol specific).
|
||||||
|
@ -153,7 +153,7 @@ pub const SwitchDB = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
|
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
|
||||||
var fd = message.fd;
|
const fd = message.fd;
|
||||||
self.write(message) catch |err| switch (err) {
|
self.write(message) catch |err| switch (err) {
|
||||||
error.closeFD => {
|
error.closeFD => {
|
||||||
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
||||||
|
@ -204,7 +204,7 @@ fn successful_in(_: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||||
defer m.deinit();
|
defer m.deinit();
|
||||||
|
|
||||||
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
|
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
|
||||||
var writer = fbs.writer();
|
const writer = fbs.writer();
|
||||||
const bytes_written = m.write(writer) catch unreachable;
|
const bytes_written = m.write(writer) catch unreachable;
|
||||||
mlen.* = @truncate(bytes_written);
|
mlen.* = @truncate(bytes_written);
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
|
@ -227,7 +227,7 @@ test "successful exchanges" {
|
||||||
try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = successful_in, .out = successful_out });
|
try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = successful_in, .out = successful_out });
|
||||||
|
|
||||||
// should return a new message (hardcoded: fd 8, payload "coucou")
|
// should return a new message (hardcoded: fd 8, payload "coucou")
|
||||||
var event_1: Event = switchdb.handle_event_read(1, 5);
|
const event_1: Event = switchdb.handle_event_read(1, 5);
|
||||||
if (event_1.m) |m| {
|
if (event_1.m) |m| {
|
||||||
m.deinit();
|
m.deinit();
|
||||||
} else {
|
} else {
|
||||||
|
@ -235,15 +235,15 @@ test "successful exchanges" {
|
||||||
}
|
}
|
||||||
|
|
||||||
// should return a new message (hardcoded: fd 8, payload "coucou")
|
// should return a new message (hardcoded: fd 8, payload "coucou")
|
||||||
var event_2: Event = switchdb.handle_event_read(1, 6);
|
const event_2: Event = switchdb.handle_event_read(1, 6);
|
||||||
if (event_2.m) |m| {
|
if (event_2.m) |m| {
|
||||||
m.deinit();
|
m.deinit();
|
||||||
} else {
|
} else {
|
||||||
return error.NoMessage;
|
return error.NoMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
var message = try Message.init(6, allocator, "coucou");
|
const message = try Message.init(6, allocator, "coucou");
|
||||||
var event_3 = switchdb.handle_event_write(5, message);
|
const event_3 = switchdb.handle_event_write(5, message);
|
||||||
if (event_3.m) |_| {
|
if (event_3.m) |_| {
|
||||||
return error.ShouldNotCarryMessage;
|
return error.ShouldNotCarryMessage;
|
||||||
}
|
}
|
||||||
|
@ -270,19 +270,19 @@ test "unsuccessful exchanges" {
|
||||||
try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = unsuccessful_in, .out = unsuccessful_out });
|
try switchdb.db.put(6, ManagedConnection{ .dest = 5, .in = unsuccessful_in, .out = unsuccessful_out });
|
||||||
|
|
||||||
// should return a new message (hardcoded: fd 8, payload "coucou")
|
// should return a new message (hardcoded: fd 8, payload "coucou")
|
||||||
var event_1: Event = switchdb.handle_event_read(1, 5);
|
const event_1: Event = switchdb.handle_event_read(1, 5);
|
||||||
if (event_1.m) |_| {
|
if (event_1.m) |_| {
|
||||||
return error.ShouldNotCarryMessage;
|
return error.ShouldNotCarryMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
// should return a new message (hardcoded: fd 8, payload "coucou")
|
// should return a new message (hardcoded: fd 8, payload "coucou")
|
||||||
var event_2: Event = switchdb.handle_event_read(1, 6);
|
const event_2: Event = switchdb.handle_event_read(1, 6);
|
||||||
if (event_2.m) |_| {
|
if (event_2.m) |_| {
|
||||||
return error.ShouldNotCarryMessage;
|
return error.ShouldNotCarryMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
var message = try Message.init(6, allocator, "coucou");
|
const message = try Message.init(6, allocator, "coucou");
|
||||||
var event_3 = switchdb.handle_event_write(5, message);
|
const event_3 = switchdb.handle_event_write(5, message);
|
||||||
if (event_3.m) |_| {
|
if (event_3.m) |_| {
|
||||||
return error.ShouldNotCarryMessage;
|
return error.ShouldNotCarryMessage;
|
||||||
}
|
}
|
||||||
|
@ -308,7 +308,7 @@ test "nuke 'em" {
|
||||||
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||||
// This may be kinda hacky, idk.
|
// This may be kinda hacky, idk.
|
||||||
var stream: net.Stream = .{ .handle = origin };
|
var stream: net.Stream = .{ .handle = origin };
|
||||||
var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
|
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
|
||||||
|
|
||||||
// Let's handle this as a disconnection.
|
// Let's handle this as a disconnection.
|
||||||
if (packet_size < 4) {
|
if (packet_size < 4) {
|
||||||
|
@ -325,7 +325,7 @@ fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
|
||||||
// Message contains the fd, no need to search for the right structure to copy,
|
// Message contains the fd, no need to search for the right structure to copy,
|
||||||
// let's just recreate a Stream from the fd.
|
// let's just recreate a Stream from the fd.
|
||||||
|
|
||||||
var to_send = mcontent[0..mlen];
|
const to_send = mcontent[0..mlen];
|
||||||
var stream = net.Stream{ .handle = fd };
|
var stream = net.Stream{ .handle = fd };
|
||||||
_ = stream.write(to_send) catch return CBEventType.ERROR;
|
_ = stream.write(to_send) catch return CBEventType.ERROR;
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
|
|
10
src/util.zig
10
src/util.zig
|
@ -16,19 +16,19 @@ pub const URI = struct {
|
||||||
|
|
||||||
pub fn read(uri_to_decode: []const u8) Self {
|
pub fn read(uri_to_decode: []const u8) Self {
|
||||||
var protocolit = std.mem.split(u8, uri_to_decode, "://");
|
var protocolit = std.mem.split(u8, uri_to_decode, "://");
|
||||||
var protocol = protocolit.first();
|
const protocol = protocolit.first();
|
||||||
|
|
||||||
var addressit = std.mem.split(u8, protocolit.next().?, "/");
|
var addressit = std.mem.split(u8, protocolit.next().?, "/");
|
||||||
var address = addressit.first();
|
const address = addressit.first();
|
||||||
|
|
||||||
var path = addressit.rest();
|
const path = addressit.rest();
|
||||||
|
|
||||||
return Self{ .protocol = protocol, .address = address, .path = path };
|
return Self{ .protocol = protocol, .address = address, .path = path };
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
test "URI simple decoding" {
|
test "URI simple decoding" {
|
||||||
var uri = URI.read("tcp://127.0.0.1:8000/some-path");
|
const uri = URI.read("tcp://127.0.0.1:8000/some-path");
|
||||||
try testing.expectEqualSlices(u8, uri.protocol, "tcp");
|
try testing.expectEqualSlices(u8, uri.protocol, "tcp");
|
||||||
try testing.expectEqualSlices(u8, uri.address, "127.0.0.1:8000");
|
try testing.expectEqualSlices(u8, uri.address, "127.0.0.1:8000");
|
||||||
try testing.expectEqualSlices(u8, uri.path, "some-path");
|
try testing.expectEqualSlices(u8, uri.path, "some-path");
|
||||||
|
@ -37,7 +37,7 @@ test "URI simple decoding" {
|
||||||
pub fn print_buffer(header: []const u8, buffer: []const u8) void {
|
pub fn print_buffer(header: []const u8, buffer: []const u8) void {
|
||||||
var hexbuf: [4000]u8 = undefined;
|
var hexbuf: [4000]u8 = undefined;
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
var hexwriter = hexfbs.writer();
|
const hexwriter = hexfbs.writer();
|
||||||
hexdump.hexdump(hexwriter, header, buffer) catch unreachable;
|
hexdump.hexdump(hexwriter, header, buffer) catch unreachable;
|
||||||
log.debug("{s}", .{hexfbs.getWritten()});
|
log.debug("{s}", .{hexfbs.getWritten()});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue