commit dcc8e7bfd98ce2d0cc8c6387a793fcf9476be0f6 Author: Philippe Pittoli Date: Mon Feb 6 14:27:35 2023 +0100 First LibIPC examples: zig applications (pong, pongd, tcpd, ipcd). diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..44dbeed --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +zig-cache +zig-out \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..a0e5d37 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "zig/lib/ipc"] + path = zig/lib/ipc + url = https://git.baguette.netlib.re/Baguette/libipc.git diff --git a/zig/build.zig b/zig/build.zig new file mode 100644 index 0000000..a85554f --- /dev/null +++ b/zig/build.zig @@ -0,0 +1,61 @@ +const std = @import("std"); + +fn link_and_install(exe : *std.Build.CompileStep) void { + const source_file = std.Build.FileSource.relative("./lib/ipc/src/main.zig"); + // var empty : []const std.Build.ModuleDependency = &.{}; + exe.addAnonymousModule("ipc", .{ .source_file = source_file, .dependencies = &.{} }); + exe.linkLibC(); + + // This declares intent for the executable to be installed into the + // standard location when the user invokes the "install" step (the default + // step when running `zig build`). + exe.install(); +} + +// Although this function looks imperative, note that its job is to +// declaratively construct a build graph that will be executed by an external +// runner. +pub fn build(b: *std.Build) void { + // Standard target options allows the person running `zig build` to choose + // what target to build for. Here we do not override the defaults, which + // means any target is allowed, and the default is native. Other options + // for restricting supported target set are available. + const target = b.standardTargetOptions(.{}); + + // Standard optimization options allow the person running `zig build` to select + // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not + // set a preferred release mode, allowing the user to decide how to optimize. + const optimize = b.standardOptimizeOption(.{}); + + const ipcd_exe = b.addExecutable(.{ + .name = "ipcd", + .root_source_file = .{ .path = "src/ipcd.zig" }, + .target = target, + .optimize = optimize, + }); + link_and_install(ipcd_exe); + + const tcpd_exe = b.addExecutable(.{ + .name = "tcpd", + .root_source_file = .{ .path = "src/tcpd.zig" }, + .target = target, + .optimize = optimize, + }); + link_and_install(tcpd_exe); + + const pong_exe = b.addExecutable(.{ + .name = "pong", + .root_source_file = .{ .path = "src/pong.zig" }, + .target = target, + .optimize = optimize, + }); + link_and_install(pong_exe); + + const pongd_exe = b.addExecutable(.{ + .name = "pongd", + .root_source_file = .{ .path = "src/pongd.zig" }, + .target = target, + .optimize = optimize, + }); + link_and_install(pongd_exe); +} diff --git a/zig/lib/ipc b/zig/lib/ipc new file mode 160000 index 0000000..64506bb --- /dev/null +++ b/zig/lib/ipc @@ -0,0 +1 @@ +Subproject commit 64506bbee0d691b3a0c6d5d97894426c188bc3df diff --git a/zig/makefile b/zig/makefile new file mode 100644 index 0000000..def1897 --- /dev/null +++ b/zig/makefile @@ -0,0 +1,9 @@ +all: build + +update-libipc: + git submodule update + +ZIG_OPTS ?= +ZIG_OPTIM ?= ReleaseSafe +build: + zig build -Doptimize=$(ZIG_OPTIM) $(ZIG_OPTS) diff --git a/zig/src/ipcd.zig b/zig/src/ipcd.zig new file mode 100644 index 0000000..1ed09f1 --- /dev/null +++ b/zig/src/ipcd.zig @@ -0,0 +1,226 @@ +const std = @import("std"); +const net = std.net; +const fmt = std.fmt; +const os = std.os; + +const ipc = @import("ipc"); +const hexdump = ipc.hexdump; +const Message = ipc.Message; + +// Import send_fd this way in order to produce docs for exchange-fd functions. +const exchange_fd = ipc.exchangefd; +const send_fd = exchange_fd.send_fd; + +const builtin = @import("builtin"); +const native_os = builtin.target.os.tag; +const print = std.debug.print; +const testing = std.testing; +const print_eq = ipc.util.print_eq; +const URI = ipc.util.URI; + +// Standard library is unecessary complex regarding networking. +// libipc drops it and uses plain old file descriptors instead. + +// API should completely obfuscate the inner structures. +// Only libipc structures should be necessary to write any networking code, +// users should only work with Context and Message, mostly. + +// QUESTION: should libipc use std.fs.path and not simple [] const u8? + +fn create_service() !void { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var ctx = try ipc.Context.init(allocator); + defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful? + + // SERVER SIDE: creating a service. + _ = try ctx.server_init("ipc"); + + // signal handler, to quit when asked + const S = struct { + var should_quit: bool = false; + + fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void { + print ("A signal has been received: {}\n", .{sig}); + // Check that we received the correct signal. + switch (native_os) { + .netbsd => { + if (sig != os.SIG.HUP or sig != info.info.signo) + return; + }, + else => { + if (sig != os.SIG.HUP and sig != info.signo) + return; + }, + } + should_quit = true; + } + }; + + var sa = os.Sigaction{ + .handler = .{ .sigaction = &S.handler }, + .mask = os.empty_sigset, // Do not mask any signal. + .flags = os.SA.SIGINFO, + }; + + // Quit on SIGHUP (kill -1). + try os.sigaction(os.SIG.HUP, &sa, null); + + var some_event: ipc.Event = undefined; + ctx.timer = 1000; // 1 second + var count: u32 = 0; + while(! S.should_quit) { + some_event = try ctx.wait_event(); + switch (some_event.t) { + .TIMER => { + print("\rTimer! ({})", .{count}); + count += 1; + }, + + .CONNECTION => { + print("New connection: {} so far!\n", .{ctx.pollfd.items.len}); + }, + + .DISCONNECTION => { + print("User {} disconnected, {} remainaing.\n" + , .{some_event.origin, ctx.pollfd.items.len}); + }, + + .EXTERNAL => { + print("Message received from a non IPC socket.\n", .{}); + print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{}); + break; + }, + + .SWITCH_RX => { + print("Message has been received (SWITCH).\n", .{}); + print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{}); + break; + }, + + .SWITCH_TX => { + print("Message has been sent (SWITCH).\n", .{}); + print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{}); + break; + }, + + .MESSAGE_RX => { + print("Client asking for a service through ipcd.\n", .{}); + defer ctx.close_fd (some_event.origin) catch {}; + if (some_event.m) |m| { + print("{}\n", .{m}); + defer m.deinit(); // Do not forget to free the message payload. + + // 1. split message + var iterator = std.mem.split(u8, m.payload, ";"); + var service_to_contact = iterator.first(); + // print("service to contact: {s}\n", .{service_to_contact}); + var final_destination: ?[]const u8 = null; + + // 2. find relevant part of the message + while (iterator.next()) |next| { + // print("next part: {s}\n", .{next}); + var iterator2 = std.mem.split(u8, next, " "); + var sname = iterator2.first(); + var target = iterator2.next(); + if (target) |t| { + // print ("sname: {s} - target: {s}\n", .{sname, t}); + if (std.mem.eql(u8, service_to_contact, sname)) { + final_destination = t; + } + } + else { + print("ERROR: no target in: {s}\n", .{next}); + } + } + + // 3. connect whether asked to and send a message + if (final_destination) |dest| { + print("Connecting to {s} (service requested: {s})\n" + , .{dest, service_to_contact}); + + var uri = URI.read(dest); + + // 1. in case there is no URI + if (std.mem.eql(u8, uri.protocol, dest)) { + var newfd = try ctx.connect_service (dest); + send_fd (some_event.origin, "ok", newfd); + try ctx.close_fd (newfd); + } + else if (std.mem.eql(u8, uri.protocol, "unix")) { + var newfd = try ctx.connect_service (uri.address); + send_fd (some_event.origin, "ok", newfd); + try ctx.close_fd (newfd); + } + // 2. else, contact d or directly the dest in case there is none. + else { + var servicefd = try ctx.connect_service (uri.protocol); + defer ctx.close_fd (servicefd) catch {}; + // TODO: make a simple protocol between IPCd and d + // NEED inform about the connection (success or fail) + // FIRST DRAFT: + // - IPCd: send a message containing the destination + // - PROTOCOLd: send "ok" to inform the connection is established + // - PROTOCOLd: send "no" in case there was an error + + var message = try Message.init(servicefd, allocator, dest); + defer message.deinit(); + try ctx.write(message); + var response_from_service = try ctx.read_fd(servicefd); + if (response_from_service) |r| { + defer r.deinit(); + if (std.mem.eql(u8, r.payload, "ok")) { + // OK + // print("service has established the connection\n", .{}); + send_fd (some_event.origin, "ok", servicefd); + } + else if (std.mem.eql(u8, r.payload, "ne")) { + // PROBLEM + print("service cannot establish the connection\n", .{}); + // TODO + } + else { + print("service isn't working properly, its response is: {s}\n", .{r.payload}); + // TODO + } + } + else { + // No message = should be handled as a disconnection. + print("No response from service: let's drop everything\n", .{}); + } + } + } + } + else { + // There is a problem: ipcd was contacted without providing + // a message, meaning there is nothing to do. This should be + // explicitely warned about. + var response = try Message.init(some_event.origin + , allocator + , "lookup message without data"); + defer response.deinit(); + try ctx.write(response); + } + }, + + .MESSAGE_TX => { + print("Message sent.\n", .{}); + }, + + .ERROR => { + print("A problem occured, event: {}, let's suicide\n", .{some_event}); + break; + }, + } + } + + print("Goodbye\n", .{}); +} + +pub fn main() !u8 { + try create_service(); + return 0; +} diff --git a/zig/src/pong.zig b/zig/src/pong.zig new file mode 100644 index 0000000..9553b68 --- /dev/null +++ b/zig/src/pong.zig @@ -0,0 +1,71 @@ +const std = @import("std"); +const net = std.net; +const fmt = std.fmt; +const os = std.os; + +const print = std.debug.print; + +const ipc = @import("ipc"); +const hexdump = ipc.hexdump; +const Message = ipc.Message; + +pub fn main() !u8 { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var ctx = try ipc.Context.init(allocator); + defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful? + + // The service to contact, either provided with the SERVICE envvar + // or simply using "pong". + var should_free_service_to_contact: bool = true; + var service_to_contact = std.process.getEnvVarOwned(allocator, "SERVICE") catch blk: { + should_free_service_to_contact = false; + break :blk "pong"; + }; + defer { + if (should_free_service_to_contact) + allocator.free(service_to_contact); + } + + var pongfd = try ctx.connect_ipc(service_to_contact); + var message = try Message.init(pongfd, allocator, "bounce me"); + try ctx.schedule(message); + + var some_event: ipc.Event = undefined; + ctx.timer = 2000; // 2 seconds + while(true) { + some_event = try ctx.wait_event(); + switch (some_event.t) { + .TIMER => { + print("Timer!\n", .{}); + }, + + .MESSAGE_RX => { + if (some_event.m) |m| { + print("message has been bounced: {}\n", .{m}); + m.deinit(); + break; + } + else { + print("Received empty message, ERROR.\n", .{}); + break; + } + }, + + .MESSAGE_TX => { + print("Message sent.\n", .{}); + }, + + else => { + print("Unexpected event: {}, let's suicide\n", .{some_event}); + break; + }, + } + } + + print("Goodbye\n", .{}); + return 0; +} diff --git a/zig/src/pongd.zig b/zig/src/pongd.zig new file mode 100644 index 0000000..454c0b2 --- /dev/null +++ b/zig/src/pongd.zig @@ -0,0 +1,111 @@ +const std = @import("std"); +const os = std.os; + +const ipc = @import("ipc"); +const hexdump = ipc.hexdump; +const Message = ipc.Message; +const util = ipc.util; + +// Import send_fd this way in order to produce docs for exchange-fd functions. +const exchange_fd = ipc.exchangefd; +const send_fd = exchange_fd.send_fd; + +const builtin = @import("builtin"); +const native_os = builtin.target.os.tag; +const print = std.debug.print; +const testing = std.testing; + +fn create_service() !void { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var ctx = try ipc.Context.init(allocator); + defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful? + + // SERVER SIDE: creating a service. + _ = try ctx.server_init("pong"); + + // signal handler, to quit when asked + const S = struct { + var should_quit: bool = false; + + fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void { + print ("A signal has been received: {}\n", .{sig}); + // Check that we received the correct signal. + switch (native_os) { + .netbsd => { + if (sig != os.SIG.HUP or sig != info.info.signo) + return; + }, + else => { + if (sig != os.SIG.HUP and sig != info.signo) + return; + }, + } + should_quit = true; + } + }; + + var sa = os.Sigaction{ + .handler = .{ .sigaction = &S.handler }, + .mask = os.empty_sigset, // Do not mask any signal. + .flags = os.SA.SIGINFO, + }; + + // Quit on SIGHUP (kill -1). + try os.sigaction(os.SIG.HUP, &sa, null); + + var some_event: ipc.Event = undefined; + ctx.timer = 20000; // 2 seconds + var count: u32 = 0; + while(! S.should_quit) { + some_event = try ctx.wait_event(); + switch (some_event.t) { + .TIMER => { + print("\rTimer! ({})", .{count}); + count += 1; + }, + + .CONNECTION => { + print("New connection: {} so far!\n", .{ctx.pollfd.items.len}); + }, + + .DISCONNECTION => { + print("User {} disconnected, {} remainaing.\n" + , .{some_event.origin, ctx.pollfd.items.len}); + }, + + .MESSAGE_RX => { + if (some_event.m) |m| { + print("New message ({} bytes)\n", .{m.payload.len}); + util.print_message ("RECEIVED MESSAGE", m); + print("Echoing it...\n", .{}); + try ctx.schedule(m); + } + else { + print("Error while receiving new message.\n", .{}); + print("Ignoring...\n", .{}); + } + }, + + .MESSAGE_TX => { + print("Message sent.\n", .{}); + }, + + else => { + print("Error: unexpected event: {}\n", .{some_event}); + print("Let's suicide.\n", .{}); + break; + }, + } + } + + print("Goodbye\n", .{}); +} + +pub fn main() !u8 { + try create_service(); + return 0; +} diff --git a/zig/src/tcpd.zig b/zig/src/tcpd.zig new file mode 100644 index 0000000..7505cf3 --- /dev/null +++ b/zig/src/tcpd.zig @@ -0,0 +1,260 @@ +const std = @import("std"); +const net = std.net; +const fmt = std.fmt; +const os = std.os; +const testing = std.testing; +const print = std.debug.print; + +const ipc = @import("ipc"); +const hexdump = ipc.hexdump; +const Message = ipc.Message; + +// Import send_fd this way in order to produce docs for exchange-fd functions. +const exchange_fd = ipc.exchangefd; +const send_fd = exchange_fd.send_fd; + +const builtin = @import("builtin"); +const native_os = builtin.target.os.tag; +const print_eq = ipc.util.print_eq; +const URI = ipc.util.URI; + +fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 { + var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) { + error.EnvironmentVariableNotFound => blk: { + print ("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{}); + break :blk try allocator.dupe(u8, "127.0.0.1:9000"); + }, + else => { return err; }, + }; + defer allocator.free(address); + + var iterator = std.mem.split(u8, address, ":"); + var real_tcp_address = iterator.first(); + var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); + + print ("TCP address [{s}] port [{}]\n", .{real_tcp_address, real_tcp_port}); + + server.* = net.StreamServer.init(.{.reuse_address = true}); + var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port); + try server.listen(socket_addr); + + const newfd = server.sockfd orelse return error.SocketLOL; // TODO + return newfd; +} + +fn create_service() !void { + const config = .{.safety = true}; + var gpa = std.heap.GeneralPurposeAllocator(config){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var ctx = try ipc.Context.init(allocator); + defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful? + + // SERVER SIDE: creating a service. + var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) { + error.EnvironmentVariableNotFound => blk: { + print ("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{}); + break :blk try allocator.dupe(u8, "tcp"); + }, + else => { return err; }, + }; + defer allocator.free(service_name); + + _ = try ctx.server_init(service_name); + + // signal handler, to quit when asked + const S = struct { + var should_quit: bool = false; + + fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void { + print ("A signal has been received: {}\n", .{sig}); + // Check that we received the correct signal. + switch (native_os) { + .netbsd => { + if (sig != os.SIG.HUP or sig != info.info.signo) + return; + }, + else => { + if (sig != os.SIG.HUP and sig != info.signo) + return; + }, + } + should_quit = true; + } + }; + + var sa = os.Sigaction{ + .handler = .{ .sigaction = &S.handler }, + .mask = os.empty_sigset, // Do not mask any signal. + .flags = os.SA.SIGINFO, + }; + + // Quit on SIGHUP (kill -1). + try os.sigaction(os.SIG.HUP, &sa, null); + + var server: net.StreamServer = undefined; + var serverfd = try init_tcp_server(allocator, &server); + try ctx.add_external (serverfd); + + var some_event: ipc.Event = undefined; + var previous_event: ipc.Event.Type = ipc.Event.Type.ERROR; + ctx.timer = 1000; // 1 second + var count: u32 = 0; + while(! S.should_quit) { + some_event = try ctx.wait_event(); + + // For clarity in the output. + if (some_event.t != .TIMER and previous_event == .TIMER ) { print("\n", .{}); } + previous_event = some_event.t; + + switch (some_event.t) { + .TIMER => { + print ("\rTimer! ({})", .{count}); + count += 1; + }, + + .CONNECTION => { + print ("New connection: {} so far!\n", .{ctx.pollfd.items.len}); + }, + + .DISCONNECTION => { + print ("User {} disconnected, {} remaining.\n" + , .{some_event.origin, ctx.pollfd.items.len}); + }, + + .EXTERNAL => { + print ("Message received from a non IPC socket.\n", .{}); + var client = try server.accept(); // net.StreamServer.Connection + errdefer client.stream.close(); + // Receiving a new client from the EXTERNAL socket. + // New client = new switch from a distant TCP connection to a + // local libipc service. + + var buffer: [10000]u8 = undefined; + var size = try client.stream.read(&buffer); + var service_to_contact = buffer[0..size]; + + if (service_to_contact.len == 0) { + print("Error, no service provided, closing the connection.\n", .{}); + client.stream.close(); + continue; + } + + print ("Ask to connect to service [{s}].\n", .{service_to_contact}); + var servicefd = ctx.connect_service (service_to_contact) catch |err| { + print("Error while connecting to the service {s}: {}.\n" + , .{service_to_contact, err}); + print ("Closing the connection.\n", .{}); + client.stream.close(); + continue; + }; + errdefer ctx.close_fd (servicefd) catch {}; + + print ("Send a message to inform remote TCPd that the connection is established.\n", .{}); + _ = try client.stream.write("ok"); + + print ("Add current client as external connection (for now).\n", .{}); + try ctx.add_external (client.stream.handle); + + print ("Message sent, switching.\n", .{}); + try ctx.add_switch(client.stream.handle, servicefd); + + print ("DONE.\n", .{}); + + // Some protocols will require to change the default functions + // to read and to write on the client socket. + // Function to call: ctx.set_switch_callbacks(clientfd, infn, outfn); + }, + + .SWITCH_RX => { + print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin}); + // if (some_event.m) |m| { + // var hexbuf: [4000]u8 = undefined; + // var hexfbs = std.io.fixedBufferStream(&hexbuf); + // var hexwriter = hexfbs.writer(); + // try hexdump.hexdump(hexwriter, "Received", m.payload); + // print("{s}\n", .{hexfbs.getWritten()}); + // } + // else { + // print ("Message received without actually a message?! {}", .{some_event}); + // } + }, + + .SWITCH_TX => { + print ("Message has been sent (SWITCH fd {}).\n", .{some_event.origin}); + }, + + .MESSAGE_RX => { + print ("Client asking for a service through TCPd.\n", .{}); + errdefer ctx.close (some_event.index) catch {}; + if (some_event.m) |m| { + defer m.deinit(); // Do not forget to free the message payload. + + print ("URI to contact {s}\n", .{m.payload}); + var uri = URI.read(m.payload); + print ("proto [{s}] address [{s}] path [{s}]\n" + , .{uri.protocol, uri.address, uri.path}); + + var iterator = std.mem.split(u8, uri.address, ":"); + var real_tcp_address = iterator.first(); + var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); + + var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port); + var stream = try net.tcpConnectToAddress(socket_addr); + errdefer stream.close(); + + print ("Writing URI PATH: {s}\n", .{uri.path}); + _ = try stream.write(uri.path); + + print ("Writing URI PATH - written, waiting for the final 'ok'.\n", .{}); + var buffer: [10000]u8 = undefined; + var size = try stream.read(&buffer); + if (! std.mem.eql(u8, buffer[0..size], "ok")) { + print ("didn't receive 'ok', let's kill the connection\n", .{}); + stream.close(); + try ctx.close(some_event.index); + continue; + } + print ("Final 'ok' received, sending 'ok' to IPCd.\n", .{}); + + // Connection is established, inform IPCd. + var response = try Message.init(some_event.origin, allocator, "ok"); + defer response.deinit(); + try ctx.write(response); + + print ("Add current client as external connection (for now).\n", .{}); + try ctx.add_external (stream.handle); + + print ("Finally, add switching\n", .{}); + try ctx.add_switch(some_event.origin, stream.handle); + // Could invoke ctx.set_switch_callbacks but TCP sockets are + // working pretty well with default functions. + } + else { + // TCPd was contacted without providing a message, nothing to do. + var response = try Message.init(some_event.origin, allocator, "no"); + defer response.deinit(); + try ctx.write(response); + try ctx.close(some_event.index); + } + }, + + .MESSAGE_TX => { + print ("Message sent.\n", .{}); + }, + + .ERROR => { + print ("A problem occured, event: {}, let's suicide.\n", .{some_event}); + break; + }, + } + } + + print ("Goodbye\n", .{}); +} + +pub fn main() !u8 { + try create_service(); + return 0; +}