Compare commits
10 commits
b0a29f5800
...
b2e811df19
| Author | SHA1 | Date | |
|---|---|---|---|
| b2e811df19 | |||
| ed86a638b5 | |||
| ad5fb32cfb | |||
| f59eb58e0b | |||
| 45614deacb | |||
| 8ffd0faba1 | |||
| c0d6404186 | |||
| 18a77fa2ef | |||
| 8e889ef242 | |||
| 31e05ef1c2 |
14 changed files with 228 additions and 137 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,4 +1,5 @@
|
||||||
zig-cache
|
zig-cache
|
||||||
zig-out
|
zig-out
|
||||||
|
.zig-cache
|
||||||
docs
|
docs
|
||||||
*.swp
|
*.swp
|
||||||
|
|
|
||||||
|
|
@ -24,5 +24,6 @@ See the [dedicated repository][examples].
|
||||||
|
|
||||||
LibIPC reached a stable state and is usable.
|
LibIPC reached a stable state and is usable.
|
||||||
Performance is fine for most projects, but can be largely improved.
|
Performance is fine for most projects, but can be largely improved.
|
||||||
|
The `poll` syscall is used instead of more recent and *faster* syscalls (`epoll`, `kqueue`, etc.).
|
||||||
|
|
||||||
[examples]: https://git.baguette.netlib.re/Baguette/libipc-examples
|
[examples]: https://git.baguette.netlib.re/Baguette/libipc-examples
|
||||||
|
|
|
||||||
1
TODO.md
1
TODO.md
|
|
@ -9,7 +9,6 @@
|
||||||
### makefile
|
### makefile
|
||||||
|
|
||||||
- release
|
- release
|
||||||
- distribution
|
|
||||||
|
|
||||||
### documentation
|
### documentation
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
const VERSION = "0.1.1";
|
const VERSION = "0.2.0";
|
||||||
|
|
||||||
// Although this function looks imperative, note that its job is to
|
// Although this function looks imperative, note that its job is to
|
||||||
// declaratively construct a build graph that will be executed by an external
|
// declaratively construct a build graph that will be executed by an external
|
||||||
|
|
@ -21,7 +21,7 @@ pub fn build(b: *std.Build) void {
|
||||||
.name = "ipc",
|
.name = "ipc",
|
||||||
// In this case the main source file is merely a path, however, in more
|
// In this case the main source file is merely a path, however, in more
|
||||||
// complicated build scripts, this could be a generated file.
|
// complicated build scripts, this could be a generated file.
|
||||||
.root_source_file = .{ .path = "src/bindings.zig" },
|
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
|
||||||
.target = target,
|
.target = target,
|
||||||
.optimize = optimize,
|
.optimize = optimize,
|
||||||
});
|
});
|
||||||
|
|
@ -37,7 +37,7 @@ pub fn build(b: *std.Build) void {
|
||||||
|
|
||||||
const shared_lib = b.addSharedLibrary(.{
|
const shared_lib = b.addSharedLibrary(.{
|
||||||
.name = "ipc",
|
.name = "ipc",
|
||||||
.root_source_file = .{ .path = "src/bindings.zig" },
|
.root_source_file = .{ .cwd_relative = "src/bindings.zig" },
|
||||||
.version = comptime (try std.SemanticVersion.parse(VERSION)),
|
.version = comptime (try std.SemanticVersion.parse(VERSION)),
|
||||||
.target = target,
|
.target = target,
|
||||||
.optimize = optimize,
|
.optimize = optimize,
|
||||||
|
|
@ -47,7 +47,7 @@ pub fn build(b: *std.Build) void {
|
||||||
|
|
||||||
// Creates a step for unit testing.
|
// Creates a step for unit testing.
|
||||||
const main_tests = b.addTest(.{
|
const main_tests = b.addTest(.{
|
||||||
.root_source_file = .{ .path = "src/main.zig" },
|
.root_source_file = .{ .cwd_relative = "src/main.zig" },
|
||||||
.target = target,
|
.target = target,
|
||||||
.optimize = optimize,
|
.optimize = optimize,
|
||||||
});
|
});
|
||||||
|
|
|
||||||
2
ipc.pc
2
ipc.pc
|
|
@ -3,6 +3,6 @@ libdir=/usr/local/lib
|
||||||
|
|
||||||
Name: LibIPC
|
Name: LibIPC
|
||||||
Description: The simplest Inter Process Communication library
|
Description: The simplest Inter Process Communication library
|
||||||
Version: 0.1.1
|
Version: 0.2.0
|
||||||
Libs: -L${libdir} -lipc
|
Libs: -L${libdir} -lipc
|
||||||
Cflags: -I${includedir}
|
Cflags: -I${includedir}
|
||||||
|
|
|
||||||
87
libipc.h
87
libipc.h
|
|
@ -3,6 +3,48 @@
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
The LibIPC API is designed to be simple.
|
||||||
|
|
||||||
|
For most applications, the usage can be summarized as:
|
||||||
|
|
||||||
|
- ipc_context_init to allocate the "context" structure;
|
||||||
|
- ipc_service_init or ipc_connect_service to instanciate the service or starts a connection to it;
|
||||||
|
- loop over ipc_wait_event to wait for events;
|
||||||
|
- ipc_schedule to send messages;
|
||||||
|
- ipc_close_all then ipc_context_deinit to close all connections then free the context.
|
||||||
|
|
||||||
|
The ipc_wait_event function is where the magic happens.
|
||||||
|
This function handles all the complexity of network management, juggling with file descriptors,
|
||||||
|
storing connection-related data, waiting for an event, handling connections and disconnections,
|
||||||
|
reading incoming messages and providing UNDERSTANDABLE notifications (see the `event_types` enumeration).
|
||||||
|
|
||||||
|
Basic events are:
|
||||||
|
|
||||||
|
- Connections and disconnections, they are already handled within LibIPC, but the API notifies the
|
||||||
|
user so they can perform additional operations;
|
||||||
|
- Messages (received or sent);
|
||||||
|
- Timers, to wake up the application on a regular basis (setup with ipc_context_timer).
|
||||||
|
|
||||||
|
Furthermore, the API can be used for non-IPC communications, meaning connections not using the LibIPC protocol.
|
||||||
|
This is particularly useful when dealing with any file descriptor that needs attention.
|
||||||
|
The ipc_add_external function adds a file descriptor marked as "external" which will be listened on by LibIPC.
|
||||||
|
In this case, disconnections are still handled, but messages aren't automatically read.
|
||||||
|
This way, almost any protocol can be used along with LibIPC communications and with the LibIPC API.
|
||||||
|
Related event: EXTERNAL, notifying users that an "external" file descriptor received a message.
|
||||||
|
|
||||||
|
Finally, a "switch" mechanism to automatically transfer messages between a pair of file descriptors (ipc_add_switch).
|
||||||
|
Messages are automatically exchanged between the two file descriptors by LibIPC;
|
||||||
|
eventually with user-provided callbacks (ipc_set_switch_callbacks) for non-IPC connections.
|
||||||
|
Thus, callbacks enable to handle websocket connections, non-IPC clients communicating with JSON messages, etc.
|
||||||
|
Related events: SWITCH RX and TX, notifying users that a "switched" file descriptor received (or sent) a message.
|
||||||
|
|
||||||
|
Switch and IO callbacks enable to easily create "protocol IPC services" (such as TCPd) to
|
||||||
|
bind IPC services to basically any available protocol through small, dedicated services
|
||||||
|
handling all the nitty-gritty details of non-IPC protocols.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Event types: the "t" parameter in "ipc_wait_event".
|
||||||
enum event_types {
|
enum event_types {
|
||||||
ERROR = 0 // A problem occured.
|
ERROR = 0 // A problem occured.
|
||||||
, CONNECTION = 1 // New user.
|
, CONNECTION = 1 // New user.
|
||||||
|
|
@ -10,8 +52,8 @@ enum event_types {
|
||||||
, MESSAGE_RX = 3 // New message.
|
, MESSAGE_RX = 3 // New message.
|
||||||
, MESSAGE_TX = 4 // Message sent.
|
, MESSAGE_TX = 4 // Message sent.
|
||||||
, TIMER = 5 // Timeout in the poll(2) function.
|
, TIMER = 5 // Timeout in the poll(2) function.
|
||||||
, EXTERNAL = 6 // Message received from a non IPC socket.
|
, EXTERNAL = 6 // Message received from a non-IPC socket.
|
||||||
, SWITCH_RX = 7 // Message received from a switched FD.
|
, SWITCH_RX = 7 // Message received from a switched fd.
|
||||||
, SWITCH_TX = 8 // Message sent to a switched fd.
|
, SWITCH_TX = 8 // Message sent to a switched fd.
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -23,27 +65,50 @@ enum cb_event_types {
|
||||||
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
|
, CB_IGNORE = 3 // The message should be ignored (protocol specific).
|
||||||
};
|
};
|
||||||
|
|
||||||
int ipc_context_init (void** ptr);
|
int ipc_context_init (void** ptr); // Allocate memory for a context.
|
||||||
|
void ipc_context_deinit (void** ctx); // Free the context's memory.
|
||||||
|
void ipc_context_timer (void* ctx, int timer); // Change the timer (for timer events, in ms).
|
||||||
|
|
||||||
|
// Init (or connect to) a service. That's almost the same operation behind the scene.
|
||||||
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
||||||
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, uint16_t service_name_len);
|
||||||
void ipc_context_deinit (void** ctx);
|
|
||||||
int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
|
// Write a message or schedule it.
|
||||||
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, uint32_t mlen);
|
int ipc_write (void* ctx, int servicefd, const char* mcontent, size_t mlen);
|
||||||
|
int ipc_schedule (void* ctx, int servicefd, const char* mcontent, size_t mlen);
|
||||||
|
|
||||||
|
// Read a message from a client; either selected by an index in the context structure or by its file descriptor.
|
||||||
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
|
int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
|
||||||
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
|
int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
|
||||||
int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffer, size_t* buflen);
|
|
||||||
void ipc_context_timer (void* ctx, int timer);
|
// Wait for an event.
|
||||||
|
// The "t" parameter is the type of the event (enum event_types).
|
||||||
|
// The "index" parameter is the index in the context structure of the origin of the event (client or server).
|
||||||
|
// The "originfd" parameter is the file descriptor on which the event occurs.
|
||||||
|
// The "newfd" parameter is the file descriptor of the new connected client, in case of a connection event.
|
||||||
|
// The "buffer" and "buflen" parameters contain respectively a copy of the received message and its length.
|
||||||
|
int ipc_wait_event (void* ctx, char* t, size_t* index, int* originfd, int* newfd, char* buffer, size_t* buflen);
|
||||||
|
|
||||||
|
// Close a client (or server) based on its file descriptor or its index in the context structure.
|
||||||
int ipc_close_fd (void* ctx, int fd);
|
int ipc_close_fd (void* ctx, int fd);
|
||||||
int ipc_close (void* ctx, size_t index);
|
int ipc_close (void* ctx, size_t index);
|
||||||
|
// Close all connections (probably right before the processus is terminated).
|
||||||
int ipc_close_all (void* ctx);
|
int ipc_close_all (void* ctx);
|
||||||
|
|
||||||
// Switch functions (for "protocol" services, such as TCPd).
|
// Add a (possibly non-IPC) file descriptor to handle.
|
||||||
|
// Since it's not marked as an IPC connection, messages won't be automatically read;
|
||||||
|
// which enables to handle any communications for most protocols through the LibIPC API.
|
||||||
int ipc_add_external (void* ctx, int newfd);
|
int ipc_add_external (void* ctx, int newfd);
|
||||||
|
|
||||||
|
// Add a new switch between a pair of file descriptors, enabling automatic exchange of messages
|
||||||
|
// between this pair of fds. Useful for "protocol services", such as TCPd.
|
||||||
int ipc_add_switch (void* ctx, int fd1, int fd2);
|
int ipc_add_switch (void* ctx, int fd1, int fd2);
|
||||||
|
|
||||||
|
// Set IO callbacks for a file descriptor.
|
||||||
// Returned "char" is a cb_event_types enum.
|
// Returned "char" is a cb_event_types enum.
|
||||||
|
// One of the callbacks can be "NULL" to keep the default callback, thus changing only input or output operations.
|
||||||
int ipc_set_switch_callbacks (void* ctx, int fd
|
int ipc_set_switch_callbacks (void* ctx, int fd
|
||||||
, char (*in (int orig, const char *payload, uint32_t *mlen))
|
, char (*in (int orig, char *payload, size_t *mlen))
|
||||||
, char (*out(int dest, char *payload, uint32_t mlen)));
|
, char (*out(int dest, const char *payload, size_t mlen)));
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
52
makefile
52
makefile
|
|
@ -15,25 +15,27 @@ PREFIX ?= /usr/local
|
||||||
LIBDIR ?= $(PREFIX)/lib
|
LIBDIR ?= $(PREFIX)/lib
|
||||||
INCLUDEDIR ?= $(PREFIX)/include
|
INCLUDEDIR ?= $(PREFIX)/include
|
||||||
PKGCONFIGDIR ?= /usr/share/pkgconfig
|
PKGCONFIGDIR ?= /usr/share/pkgconfig
|
||||||
install-pkgconfig:
|
|
||||||
[ -d $(PKGCONFIGDIR) ] || install -m 0755 -d $(PKGCONFIGDIR)
|
|
||||||
install -m 0644 ipc.pc $(PKGCONFIGDIR)
|
|
||||||
install-library:
|
|
||||||
[ -d $(LIBDIR) ] || install -m 0755 -d $(LIBDIR)
|
|
||||||
install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
|
|
||||||
install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
|
|
||||||
install-header:
|
|
||||||
[ -d $(INCLUDEDIR) ] || install -m 0755 -d $(INCLUDEDIR)
|
|
||||||
install -m 0644 libipc.h $(INCLUDEDIR)
|
|
||||||
install: install-pkgconfig install-library install-header
|
|
||||||
|
|
||||||
uninstall-library:
|
$(PKGCONFIGDIR):; install -m 0755 -d $(PKGCONFIGDIR)
|
||||||
rm $(LIBDIR)/libipc.a \
|
$(PKGCONFIGDIR)/ipc.pc: ipc.pc; install -m 0644 ipc.pc $(PKGCONFIGDIR)
|
||||||
$(LIBDIR)/libipc.so \
|
install-pkgconfig: $(PKGCONFIGDIR) $(PKGCONFIGDIR)/ipc.pc
|
||||||
$(LIBDIR)/libipc.so.*
|
|
||||||
uninstall-header:
|
$(LIBDIR):; install -m 0755 -d $(LIBDIR)
|
||||||
rm $(INCLUDEDIR)/libipc.h
|
$(LIBDIR)/libipc.a: zig-out/lib/libipc.a; install -m 0644 zig-out/lib/libipc.a $(LIBDIR)
|
||||||
uninstall: uninstall-library uninstall-header
|
$(LIBDIR)/libipc.so: zig-out/lib/libipc.so; install -m 0644 zig-out/lib/libipc.so $(LIBDIR)
|
||||||
|
install-library: $(LIBDIR) $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so
|
||||||
|
|
||||||
|
$(INCLUDEDIR):; install -m 0755 -d $(INCLUDEDIR)
|
||||||
|
$(INCLUDEDIR)/libipc.h: libipc.h; install -m 0644 libipc.h $(INCLUDEDIR)
|
||||||
|
install-header: $(INCLUDEDIR) $(INCLUDEDIR)/libipc.h
|
||||||
|
|
||||||
|
install: install-pkgconfig install-library install-header
|
||||||
|
@echo "Now that you have installed the library, you should (probably) run ldconfig."
|
||||||
|
|
||||||
|
uninstall-library:; rm $(LIBDIR)/libipc.a $(LIBDIR)/libipc.so*
|
||||||
|
uninstall-header:; rm $(INCLUDEDIR)/libipc.h
|
||||||
|
uninstall-pkgconfig:; rm $(PKGCONFIGDIR)/ipc.pc
|
||||||
|
uninstall: uninstall-pkgconfig uninstall-library uninstall-header
|
||||||
|
|
||||||
mrproper:
|
mrproper:
|
||||||
rm -r docs zig-cache zig-out 2>/dev/null || true
|
rm -r docs zig-cache zig-out 2>/dev/null || true
|
||||||
|
|
@ -48,21 +50,21 @@ serve-doc:
|
||||||
darkhttpd docs/ --addr $(DOC_HTTPD_ADDR) --port $(DOC_HTTPD_PORT) --log $(DOC_HTTPD_ACCESS_LOGS)
|
darkhttpd docs/ --addr $(DOC_HTTPD_ADDR) --port $(DOC_HTTPD_PORT) --log $(DOC_HTTPD_ACCESS_LOGS)
|
||||||
|
|
||||||
PACKAGE ?= libipc
|
PACKAGE ?= libipc
|
||||||
VERSION ?= 0.1.0
|
VERSION ?= 0.2.0
|
||||||
PKG = $(PACKAGE)-$(VERSION)
|
PKG = $(PACKAGE)-$(VERSION)
|
||||||
dist-dir:
|
|
||||||
[ -d $(PKG) ] || ln -s . $(PKG)
|
|
||||||
$(PKG).tar.gz: dist-dir
|
$(PKG).tar.gz: dist-dir
|
||||||
tar zcf $@ \
|
tar zcf $@ \
|
||||||
$(PKG)/src \
|
$(PKG)/src \
|
||||||
$(PKG)/build.zig \
|
$(PKG)/build.zig* \
|
||||||
$(PKG)/libipc.h \
|
$(PKG)/libipc.h \
|
||||||
|
$(PKG)/ipc.pc \
|
||||||
$(PKG)/makefile* \
|
$(PKG)/makefile* \
|
||||||
$(PKG)/README* \
|
$(PKG)/README* \
|
||||||
$(PKG)/TODO*
|
$(PKG)/TODO*
|
||||||
dist-rm-dir:
|
$(PKG):; ln -s . $(PKG)
|
||||||
rm $(PKG)
|
dist-dir: $(PKG)
|
||||||
dist-gz: $(PACKAGE)-$(VERSION).tar.gz
|
dist-rm-dir:; rm $(PKG)
|
||||||
|
dist-gz: $(PKG).tar.gz
|
||||||
dist: dist-gz dist-rm-dir
|
dist: dist-gz dist-rm-dir
|
||||||
|
|
||||||
# You can add your specific instructions there.
|
# You can add your specific instructions there.
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ export fn ipc_context_deinit(ctx: **Context) callconv(.C) void {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write a message (no waiting).
|
/// Write a message (no waiting).
|
||||||
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
|
||||||
// TODO: better default length.
|
// TODO: better default length.
|
||||||
var buffer = [_]u8{0} ** 100000;
|
var buffer = [_]u8{0} ** 100000;
|
||||||
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
var fba = std.heap.FixedBufferAllocator.init(&buffer);
|
||||||
|
|
@ -48,7 +48,7 @@ export fn ipc_write(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen:
|
||||||
|
|
||||||
/// Schedule a message.
|
/// Schedule a message.
|
||||||
/// Use the same allocator as the context.
|
/// Use the same allocator as the context.
|
||||||
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) i32 {
|
export fn ipc_schedule(ctx: *Context, servicefd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) i32 {
|
||||||
const message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
|
const message = Message.init(servicefd, ctx.allocator, mcontent[0..mlen]) catch return -1;
|
||||||
ctx.schedule(message) catch return -2;
|
ctx.schedule(message) catch return -2;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
@ -90,7 +90,7 @@ export fn ipc_read(ctx: *Context, index: usize, buffer: [*]u8, buflen: *usize) c
|
||||||
|
|
||||||
/// Wait for an event.
|
/// Wait for an event.
|
||||||
/// Buffer length will be changed to the size of the received message.
|
/// Buffer length will be changed to the size of the received message.
|
||||||
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, newfd: *i32, buffer: [*]u8, buflen: *usize) callconv(.C) i32 {
|
||||||
const event = ctx.wait_event() catch |err| switch (err) {
|
const event = ctx.wait_event() catch |err| switch (err) {
|
||||||
else => {
|
else => {
|
||||||
log.warn("error while waiting for an event: {}\n", .{err});
|
log.warn("error while waiting for an event: {}\n", .{err});
|
||||||
|
|
@ -100,13 +100,17 @@ export fn ipc_wait_event(ctx: *Context, t: *u8, index: *usize, originfd: *i32, b
|
||||||
t.* = @intFromEnum(event.t);
|
t.* = @intFromEnum(event.t);
|
||||||
index.* = event.index;
|
index.* = event.index;
|
||||||
originfd.* = event.origin;
|
originfd.* = event.origin;
|
||||||
|
newfd.* = event.newfd;
|
||||||
|
|
||||||
if (event.m) |m| {
|
if (event.m) |m| {
|
||||||
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
|
var fbs = std.io.fixedBufferStream(buffer[0..buflen.*]);
|
||||||
var writer = fbs.writer();
|
var writer = fbs.writer();
|
||||||
_ = writer.write(m.payload) catch return -4;
|
_ = writer.write(m.payload) catch return -4;
|
||||||
buflen.* = m.payload.len;
|
buflen.* = m.payload.len;
|
||||||
m.deinit();
|
switch (event.t) {
|
||||||
|
.SWITCH_RX => {},
|
||||||
|
else => m.deinit(),
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
buflen.* = 0;
|
buflen.* = 0;
|
||||||
}
|
}
|
||||||
|
|
@ -148,8 +152,8 @@ export fn ipc_add_switch(ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32,
|
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32,
|
||||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) callconv(.C) i32 {
|
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) callconv(.C) i32 {
|
||||||
ctx.set_switch_callbacks(fd, in, out) catch return -1;
|
ctx.set_switch_callbacks(fd, in, out) catch return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,11 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const testing = std.testing;
|
const testing = std.testing;
|
||||||
// const DEBUG = @import("./hexdump.zig");
|
|
||||||
const net = std.net;
|
const net = std.net;
|
||||||
const os = std.os;
|
const os = std.os;
|
||||||
const fmt = std.fmt;
|
const fmt = std.fmt;
|
||||||
const c = std.c;
|
const c = std.c;
|
||||||
const posix = std.posix;
|
const posix = std.posix;
|
||||||
|
|
||||||
// const print = std.debug.print;
|
|
||||||
|
|
||||||
// TODO: to remove once PR https://github.com/ziglang/zig/pull/14639 is accepted.
|
|
||||||
pub extern "c" fn umask(mode: c.mode_t) c.mode_t;
|
|
||||||
|
|
||||||
const log = std.log.scoped(.libipc_context);
|
const log = std.log.scoped(.libipc_context);
|
||||||
|
|
||||||
const receive_fd = @import("./exchange-fd.zig").receive_fd;
|
const receive_fd = @import("./exchange-fd.zig").receive_fd;
|
||||||
|
|
@ -35,7 +29,6 @@ pub const PollFD = std.ArrayList(posix.pollfd);
|
||||||
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
||||||
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
|
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
|
||||||
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
|
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE - IPC_HEADER_SIZE;
|
||||||
pub const IPC_VERSION = 1;
|
|
||||||
|
|
||||||
// Context of the whole networking state.
|
// Context of the whole networking state.
|
||||||
pub const Context = struct {
|
pub const Context = struct {
|
||||||
|
|
@ -66,8 +59,8 @@ pub const Context = struct {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Allow mkdir to create a directory with 0o770 permissions.
|
// Allow mkdir to create a directory with 0o770 permissions.
|
||||||
const previous_mask = umask(0o007);
|
const previous_mask = c.umask(0o007);
|
||||||
defer _ = umask(previous_mask);
|
defer _ = c.umask(previous_mask);
|
||||||
|
|
||||||
// Create the run directory, where all UNIX sockets will be.
|
// Create the run directory, where all UNIX sockets will be.
|
||||||
posix.mkdir(rundir, 0o0770) catch |err| switch (err) {
|
posix.mkdir(rundir, 0o0770) catch |err| switch (err) {
|
||||||
|
|
@ -80,7 +73,12 @@ pub const Context = struct {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
return Self{ .rundir = rundir, .connections = Connections.init(allocator), .pollfd = PollFD.init(allocator), .tx = Messages.init(allocator), .switchdb = SwitchDB.init(allocator), .allocator = allocator };
|
return Self{ .rundir = rundir
|
||||||
|
, .connections = Connections.init(allocator)
|
||||||
|
, .pollfd = PollFD.init(allocator)
|
||||||
|
, .tx = Messages.init(allocator)
|
||||||
|
, .switchdb = SwitchDB.init(allocator)
|
||||||
|
, .allocator = allocator };
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Self) void {
|
pub fn deinit(self: *Self) void {
|
||||||
|
|
@ -218,6 +216,13 @@ pub const Context = struct {
|
||||||
try self.add_(newcon, newfd);
|
try self.add_(newcon, newfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// /// Change the type of a file descriptor.
|
||||||
|
// /// Useful for protocol daemons (ex: TCPd) or proxies listening to local IPC connections for new clients.
|
||||||
|
// /// In this case, the client can be "switched" but needs to be marked as such.
|
||||||
|
// /// TODO
|
||||||
|
// pub fn change_fd_type (self: *Self, fd: i32, new_type: Connection.Type) !void {
|
||||||
|
// }
|
||||||
|
|
||||||
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
||||||
// net.Server
|
// net.Server
|
||||||
const serverfd = self.pollfd.items[server_index].fd;
|
const serverfd = self.pollfd.items[server_index].fd;
|
||||||
|
|
@ -233,7 +238,7 @@ pub const Context = struct {
|
||||||
|
|
||||||
const sfd = server.stream.handle;
|
const sfd = server.stream.handle;
|
||||||
// WARNING: imply every new item is last
|
// WARNING: imply every new item is last
|
||||||
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
|
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, newfd, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a unix socket.
|
// Create a unix socket.
|
||||||
|
|
@ -257,8 +262,8 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Allow to create a unix socket with the right permissions.
|
// Allow to create a unix socket with the right permissions.
|
||||||
// Group should include write permissions.
|
// Group should include write permissions.
|
||||||
const previous_mask = umask(0o117);
|
const previous_mask = c.umask(0o117);
|
||||||
defer _ = umask(previous_mask);
|
defer _ = c.umask(previous_mask);
|
||||||
|
|
||||||
// Remove the old UNIX socket.
|
// Remove the old UNIX socket.
|
||||||
posix.unlink(path) catch |err| switch (err) {
|
posix.unlink(path) catch |err| switch (err) {
|
||||||
|
|
@ -290,12 +295,6 @@ pub const Context = struct {
|
||||||
_ = try m.write(writer); // returns paylen
|
_ = try m.write(writer); // returns paylen
|
||||||
|
|
||||||
_ = try stream.write(fbs.getWritten());
|
_ = try stream.write(fbs.getWritten());
|
||||||
|
|
||||||
// var hexbuf: [2000]u8 = undefined;
|
|
||||||
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
|
||||||
// var hexwriter = hexfbs.writer();
|
|
||||||
// try DEBUG.hexdump(hexwriter, "MSG SENT", fbs.getWritten());
|
|
||||||
// print("{s}\n", .{hexfbs.getWritten()});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn schedule(self: *Self, m: Message) !void {
|
pub fn schedule(self: *Self, m: Message) !void {
|
||||||
|
|
@ -318,8 +317,8 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_switch_callbacks(self: *Self, fd: i32,
|
pub fn set_switch_callbacks(self: *Self, fd: i32,
|
||||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
|
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
|
||||||
try self.switchdb.set_callbacks(fd, in, out);
|
try self.switchdb.set_callbacks(fd, in, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -365,7 +364,7 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Wait for an event.
|
// Wait for an event.
|
||||||
pub fn wait_event(self: *Self) !Event {
|
pub fn wait_event(self: *Self) !Event {
|
||||||
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, null);
|
var current_event: Event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||||
var wait_duration: i32 = -1; // -1 == unlimited
|
var wait_duration: i32 = -1; // -1 == unlimited
|
||||||
|
|
||||||
if (self.timer) |t| {
|
if (self.timer) |t| {
|
||||||
|
|
@ -399,17 +398,17 @@ pub const Context = struct {
|
||||||
|
|
||||||
if (count < 0) {
|
if (count < 0) {
|
||||||
log.err("there is a problem: poll < 0", .{});
|
log.err("there is a problem: poll < 0", .{});
|
||||||
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||||
return current_event;
|
return current_event;
|
||||||
}
|
}
|
||||||
|
|
||||||
const duration = timer.read() / 1000000; // ns -> ms
|
const duration = timer.read() / 1000000; // ns -> ms
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
if (duration >= wait_duration) {
|
if (duration >= wait_duration) {
|
||||||
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
current_event = Event.init(Event.Type.TIMER, 0, 0, 0, null);
|
||||||
} else {
|
} else {
|
||||||
// In case nothing happened, and poll wasn't triggered by time out.
|
// In case nothing happened, and poll wasn't triggered by time out.
|
||||||
current_event = Event.init(Event.Type.ERROR, 0, 0, null);
|
current_event = Event.init(Event.Type.ERROR, 0, 0, 0, null);
|
||||||
}
|
}
|
||||||
return current_event;
|
return current_event;
|
||||||
}
|
}
|
||||||
|
|
@ -455,7 +454,7 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
// EXTERNAL = user handles IO
|
// EXTERNAL = user handles IO
|
||||||
else if (self.connections.items[i].t == .EXTERNAL) {
|
else if (self.connections.items[i].t == .EXTERNAL) {
|
||||||
return Event.init(Event.Type.EXTERNAL, i, current_fd, null);
|
return Event.init(Event.Type.EXTERNAL, i, current_fd, 0, null);
|
||||||
}
|
}
|
||||||
// otherwise = new message or disconnection
|
// otherwise = new message or disconnection
|
||||||
else {
|
else {
|
||||||
|
|
@ -463,26 +462,26 @@ pub const Context = struct {
|
||||||
error.ConnectionResetByPeer => {
|
error.ConnectionResetByPeer => {
|
||||||
log.warn("connection reset by peer", .{});
|
log.warn("connection reset by peer", .{});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||||
},
|
},
|
||||||
error.wrongMessageLength => {
|
error.wrongMessageLength => {
|
||||||
log.warn("wrong message length, terminating the connection", .{});
|
log.warn("wrong message length, terminating the connection", .{});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
log.warn("unmanaged error while reading a message ({})", .{err});
|
log.warn("unmanaged error while reading a message ({})", .{err});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
if (maybe_message) |m| {
|
if (maybe_message) |m| {
|
||||||
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, m);
|
return Event.init(Event.Type.MESSAGE_RX, i, current_fd, 0, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -526,21 +525,21 @@ pub const Context = struct {
|
||||||
error.BrokenPipe => {
|
error.BrokenPipe => {
|
||||||
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
log.warn("cannot send message, dest probably closed the connection ({})", .{err});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
return Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
log.warn("unmanaged error while sending a message ({})", .{err});
|
log.warn("unmanaged error while sending a message ({})", .{err});
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, null);
|
return Event.init(Event.Type.MESSAGE_TX, i, current_fd, 0, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// .revent is POLLHUP
|
// .revent is POLLHUP
|
||||||
if (fd.revents & std.os.linux.POLL.HUP > 0) {
|
if (fd.revents & std.os.linux.POLL.HUP > 0) {
|
||||||
// handle disconnection
|
// handle disconnection
|
||||||
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, null);
|
current_event = Event.init(Event.Type.DISCONNECTION, i, current_fd, 0, null);
|
||||||
try self.close(i);
|
try self.close(i);
|
||||||
return current_event;
|
return current_event;
|
||||||
}
|
}
|
||||||
|
|
@ -548,7 +547,7 @@ pub const Context = struct {
|
||||||
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
|
if ((fd.revents & std.os.linux.POLL.HUP > 0) or
|
||||||
(fd.revents & std.os.linux.POLL.NVAL > 0))
|
(fd.revents & std.os.linux.POLL.NVAL > 0))
|
||||||
{
|
{
|
||||||
return Event.init(Event.Type.ERROR, i, current_fd, null);
|
return Event.init(Event.Type.ERROR, i, current_fd, 0, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,23 +48,26 @@ pub const Event = struct {
|
||||||
t: Event.Type,
|
t: Event.Type,
|
||||||
index: usize,
|
index: usize,
|
||||||
origin: i32, // socket fd
|
origin: i32, // socket fd
|
||||||
|
newfd: i32, // on new connection, tell the new client's socket fd
|
||||||
m: ?Message, // message
|
m: ?Message, // message
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
|
pub fn init(t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) Self {
|
||||||
return Self{
|
return Self{
|
||||||
.t = t,
|
.t = t,
|
||||||
.index = index,
|
.index = index,
|
||||||
.origin = origin,
|
.origin = origin,
|
||||||
|
.newfd = newfd,
|
||||||
.m = m,
|
.m = m,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
|
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, newfd: i32, m: ?Message) void {
|
||||||
self.t = t;
|
self.t = t;
|
||||||
self.index = index;
|
self.index = index;
|
||||||
self.origin = origin;
|
self.origin = origin;
|
||||||
|
self.newfd = newfd;
|
||||||
self.m = m;
|
self.m = m;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,6 +75,7 @@ pub const Event = struct {
|
||||||
self.t = Event.Type.ERROR;
|
self.t = Event.Type.ERROR;
|
||||||
self.index = @as(usize, 0);
|
self.index = @as(usize, 0);
|
||||||
self.origin = @as(i32, 0);
|
self.origin = @as(i32, 0);
|
||||||
|
self.newfd = @as(i32, 0);
|
||||||
if (self.m) |message| {
|
if (self.m) |message| {
|
||||||
message.deinit();
|
message.deinit();
|
||||||
}
|
}
|
||||||
|
|
@ -79,7 +83,7 @@ pub const Event = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||||
try fmt.format(out_stream, "{}, origin: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.index, self.m });
|
try fmt.format(out_stream, "{}, origin: {}, newfd: {}, index {}, message: [{?}]", .{ self.t, self.origin, self.newfd, self.index, self.m });
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -92,7 +96,7 @@ test "Event - creation and display" {
|
||||||
const s = "hello!!";
|
const s = "hello!!";
|
||||||
var m = try Message.init(1, allocator, s); // fd type payload
|
var m = try Message.init(1, allocator, s); // fd type payload
|
||||||
defer m.deinit();
|
defer m.deinit();
|
||||||
const e = Event.init(Event.Type.CONNECTION, 5, 8, m); // type index origin message
|
const e = Event.init(Event.Type.CONNECTION, 5, 8, 4, m); // type index origin message
|
||||||
|
|
||||||
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
try print_eq("event.Event.Type.CONNECTION, origin: 8, newfd: 4, index 5, message: [fd: 1, payload: [hello!!]]", e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -78,8 +78,8 @@ test {
|
||||||
pub fn send_fd(sockfd: posix.socket_t, msg: []const u8, fd: posix.fd_t) void {
|
pub fn send_fd(sockfd: posix.socket_t, msg: []const u8, fd: posix.fd_t) void {
|
||||||
var iov = [_]posix.iovec_const{
|
var iov = [_]posix.iovec_const{
|
||||||
.{
|
.{
|
||||||
.iov_base = msg.ptr,
|
.base = msg.ptr,
|
||||||
.iov_len = msg.len,
|
.len = msg.len,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -191,7 +191,7 @@ pub fn receive_fd(sockfd: posix.socket_t, buffer: []u8, msg_size: *usize) !posix
|
||||||
var msg_buffer = [_]u8{0} ** 1500;
|
var msg_buffer = [_]u8{0} ** 1500;
|
||||||
|
|
||||||
var iov = [_]posix.iovec{
|
var iov = [_]posix.iovec{
|
||||||
.{ .iov_base = msg_buffer[0..], .iov_len = msg_buffer.len },
|
.{ .base = msg_buffer[0..], .len = msg_buffer.len },
|
||||||
};
|
};
|
||||||
|
|
||||||
var cmsg = Cmsghdr(posix.fd_t).init(.{
|
var cmsg = Cmsghdr(posix.fd_t).init(.{
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,13 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.WriteError!void {
|
pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.posix.WriteError!void {
|
||||||
// Print a header.
|
// Print a header.
|
||||||
if (header.len > 0) {
|
if (header.len > 0) {
|
||||||
var hdr: [64]u8 = undefined;
|
var hdr: [64]u8 = undefined;
|
||||||
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
|
const offset: usize = (hdr.len / 2) - ((header.len / 2) - 1);
|
||||||
|
|
||||||
@memset(hdr[0..hdr.len], ' ');
|
@memset(hdr[0..hdr.len], ' ');
|
||||||
std.mem.copy(u8, hdr[offset..hdr.len], header);
|
std.mem.copyForwards(u8, hdr[offset..hdr.len], header);
|
||||||
|
|
||||||
try stream.writeAll(hdr[0..hdr.len]);
|
try stream.writeAll(hdr[0..hdr.len]);
|
||||||
try stream.writeAll("\n");
|
try stream.writeAll("\n");
|
||||||
|
|
@ -72,6 +72,17 @@ pub fn hexdump(stream: anytype, header: []const u8, buffer: []const u8) std.os.W
|
||||||
try stream.writeAll("\n");
|
try stream.writeAll("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Debug function: print some bytes' hexdump on standard output.
|
||||||
|
pub fn print_hex(title: []const u8, buffer : []const u8) !void
|
||||||
|
{
|
||||||
|
const stdout = std.io.getStdOut().writer();
|
||||||
|
var hexbuf: [100_000]u8 = undefined;
|
||||||
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
|
const hexwriter = hexfbs.writer();
|
||||||
|
try hexdump(hexwriter, title, buffer);
|
||||||
|
try stdout.print("{s}\n", .{hexfbs.getWritten()});
|
||||||
|
}
|
||||||
|
|
||||||
const print = std.debug.print;
|
const print = std.debug.print;
|
||||||
|
|
||||||
test "36-byte hexdump test" {
|
test "36-byte hexdump test" {
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ const fmt = std.fmt;
|
||||||
|
|
||||||
const print_eq = @import("./util.zig").print_eq;
|
const print_eq = @import("./util.zig").print_eq;
|
||||||
|
|
||||||
|
const payload_header_length = 4;
|
||||||
pub const Messages = std.ArrayList(Message);
|
pub const Messages = std.ArrayList(Message);
|
||||||
|
|
||||||
pub const Message = struct {
|
pub const Message = struct {
|
||||||
|
|
@ -28,17 +29,17 @@ pub const Message = struct {
|
||||||
var reader = fbs.reader();
|
var reader = fbs.reader();
|
||||||
|
|
||||||
const msg_len = try reader.readInt(u32, .big);
|
const msg_len = try reader.readInt(u32, .big);
|
||||||
if (msg_len > buffer.len - 4) {
|
if (msg_len > buffer.len - payload_header_length) {
|
||||||
return error.wrongMessageLength;
|
return error.wrongMessageLength;
|
||||||
}
|
}
|
||||||
const msg_payload = buffer[4 .. 4 + msg_len];
|
const msg_payload = buffer[payload_header_length .. payload_header_length + msg_len];
|
||||||
|
|
||||||
return try Message.init(fd, allocator, msg_payload);
|
return try Message.init(fd, allocator, msg_payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write(self: Self, writer: anytype) !usize {
|
pub fn write(self: Self, writer: anytype) !usize {
|
||||||
try writer.writeInt(u32, @as(u32, @truncate(self.payload.len)), .big);
|
try writer.writeInt(u32, @as(u32, @truncate(self.payload.len)), .big);
|
||||||
return 4 + try writer.write(self.payload);
|
return payload_header_length + try writer.write(self.payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||||
|
|
|
||||||
|
|
@ -40,10 +40,12 @@ pub const SwitchDB = struct {
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
db: std.AutoArrayHashMap(i32, ManagedConnection),
|
db: std.AutoArrayHashMap(i32, ManagedConnection),
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
|
||||||
pub fn init(allocator: Allocator) Self {
|
pub fn init(allocator: Allocator) Self {
|
||||||
return Self{
|
return Self{
|
||||||
.db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator),
|
.db = std.AutoArrayHashMap(i32, ManagedConnection).init(allocator),
|
||||||
|
.allocator = allocator,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,11 +65,12 @@ pub const SwitchDB = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_callbacks(self: *Self, fd: i32,
|
pub fn set_callbacks(self: *Self, fd: i32,
|
||||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
|
in: ?*const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8,
|
||||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
|
out: ?*const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8) !void {
|
||||||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||||
managedconnection.in = in;
|
if (in) |f| { managedconnection.in = f; }
|
||||||
managedconnection.out = out;
|
if (out) |f| { managedconnection.out = f; }
|
||||||
|
try self.db.put(fd, managedconnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dig the "db" hashmap, perform "in" fn, may provide a message.
|
/// Dig the "db" hashmap, perform "in" fn, may provide a message.
|
||||||
|
|
@ -77,7 +80,7 @@ pub const SwitchDB = struct {
|
||||||
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
|
||||||
|
|
||||||
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
|
||||||
var message_size: u32 = @truncate(buffer.len);
|
var message_size: usize = buffer.len;
|
||||||
const r: CBEventType = @enumFromInt(managedconnection.in(fd, &buffer, &message_size));
|
const r: CBEventType = @enumFromInt(managedconnection.in(fd, &buffer, &message_size));
|
||||||
|
|
||||||
switch (r) {
|
switch (r) {
|
||||||
|
|
@ -87,9 +90,10 @@ pub const SwitchDB = struct {
|
||||||
},
|
},
|
||||||
CBEventType.NO_ERROR => {
|
CBEventType.NO_ERROR => {
|
||||||
// TODO: read message
|
// TODO: read message
|
||||||
// TODO: better allocator?
|
|
||||||
// TODO: better errors?
|
// TODO: better errors?
|
||||||
const message: Message = Message.read(managedconnection.dest, buffer[0..message_size], std.heap.c_allocator) catch {
|
const message: Message = Message.read(managedconnection.dest
|
||||||
|
, buffer[0..message_size]
|
||||||
|
, self.allocator) catch {
|
||||||
return error.generic;
|
return error.generic;
|
||||||
};
|
};
|
||||||
return message;
|
return message;
|
||||||
|
|
@ -120,7 +124,7 @@ pub const SwitchDB = struct {
|
||||||
_ = message.write(writer) catch return error.generic;
|
_ = message.write(writer) catch return error.generic;
|
||||||
const written = fbs.getWritten();
|
const written = fbs.getWritten();
|
||||||
|
|
||||||
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, @truncate(written.len)));
|
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, written.len));
|
||||||
|
|
||||||
switch (r) {
|
switch (r) {
|
||||||
// The message should be ignored (protocol specific).
|
// The message should be ignored (protocol specific).
|
||||||
|
|
@ -145,26 +149,26 @@ pub const SwitchDB = struct {
|
||||||
var message: ?Message = null;
|
var message: ?Message = null;
|
||||||
message = self.read(fd) catch |err| switch (err) {
|
message = self.read(fd) catch |err| switch (err) {
|
||||||
error.closeFD => {
|
error.closeFD => {
|
||||||
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
|
||||||
},
|
},
|
||||||
error.unregisteredFD, error.generic => {
|
error.unregisteredFD, error.generic => {
|
||||||
return Event.init(Event.Type.ERROR, index, fd, null);
|
return Event.init(Event.Type.ERROR, index, fd, 0, null);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
return Event.init(Event.Type.SWITCH_RX, index, fd, message);
|
return Event.init(Event.Type.SWITCH_RX, index, fd, 0, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
|
pub fn handle_event_write(self: *Self, index: usize, message: Message) Event {
|
||||||
const fd = message.fd;
|
const fd = message.fd;
|
||||||
self.write(message) catch |err| switch (err) {
|
self.write(message) catch |err| switch (err) {
|
||||||
error.closeFD => {
|
error.closeFD => {
|
||||||
return Event.init(Event.Type.DISCONNECTION, index, fd, null);
|
return Event.init(Event.Type.DISCONNECTION, index, fd, 0, null);
|
||||||
},
|
},
|
||||||
error.unregisteredFD, error.generic => {
|
error.unregisteredFD, error.generic => {
|
||||||
return Event.init(Event.Type.ERROR, index, fd, null);
|
return Event.init(Event.Type.ERROR, index, fd, 0, null);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
return Event.init(Event.Type.SWITCH_TX, index, fd, null);
|
return Event.init(Event.Type.SWITCH_TX, index, fd, 0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Simple wrapper around self.db.get.
|
/// Simple wrapper around self.db.get.
|
||||||
|
|
@ -182,8 +186,8 @@ pub const SwitchDB = struct {
|
||||||
|
|
||||||
const ManagedConnection = struct {
|
const ManagedConnection = struct {
|
||||||
dest: i32,
|
dest: i32,
|
||||||
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 = default_in,
|
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 = default_in,
|
||||||
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 = default_out,
|
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 = default_out,
|
||||||
};
|
};
|
||||||
|
|
||||||
test "creation and display" {
|
test "creation and display" {
|
||||||
|
|
@ -201,18 +205,18 @@ test "creation and display" {
|
||||||
try print_eq("{ (5,6)(6,5) }", .{switchdb});
|
try print_eq("{ (5,6)(6,5) }", .{switchdb});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn successful_in(_: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
fn successful_in(_: i32, mcontent: [*]u8, mlen: *usize) CBEventType {
|
||||||
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
|
var m = Message.init(8, std.heap.c_allocator, "coucou") catch unreachable;
|
||||||
defer m.deinit();
|
defer m.deinit();
|
||||||
|
|
||||||
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
|
var fbs = std.io.fixedBufferStream(mcontent[0..mlen.*]);
|
||||||
const writer = fbs.writer();
|
const writer = fbs.writer();
|
||||||
const bytes_written = m.write(writer) catch unreachable;
|
const bytes_written = m.write(writer) catch unreachable;
|
||||||
mlen.* = @truncate(bytes_written);
|
mlen.* = bytes_written;
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn successful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
|
fn successful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -251,11 +255,11 @@ test "successful exchanges" {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unsuccessful_in(_: i32, _: [*]const u8, _: *u32) CBEventType {
|
fn unsuccessful_in(_: i32, _: [*]const u8, _: *usize) CBEventType {
|
||||||
return CBEventType.ERROR;
|
return CBEventType.ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unsuccessful_out(_: i32, _: [*]const u8, _: u32) CBEventType {
|
fn unsuccessful_out(_: i32, _: [*]const u8, _: usize) CBEventType {
|
||||||
return CBEventType.ERROR;
|
return CBEventType.ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -307,7 +311,7 @@ test "nuke 'em" {
|
||||||
try testing.expect(switchdb.db.count() == 0);
|
try testing.expect(switchdb.db.count() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
|
fn default_in(origin: i32, mcontent: [*]u8, mlen: *usize) callconv(.C) u8 {
|
||||||
// This may be kinda hacky, idk.
|
// This may be kinda hacky, idk.
|
||||||
var stream: net.Stream = .{ .handle = origin };
|
var stream: net.Stream = .{ .handle = origin };
|
||||||
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return @intFromEnum(CBEventType.ERROR);
|
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return @intFromEnum(CBEventType.ERROR);
|
||||||
|
|
@ -318,12 +322,12 @@ fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
|
||||||
return @intFromEnum(CBEventType.FD_CLOSING);
|
return @intFromEnum(CBEventType.FD_CLOSING);
|
||||||
}
|
}
|
||||||
|
|
||||||
mlen.* = @truncate(packet_size);
|
mlen.* = packet_size;
|
||||||
|
|
||||||
return @intFromEnum(CBEventType.NO_ERROR);
|
return @intFromEnum(CBEventType.NO_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 {
|
fn default_out(fd: i32, mcontent: [*]const u8, mlen: usize) callconv(.C) u8 {
|
||||||
// Message contains the fd, no need to search for the right structure to copy,
|
// Message contains the fd, no need to search for the right structure to copy,
|
||||||
// let's just recreate a Stream from the fd.
|
// let's just recreate a Stream from the fd.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue