TCPd: code simplification & (most important) error management.
This commit is contained in:
parent
614e972b95
commit
aca3f2183d
@ -19,23 +19,16 @@ const print_eq = @import("./util.zig").print_eq;
|
||||
const URI = @import("./util.zig").URI;
|
||||
|
||||
fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 {
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) {
|
||||
error.EnvironmentVariableNotFound => blk: {
|
||||
print("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{});
|
||||
print ("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{});
|
||||
break :blk try allocator.dupe(u8, "127.0.0.1:9000");
|
||||
},
|
||||
else => { return err; },
|
||||
};
|
||||
defer allocator.free(address);
|
||||
|
||||
try writer.print("{s}", .{address});
|
||||
var tcp_address = fbs.getWritten();
|
||||
|
||||
var iterator = std.mem.split(u8, tcp_address, ":");
|
||||
var iterator = std.mem.split(u8, address, ":");
|
||||
var real_tcp_address = iterator.first();
|
||||
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
||||
|
||||
@ -61,7 +54,7 @@ fn create_service() !void {
|
||||
// SERVER SIDE: creating a service.
|
||||
var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) {
|
||||
error.EnvironmentVariableNotFound => blk: {
|
||||
print("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{});
|
||||
print ("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{});
|
||||
break :blk try allocator.dupe(u8, "tcp");
|
||||
},
|
||||
else => { return err; },
|
||||
@ -100,10 +93,8 @@ fn create_service() !void {
|
||||
// Quit on SIGHUP (kill -1).
|
||||
try os.sigaction(os.SIG.HUP, &sa, null);
|
||||
|
||||
// TODO: create a TCP socket.
|
||||
var server: net.StreamServer = undefined;
|
||||
var serverfd = try init_tcp_server(allocator, &server);
|
||||
// TODO: add the socket to the list of ctx FDs.
|
||||
try ctx.add_external (serverfd);
|
||||
|
||||
var some_event: ipc.Event = undefined;
|
||||
@ -113,57 +104,66 @@ fn create_service() !void {
|
||||
some_event = try ctx.wait_event();
|
||||
switch (some_event.t) {
|
||||
.TIMER => {
|
||||
print("\rTimer! ({})", .{count});
|
||||
print ("\rTimer! ({})", .{count});
|
||||
count += 1;
|
||||
},
|
||||
|
||||
.CONNECTION => {
|
||||
print("New connection: {} so far!\n", .{ctx.pollfd.items.len});
|
||||
print ("New connection: {} so far!\n", .{ctx.pollfd.items.len});
|
||||
},
|
||||
|
||||
.DISCONNECTION => {
|
||||
print("User {} disconnected, {} remainaing.\n"
|
||||
print ("User {} disconnected, {} remaining.\n"
|
||||
, .{some_event.origin, ctx.pollfd.items.len});
|
||||
},
|
||||
|
||||
.EXTERNAL => {
|
||||
print("Message received from a non IPC socket.\n", .{});
|
||||
print ("Message received from a non IPC socket.\n", .{});
|
||||
var client = try server.accept(); // net.StreamServer.Connection
|
||||
errdefer client.stream.close();
|
||||
// Receiving a new client from the EXTERNAL socket.
|
||||
// New client = new switch from a distant TCP connection to a
|
||||
// local libipc service.
|
||||
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var buffer: [10000]u8 = undefined;
|
||||
var size = try client.stream.read(&buffer);
|
||||
print ("Asking to connect to service [{s}]\n", .{buffer[0..size]});
|
||||
var servicefd = try ctx.connect_service (buffer[0..size]);
|
||||
// Send a message to inform remote TCPd that the connection is established.
|
||||
var service_to_contact = buffer[0..size];
|
||||
|
||||
print ("Ask to connect to service [{s}].\n", .{service_to_contact});
|
||||
var servicefd = try ctx.connect_service (service_to_contact);
|
||||
errdefer ctx.close_fd (servicefd) catch {};
|
||||
|
||||
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
|
||||
_ = try client.stream.write("ok");
|
||||
|
||||
print("add current client as external connection (for now)\n", .{});
|
||||
print ("Add current client as external connection (for now).\n", .{});
|
||||
try ctx.add_external (client.stream.handle);
|
||||
|
||||
print ("Message sent, switching.\n", .{});
|
||||
try ctx.add_switch(client.stream.handle, servicefd);
|
||||
|
||||
print ("DONE.\n", .{});
|
||||
|
||||
// Some protocols will require to change the default functions
|
||||
// to read and to write on the client socket.
|
||||
// Function to call: ctx.set_switch_callbacks(clientfd, infn, outfn);
|
||||
},
|
||||
|
||||
.SWITCH_RX => {
|
||||
print("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
|
||||
print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
|
||||
},
|
||||
|
||||
.SWITCH_TX => {
|
||||
print("Message has been sent (SWITCH fd {}).\n", .{some_event.origin});
|
||||
print ("Message has been sent (SWITCH fd {}).\n", .{some_event.origin});
|
||||
},
|
||||
|
||||
.MESSAGE => {
|
||||
print("Client asking for a service through TCPd.\n", .{});
|
||||
print ("Client asking for a service through TCPd.\n", .{});
|
||||
errdefer ctx.close (some_event.index) catch {};
|
||||
if (some_event.m) |m| {
|
||||
print("{}\n", .{m});
|
||||
defer m.deinit(); // Do not forget to free the message payload.
|
||||
|
||||
print ("URI to work with is {s}\n", .{m.payload});
|
||||
print ("URI to contact {s}\n", .{m.payload});
|
||||
var uri = URI.read(m.payload);
|
||||
print ("proto [{s}] address [{s}] path [{s}]\n"
|
||||
, .{uri.protocol, uri.address, uri.path});
|
||||
@ -171,33 +171,34 @@ fn create_service() !void {
|
||||
var iterator = std.mem.split(u8, uri.address, ":");
|
||||
var real_tcp_address = iterator.first();
|
||||
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
||||
|
||||
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
|
||||
var stream = try net.tcpConnectToAddress(socket_addr);
|
||||
errdefer stream.close();
|
||||
|
||||
print("Writing URI PATH: {s}\n", .{uri.path});
|
||||
print ("Writing URI PATH: {s}\n", .{uri.path});
|
||||
_ = try stream.write(uri.path);
|
||||
|
||||
print("Writing URI PATH - written, waiting for the final 'ok'\n", .{});
|
||||
var buffer: [1000]u8 = undefined;
|
||||
print ("Writing URI PATH - written, waiting for the final 'ok'.\n", .{});
|
||||
var buffer: [10000]u8 = undefined;
|
||||
var size = try stream.read(&buffer);
|
||||
if (! std.mem.eql(u8, buffer[0..size], "ok")) {
|
||||
print("didn't receive 'ok', let's kill the connection\n", .{});
|
||||
print ("didn't receive 'ok', let's kill the connection\n", .{});
|
||||
stream.close();
|
||||
try ctx.close_fd(some_event.origin);
|
||||
try ctx.close(some_event.index);
|
||||
continue;
|
||||
}
|
||||
print("final 'ok' received, sending 'ok' to IPCd\n", .{});
|
||||
print ("Final 'ok' received, sending 'ok' to IPCd.\n", .{});
|
||||
|
||||
// Connection is established, inform IPCd.
|
||||
var response = try Message.init(some_event.origin, allocator, "ok");
|
||||
defer response.deinit();
|
||||
try ctx.write(response);
|
||||
|
||||
print("add current client as external connection (for now)\n", .{});
|
||||
print ("Add current client as external connection (for now).\n", .{});
|
||||
try ctx.add_external (stream.handle);
|
||||
|
||||
print("Finally, add switching\n", .{});
|
||||
// Let's switch the connections!
|
||||
print ("Finally, add switching\n", .{});
|
||||
try ctx.add_switch(some_event.origin, stream.handle);
|
||||
// Could invoke ctx.set_switch_callbacks but TCP sockets are
|
||||
// working pretty well with default functions.
|
||||
@ -212,17 +213,17 @@ fn create_service() !void {
|
||||
},
|
||||
|
||||
.TX => {
|
||||
print("Message sent.\n", .{});
|
||||
print ("Message sent.\n", .{});
|
||||
},
|
||||
|
||||
.ERROR => {
|
||||
print("A problem occured, event: {}, let's suicide\n", .{some_event});
|
||||
print ("A problem occured, event: {}, let's suicide.\n", .{some_event});
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
print("Goodbye\n", .{});
|
||||
print ("Goodbye\n", .{});
|
||||
}
|
||||
|
||||
pub fn main() !u8 {
|
||||
|
Reference in New Issue
Block a user