Compare commits
10 commits
b0a29f5800
...
b2e811df19
Author | SHA1 | Date | |
---|---|---|---|
b2e811df19 | |||
ed86a638b5 | |||
ad5fb32cfb | |||
f59eb58e0b | |||
45614deacb | |||
8ffd0faba1 | |||
c0d6404186 | |||
18a77fa2ef | |||
8e889ef242 | |||
31e05ef1c2 |
14 changed files with 228 additions and 137 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,4 +1,5 @@
|
|||
zig-cache
|
||||
zig-out
|
||||
.zig-cache
|
||||
docs
|
||||
*.swp
|
||||
|
|
|
@ -24,5 +24,6 @@ See the [dedicated repository][examples].
|
|||
|
||||
LibIPC reached a stable state and is usable.
|
||||
Performance is fine for most projects, but can be largely improved.
|
||||
The `poll` syscall is used instead of more recent and *faster* syscalls (`epoll`, `kqueue`, etc.).
|
||||
|
||||
[examples]: https://git.baguette.netlib.re/Baguette/libipc-examples
|
||||
|
|
1
TODO.md
1
TODO.md
|
@ -9,7 +9,6 @@
|
|||
### makefile
|
||||
|
||||
- release
|
||||
- distribution
|
||||
|
||||
### documentation
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
const std = @import("std");
|
||||
|
||||
const VERSION = "0.1.1";
|
||||
const VERSION = "0.2.0";
|
||||
|
||||
// Although this function looks imperative, note that its job is to
|
||||
// declaratively construct a build graph that will be executed by an external
|
||||
|
@ -21,7 +21,7 @@ pub fn build(b: *std.Build) void {
|
|||
.name = "ipc",
|
||||
// In this case the main source file is merely a path, however, in more
|
||||
// complicated build scripts, this could be a generated file.
|
||||
.root_source_file = .{ .path = "src/bindings.zig" },
|
||||
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
|
||||
.target = target,
|
||||
.optimize = optimize,
|
||||
});
|
||||
|
@ -37,7 +37,7 @@ pub fn build(b: *std.Build) void {
|
|||
|
||||
const shared_lib = b.addSharedLibrary(.{
|
||||
.name = "ipc",
|
||||
.root_source_file = .{ .path = "src/bindings.zig" },
|
||||
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
|
||||
.version = comptime (try std.SemanticVersion.parse(VERSION)),
|
||||
.target = target,
|
||||
.optimize = optimize,
|
||||
|
@ -47,7 +47,7 @@ pub fn build(b: *std.Build) void {
|
|||
|
||||
// Creates a step for unit testing.
|
||||
const main_tests = b.addTest(.{
|
||||
.root_source_file = .{ .path = "src/main.zig" },
|
||||
.root_source_file = .{ .cwd_relative = "src/main.zig" },
|
||||
.target = target,
|
||||
.optimize = optimize,
|
||||
});
|
||||
|
|
2
ipc.pc
2
ipc.pc
|
@ -3,6 +3,6 @@ libdir=/usr/local/lib
|
|||
|
||||
Name: LibIPC
|
||||
Description: The simplest Inter Process Communication library
|
||||
Version: 0.1.1
|
||||
Version: 0.2.0
|
||||
Libs: -L${libdir} -lipc
|
||||
Cflags: -I${includedir}
|
||||
|
|
119
libipc.h
119
libipc.h
|
@ -3,47 +3,112 @@
|
|||
|
||||
#include <stdint.h>
|
||||
|
||||
/**
|
||||
The LibIPC API is designed to be simple.
|
||||
|
||||
For most applications, the usage can be summarized as:
|
||||
|
||||
- ipc_context_init to allocate the "context" structure;
|
||||
- ipc_service_init or ipc_connect_service to instanciate the service or starts a connection to it;
|
||||
- loop over ipc_wait_event to wait for events;
|
||||
- ipc_schedule to send messages;
|
||||
- ipc_close_all then ipc_context_deinit to close all connections then free the context.
|
||||
|
||||
The ipc_wait_event function is where the magic happens.
|
||||
This function handles all the complexity of network management, juggling with file descriptors,
|
||||
storing connection-related data, waiting for an event, handling connections and disconnections,
|
||||
reading incoming messages and providing UNDERSTANDABLE notifications (see the `event_types` enumeration).
|
||||
|
||||
Basic events are:
|
||||
|
||||
- Connections and disconnections, they are already handled within LibIPC, but the API notifies the
|
||||
user so they can perform additional operations;
|
||||
- Messages (received or sent);
|
||||
- Timers, to wake up the application on a regular basis (setup with ipc_context_timer).
|
||||
|
||||
Furthermore, the API can be used for non-IPC communications, meaning connections not using the LibIPC protocol.
|
||||
This is particularly useful when dealing with any file descriptor that needs attention.
|
||||
The ipc_add_external function adds a file descriptor marked as "external" which will be listened on by LibIPC.
|
||||
In this case, disconnections are still handled, but messages aren't automatically read.
|
||||
This way, almost any protocol can be used along with LibIPC communications and with the LibIPC API.
|
||||
Related event: EXTERNAL, notifying users that an "external" file descriptor received a message.
|
||||
|
||||
Finally, a "switch" mechanism to automatically transfer messages between a pair of file descriptors (ipc_add_switch).
|
||||
Messages are automatically exchanged between the two file descriptors by LibIPC;
|
||||
eventually with user-provided callbacks (ipc_set_switch_callbacks) for non-IPC connections.
|
||||
Thus, callbacks enable to handle websocket connections, non-IPC clients communicating with JSON messages, etc.
|
||||
Related events: SWITCH RX and TX, notifying users that a "switched" file descriptor received (or sent) a message.
|
||||
|
||||
Switch and IO callbacks enable to easily create "protocol IPC services" (such as TCPd) to
|
||||
bind IPC services to basically any available protocol through small, dedicated services
|
||||
handling all the nitty-gritty details of non-IPC protocols.
|
||||
*/
|
||||
|
||||
// Event types: the "t" parameter in "ipc_wait_event".
|
||||
enum event_types {
|
||||
ERROR = 0 // A problem occured.
|
||||
, CONNECTION = 1 // New user.
|
||||
, DISCONNECTION = 2 // User disconnected.
|
||||
, MESSAGE_RX = 3 // New message.
|
||||
, MESSAGE_TX = 4 // Message sent.
|
||||
, TIMER = 5 // Timeout in the poll(2) function.
|
||||
, EXTERNAL = 6 // Message received from a non IPC socket.
|
||||
, SWITCH_RX = 7 // Message received from a switched FD.
|
||||
, SWITCH_TX = 8 // Message sent to a switched fd.
|
||||
ERROR = 0 // A problem occured.
|
||||
, CONNECTION = 1 // New user.
|
||||
, DISCONNECTION = 2 // User disconnected.
|
||||
, MESSAGE_RX = 3 // New message.
|
||||
, MESSAGE_TX = 4 // Message sent.
|
||||
, TIMER = 5 // Timeout in the poll(2) function.
|
||||
, EXTERNAL = 6 // Message received from a non-IPC socket.
|
||||
, SWITCH_RX = 7 // Message received from a switched fd.
|
||||
, SWITCH_TX = 8 // Message sent to a switched fd.
|
||||
};
|
||||
|
||||
// Return type of callback functions when switching.
|
||||
enum cb_event_types {
|
||||
CB_NO_ERROR = 0 // No error. A message was generated.
|
||||
, CB_ERROR = 1 // Generic error.
|
||||
, CB_FD_CLOSING = 2 // The fd is closing.
|
||||
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
|
||||
CB_NO_ERROR = 0 // No error. A message was generated.
|
||||
, CB_ERROR = 1 // Generic error.
|
||||
, CB_FD_CLOSING = 2 // The fd is closing.
|
||||
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
|
||||
};
|
||||
|
||||
int ipc_context_init (void** ptr);
|
||||
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
||||
int ipc_context_init (void** ptr); // Allocate memory for a context.
|
||||
void ipc_context_deinit (void** ctx); // Free the context's memory.
|
||||
void ipc_context_timer (void* ctx, int timer); // Change the timer (for timer events, in ms).
|
||||
|
||||
// Init (or connect to) a service. That's almost the same operation behind the scene.
|
||||
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
||||
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
||||
void ipc_context_deinit (void** ctx);
|
||||
int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
|
||||
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen);
|
||||
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
|
||||
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
|
||||
int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen);
|
||||
void ipc_context_timer (void* ctx, int timer);
|
||||
int ipc_close_fd (void* ctx, int fd);
|
||||
int ipc_close (void* ctx, size_t index);
|
||||
|
||||
// Write a message or schedule it.
|
||||
int ipc_write (void* ctx, int servicefd, const char* mcontent, size_t mlen);
|
||||
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, size_t mlen);
|
||||
|
||||
// Read a message from a client; either selected by an index in the context structure or by its file descriptor.
|
||||
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
|
||||
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
|
||||
|
||||
// Wait for an event.
|
||||
// The "t" parameter is the type of the event (enum event_types).
|
||||
// The "index" parameter is the index in the context structure of the origin of the event (client or server).
|
||||
// The "originfd" parameter is the file descriptor on which the event occurs.
|
||||
// The "newfd" parameter is the file descriptor of the new connected client, in case of a connection event.
|
||||
// The "buffer" and "buflen" parameters contain respectively a copy of the received message and its length.
|
||||
int ipc_wait_event (void* ctx, char* t, size_t* index, int* originfd, int* newfd, char* buffer, size_t* buflen);
|
||||
|
||||
// Close a client (or server) based on its file descriptor or its index in the context structure.
|
||||
int ipc_close_fd (void* ctx, int fd);
|
||||
int ipc_close (void* ctx, size_t index);
|
||||
// Close all connections (probably right before the processus is terminated).
|
||||
int ipc_close_all (void* ctx);
|
||||
|
||||
// Switch functions (for "protocol" services, such as TCPd).
|
||||
// Add a (possibly non-IPC) file descriptor to handle.
|
||||
// Since it's not marked as an IPC connection, messages won't be automatically read;
|
||||
// which enables to handle any communications for most protocols through the LibIPC API.
|
||||
int ipc_add_external (void* ctx, int newfd);
|
||||
|
||||
// Add a new switch between a pair of file descriptors, enabling automatic exchange of messages
|
||||
// between this pair of fds. Useful for "protocol services", such as TCPd.
|
||||
int ipc_add_switch (void* ctx, int fd1, int fd2);
|
||||
|
||||
// Set IO callbacks for a file descriptor.
|
||||
// Returned "char" is a cb_event_types enum.
|
||||
// One of the callbacks can be "NULL" to keep the default callback, thus changing only input or output operations.
|
||||
int ipc_set_switch_callbacks (void* ctx, int fd
|
||||
, char (*in (int orig, const char *payload, uint32_t *mlen))
|
||||
, char (*out(int dest, char *payload, uint32_t mlen)));
|
||||
, char (*in (int orig, char *payload, size_t *mlen))
|
||||
, char (*out(int dest, const char *payload, size_t mlen)));
|
||||
|
||||
#endif
|
||||
|
|
52
makefile
52
makefile
|
@ -15,25 +15,27 @@ PREFIX ?= /usr/local
|
|||
LIBDIR ?= $(PREFIX)/lib
|
||||
INCLUDEDIR ?= $(PREFIX)/include
|
||||
PKGCONFIGDIR ?= /usr/share/pkgconfig
|
||||
install-pkgconfig:
|
||||
[ -d $(PKGCONFIGDIR) ] || install -m 0755 -d $(PKGCONFIGDIR)
|
||||
install -m 0644 ipc.pc $(PKGCONFIGDIR)
|
||||
install-library:
|
||||
[ -d $(LIBDIR) ] || install -m 0755 -d $(LIBDIR)
|
||||
install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
|
||||
install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
|
||||
install-header:
|
||||
[ -d $(INCLUDEDIR) ] || install -m 0755 -d $(INCLUDEDIR)
|
||||
install -m 0644 libipc.h $(INCLUDEDIR)
|
||||
install: install-pkgconfig install-library install-header
|
||||
|
||||
uninstall-library:
|
||||
rm $(LIBDIR)/libipc.a \
|
||||
$(LIBDIR)/libipc.so \
|
||||
$(LIBDIR)/libipc.so.*
|
||||
uninstall-header:
|
||||
rm $(INCLUDEDIR)/libipc.h
|
||||
uninstall: uninstall-library uninstall-header
|
||||
$(PKGCONFIGDIR):; install -m 0755 -d $(PKGCONFIGDIR)
|
||||
$(PKGCONFIGDIR)/ipc.pc: ipc.pc; install -m 0644 ipc.pc $(PKGCONFIGDIR)
|
||||
install-pkgconfig: $(PKGCONFIGDIR) $(PKGCONFIGDIR)/ipc.pc
|
||||
|
||||
$(LIBDIR):; install -m 0755 -d $(LIBDIR)
|
||||
$(LIBDIR)/libipc.a: zig-out/lib/libipc.a; install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
|
||||
$(LIBDIR)/libipc.so: zig-out/lib/libipc.so; install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
|
||||
install-library: $(LIBDIR) $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so
|
||||
|
||||
$(INCLUDEDIR):; install -m 0755 -d $(INCLUDEDIR)
|
||||
$(INCLUDEDIR)/libipc.h: libipc.h; install -m 0644 libipc.h $(INCLUDEDIR)
|
||||
install-header: $(INCLUDEDIR) $(INCLUDEDIR)/libipc.h
|
||||
|
||||
install: install-pkgconfig install-library install-header
|
||||
@echo "Now that you have installed the library, you should (probably) run ldconfig."
|
||||
|
||||
uninstall-library:; rm $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so*
|
||||
uninstall-header:; rm $(INCLUDEDIR)/libipc.h
|
||||
uninstall-pkgconfig:; rm $(PKGCONFIGDIR)/ipc.pc
|
||||
uninstall: uninstall-pkgconfig uninstall-library uninstall-header
|
||||
|
||||
mrproper:
|
||||
rm -r docs zig-cache zig-out 2>/dev/null || true
|
||||
|
@ -48,21 +50,21 @@ serve-doc:
|
|||
darkhttpd docs/ --addr $(DOC_HTTPD_ADDR) --port $(DOC_HTTPD_PORT) --log $(DOC_HTTPD_ACCESS_LOGS)
|
||||
|
||||
PACKAGE ?= libipc
|
||||
VERSION ?= 0.1.0
|
||||
VERSION ?= 0.2.0
|
||||
PKG = $(PACKAGE)-$(VERSION)
|
||||
dist-dir:
|
||||
[ -d $(PKG) ] || ln -s . $(PKG)
|
||||
$(PKG).tar.gz: dist-dir
|
||||
tar zcf $@ \
|
||||
$(PKG)/src \
|
||||
$(PKG)/build.zig \
|
||||
$(PKG)/build.zig* \
|
||||
$(PKG)/libipc.h \
|
||||
$(PKG)/ipc.pc \
|
||||
$(PKG)/makefile* \
|
||||
$(PKG)/README* \
|
||||
$(PKG)/TODO*
|
||||
dist-rm-dir:
|
||||
rm $(PKG)
|
||||
dist-gz: $(PACKAGE)-$(VERSION).tar.gz
|
||||
$(PKG):; ln -s . $(PKG)
|
||||
dist-dir: $(PKG)
|
||||
dist-rm-dir:; rm $(PKG)
|
||||
dist-gz: $(PKG).tar.gz
|
||||
dist: dist-gz dist-rm-dir
|
||||
|
||||
# You can add your specific instructions there.
|
||||
|
|
|
@ -36,7 +36,7 @@ export fn ipc_context_deinit(ctx: **Context) callconv(.C) void {
|
|||
}
|
||||
|
||||
/// Write a message (no waiting).
|
||||
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
||||
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
|
||||
// TODO: better default length.
|
||||
var buffer = [_]u8{0} ** 100000;
|
||||
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
||||
|
@ -48,7 +48,7 @@ export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
|
|||
|
||||
/// Schedule a message.
|
||||
/// Use the same allocator as the context.
|
||||
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
||||
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
|
||||
const message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
|
||||
ctx.schedule(message) catch return -2;
|
||||
return 0;
|
||||
|
@ -90,7 +90,7 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c
|
|||
|
||||
/// Wait for an event.
|
||||
/// Buffer length will be changed to the size of the received message.
|
||||
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
||||
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, newfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
||||
const event = ctx.wait_event() catch |err| switch (err) {
|
||||
else => {
|
||||
log.warn("error while waiting for an event: {}\n", .{err});
|
||||
|
@ -100,13 +100,17 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
|
|||
t.* = @intFromEnum(event.t);
|
||||
index.* = event.index;
|
||||
originfd.* = event.origin;
|
||||
newfd.* = event.newfd;
|
||||
|
||||
if (event.m) |m| {
|
||||
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
|
||||
var writer = fbs.writer();
|
||||
_ = writer.write(m.payload) catch return -4;
|
||||
buflen.* = m.payload.len;
|
||||
m.deinit();
|
||||
switch (event.t) {
|
||||
.SWITCH_RX => {},
|
||||
else => m.deinit(),
|
||||
}
|
||||
} else {
|
||||
buflen.* = 0;
|
||||
}
|
||||
|
@ -148,8 +152,8 @@ export fn ipc_add_switch(ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
|
|||
}
|
||||
|
||||
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32,
|
||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) callconv(.C) i32 {
|
||||
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) callconv(.C) i32 {
|
||||
ctx.set_switch_callbacks(fd, in, out) catch return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1,17 +1,11 @@
|
|||
const std = @import("std");
|
||||
const testing = std.testing;
|
||||
// const DEBUG = @import("./hexdump.zig");
|
||||
const net = std.net;
|
||||
const os = std.os;
|
||||
const fmt = std.fmt;
|
||||
const c = std.c;
|
||||
const posix = std.posix;
|
||||
|
||||
// const print = std.debug.print;
|
||||
|
||||
// TODO: to remove once PR https://github.com/ziglang/zig/pull/14639 is accepted.
|
||||
pub extern "c" fn umask(mode: c.mode_t) c.mode_t;
|
||||
|
||||
const log = std.log.scoped(.libipc_context);
|
||||
|
||||
const receive_fd = @import("./exchange-fd.zig").receive_fd;
|
||||
|
@ -35,7 +29,6 @@ pub const PollFD = std.ArrayList(posix.pollfd);
|
|||
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
||||
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
|
||||
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
|
||||
pub const IPC_VERSION = 1;
|
||||
|
||||
// Context of the whole networking state.
|
||||
pub const Context = struct {
|
||||
|
@ -66,8 +59,8 @@ pub const Context = struct {
|
|||
};
|
||||
|
||||
// Allow mkdir to create a directory with 0o770 permissions.
|
||||
const previous_mask = umask(0o007);
|
||||
defer _ = umask(previous_mask);
|
||||
const previous_mask = c.umask(0o007);
|
||||
defer _ = c.umask(previous_mask);
|
||||
|
||||
// Create the run directory, where all UNIX sockets will be.
|
||||
posix.mkdir(rundir, 0o0770) catch |err| switch (err) {
|
||||
|
@ -80,7 +73,12 @@ pub const Context = struct {
|
|||
},
|
||||
};
|
||||
|
||||
return Self{ .rundir = rundir, .connections = Connections.init(allocator), .pollfd = PollFD.init(allocator), .tx = Messages.init(allocator), .switchdb = SwitchDB.init(allocator), .allocator = allocator };
|
||||
return Self{ .rundir = rundir
|
||||
, .connections = Connections.init(allocator)
|
||||
, .pollfd = PollFD.init(allocator)
|
||||
, .tx = Messages.init(allocator)
|
||||
, .switchdb = SwitchDB.init(allocator)
|
||||
, .allocator = allocator };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
|
@ -218,6 +216,13 @@ pub const Context = struct {
|
|||
try self.add_(newcon, newfd);
|
||||
}
|
||||
|
||||
// /// Change the type of a file descriptor.
|
||||
// /// Useful for protocol daemons (ex: TCPd) or proxies listening to local IPC connections for new clients.
|
||||
// /// In this case, the client can be "switched" but needs to be marked as such.
|
||||
// /// TODO
|
||||
// pub fn change_fd_type (self: *Self, fd: i32, new_type: Connection.Type) !void {
|
||||
// }
|
||||
|
||||
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
||||
// net.Server
|
||||
const serverfd = self.pollfd.items[server_index].fd;
|
||||
|
@ -233,7 +238,7 @@ pub const Context = struct {
|
|||
|
||||
const sfd = server.stream.handle;
|
||||
// WARNING: imply every new item is last
|
||||
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
|
||||
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, newfd, null);
|
||||
}
|
||||
|
||||
// Create a unix socket.
|
||||
|
@ -257,8 +262,8 @@ pub const Context = struct {
|
|||
|
||||
// Allow to create a unix socket with the right permissions.
|
||||
// Group should include write permissions.
|
||||
const previous_mask = umask(0o117);
|
||||
defer _ = umask(previous_mask);
|
||||
const previous_mask = c.umask(0o117);
|
||||
defer _ = c.umask(previous_mask);
|
||||
|
||||
// Remove the old UNIX socket.
|
||||
posix.unlink(path) catch |err| switch (err) {
|
||||
|
@ -290,12 +295,6 @@ pub const Context = struct {
|
|||
_ = try m.write(writer); // returns paylen
|
||||
|
||||
_ = try stream.write(fbs.getWritten());
|
||||
|
||||
// var hexbuf: [2000]u8 = undefined;
|
||||
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
// var hexwriter = hexfbs.writer();
|
||||
// try DEBUG.hexdump(hexwriter, "MSG SENT", fbs.getWritten());
|
||||
// print("{s}\n", .{hexfbs.getWritten()});
|
||||
}
|
||||
|
||||
pub fn schedule(self: *Self, m: Message) !void {
|
||||
|
@ -318,8 +317,8 @@ pub const Context = struct {
|
|||
}
|
||||
|
||||
pub fn set_switch_callbacks(self: *Self, fd: i32,
|
||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
|
||||
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
|
||||
try self.switchdb.set_callbacks(fd, in, out);
|
||||
}
|
||||
|
||||
|
@ -365,7 +364,7 @@ pub const Context = struct {
|
|||
|
||||
// Wait for an event.
|
||||
pub fn wait_event(self: *Self) !Event {
|
||||
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
|
||||
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||
var wait_duration: i32 = -1; // -1 == unlimited
|
||||
|
||||
if (self.timer) |t| {
|
||||
|
@ -399,17 +398,17 @@ pub const Context = struct {
|
|||
|
||||
if (count < 0) {
|
||||
log.err("there is a problem: poll < 0", .{});
|
||||
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
||||
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||
return current_event;
|
||||
}
|
||||
|
||||
const duration = timer.read() / 1000000; // ns -> ms
|
||||
if (count == 0) {
|
||||
if (duration >= wait_duration) {
|
||||
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
||||
current_event = Event.init(Event.Type.TIMER, 0, 0, 0, null);
|
||||
} else {
|
||||
// In case nothing happened, and poll wasn't triggered by time out.
|
||||
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
||||
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||
}
|
||||
return current_event;
|
||||
}
|
||||
|
@ -455,7 +454,7 @@ pub const Context = struct {
|
|||
}
|
||||
// EXTERNAL = user handles IO
|
||||
else if (self.connections.items[i].t == .EXTERNAL) {
|
||||
return Event.init(Event.Type.EXTERNAL, i, current_fd, null);
|
||||
return Event.init(Event.Type.EXTERNAL, i, current_fd, 0, null);
|
||||
}
|
||||
// otherwise = new message or disconnection
|
||||
else {
|
||||
|
@ -463,26 +462,26 @@ pub const Context = struct {
|
|||
error.ConnectionResetByPeer => {
|
||||
log.warn("connection reset by peer", .{});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||
},
|
||||
error.wrongMessageLength => {
|
||||
log.warn("wrong message length, terminating the connection", .{});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||
},
|
||||
else => {
|
||||
log.warn("unmanaged error while reading a message ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||
},
|
||||
};
|
||||
|
||||
if (maybe_message) |m| {
|
||||
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m);
|
||||
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, 0, m);
|
||||
}
|
||||
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -526,21 +525,21 @@ pub const Context = struct {
|
|||
error.BrokenPipe => {
|
||||
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||
},
|
||||
else => {
|
||||
log.warn("unmanaged error while sending a message ({})", .{err});
|
||||
try self.close(i);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||
},
|
||||
};
|
||||
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null);
|
||||
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, 0, null);
|
||||
}
|
||||
}
|
||||
// .revent is POLLHUP
|
||||
if (fd.revents & std.os.linux.POLL.HUP > 0) {
|
||||
// handle disconnection
|
||||
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
||||
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||
try self.close(i);
|
||||
return current_event;
|
||||
}
|
||||
|
@ -548,7 +547,7 @@ pub const Context = struct {
|
|||
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
|
||||
(fd.revents & std.os.linux.POLL.NVAL > 0))
|
||||
{
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
||||
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -48,23 +48,26 @@ pub const Event = struct {
|
|||
t: Event.Type,
|
||||
index: usize,
|
||||
origin: i32, // socket fd
|
||||
newfd: i32, // on new connection, tell the new client's socket fd
|
||||
m: ?Message, // message
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
|
||||
pub fn init(t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) Self {
|
||||
return Self{
|
||||
.t = t,
|
||||
.index = index,
|
||||
.origin = origin,
|
||||
.newfd = newfd,
|
||||
.m = m,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
|
||||
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) void {
|
||||
self.t = t;
|
||||
self.index = index;
|
||||
self.origin = origin;
|
||||
self.newfd = newfd;
|
||||
self.m = m;
|
||||
}
|
||||
|
||||
|
@ -72,6 +75,7 @@ pub const Event = struct {
|
|||
self.t = Event.Type.ERROR;
|
||||
self.index = @as(usize, 0);
|
||||
self.origin = @as(i32, 0);
|
||||
self.newfd = @as(i32, 0);
|
||||
if (self.m) |message| {
|
||||
message.deinit();
|
||||
}
|
||||
|
@ -79,7 +83,7 @@ pub const Event = struct {
|
|||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m });
|
||||
try fmt.format(out_stream, "{}, origin: {}, newfd: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.newfd, self.index, self.m });
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -92,7 +96,7 @@ test "Event - creation and display" {
|
|||
const s = "hello!!";
|
||||
var m = try Message.init(1, allocator, s); // fd type payload
|
||||
defer m.deinit();
|
||||
const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
|
||||
const e = Event.init(Event.Type.CONNECTION, 5, 8, 4, m); // type index origin message
|
||||
|
||||
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
||||
try print_eq("event.Event.Type.CONNECTION, origin: 8, newfd: 4, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
||||
}
|
||||
|
|
|
@ -78,8 +78,8 @@ test {
|
|||
pub fn send_fd(sockfd: posix.socket_t, msg: []const u8, fd: posix.fd_t) void {
|
||||
var iov = [_]posix.iovec_const{
|
||||
.{
|
||||
.iov_base = msg.ptr,
|
||||
.iov_len = msg.len,
|
||||
.base = msg.ptr,
|
||||
.len = msg.len,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -191,7 +191,7 @@ pub fn receive_fd(sockfd: posix.socket_t, buffer: []u8, msg_size: *usize) !posix
|
|||
var msg_buffer = [_]u8{0} ** 1500;
|
||||
|
||||
var iov = [_]posix.iovec{
|
||||
.{ .iov_base = msg_buffer[0..], .iov_len = msg_buffer.len },
|
||||
.{ .base = msg_buffer[0..], .len = msg_buffer.len },
|
||||
};
|
||||
|
||||
var cmsg = Cmsghdr(posix.fd_t).init(.{
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
const std = @import("std");
|
||||
|
||||
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void {
|
||||
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.posix.WriteError!void {
|
||||
// Print a header.
|
||||
if (header.len > 0) {
|
||||
var hdr: [64]u8 = undefined;
|
||||
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
|
||||
|
||||
@memset(hdr[0..hdr.len], ' ');
|
||||
std.mem.copy(u8, hdr[offset..hdr.len], header);
|
||||
std.mem.copyForwards(u8, hdr[offset..hdr.len], header);
|
||||
|
||||
try stream.writeAll(hdr[0..hdr.len]);
|
||||
try stream.writeAll("\n");
|
||||
|
@ -72,6 +72,17 @@ pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.W
|
|||
try stream.writeAll("\n");
|
||||
}
|
||||
|
||||
/// Debug function: print some bytes' hexdump on standard output.
|
||||
pub fn print_hex(title: []const u8, buffer : []const u8) !void
|
||||
{
|
||||
const stdout = std.io.getStdOut().writer();
|
||||
var hexbuf: [100_000]u8 = undefined;
|
||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
const hexwriter = hexfbs.writer();
|
||||
try hexdump(hexwriter, title, buffer);
|
||||
try stdout.print("{s}\n", .{hexfbs.getWritten()});
|
||||
}
|
||||
|
||||
const print = std.debug.print;
|
||||
|
||||
test "36-byte hexdump test" {
|
||||
|
|
|
@ -5,6 +5,7 @@ const fmt = std.fmt;
|
|||
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
const payload_header_length = 4;
|
||||
pub const Messages = std.ArrayList(Message);
|
||||
|
||||
pub const Message = struct {
|
||||
|
@ -28,17 +29,17 @@ pub const Message = struct {
|
|||
var reader = fbs.reader();
|
||||
|
||||
const msg_len = try reader.readInt(u32, .big);
|
||||
if (msg_len > buffer.len - 4) {
|
||||
if (msg_len > buffer.len - payload_header_length) {
|
||||
return error.wrongMessageLength;
|
||||
}
|
||||
const msg_payload = buffer[4 .. 4 + msg_len];
|
||||
const msg_payload = buffer[payload_header_length .. payload_header_length + msg_len];
|
||||
|
||||
return try Message.init(fd, allocator, msg_payload);
|
||||
}
|
||||
|
||||
pub fn write(self: Self, writer: anytype) !usize {
|
||||
try writer.writeInt(u32, @as(u32, @truncate(self.payload.len)), .big);
|
||||
return 4 + try writer.write(self.payload);
|
||||
return payload_header_length + try writer.write(self.payload);
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
|
|
|
@ -40,10 +40,12 @@ pub const SwitchDB = struct {
|
|||
const Self = @This();
|
||||
|
||||
db: std.AutoArrayHashMap(i32, ManagedConnection),
|
||||
allocator: std.mem.Allocator,
|
||||
|
||||
pub fn init(allocator: Allocator) Self {
|
||||
return Self{
|
||||
.db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator),
|
||||
.allocator = allocator,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -63,11 +65,12 @@ pub const SwitchDB = struct {
|
|||
}
|
||||
|
||||
pub fn set_callbacks(self: *Self, fd: i32,
|
||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
|
||||
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
|
||||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||
managedconnection.in = in;
|
||||
managedconnection.out = out;
|
||||
if (in) |f| { managedconnection.in = f; }
|
||||
if (out) |f| { managedconnection.out = f; }
|
||||
try self.db.put(fd, managedconnection);
|
||||
}
|
||||
|
||||
/// Dig the "db" hashmap, perform "in" fn, may provide a message.
|
||||
|
@ -77,7 +80,7 @@ pub const SwitchDB = struct {
|
|||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||
|
||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||
var message_size: u32 = @truncate(buffer.len);
|
||||
var message_size: usize = buffer.len;
|
||||
const r: CBEventType = @enumFromInt(managedconnection.in(fd, &buffer, &message_size));
|
||||
|
||||
switch (r) {
|
||||
|
@ -87,9 +90,10 @@ pub const SwitchDB = struct {
|
|||
},
|
||||
CBEventType.NO_ERROR => {
|
||||
// TODO: read message
|
||||
// TODO: better allocator?
|
||||
// TODO: better errors?
|
||||
const message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
|
||||
const message: Message = Message.read(managedconnection.dest
|
||||
, buffer[0..message_size]
|
||||
, self.allocator) catch {
|
||||
return error.generic;
|
||||
};
|
||||
return message;
|
||||
|
@ -120,7 +124,7 @@ pub const SwitchDB = struct {
|
|||
_ = message.write(writer) catch return error.generic;
|
||||
const written = fbs.getWritten();
|
||||
|
||||
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, @truncate(written.len)));
|
||||
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, written.len));
|
||||
|
||||
switch (r) {
|
||||
// The message should be ignored (protocol specific).
|
||||
|
@ -145,26 +149,26 @@ pub const SwitchDB = struct {
|
|||
var message: ?Message = null;
|
||||
message = self.read(fd) catch |err| switch (err) {
|
||||
error.closeFD => {
|
||||
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
|
||||
},
|
||||
error.unregisteredFD, error.generic => {
|
||||
return Event.init(Event.Type.ERROR, index, fd, null);
|
||||
return Event.init(Event.Type.ERROR, index, fd, 0, null);
|
||||
},
|
||||
};
|
||||
return Event.init(Event.Type.SWITCH_RX, index, fd, message);
|
||||
return Event.init(Event.Type.SWITCH_RX, index, fd, 0, message);
|
||||
}
|
||||
|
||||
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
|
||||
const fd = message.fd;
|
||||
self.write(message) catch |err| switch (err) {
|
||||
error.closeFD => {
|
||||
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
||||
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
|
||||
},
|
||||
error.unregisteredFD, error.generic => {
|
||||
return Event.init(Event.Type.ERROR, index, fd, null);
|
||||
return Event.init(Event.Type.ERROR, index, fd, 0, null);
|
||||
},
|
||||
};
|
||||
return Event.init(Event.Type.SWITCH_TX, index, fd, null);
|
||||
return Event.init(Event.Type.SWITCH_TX, index, fd, 0, null);
|
||||
}
|
||||
|
||||
/// Simple wrapper around self.db.get.
|
||||
|
@ -182,8 +186,8 @@ pub const SwitchDB = struct {
|
|||
|
||||
const ManagedConnection = struct {
|
||||
dest: i32,
|
||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 = default_in,
|
||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 = default_out,
|
||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 = default_in,
|
||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 = default_out,
|
||||
};
|
||||
|
||||
test "creation and display" {
|
||||
|
@ -201,18 +205,18 @@ test "creation and display" {
|
|||
try print_eq("{ (5,6)(6,5) }", .{switchdb});
|
||||
}
|
||||
|
||||
fn successful_in(_: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||
fn successful_in(_: i32, mcontent: [*]u8, mlen: *usize) CBEventType {
|
||||
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
|
||||
defer m.deinit();
|
||||
|
||||
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
|
||||
const writer = fbs.writer();
|
||||
const bytes_written = m.write(writer) catch unreachable;
|
||||
mlen.* = @truncate(bytes_written);
|
||||
mlen.* = bytes_written;
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
fn successful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
|
||||
fn successful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
|
@ -251,11 +255,11 @@ test "successful exchanges" {
|
|||
}
|
||||
}
|
||||
|
||||
fn unsuccessful_in(_: i32, _: [*]const u8, _: *u32) CBEventType {
|
||||
fn unsuccessful_in(_: i32, _: [*]const u8, _: *usize) CBEventType {
|
||||
return CBEventType.ERROR;
|
||||
}
|
||||
|
||||
fn unsuccessful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
|
||||
fn unsuccessful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
|
||||
return CBEventType.ERROR;
|
||||
}
|
||||
|
||||
|
@ -307,7 +311,7 @@ test "nuke 'em" {
|
|||
try testing.expect(switchdb.db.count() == 0);
|
||||
}
|
||||
|
||||
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
|
||||
fn default_in(origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 {
|
||||
// This may be kinda hacky, idk.
|
||||
var stream: net.Stream = .{ .handle = origin };
|
||||
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return @intFromEnum(CBEventType.ERROR);
|
||||
|
@ -318,12 +322,12 @@ fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
|
|||
return @intFromEnum(CBEventType.FD_CLOSING);
|
||||
}
|
||||
|
||||
mlen.* = @truncate(packet_size);
|
||||
mlen.* = packet_size;
|
||||
|
||||
return @intFromEnum(CBEventType.NO_ERROR);
|
||||
}
|
||||
|
||||
fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 {
|
||||
fn default_out(fd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 {
|
||||
// Message contains the fd, no need to search for the right structure to copy,
|
||||
// let's just recreate a Stream from the fd.
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue