First LibIPC examples: zig applications (pong, pongd, tcpd, ipcd).

This commit is contained in:
Philippe Pittoli 2023-02-06 14:27:35 +01:00
commit dcc8e7bfd9
9 changed files with 744 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
zig-cache
zig-out

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "zig/lib/ipc"]
path = zig/lib/ipc
url = https://git.baguette.netlib.re/Baguette/libipc.git

61
zig/build.zig Normal file
View File

@ -0,0 +1,61 @@
const std = @import("std");
fn link_and_install(exe : *std.Build.CompileStep) void {
const source_file = std.Build.FileSource.relative("./lib/ipc/src/main.zig");
// var empty : []const std.Build.ModuleDependency = &.{};
exe.addAnonymousModule("ipc", .{ .source_file = source_file, .dependencies = &.{} });
exe.linkLibC();
// This declares intent for the executable to be installed into the
// standard location when the user invokes the "install" step (the default
// step when running `zig build`).
exe.install();
}
// Although this function looks imperative, note that its job is to
// declaratively construct a build graph that will be executed by an external
// runner.
pub fn build(b: *std.Build) void {
// Standard target options allows the person running `zig build` to choose
// what target to build for. Here we do not override the defaults, which
// means any target is allowed, and the default is native. Other options
// for restricting supported target set are available.
const target = b.standardTargetOptions(.{});
// Standard optimization options allow the person running `zig build` to select
// between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not
// set a preferred release mode, allowing the user to decide how to optimize.
const optimize = b.standardOptimizeOption(.{});
const ipcd_exe = b.addExecutable(.{
.name = "ipcd",
.root_source_file = .{ .path = "src/ipcd.zig" },
.target = target,
.optimize = optimize,
});
link_and_install(ipcd_exe);
const tcpd_exe = b.addExecutable(.{
.name = "tcpd",
.root_source_file = .{ .path = "src/tcpd.zig" },
.target = target,
.optimize = optimize,
});
link_and_install(tcpd_exe);
const pong_exe = b.addExecutable(.{
.name = "pong",
.root_source_file = .{ .path = "src/pong.zig" },
.target = target,
.optimize = optimize,
});
link_and_install(pong_exe);
const pongd_exe = b.addExecutable(.{
.name = "pongd",
.root_source_file = .{ .path = "src/pongd.zig" },
.target = target,
.optimize = optimize,
});
link_and_install(pongd_exe);
}

1
zig/lib/ipc Submodule

@ -0,0 +1 @@
Subproject commit 64506bbee0d691b3a0c6d5d97894426c188bc3df

9
zig/makefile Normal file
View File

@ -0,0 +1,9 @@
all: build
update-libipc:
git submodule update
ZIG_OPTS ?=
ZIG_OPTIM ?= ReleaseSafe
build:
zig build -Doptimize=$(ZIG_OPTIM) $(ZIG_OPTS)

226
zig/src/ipcd.zig Normal file
View File

