Compare commits

...

10 commits

14 changed files with 228 additions and 137 deletions

1
.gitignore vendored
View file

@ -1,4 +1,5 @@
zig-cache
zig-out
.zig-cache
docs
*.swp

View file

@ -24,5 +24,6 @@ See the [dedicated repository][examples].
LibIPC reached a stable state and is usable.
Performance is fine for most projects, but can be largely improved.
The `poll` syscall is used instead of more recent and *faster* syscalls (`epoll`, `kqueue`, etc.).
[examples]: https://git.baguette.netlib.re/Baguette/libipc-examples

View file

@ -9,7 +9,6 @@
### makefile
- release
- distribution
### documentation

View file

@ -1,6 +1,6 @@
const std = @import("std");
const VERSION = "0.1.1";
const VERSION = "0.2.0";
// Although this function looks imperative, note that its job is to
// declaratively construct a build graph that will be executed by an external
@ -21,7 +21,7 @@ pub fn build(b: *std.Build) void {
.name = "ipc",
// In this case the main source file is merely a path, however, in more
// complicated build scripts, this could be a generated file.
.root_source_file = .{ .path = "src/bindings.zig" },
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
.target = target,
.optimize = optimize,
});
@ -37,7 +37,7 @@ pub fn build(b: *std.Build) void {
const shared_lib = b.addSharedLibrary(.{
.name = "ipc",
.root_source_file = .{ .path = "src/bindings.zig" },
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
.version = comptime (try std.SemanticVersion.parse(VERSION)),
.target = target,
.optimize = optimize,
@ -47,7 +47,7 @@ pub fn build(b: *std.Build) void {
// Creates a step for unit testing.
const main_tests = b.addTest(.{
.root_source_file = .{ .path = "src/main.zig" },
.root_source_file = .{ .cwd_relative = "src/main.zig" },
.target = target,
.optimize = optimize,
});

2
ipc.pc
View file

@ -3,6 +3,6 @@ libdir=/usr/local/lib
Name: LibIPC
Description: The simplest Inter Process Communication library
Version: 0.1.1
Version: 0.2.0
Libs: -L${libdir} -lipc
Cflags: -I${includedir}

119
libipc.h
View file

@ -3,47 +3,112 @@
#include <stdint.h>
/**
The LibIPC API is designed to be simple.
For most applications, the usage can be summarized as:
- ipc_context_init to allocate the "context" structure;
- ipc_service_init or ipc_connect_service to instanciate the service or starts a connection to it;
- loop over ipc_wait_event to wait for events;
- ipc_schedule to send messages;
- ipc_close_all then ipc_context_deinit to close all connections then free the context.
The ipc_wait_event function is where the magic happens.
This function handles all the complexity of network management, juggling with file descriptors,
storing connection-related data, waiting for an event, handling connections and disconnections,
reading incoming messages and providing UNDERSTANDABLE notifications (see the `event_types` enumeration).
Basic events are:
- Connections and disconnections, they are already handled within LibIPC, but the API notifies the
user so they can perform additional operations;
- Messages (received or sent);
- Timers, to wake up the application on a regular basis (setup with ipc_context_timer).
Furthermore, the API can be used for non-IPC communications, meaning connections not using the LibIPC protocol.
This is particularly useful when dealing with any file descriptor that needs attention.
The ipc_add_external function adds a file descriptor marked as "external" which will be listened on by LibIPC.
In this case, disconnections are still handled, but messages aren't automatically read.
This way, almost any protocol can be used along with LibIPC communications and with the LibIPC API.
Related event: EXTERNAL, notifying users that an "external" file descriptor received a message.
Finally, a "switch" mechanism to automatically transfer messages between a pair of file descriptors (ipc_add_switch).
Messages are automatically exchanged between the two file descriptors by LibIPC;
eventually with user-provided callbacks (ipc_set_switch_callbacks) for non-IPC connections.
Thus, callbacks enable to handle websocket connections, non-IPC clients communicating with JSON messages, etc.
Related events: SWITCH RX and TX, notifying users that a "switched" file descriptor received (or sent) a message.
Switch and IO callbacks enable to easily create "protocol IPC services" (such as TCPd) to
bind IPC services to basically any available protocol through small, dedicated services
handling all the nitty-gritty details of non-IPC protocols.
*/
// Event types: the "t" parameter in "ipc_wait_event".
enum event_types {
ERROR = 0 // A problem occured.
, CONNECTION = 1 // New user.
, DISCONNECTION = 2 // User disconnected.
, MESSAGE_RX = 3 // New message.
, MESSAGE_TX = 4 // Message sent.
, TIMER = 5 // Timeout in the poll(2) function.
, EXTERNAL = 6 // Message received from a non IPC socket.
, SWITCH_RX = 7 // Message received from a switched FD.
, SWITCH_TX = 8 // Message sent to a switched fd.
ERROR = 0 // A problem occured.
, CONNECTION = 1 // New user.
, DISCONNECTION = 2 // User disconnected.
, MESSAGE_RX = 3 // New message.
, MESSAGE_TX = 4 // Message sent.
, TIMER = 5 // Timeout in the poll(2) function.
, EXTERNAL = 6 // Message received from a non-IPC socket.
, SWITCH_RX = 7 // Message received from a switched fd.
, SWITCH_TX = 8 // Message sent to a switched fd.
};
// Return type of callback functions when switching.
enum cb_event_types {
CB_NO_ERROR = 0 // No error. A message was generated.
, CB_ERROR = 1 // Generic error.
, CB_FD_CLOSING = 2 // The fd is closing.
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
CB_NO_ERROR = 0 // No error. A message was generated.
, CB_ERROR = 1 // Generic error.
, CB_FD_CLOSING = 2 // The fd is closing.
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
};
int ipc_context_init (void** ptr);
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
int ipc_context_init (void** ptr); // Allocate memory for a context.
void ipc_context_deinit (void** ctx); // Free the context's memory.
void ipc_context_timer (void* ctx, int timer); // Change the timer (for timer events, in ms).
// Init (or connect to) a service. That's almost the same operation behind the scene.
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
void ipc_context_deinit (void** ctx);
int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen);
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen);
void ipc_context_timer (void* ctx, int timer);
int ipc_close_fd (void* ctx, int fd);
int ipc_close (void* ctx, size_t index);
// Write a message or schedule it.
int ipc_write (void* ctx, int servicefd, const char* mcontent, size_t mlen);
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, size_t mlen);
// Read a message from a client; either selected by an index in the context structure or by its file descriptor.
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
// Wait for an event.
// The "t" parameter is the type of the event (enum event_types).
// The "index" parameter is the index in the context structure of the origin of the event (client or server).
// The "originfd" parameter is the file descriptor on which the event occurs.
// The "newfd" parameter is the file descriptor of the new connected client, in case of a connection event.
// The "buffer" and "buflen" parameters contain respectively a copy of the received message and its length.
int ipc_wait_event (void* ctx, char* t, size_t* index, int* originfd, int* newfd, char* buffer, size_t* buflen);
// Close a client (or server) based on its file descriptor or its index in the context structure.
int ipc_close_fd (void* ctx, int fd);
int ipc_close (void* ctx, size_t index);
// Close all connections (probably right before the processus is terminated).
int ipc_close_all (void* ctx);
// Switch functions (for "protocol" services, such as TCPd).
// Add a (possibly non-IPC) file descriptor to handle.
// Since it's not marked as an IPC connection, messages won't be automatically read;
// which enables to handle any communications for most protocols through the LibIPC API.
int ipc_add_external (void* ctx, int newfd);
// Add a new switch between a pair of file descriptors, enabling automatic exchange of messages
// between this pair of fds. Useful for "protocol services", such as TCPd.
int ipc_add_switch (void* ctx, int fd1, int fd2);
// Set IO callbacks for a file descriptor.
// Returned "char" is a cb_event_types enum.
// One of the callbacks can be "NULL" to keep the default callback, thus changing only input or output operations.
int ipc_set_switch_callbacks (void* ctx, int fd
, char (*in (int orig, const char *payload, uint32_t *mlen))
, char (*out(int dest, char *payload, uint32_t mlen)));
, char (*in (int orig, char *payload, size_t *mlen))
, char (*out(int dest, const char *payload, size_t mlen)));
#endif

View file

@ -15,25 +15,27 @@ PREFIX ?= /usr/local
LIBDIR ?= $(PREFIX)/lib
INCLUDEDIR ?= $(PREFIX)/include
PKGCONFIGDIR ?= /usr/share/pkgconfig
install-pkgconfig:
[ -d $(PKGCONFIGDIR) ] || install -m 0755 -d $(PKGCONFIGDIR)
install -m 0644 ipc.pc $(PKGCONFIGDIR)
install-library:
[ -d $(LIBDIR) ] || install -m 0755 -d $(LIBDIR)
install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
install-header:
[ -d $(INCLUDEDIR) ] || install -m 0755 -d $(INCLUDEDIR)
install -m 0644 libipc.h $(INCLUDEDIR)
install: install-pkgconfig install-library install-header
uninstall-library:
rm $(LIBDIR)/libipc.a \
$(LIBDIR)/libipc.so \
$(LIBDIR)/libipc.so.*
uninstall-header:
rm $(INCLUDEDIR)/libipc.h
uninstall: uninstall-library uninstall-header
$(PKGCONFIGDIR):; install -m 0755 -d $(PKGCONFIGDIR)
$(PKGCONFIGDIR)/ipc.pc: ipc.pc; install -m 0644 ipc.pc $(PKGCONFIGDIR)
install-pkgconfig: $(PKGCONFIGDIR) $(PKGCONFIGDIR)/ipc.pc
$(LIBDIR):; install -m 0755 -d $(LIBDIR)
$(LIBDIR)/libipc.a: zig-out/lib/libipc.a; install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
$(LIBDIR)/libipc.so: zig-out/lib/libipc.so; install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
install-library: $(LIBDIR) $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so
$(INCLUDEDIR):; install -m 0755 -d $(INCLUDEDIR)
$(INCLUDEDIR)/libipc.h: libipc.h; install -m 0644 libipc.h $(INCLUDEDIR)
install-header: $(INCLUDEDIR) $(INCLUDEDIR)/libipc.h
install: install-pkgconfig install-library install-header
@echo "Now that you have installed the library, you should (probably) run ldconfig."
uninstall-library:; rm $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so*
uninstall-header:; rm $(INCLUDEDIR)/libipc.h
uninstall-pkgconfig:; rm $(PKGCONFIGDIR)/ipc.pc
uninstall: uninstall-pkgconfig uninstall-library uninstall-header
mrproper:
rm -r docs zig-cache zig-out 2>/dev/null || true
@ -48,21 +50,21 @@ serve-doc:
darkhttpd docs/ --addr $(DOC_HTTPD_ADDR) --port $(DOC_HTTPD_PORT) --log $(DOC_HTTPD_ACCESS_LOGS)
PACKAGE ?= libipc
VERSION ?= 0.1.0
VERSION ?= 0.2.0
PKG = $(PACKAGE)-$(VERSION)
dist-dir:
[ -d $(PKG) ] || ln -s . $(PKG)
$(PKG).tar.gz: dist-dir
tar zcf $@ \
$(PKG)/src \
$(PKG)/build.zig \
$(PKG)/build.zig* \
$(PKG)/libipc.h \
$(PKG)/ipc.pc \
$(PKG)/makefile* \
$(PKG)/README* \
$(PKG)/TODO*
dist-rm-dir:
rm $(PKG)
dist-gz: $(PACKAGE)-$(VERSION).tar.gz
$(PKG):; ln -s . $(PKG)
dist-dir: $(PKG)
dist-rm-dir:; rm $(PKG)
dist-gz: $(PKG).tar.gz
dist: dist-gz dist-rm-dir
# You can add your specific instructions there.

View file

@ -36,7 +36,7 @@ export fn ipc_context_deinit(ctx: **Context) callconv(.C) void {
}
/// Write a message (no waiting).
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
// TODO: better default length.
var buffer = [_]u8{0} ** 100000;
var fba = std.heap.FixedBufferAllocator.init(&buffer);
@ -48,7 +48,7 @@ export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
/// Schedule a message.
/// Use the same allocator as the context.
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
const message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
ctx.schedule(message) catch return -2;
return 0;
@ -90,7 +90,7 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c
/// Wait for an event.
/// Buffer length will be changed to the size of the received message.
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, newfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
const event = ctx.wait_event() catch |err| switch (err) {
else => {
log.warn("error while waiting for an event: {}\n", .{err});
@ -100,13 +100,17 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
t.* = @intFromEnum(event.t);
index.* = event.index;
originfd.* = event.origin;
newfd.* = event.newfd;
if (event.m) |m| {
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
var writer = fbs.writer();
_ = writer.write(m.payload) catch return -4;
buflen.* = m.payload.len;
m.deinit();
switch (event.t) {
.SWITCH_RX => {},
else => m.deinit(),
}
} else {
buflen.* = 0;
}
@ -148,8 +152,8 @@ export fn ipc_add_switch(ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
}
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) callconv(.C) i32 {
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) callconv(.C) i32 {
ctx.set_switch_callbacks(fd, in, out) catch return -1;
return 0;
}

View file

@ -1,17 +1,11 @@
const std = @import("std");
const testing = std.testing;
// const DEBUG = @import("./hexdump.zig");
const net = std.net;
const os = std.os;
const fmt = std.fmt;
const c = std.c;
const posix = std.posix;
// const print = std.debug.print;
// TODO: to remove once PR https://github.com/ziglang/zig/pull/14639 is accepted.
pub extern "c" fn umask(mode: c.mode_t) c.mode_t;
const log = std.log.scoped(.libipc_context);
const receive_fd = @import("./exchange-fd.zig").receive_fd;
@ -35,7 +29,6 @@ pub const PollFD = std.ArrayList(posix.pollfd);
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
pub const IPC_VERSION = 1;
// Context of the whole networking state.
pub const Context = struct {
@ -66,8 +59,8 @@ pub const Context = struct {
};
// Allow mkdir to create a directory with 0o770 permissions.
const previous_mask = umask(0o007);
defer _ = umask(previous_mask);
const previous_mask = c.umask(0o007);
defer _ = c.umask(previous_mask);
// Create the run directory, where all UNIX sockets will be.
posix.mkdir(rundir, 0o0770) catch |err| switch (err) {
@ -80,7 +73,12 @@ pub const Context = struct {
},
};
return Self{ .rundir = rundir, .connections = Connections.init(allocator), .pollfd = PollFD.init(allocator), .tx = Messages.init(allocator), .switchdb = SwitchDB.init(allocator), .allocator = allocator };
return Self{ .rundir = rundir
, .connections = Connections.init(allocator)
, .pollfd = PollFD.init(allocator)
, .tx = Messages.init(allocator)
, .switchdb = SwitchDB.init(allocator)
, .allocator = allocator };
}
pub fn deinit(self: *Self) void {
@ -218,6 +216,13 @@ pub const Context = struct {
try self.add_(newcon, newfd);
}
// /// Change the type of a file descriptor.
// /// Useful for protocol daemons (ex: TCPd) or proxies listening to local IPC connections for new clients.
// /// In this case, the client can be "switched" but needs to be marked as such.
// /// TODO
// pub fn change_fd_type (self: *Self, fd: i32, new_type: Connection.Type) !void {
// }
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
// net.Server
const serverfd = self.pollfd.items[server_index].fd;
@ -233,7 +238,7 @@ pub const Context = struct {
const sfd = server.stream.handle;
// WARNING: imply every new item is last
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, newfd, null);
}
// Create a unix socket.
@ -257,8 +262,8 @@ pub const Context = struct {
// Allow to create a unix socket with the right permissions.
// Group should include write permissions.
const previous_mask = umask(0o117);
defer _ = umask(previous_mask);
const previous_mask = c.umask(0o117);
defer _ = c.umask(previous_mask);
// Remove the old UNIX socket.
posix.unlink(path) catch |err| switch (err) {
@ -290,12 +295,6 @@ pub const Context = struct {
_ = try m.write(writer); // returns paylen
_ = try stream.write(fbs.getWritten());
// var hexbuf: [2000]u8 = undefined;
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
// var hexwriter = hexfbs.writer();
// try DEBUG.hexdump(hexwriter, "MSG SENT", fbs.getWritten());
// print("{s}\n", .{hexfbs.getWritten()});
}
pub fn schedule(self: *Self, m: Message) !void {
@ -318,8 +317,8 @@ pub const Context = struct {
}
pub fn set_switch_callbacks(self: *Self, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
try self.switchdb.set_callbacks(fd, in, out);
}
@ -365,7 +364,7 @@ pub const Context = struct {
// Wait for an event.
pub fn wait_event(self: *Self) !Event {
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
var wait_duration: i32 = -1; // -1 == unlimited
if (self.timer) |t| {
@ -399,17 +398,17 @@ pub const Context = struct {
if (count < 0) {
log.err("there is a problem: poll < 0", .{});
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
return current_event;
}
const duration = timer.read() / 1000000; // ns -> ms
if (count == 0) {
if (duration >= wait_duration) {
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
current_event = Event.init(Event.Type.TIMER, 0, 0, 0, null);
} else {
// In case nothing happened, and poll wasn't triggered by time out.
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
}
return current_event;
}
@ -455,7 +454,7 @@ pub const Context = struct {
}
// EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) {
return Event.init(Event.Type.EXTERNAL, i, current_fd, null);
return Event.init(Event.Type.EXTERNAL, i, current_fd, 0, null);
}
// otherwise = new message or disconnection
else {
@ -463,26 +462,26 @@ pub const Context = struct {
error.ConnectionResetByPeer => {
log.warn("connection reset by peer", .{});
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
},
error.wrongMessageLength => {
log.warn("wrong message length, terminating the connection", .{});
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
},
else => {
log.warn("unmanaged error while reading a message ({})", .{err});
try self.close(i);
return Event.init(Event.Type.ERROR, i, current_fd, null);
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
},
};
if (maybe_message) |m| {
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m);
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, 0, m);
}
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
}
}
@ -526,21 +525,21 @@ pub const Context = struct {
error.BrokenPipe => {
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
try self.close(i);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
},
else => {
log.warn("unmanaged error while sending a message ({})", .{err});
try self.close(i);
return Event.init(Event.Type.ERROR, i, current_fd, null);
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
},
};
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null);
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, 0, null);
}
}
// .revent is POLLHUP
if (fd.revents & std.os.linux.POLL.HUP > 0) {
// handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
try self.close(i);
return current_event;
}
@ -548,7 +547,7 @@ pub const Context = struct {
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.NVAL > 0))
{
return Event.init(Event.Type.ERROR, i, current_fd, null);
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
}
}

