261 lines
10 KiB
Zig
261 lines
10 KiB
Zig
const std = @import("std");
|
|
const net = std.net;
|
|
const fmt = std.fmt;
|
|
const os = std.os;
|
|
const testing = std.testing;
|
|
const print = std.debug.print;
|
|
|
|
const ipc = @import("ipc");
|
|
const hexdump = ipc.hexdump;
|
|
const Message = ipc.Message;
|
|
|
|
// Import send_fd this way in order to produce docs for exchange-fd functions.
|
|
const exchange_fd = ipc.exchangefd;
|
|
const send_fd = exchange_fd.send_fd;
|
|
|
|
const builtin = @import("builtin");
|
|
const native_os = builtin.target.os.tag;
|
|
const print_eq = ipc.util.print_eq;
|
|
const URI = ipc.util.URI;
|
|
|
|
fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 {
|
|
var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) {
|
|
error.EnvironmentVariableNotFound => blk: {
|
|
print ("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{});
|
|
break :blk try allocator.dupe(u8, "127.0.0.1:9000");
|
|
},
|
|
else => { return err; },
|
|
};
|
|
defer allocator.free(address);
|
|
|
|
var iterator = std.mem.split(u8, address, ":");
|
|
var real_tcp_address = iterator.first();
|
|
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
|
|
|
print ("TCP address [{s}] port [{}]\n", .{real_tcp_address, real_tcp_port});
|
|
|
|
server.* = net.StreamServer.init(.{.reuse_address = true});
|
|
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
|
|
try server.listen(socket_addr);
|
|
|
|
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
|
|
return newfd;
|
|
}
|
|
|
|
fn create_service() !void {
|
|
const config = .{.safety = true};
|
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
|
defer _ = gpa.deinit();
|
|
const allocator = gpa.allocator();
|
|
|
|
var ctx = try ipc.Context.init(allocator);
|
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
|
|
|
// SERVER SIDE: creating a service.
|
|
var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) {
|
|
error.EnvironmentVariableNotFound => blk: {
|
|
print ("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{});
|
|
break :blk try allocator.dupe(u8, "tcp");
|
|
},
|
|
else => { return err; },
|
|
};
|
|
defer allocator.free(service_name);
|
|
|
|
_ = try ctx.server_init(service_name);
|
|
|
|
// signal handler, to quit when asked
|
|
const S = struct {
|
|
var should_quit: bool = false;
|
|
|
|
fn handler(sig: i32, info: *const os.siginfo_t, _: ?*const anyopaque) callconv(.C) void {
|
|
print ("A signal has been received: {}\n", .{sig});
|
|
// Check that we received the correct signal.
|
|
switch (native_os) {
|
|
.netbsd => {
|
|
if (sig != os.SIG.HUP or sig != info.info.signo)
|
|
return;
|
|
},
|
|
else => {
|
|
if (sig != os.SIG.HUP and sig != info.signo)
|
|
return;
|
|
},
|
|
}
|
|
should_quit = true;
|
|
}
|
|
};
|
|
|
|
var sa = os.Sigaction{
|
|
.handler = .{ .sigaction = &S.handler },
|
|
.mask = os.empty_sigset, // Do not mask any signal.
|
|
.flags = os.SA.SIGINFO,
|
|
};
|
|
|
|
// Quit on SIGHUP (kill -1).
|
|
try os.sigaction(os.SIG.HUP, &sa, null);
|
|
|
|
var server: net.StreamServer = undefined;
|
|
var serverfd = try init_tcp_server(allocator, &server);
|
|
try ctx.add_external (serverfd);
|
|
|
|
var some_event: ipc.Event = undefined;
|
|
var previous_event: ipc.Event.Type = ipc.Event.Type.ERROR;
|
|
ctx.timer = 1000; // 1 second
|
|
var count: u32 = 0;
|
|
while(! S.should_quit) {
|
|
some_event = try ctx.wait_event();
|
|
|
|
// For clarity in the output.
|
|
if (some_event.t != .TIMER and previous_event == .TIMER ) { print("\n", .{}); }
|
|
previous_event = some_event.t;
|
|
|
|
switch (some_event.t) {
|
|
.TIMER => {
|
|
print ("\rTimer! ({})", .{count});
|
|
count += 1;
|
|
},
|
|
|
|
.CONNECTION => {
|
|
print ("New connection: {} so far!\n", .{ctx.pollfd.items.len});
|
|
},
|
|
|
|
.DISCONNECTION => {
|
|
print ("User {} disconnected, {} remaining.\n"
|
|
, .{some_event.origin, ctx.pollfd.items.len});
|
|
},
|
|
|
|
.EXTERNAL => {
|
|
print ("Message received from a non IPC socket.\n", .{});
|
|
var client = try server.accept(); // net.StreamServer.Connection
|
|
errdefer client.stream.close();
|
|
// Receiving a new client from the EXTERNAL socket.
|
|
// New client = new switch from a distant TCP connection to a
|
|
// local libipc service.
|
|
|
|
var buffer: [10000]u8 = undefined;
|
|
var size = try client.stream.read(&buffer);
|
|
var service_to_contact = buffer[0..size];
|
|
|
|
if (service_to_contact.len == 0) {
|
|
print("Error, no service provided, closing the connection.\n", .{});
|
|
client.stream.close();
|
|
continue;
|
|
}
|
|
|
|
print ("Ask to connect to service [{s}].\n", .{service_to_contact});
|
|
var servicefd = ctx.connect_service (service_to_contact) catch |err| {
|
|
print("Error while connecting to the service {s}: {}.\n"
|
|
, .{service_to_contact, err});
|
|
print ("Closing the connection.\n", .{});
|
|
client.stream.close();
|
|
continue;
|
|
};
|
|
errdefer ctx.close_fd (servicefd) catch {};
|
|
|
|
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
|
|
_ = try client.stream.write("ok");
|
|
|
|
print ("Add current client as external connection (for now).\n", .{});
|
|
try ctx.add_external (client.stream.handle);
|
|
|
|
print ("Message sent, switching.\n", .{});
|
|
try ctx.add_switch(client.stream.handle, servicefd);
|
|
|
|
print ("DONE.\n", .{});
|
|
|
|
// Some protocols will require to change the default functions
|
|
// to read and to write on the client socket.
|
|
// Function to call: ctx.set_switch_callbacks(clientfd, infn, outfn);
|
|
},
|
|
|
|
.SWITCH_RX => {
|
|
print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
|
|
// if (some_event.m) |m| {
|
|
// var hexbuf: [4000]u8 = undefined;
|
|
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
|
// var hexwriter = hexfbs.writer();
|
|
// try hexdump.hexdump(hexwriter, "Received", m.payload);
|
|
// print("{s}\n", .{hexfbs.getWritten()});
|
|
// }
|
|
// else {
|
|
// print ("Message received without actually a message?! {}", .{some_event});
|
|
// }
|
|
},
|
|
|
|
.SWITCH_TX => {
|
|
print ("Message has been sent (SWITCH fd {}).\n", .{some_event.origin});
|
|
},
|
|
|
|
.MESSAGE => {
|
|
print ("Client asking for a service through TCPd.\n", .{});
|
|
errdefer ctx.close (some_event.index) catch {};
|
|
if (some_event.m) |m| {
|
|
defer m.deinit(); // Do not forget to free the message payload.
|
|
|
|
print ("URI to contact {s}\n", .{m.payload});
|
|
var uri = URI.read(m.payload);
|
|
print ("proto [{s}] address [{s}] path [{s}]\n"
|
|
, .{uri.protocol, uri.address, uri.path});
|
|
|
|
var iterator = std.mem.split(u8, uri.address, ":");
|
|
var real_tcp_address = iterator.first();
|
|
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
|
|
|
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
|
|
var stream = try net.tcpConnectToAddress(socket_addr);
|
|
errdefer stream.close();
|
|
|
|
print ("Writing URI PATH: {s}\n", .{uri.path});
|
|
_ = try stream.write(uri.path);
|
|
|
|
print ("Writing URI PATH - written, waiting for the final 'ok'.\n", .{});
|
|
var buffer: [10000]u8 = undefined;
|
|
var size = try stream.read(&buffer);
|
|
if (! std.mem.eql(u8, buffer[0..size], "ok")) {
|
|
print ("didn't receive 'ok', let's kill the connection\n", .{});
|
|
stream.close();
|
|
try ctx.close(some_event.index);
|
|
continue;
|
|
}
|
|
print ("Final 'ok' received, sending 'ok' to IPCd.\n", .{});
|
|
|
|
// Connection is established, inform IPCd.
|
|
var response = try Message.init(some_event.origin, allocator, "ok");
|
|
defer response.deinit();
|
|
try ctx.write(response);
|
|
|
|
print ("Add current client as external connection (for now).\n", .{});
|
|
try ctx.add_external (stream.handle);
|
|
|
|
print ("Finally, add switching\n", .{});
|
|
try ctx.add_switch(some_event.origin, stream.handle);
|
|
// Could invoke ctx.set_switch_callbacks but TCP sockets are
|
|
// working pretty well with default functions.
|
|
}
|
|
else {
|
|
// TCPd was contacted without providing a message, nothing to do.
|
|
var response = try Message.init(some_event.origin, allocator, "no");
|
|
defer response.deinit();
|
|
try ctx.write(response);
|
|
try ctx.close(some_event.index);
|
|
}
|
|
},
|
|
|
|
.TX => {
|
|
print ("Message sent.\n", .{});
|
|
},
|
|
|
|
.ERROR => {
|
|
print ("A problem occured, event: {}, let's suicide.\n", .{some_event});
|
|
break;
|
|
},
|
|
}
|
|
}
|
|
|
|
print ("Goodbye\n", .{});
|
|
}
|
|
|
|
pub fn main() !u8 {
|
|
try create_service();
|
|
return 0;
|
|
}
|