@ -0,0 +1,226 @@
const std = @import("std");
const net = std.net;
const fmt = std.fmt;
const os = std.os;
const ipc = @import("ipc");
const hexdump = ipc.hexdump;
const Message = ipc.Message;
// Import send_fd this way in order to produce docs for exchange-fd functions.
const exchange_fd = ipc.exchangefd;
const send_fd = exchange_fd.send_fd;
const builtin = @import("builtin");
const native_os = builtin.target.os.tag;
const print = std.debug.print;
const testing = std.testing;
const print_eq = ipc.util.print_eq;
const URI = ipc.util.URI;
// Standard library is unecessary complex regarding networking.
// libipc drops it and uses plain old file descriptors instead.
// API should completely obfuscate the inner structures.
// Only libipc structures should be necessary to write any networking code,
// users should only work with Context and Message, mostly.
// QUESTION: should libipc use std.fs.path and not simple [] const u8?
fn create_service() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var ctx = try ipc.Context.init(allocator);
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
// SERVER SIDE: creating a service.
_ = try ctx.server_init("ipc");
// signal handler, to quit when asked
const S = struct {
var should_quit: bool = false;
fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void {
print ("A signal has been received: {}\n", .{sig});
// Check that we received the correct signal.
switch (native_os) {
.netbsd => {
if (sig != os.SIG.HUP or sig != info.info.signo)
return;
},
else => {
if (sig != os.SIG.HUP and sig != info.signo)
return;
},
}
should_quit = true;
}
};
var sa = os.Sigaction{
.handler = .{ .sigaction = &S.handler },
.mask = os.empty_sigset, // Do not mask any signal.
.flags = os.SA.SIGINFO,
};
// Quit on SIGHUP (kill -1).
try os.sigaction(os.SIG.HUP, &sa, null);
var some_event: ipc.Event = undefined;
ctx.timer = 1000; // 1 second
var count: u32 = 0;
while(! S.should_quit) {
some_event = try ctx.wait_event();
switch (some_event.t) {
.TIMER => {
print("\rTimer! ({})", .{count});
count += 1;
},
.CONNECTION => {
print("New connection: {} so far!\n", .{ctx.pollfd.items.len});
},
.DISCONNECTION => {
print("User {} disconnected, {} remainaing.\n"
, .{some_event.origin, ctx.pollfd.items.len});
},
.EXTERNAL => {
print("Message received from a non IPC socket.\n", .{});
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
break;
},
.SWITCH_RX => {
print("Message has been received (SWITCH).\n", .{});
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
break;
},
.SWITCH_TX => {
print("Message has been sent (SWITCH).\n", .{});
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
break;
},
.MESSAGE_RX => {
print("Client asking for a service through ipcd.\n", .{});
defer ctx.close_fd (some_event.origin) catch {};
if (some_event.m) |m| {
print("{}\n", .{m});
defer m.deinit(); // Do not forget to free the message payload.
// 1. split message
var iterator = std.mem.split(u8, m.payload, ";");
var service_to_contact = iterator.first();
// print("service to contact: {s}\n", .{service_to_contact});
var final_destination: ?[]const u8 = null;
// 2. find relevant part of the message
while (iterator.next()) |next| {
// print("next part: {s}\n", .{next});
var iterator2 = std.mem.split(u8, next, " ");
var sname = iterator2.first();
var target = iterator2.next();
if (target) |t| {
// print ("sname: {s} - target: {s}\n", .{sname, t});
if (std.mem.eql(u8, service_to_contact, sname)) {
final_destination = t;
}
}
else {
print("ERROR: no target in: {s}\n", .{next});
}
}
// 3. connect whether asked to and send a message
if (final_destination) |dest| {
print("Connecting to {s} (service requested: {s})\n"
, .{dest, service_to_contact});
var uri = URI.read(dest);
// 1. in case there is no URI
if (std.mem.eql(u8, uri.protocol, dest)) {
var newfd = try ctx.connect_service (dest);
send_fd (some_event.origin, "ok", newfd);
try ctx.close_fd (newfd);
}
else if (std.mem.eql(u8, uri.protocol, "unix")) {
var newfd = try ctx.connect_service (uri.address);
send_fd (some_event.origin, "ok", newfd);
try ctx.close_fd (newfd);
}
// 2. else, contact <protocol>d or directly the dest in case there is none.
else {
var servicefd = try ctx.connect_service (uri.protocol);
defer ctx.close_fd (servicefd) catch {};
// TODO: make a simple protocol between IPCd and <protocol>d
// NEED inform about the connection (success or fail)
// FIRST DRAFT:
// - IPCd: send a message containing the destination
// - PROTOCOLd: send "ok" to inform the connection is established
// - PROTOCOLd: send "no" in case there was an error
var message = try Message.init(servicefd, allocator, dest);
defer message.deinit();
try ctx.write(message);
var response_from_service = try ctx.read_fd(servicefd);
if (response_from_service) |r| {
defer r.deinit();
if (std.mem.eql(u8, r.payload, "ok")) {
// OK
// print("service has established the connection\n", .{});
send_fd (some_event.origin, "ok", servicefd);
}
else if (std.mem.eql(u8, r.payload, "ne")) {
// PROBLEM
print("service cannot establish the connection\n", .{});
// TODO
}
else {
print("service isn't working properly, its response is: {s}\n", .{r.payload});
// TODO
}
}
else {
// No message = should be handled as a disconnection.
print("No response from service: let's drop everything\n", .{});
}
}
}
}
else {
// There is a problem: ipcd was contacted without providing
// a message, meaning there is nothing to do. This should be
// explicitely warned about.
var response = try Message.init(some_event.origin
, allocator
, "lookup message without data");
defer response.deinit();
try ctx.write(response);
}
},
.MESSAGE_TX => {
print("Message sent.\n", .{});
},
.ERROR => {
print("A problem occured, event: {}, let's suicide\n", .{some_event});
break;
},
}
}
print("Goodbye\n", .{});
}
pub fn main() !u8 {
try create_service();
return 0;
}