View file

@ -48,23 +48,26 @@ pub const Event = struct {
t: Event.Type,
index: usize,
origin: i32, // socket fd
newfd: i32, // on new connection, tell the new client's socket fd
m: ?Message, // message
const Self = @This();
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
pub fn init(t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) Self {
return Self{
.t = t,
.index = index,
.origin = origin,
.newfd = newfd,
.m = m,
};
}
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) void {
self.t = t;
self.index = index;
self.origin = origin;
self.newfd = newfd;
self.m = m;
}
@ -72,6 +75,7 @@ pub const Event = struct {
self.t = Event.Type.ERROR;
self.index = @as(usize, 0);
self.origin = @as(i32, 0);
self.newfd = @as(i32, 0);
if (self.m) |message| {
message.deinit();
}
@ -79,7 +83,7 @@ pub const Event = struct {
}
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m });
try fmt.format(out_stream, "{}, origin: {}, newfd: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.newfd, self.index, self.m });
}
};
@ -92,7 +96,7 @@ test "Event - creation and display" {
const s = "hello!!";
var m = try Message.init(1, allocator, s); // fd type payload
defer m.deinit();
const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
const e = Event.init(Event.Type.CONNECTION, 5, 8, 4, m); // type index origin message
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
try print_eq("event.Event.Type.CONNECTION, origin: 8, newfd: 4, index 5, message: [fd: 1, payload: [hello!!]]", e);
}

