diff --git a/zig-impl/src/tcpd.zig b/zig-impl/src/tcpd.zig index c511629..70f61b0 100644 --- a/zig-impl/src/tcpd.zig +++ b/zig-impl/src/tcpd.zig @@ -19,23 +19,16 @@ const print_eq = @import("./util.zig").print_eq; const URI = @import("./util.zig").URI; fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 { - var buffer: [1000]u8 = undefined; - var fbs = std.io.fixedBufferStream(&buffer); - var writer = fbs.writer(); - var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) { error.EnvironmentVariableNotFound => blk: { - print("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{}); + print ("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{}); break :blk try allocator.dupe(u8, "127.0.0.1:9000"); }, else => { return err; }, }; defer allocator.free(address); - try writer.print("{s}", .{address}); - var tcp_address = fbs.getWritten(); - - var iterator = std.mem.split(u8, tcp_address, ":"); + var iterator = std.mem.split(u8, address, ":"); var real_tcp_address = iterator.first(); var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); @@ -61,7 +54,7 @@ fn create_service() !void { // SERVER SIDE: creating a service. var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) { error.EnvironmentVariableNotFound => blk: { - print("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{}); + print ("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{}); break :blk try allocator.dupe(u8, "tcp"); }, else => { return err; }, @@ -100,10 +93,8 @@ fn create_service() !void { // Quit on SIGHUP (kill -1). try os.sigaction(os.SIG.HUP, &sa, null); - // TODO: create a TCP socket. var server: net.StreamServer = undefined; var serverfd = try init_tcp_server(allocator, &server); - // TODO: add the socket to the list of ctx FDs. try ctx.add_external (serverfd); var some_event: ipc.Event = undefined; @@ -113,57 +104,66 @@ fn create_service() !void { some_event = try ctx.wait_event(); switch (some_event.t) { .TIMER => { - print("\rTimer! ({})", .{count}); + print ("\rTimer! ({})", .{count}); count += 1; }, .CONNECTION => { - print("New connection: {} so far!\n", .{ctx.pollfd.items.len}); + print ("New connection: {} so far!\n", .{ctx.pollfd.items.len}); }, .DISCONNECTION => { - print("User {} disconnected, {} remainaing.\n" + print ("User {} disconnected, {} remaining.\n" , .{some_event.origin, ctx.pollfd.items.len}); }, .EXTERNAL => { - print("Message received from a non IPC socket.\n", .{}); + print ("Message received from a non IPC socket.\n", .{}); var client = try server.accept(); // net.StreamServer.Connection + errdefer client.stream.close(); // Receiving a new client from the EXTERNAL socket. // New client = new switch from a distant TCP connection to a // local libipc service. - var buffer: [1000]u8 = undefined; + var buffer: [10000]u8 = undefined; var size = try client.stream.read(&buffer); - print ("Asking to connect to service [{s}]\n", .{buffer[0..size]}); - var servicefd = try ctx.connect_service (buffer[0..size]); - // Send a message to inform remote TCPd that the connection is established. + var service_to_contact = buffer[0..size]; + + print ("Ask to connect to service [{s}].\n", .{service_to_contact}); + var servicefd = try ctx.connect_service (service_to_contact); + errdefer ctx.close_fd (servicefd) catch {}; + print ("Send a message to inform remote TCPd that the connection is established.\n", .{}); _ = try client.stream.write("ok"); - print("add current client as external connection (for now)\n", .{}); + print ("Add current client as external connection (for now).\n", .{}); try ctx.add_external (client.stream.handle); print ("Message sent, switching.\n", .{}); try ctx.add_switch(client.stream.handle, servicefd); + print ("DONE.\n", .{}); + + // Some protocols will require to change the default functions + // to read and to write on the client socket. + // Function to call: ctx.set_switch_callbacks(clientfd, infn, outfn); }, .SWITCH_RX => { - print("Message has been received (SWITCH fd {}).\n", .{some_event.origin}); + print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin}); }, .SWITCH_TX => { - print("Message has been sent (SWITCH fd {}).\n", .{some_event.origin}); + print ("Message has been sent (SWITCH fd {}).\n", .{some_event.origin}); }, .MESSAGE => { - print("Client asking for a service through TCPd.\n", .{}); + print ("Client asking for a service through TCPd.\n", .{}); + errdefer ctx.close (some_event.index) catch {}; if (some_event.m) |m| { - print("{}\n", .{m}); defer m.deinit(); // Do not forget to free the message payload. - print ("URI to work with is {s}\n", .{m.payload}); + print ("URI to contact {s}\n", .{m.payload}); var uri = URI.read(m.payload); print ("proto [{s}] address [{s}] path [{s}]\n" , .{uri.protocol, uri.address, uri.path}); @@ -171,33 +171,34 @@ fn create_service() !void { var iterator = std.mem.split(u8, uri.address, ":"); var real_tcp_address = iterator.first(); var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10); + var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port); var stream = try net.tcpConnectToAddress(socket_addr); + errdefer stream.close(); - print("Writing URI PATH: {s}\n", .{uri.path}); + print ("Writing URI PATH: {s}\n", .{uri.path}); _ = try stream.write(uri.path); - print("Writing URI PATH - written, waiting for the final 'ok'\n", .{}); - var buffer: [1000]u8 = undefined; + print ("Writing URI PATH - written, waiting for the final 'ok'.\n", .{}); + var buffer: [10000]u8 = undefined; var size = try stream.read(&buffer); if (! std.mem.eql(u8, buffer[0..size], "ok")) { - print("didn't receive 'ok', let's kill the connection\n", .{}); + print ("didn't receive 'ok', let's kill the connection\n", .{}); stream.close(); - try ctx.close_fd(some_event.origin); + try ctx.close(some_event.index); continue; } - print("final 'ok' received, sending 'ok' to IPCd\n", .{}); + print ("Final 'ok' received, sending 'ok' to IPCd.\n", .{}); // Connection is established, inform IPCd. var response = try Message.init(some_event.origin, allocator, "ok"); defer response.deinit(); try ctx.write(response); - print("add current client as external connection (for now)\n", .{}); + print ("Add current client as external connection (for now).\n", .{}); try ctx.add_external (stream.handle); - print("Finally, add switching\n", .{}); - // Let's switch the connections! + print ("Finally, add switching\n", .{}); try ctx.add_switch(some_event.origin, stream.handle); // Could invoke ctx.set_switch_callbacks but TCP sockets are // working pretty well with default functions. @@ -212,17 +213,17 @@ fn create_service() !void { }, .TX => { - print("Message sent.\n", .{}); + print ("Message sent.\n", .{}); }, .ERROR => { - print("A problem occured, event: {}, let's suicide\n", .{some_event}); + print ("A problem occured, event: {}, let's suicide.\n", .{some_event}); break; }, } } - print("Goodbye\n", .{}); + print ("Goodbye\n", .{}); } pub fn main() !u8 {