71
zig/src/pong.zig Normal file
View File

@ -0,0 +1,71 @@
const std = @import("std");
const net = std.net;
const fmt = std.fmt;
const os = std.os;
const print = std.debug.print;
const ipc = @import("ipc");
const hexdump = ipc.hexdump;
const Message = ipc.Message;
pub fn main() !u8 {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var ctx = try ipc.Context.init(allocator);
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
// The service to contact, either provided with the SERVICE envvar
// or simply using "pong".
var should_free_service_to_contact: bool = true;
var service_to_contact = std.process.getEnvVarOwned(allocator, "SERVICE") catch blk: {
should_free_service_to_contact = false;
break :blk "pong";
};
defer {
if (should_free_service_to_contact)
allocator.free(service_to_contact);
}
var pongfd = try ctx.connect_ipc(service_to_contact);
var message = try Message.init(pongfd, allocator, "bounce me");
try ctx.schedule(message);
var some_event: ipc.Event = undefined;
ctx.timer = 2000; // 2 seconds
while(true) {
some_event = try ctx.wait_event();
switch (some_event.t) {
.TIMER => {
print("Timer!\n", .{});
},
.MESSAGE_RX => {
if (some_event.m) |m| {
print("message has been bounced: {}\n", .{m});
m.deinit();
break;
}
else {
print("Received empty message, ERROR.\n", .{});
break;
}
},
.MESSAGE_TX => {
print("Message sent.\n", .{});
},
else => {
print("Unexpected event: {}, let's suicide\n", .{some_event});
break;
},
}
}
print("Goodbye\n", .{});
return 0;
}

111
zig/src/pongd.zig Normal file
View File

@ -0,0 +1,111 @@
const std = @import("std");
const os = std.os;
const ipc = @import("ipc");
const hexdump = ipc.hexdump;
const Message = ipc.Message;
const util = ipc.util;
// Import send_fd this way in order to produce docs for exchange-fd functions.
const exchange_fd = ipc.exchangefd;
const send_fd = exchange_fd.send_fd;
const builtin = @import("builtin");
const native_os = builtin.target.os.tag;
const print = std.debug.print;
const testing = std.testing;
fn create_service() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var ctx = try ipc.Context.init(allocator);
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
// SERVER SIDE: creating a service.
_ = try ctx.server_init("pong");
// signal handler, to quit when asked
const S = struct {
var should_quit: bool = false;
fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void {
print ("A signal has been received: {}\n", .{sig});
// Check that we received the correct signal.
switch (native_os) {
.netbsd => {
if (sig != os.SIG.HUP or sig != info.info.signo)
return;
},
else => {
if (sig != os.SIG.HUP and sig != info.signo)
return;
},
}
should_quit = true;
}
};
var sa = os.Sigaction{
.handler = .{ .sigaction = &S.handler },
.mask = os.empty_sigset, // Do not mask any signal.
.flags = os.SA.SIGINFO,
};
// Quit on SIGHUP (kill -1).
try os.sigaction(os.SIG.HUP, &sa, null);
var some_event: ipc.Event = undefined;
ctx.timer = 20000; // 2 seconds
var count: u32 = 0;
while(! S.should_quit) {
some_event = try ctx.wait_event();
switch (some_event.t) {
.TIMER => {
print("\rTimer! ({})", .{count});
count += 1;
},
.CONNECTION => {
print("New connection: {} so far!\n", .{ctx.pollfd.items.len});
},
.DISCONNECTION => {
print("User {} disconnected, {} remainaing.\n"
, .{some_event.origin, ctx.pollfd.items.len});
},
.MESSAGE_RX => {
if (some_event.m) |m| {
print("New message ({} bytes)\n", .{m.payload.len});
util.print_message ("RECEIVED MESSAGE", m);
print("Echoing it...\n", .{});
try ctx.schedule(m);
}
else {
print("Error while receiving new message.\n", .{});
print("Ignoring...\n", .{});
}
},
.MESSAGE_TX => {
print("Message sent.\n", .{});
},
else => {
print("Error: unexpected event: {}\n", .{some_event});
print("Let's suicide.\n", .{});
break;
},
}
}
print("Goodbye\n", .{});
}
pub fn main() !u8 {
try create_service();
return 0;
}

