diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index 950a7cd..bec0996 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -209,6 +209,14 @@ pub const Context = struct { return try self.connect_service (service_name); } + /// Add a new file descriptor to follow, labeled as EXTERNAL. + /// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections, + /// clients trying to reach a libipc service. + pub fn add_external (self: *Self, newfd: i32) !void { + var newcon = Connection.init(Connection.Type.EXTERNAL, null); + try self.add_ (newcon, newfd); + } + pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void { // net.StreamServer var serverfd = self.pollfd.items[server_index].fd; diff --git a/zig-impl/src/tcpd.zig b/zig-impl/src/tcpd.zig index 791ad7d..0f2a052 100644 --- a/zig-impl/src/tcpd.zig +++ b/zig-impl/src/tcpd.zig @@ -18,6 +18,37 @@ const testing = std.testing; const print_eq = @import("./util.zig").print_eq; const URI = @import("./util.zig").URI; +fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 { + var buffer: [1000]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + var writer = fbs.writer(); + + var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) { + error.EnvironmentVariableNotFound => blk: { + print("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{}); + break :blk try allocator.dupe(u8, "127.0.0.1:9000"); + }, + else => { return err; }, + }; + defer allocator.free(address); + + try writer.print("{s}", .{address}); + var tcp_address = fbs.getWritten(); + + var iterator = std.mem.split(u8, tcp_address, ":"); + var real_tcp_address = iterator.first(); + var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); + + print ("TCP address [{s}] port [{}]\n", .{real_tcp_address, real_tcp_port}); + + server.* = net.StreamServer.init(.{.reuse_address = true}); + var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port); + try server.listen(socket_addr); + + const newfd = server.sockfd orelse return error.SocketLOL; // TODO + return newfd; +} + fn create_service() !void { const config = .{.safety = true}; var gpa = std.heap.GeneralPurposeAllocator(config){}; @@ -28,7 +59,16 @@ fn create_service() !void { defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful? // SERVER SIDE: creating a service. - _ = try ctx.server_init("tcp"); + var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) { + error.EnvironmentVariableNotFound => blk: { + print("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{}); + break :blk try allocator.dupe(u8, "tcp"); + }, + else => { return err; }, + }; + defer allocator.free(service_name); + + _ = try ctx.server_init(service_name); // signal handler, to quit when asked const S = struct { @@ -60,6 +100,12 @@ fn create_service() !void { // Quit on SIGHUP (kill -1). try os.sigaction(os.SIG.HUP, &sa, null); + // TODO: create a TCP socket. + var server: net.StreamServer = undefined; + var serverfd = try init_tcp_server(allocator, &server); + // TODO: add the socket to the list of ctx FDs. + try ctx.add_external (serverfd); + var some_event: ipc.Event = undefined; ctx.timer = 1000; // 1 second var count: u32 = 0; @@ -82,16 +128,33 @@ fn create_service() !void { .EXTERNAL => { print("Message received from a non IPC socket.\n", .{}); - print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{}); - break; + var client = try server.accept(); // net.StreamServer.Connection + // Receiving a new client from the EXTERNAL socket. + // New client = new switch from a distant TCP connection to a + // local libipc service. + + var buffer: [1000]u8 = undefined; + var size = try client.stream.read(&buffer); + print ("Asking to connect to service [{s}]\n", .{buffer[0..size]}); + var servicefd = try ctx.connect_service (buffer[0..size]); + // Send a message to inform remote TCPd that the connection is established. + print ("Send a message to inform remote TCPd that the connection is established.\n", .{}); + _ = try client.stream.write("ok"); + + print("add current client as external connection (for now)\n", .{}); + try ctx.add_external (client.stream.handle); + + print ("Message sent, switching.\n", .{}); + try ctx.add_switch(client.stream.handle, servicefd); + print ("DONE.\n", .{}); }, .SWITCH_RX => { - print("Message has been received (SWITCH).\n", .{}); + print("Message has been received (SWITCH fd {}).\n", .{some_event.origin}); }, .SWITCH_TX => { - print("Message has been sent (SWITCH).\n", .{}); + print("Message has been sent (SWITCH fd {}).\n", .{some_event.origin}); }, .MESSAGE => { @@ -105,18 +168,38 @@ fn create_service() !void { print ("proto [{s}] address [{s}] path [{s}]\n" , .{uri.protocol, uri.address, uri.path}); - // TODO FIXME: CURRENTLY ONLY CONNECT TO LOCAL UNIX SERVICE - // TODO: TCP!! - var servicefd = try ctx.connect_service (uri.path); + var iterator = std.mem.split(u8, uri.address, ":"); + var real_tcp_address = iterator.first(); + var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); + var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port); + var stream = try net.tcpConnectToAddress(socket_addr); + + print("Writing URI PATH: {s}\n", .{uri.path}); + _ = try stream.write(uri.path); + + print("Writing URI PATH - written, waiting for the final 'ok'\n", .{}); + var buffer: [1000]u8 = undefined; + var size = try stream.read(&buffer); + if (! std.mem.eql(u8, buffer[0..size], "ok")) { + print("didn't receive 'ok', let's kill the connection\n", .{}); + stream.close(); + try ctx.close_fd(some_event.origin); + continue; + } + print("final 'ok' received, sending 'ok' to IPCd\n", .{}); // Connection is established, inform IPCd. var response = try Message.init(some_event.origin, allocator, "ok"); defer response.deinit(); try ctx.write(response); + print("add current client as external connection (for now)\n", .{}); + try ctx.add_external (stream.handle); + + print("Finally, add switching\n", .{}); // Let's switch the connections! - try ctx.add_switch(some_event.origin, servicefd); - // TODO: for real TCP code, invoke: ctx.set_switch_callbacks (); + try ctx.add_switch(some_event.origin, stream.handle); + // TODO: should probably invoke: ctx.set_switch_callbacks (); } else { // TCPd was contacted without providing a message, nothing to do.