View file

@ -78,8 +78,8 @@ test {
pub fn send_fd(sockfd: posix.socket_t, msg: []const u8, fd: posix.fd_t) void {
var iov = [_]posix.iovec_const{
.{
.iov_base = msg.ptr,
.iov_len = msg.len,
.base = msg.ptr,
.len = msg.len,
},
};
@ -191,7 +191,7 @@ pub fn receive_fd(sockfd: posix.socket_t, buffer: []u8, msg_size: *usize) !posix
var msg_buffer = [_]u8{0} ** 1500;
var iov = [_]posix.iovec{
.{ .iov_base = msg_buffer[0..], .iov_len = msg_buffer.len },
.{ .base = msg_buffer[0..], .len = msg_buffer.len },
};
var cmsg = Cmsghdr(posix.fd_t).init(.{

View file

@ -1,13 +1,13 @@
const std = @import("std");
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void {
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.posix.WriteError!void {
// Print a header.
if (header.len > 0) {
var hdr: [64]u8 = undefined;
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
@memset(hdr[0..hdr.len], ' ');
std.mem.copy(u8, hdr[offset..hdr.len], header);
std.mem.copyForwards(u8, hdr[offset..hdr.len], header);
try stream.writeAll(hdr[0..hdr.len]);
try stream.writeAll("\n");
@ -72,6 +72,17 @@ pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.W
try stream.writeAll("\n");
}
/// Debug function: print some bytes' hexdump on standard output.
pub fn print_hex(title: []const u8, buffer : []const u8) !void
{
const stdout = std.io.getStdOut().writer();
var hexbuf: [100_000]u8 = undefined;
var hexfbs = std.io.fixedBufferStream(&hexbuf);
const hexwriter = hexfbs.writer();
try hexdump(hexwriter, title, buffer);
try stdout.print("{s}\n", .{hexfbs.getWritten()});
}
const print = std.debug.print;
test "36-byte hexdump test" {

View file

@ -5,6 +5,7 @@ const fmt = std.fmt;
const print_eq = @import("./util.zig").print_eq;
const payload_header_length = 4;
pub const Messages = std.ArrayList(Message);
pub const Message = struct {
@ -28,17 +29,17 @@ pub const Message = struct {
var reader = fbs.reader();
const msg_len = try reader.readInt(u32, .big);
if (msg_len > buffer.len - 4) {
if (msg_len > buffer.len - payload_header_length) {
return error.wrongMessageLength;
}
const msg_payload = buffer[4 .. 4 + msg_len];
const msg_payload = buffer[payload_header_length .. payload_header_length + msg_len];
return try Message.init(fd, allocator, msg_payload);
}
pub fn write(self: Self, writer: anytype) !usize {
try writer.writeInt(u32, @as(u32, @truncate(self.payload.len)), .big);
return 4 + try writer.write(self.payload);
return payload_header_length + try writer.write(self.payload);
}
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {

View file

@ -40,10 +40,12 @@ pub const SwitchDB = struct {
const Self = @This();
db: std.AutoArrayHashMap(i32, ManagedConnection),
allocator: std.mem.Allocator,
pub fn init(allocator: Allocator) Self {
return Self{
.db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator),
.allocator = allocator,
};
}
@ -63,11 +65,12 @@ pub const SwitchDB = struct {
}
pub fn set_callbacks(self: *Self, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
managedconnection.in = in;
managedconnection.out = out;
if (in) |f| { managedconnection.in = f; }
if (out) |f| { managedconnection.out = f; }
try self.db.put(fd, managedconnection);
}
/// Dig the "db" hashmap, perform "in" fn, may provide a message.
@ -77,7 +80,7 @@ pub const SwitchDB = struct {
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
var message_size: u32 = @truncate(buffer.len);
var message_size: usize = buffer.len;
const r: CBEventType = @enumFromInt(managedconnection.in(fd, &buffer, &message_size));
switch (r) {
@ -87,9 +90,10 @@ pub const SwitchDB = struct {
},
CBEventType.NO_ERROR => {
// TODO: read message
// TODO: better allocator?
// TODO: better errors?
const message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
const message: Message = Message.read(managedconnection.dest
, buffer[0..message_size]
, self.allocator) catch {
return error.generic;
};
return message;
@ -120,7 +124,7 @@ pub const SwitchDB = struct {
_ = message.write(writer) catch return error.generic;
const written = fbs.getWritten();
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, @truncate(written.len)));
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, written.len));
switch (r) {
// The message should be ignored (protocol specific).
@ -145,26 +149,26 @@ pub const SwitchDB = struct {
var message: ?Message = null;
message = self.read(fd) catch |err| switch (err) {
error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
},
error.unregisteredFD, error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null);
return Event.init(Event.Type.ERROR, index, fd, 0, null);
},
};
return Event.init(Event.Type.SWITCH_RX, index, fd, message);
return Event.init(Event.Type.SWITCH_RX, index, fd, 0, message);
}
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
const fd = message.fd;
self.write(message) catch |err| switch (err) {
error.closeFD => {
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
},
error.unregisteredFD, error.generic => {
return Event.init(Event.Type.ERROR, index, fd, null);
return Event.init(Event.Type.ERROR, index, fd, 0, null);
},
};
return Event.init(Event.Type.SWITCH_TX, index, fd, null);
return Event.init(Event.Type.SWITCH_TX, index, fd, 0, null);
}
/// Simple wrapper around self.db.get.
@ -182,8 +186,8 @@ pub const SwitchDB = struct {
const ManagedConnection = struct {
dest: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 = default_in,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 = default_out,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 = default_in,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 = default_out,
};
test "creation and display" {
@ -201,18 +205,18 @@ test "creation and display" {
try print_eq("{ (5,6)(6,5) }", .{switchdb});
}
fn successful_in(_: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
fn successful_in(_: i32, mcontent: [*]u8, mlen: *usize) CBEventType {
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
defer m.deinit();
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
const writer = fbs.writer();
const bytes_written = m.write(writer) catch unreachable;
mlen.* = @truncate(bytes_written);
mlen.* = bytes_written;
return CBEventType.NO_ERROR;
}
fn successful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
fn successful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
return CBEventType.NO_ERROR;
}
@ -251,11 +255,11 @@ test "successful exchanges" {
}
}
fn unsuccessful_in(_: i32, _: [*]const u8, _: *u32) CBEventType {
fn unsuccessful_in(_: i32, _: [*]const u8, _: *usize) CBEventType {
return CBEventType.ERROR;
}
fn unsuccessful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
fn unsuccessful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
return CBEventType.ERROR;
}
@ -307,7 +311,7 @@ test "nuke 'em" {
try testing.expect(switchdb.db.count() == 0);
}
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
fn default_in(origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 {
// This may be kinda hacky, idk.
var stream: net.Stream = .{ .handle = origin };
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return @intFromEnum(CBEventType.ERROR);
@ -318,12 +322,12 @@ fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
return @intFromEnum(CBEventType.FD_CLOSING);
}
mlen.* = @truncate(packet_size);
mlen.* = packet_size;
return @intFromEnum(CBEventType.NO_ERROR);
}
fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 {
fn default_out(fd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 {
// Message contains the fd, no need to search for the right structure to copy,
// let's just recreate a Stream from the fd.