260
zig/src/tcpd.zig Normal file
View File

@ -0,0 +1,260 @@
const std = @import("std");
const net = std.net;
const fmt = std.fmt;
const os = std.os;
const testing = std.testing;
const print = std.debug.print;
const ipc = @import("ipc");
const hexdump = ipc.hexdump;
const Message = ipc.Message;
// Import send_fd this way in order to produce docs for exchange-fd functions.
const exchange_fd = ipc.exchangefd;
const send_fd = exchange_fd.send_fd;
const builtin = @import("builtin");
const native_os = builtin.target.os.tag;
const print_eq = ipc.util.print_eq;
const URI = ipc.util.URI;
fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 {
var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) {
error.EnvironmentVariableNotFound => blk: {
print ("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{});
break :blk try allocator.dupe(u8, "127.0.0.1:9000");
},
else => { return err; },
};
defer allocator.free(address);
var iterator = std.mem.split(u8, address, ":");
var real_tcp_address = iterator.first();
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
print ("TCP address [{s}] port [{}]\n", .{real_tcp_address, real_tcp_port});
server.* = net.StreamServer.init(.{.reuse_address = true});
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
try server.listen(socket_addr);
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
return newfd;
}
fn create_service() !void {
const config = .{.safety = true};
var gpa = std.heap.GeneralPurposeAllocator(config){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var ctx = try ipc.Context.init(allocator);
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
// SERVER SIDE: creating a service.
var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) {
error.EnvironmentVariableNotFound => blk: {
print ("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{});
break :blk try allocator.dupe(u8, "tcp");
},
else => { return err; },
};
defer allocator.free(service_name);
_ = try ctx.server_init(service_name);
// signal handler, to quit when asked
const S = struct {
var should_quit: bool = false;
fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void {
print ("A signal has been received: {}\n", .{sig});
// Check that we received the correct signal.
switch (native_os) {
.netbsd => {
if (sig != os.SIG.HUP or sig != info.info.signo)
return;
},
else => {
if (sig != os.SIG.HUP and sig != info.signo)
return;
},
}
should_quit = true;
}
};
var sa = os.Sigaction{
.handler = .{ .sigaction = &S.handler },
.mask = os.empty_sigset, // Do not mask any signal.
.flags = os.SA.SIGINFO,
};
// Quit on SIGHUP (kill -1).
try os.sigaction(os.SIG.HUP, &sa, null);
var server: net.StreamServer = undefined;
var serverfd = try init_tcp_server(allocator, &server);
try ctx.add_external (serverfd);
var some_event: ipc.Event = undefined;
var previous_event: ipc.Event.Type = ipc.Event.Type.ERROR;
ctx.timer = 1000; // 1 second
var count: u32 = 0;
while(! S.should_quit) {
some_event = try ctx.wait_event();
// For clarity in the output.
if (some_event.t != .TIMER and previous_event == .TIMER ) { print("\n", .{}); }
previous_event = some_event.t;
switch (some_event.t) {
.TIMER => {
print ("\rTimer! ({})", .{count});
count += 1;
},
.CONNECTION => {
print ("New connection: {} so far!\n", .{ctx.pollfd.items.len});
},
.DISCONNECTION => {
print ("User {} disconnected, {} remaining.\n"
, .{some_event.origin, ctx.pollfd.items.len});
},
.EXTERNAL => {
print ("Message received from a non IPC socket.\n", .{});
var client = try server.accept(); // net.StreamServer.Connection
errdefer client.stream.close();
// Receiving a new client from the EXTERNAL socket.
// New client = new switch from a distant TCP connection to a
// local libipc service.
var buffer: [10000]u8 = undefined;
var size = try client.stream.read(&buffer);
var service_to_contact = buffer[0..size];
if (service_to_contact.len == 0) {
print("Error, no service provided, closing the connection.\n", .{});
client.stream.close();
continue;
}
print ("Ask to connect to service [{s}].\n", .{service_to_contact});
var servicefd = ctx.connect_service (service_to_contact) catch |err| {
print("Error while connecting to the service {s}: {}.\n"
, .{service_to_contact, err});
print ("Closing the connection.\n", .{});
client.stream.close();
continue;
};
errdefer ctx.close_fd (servicefd) catch {};
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
_ = try client.stream.write("ok");
print ("Add current client as external connection (for now).\n", .{});
try ctx.add_external (client.stream.handle);
print ("Message sent, switching.\n", .{});
try ctx.add_switch(client.stream.handle, servicefd);
print ("DONE.\n", .{});
// Some protocols will require to change the default functions
// to read and to write on the client socket.
// Function to call: ctx.set_switch_callbacks(clientfd, infn, outfn);
},
.SWITCH_RX => {
print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
// if (some_event.m) |m| {
// var hexbuf: [4000]u8 = undefined;
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
// var hexwriter = hexfbs.writer();
// try hexdump.hexdump(hexwriter, "Received", m.payload);
// print("{s}\n", .{hexfbs.getWritten()});
// }
// else {
// print ("Message received without actually a message?! {}", .{some_event});
// }
},
.SWITCH_TX => {
print ("Message has been sent (SWITCH fd {}).\n", .{some_event.origin});
},
.MESSAGE_RX => {
print ("Client asking for a service through TCPd.\n", .{});
errdefer ctx.close (some_event.index) catch {};
if (some_event.m) |m| {
defer m.deinit(); // Do not forget to free the message payload.
print ("URI to contact {s}\n", .{m.payload});
var uri = URI.read(m.payload);
print ("proto [{s}] address [{s}] path [{s}]\n"
, .{uri.protocol, uri.address, uri.path});
var iterator = std.mem.split(u8, uri.address, ":");
var real_tcp_address = iterator.first();
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
var stream = try net.tcpConnectToAddress(socket_addr);
errdefer stream.close();
print ("Writing URI PATH: {s}\n", .{uri.path});
_ = try stream.write(uri.path);
print ("Writing URI PATH - written, waiting for the final 'ok'.\n", .{});
var buffer: [10000]u8 = undefined;
var size = try stream.read(&buffer);
if (! std.mem.eql(u8, buffer[0..size], "ok")) {
print ("didn't receive 'ok', let's kill the connection\n", .{});
stream.close();
try ctx.close(some_event.index);
continue;
}
print ("Final 'ok' received, sending 'ok' to IPCd.\n", .{});
// Connection is established, inform IPCd.
var response = try Message.init(some_event.origin, allocator, "ok");
defer response.deinit();
try ctx.write(response);
print ("Add current client as external connection (for now).\n", .{});
try ctx.add_external (stream.handle);
print ("Finally, add switching\n", .{});
try ctx.add_switch(some_event.origin, stream.handle);
// Could invoke ctx.set_switch_callbacks but TCP sockets are
// working pretty well with default functions.
}
else {
// TCPd was contacted without providing a message, nothing to do.
var response = try Message.init(some_event.origin, allocator, "no");
defer response.deinit();
try ctx.write(response);
try ctx.close(some_event.index);
}
},
.MESSAGE_TX => {
print ("Message sent.\n", .{});
},
.ERROR => {
print ("A problem occured, event: {}, let's suicide.\n", .{some_event});
break;
},
}
}
print ("Goodbye\n", .{});
}
pub fn main() !u8 {
try create_service();
return 0;
}