TCPd works. TODO: proper error management.
parent
1034b1aa5c
commit
bb9d397a40
|
@ -209,6 +209,14 @@ pub const Context = struct {
|
||||||
return try self.connect_service (service_name);
|
return try self.connect_service (service_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a new file descriptor to follow, labeled as EXTERNAL.
|
||||||
|
/// Useful for protocol daemons (ex: TCPd) listening to a socket for external connections,
|
||||||
|
/// clients trying to reach a libipc service.
|
||||||
|
pub fn add_external (self: *Self, newfd: i32) !void {
|
||||||
|
var newcon = Connection.init(Connection.Type.EXTERNAL, null);
|
||||||
|
try self.add_ (newcon, newfd);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
||||||
// net.StreamServer
|
// net.StreamServer
|
||||||
var serverfd = self.pollfd.items[server_index].fd;
|
var serverfd = self.pollfd.items[server_index].fd;
|
||||||
|
|
|
@ -18,6 +18,37 @@ const testing = std.testing;
|
||||||
const print_eq = @import("./util.zig").print_eq;
|
const print_eq = @import("./util.zig").print_eq;
|
||||||
const URI = @import("./util.zig").URI;
|
const URI = @import("./util.zig").URI;
|
||||||
|
|
||||||
|
fn init_tcp_server(allocator: std.mem.Allocator, server: *net.StreamServer) !i32 {
|
||||||
|
var buffer: [1000]u8 = undefined;
|
||||||
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
|
var writer = fbs.writer();
|
||||||
|
|
||||||
|
var address = std.process.getEnvVarOwned(allocator, "ADDRESS") catch |err| switch(err) {
|
||||||
|
error.EnvironmentVariableNotFound => blk: {
|
||||||
|
print("no ADDRESS envvar: TCPd will listen on 127.0.0.1:9000\n", .{});
|
||||||
|
break :blk try allocator.dupe(u8, "127.0.0.1:9000");
|
||||||
|
},
|
||||||
|
else => { return err; },
|
||||||
|
};
|
||||||
|
defer allocator.free(address);
|
||||||
|
|
||||||
|
try writer.print("{s}", .{address});
|
||||||
|
var tcp_address = fbs.getWritten();
|
||||||
|
|
||||||
|
var iterator = std.mem.split(u8, tcp_address, ":");
|
||||||
|
var real_tcp_address = iterator.first();
|
||||||
|
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
||||||
|
|
||||||
|
print ("TCP address [{s}] port [{}]\n", .{real_tcp_address, real_tcp_port});
|
||||||
|
|
||||||
|
server.* = net.StreamServer.init(.{.reuse_address = true});
|
||||||
|
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
|
||||||
|
try server.listen(socket_addr);
|
||||||
|
|
||||||
|
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
|
||||||
|
return newfd;
|
||||||
|
}
|
||||||
|
|
||||||
fn create_service() !void {
|
fn create_service() !void {
|
||||||
const config = .{.safety = true};
|
const config = .{.safety = true};
|
||||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||||
|
@ -28,7 +59,16 @@ fn create_service() !void {
|
||||||
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
defer ctx.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||||
|
|
||||||
// SERVER SIDE: creating a service.
|
// SERVER SIDE: creating a service.
|
||||||
_ = try ctx.server_init("tcp");
|
var service_name = std.process.getEnvVarOwned(allocator, "IPC_SERVICE_NAME") catch |err| switch(err) {
|
||||||
|
error.EnvironmentVariableNotFound => blk: {
|
||||||
|
print("no IPC_SERVICE_NAME envvar: TCPd will be named 'tcp'\n", .{});
|
||||||
|
break :blk try allocator.dupe(u8, "tcp");
|
||||||
|
},
|
||||||
|
else => { return err; },
|
||||||
|
};
|
||||||
|
defer allocator.free(service_name);
|
||||||
|
|
||||||
|
_ = try ctx.server_init(service_name);
|
||||||
|
|
||||||
// signal handler, to quit when asked
|
// signal handler, to quit when asked
|
||||||
const S = struct {
|
const S = struct {
|
||||||
|
@ -60,6 +100,12 @@ fn create_service() !void {
|
||||||
// Quit on SIGHUP (kill -1).
|
// Quit on SIGHUP (kill -1).
|
||||||
try os.sigaction(os.SIG.HUP, &sa, null);
|
try os.sigaction(os.SIG.HUP, &sa, null);
|
||||||
|
|
||||||
|
// TODO: create a TCP socket.
|
||||||
|
var server: net.StreamServer = undefined;
|
||||||
|
var serverfd = try init_tcp_server(allocator, &server);
|
||||||
|
// TODO: add the socket to the list of ctx FDs.
|
||||||
|
try ctx.add_external (serverfd);
|
||||||
|
|
||||||
var some_event: ipc.Event = undefined;
|
var some_event: ipc.Event = undefined;
|
||||||
ctx.timer = 1000; // 1 second
|
ctx.timer = 1000; // 1 second
|
||||||
var count: u32 = 0;
|
var count: u32 = 0;
|
||||||
|
@ -82,16 +128,33 @@ fn create_service() !void {
|
||||||
|
|
||||||
.EXTERNAL => {
|
.EXTERNAL => {
|
||||||
print("Message received from a non IPC socket.\n", .{});
|
print("Message received from a non IPC socket.\n", .{});
|
||||||
print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{});
|
var client = try server.accept(); // net.StreamServer.Connection
|
||||||
break;
|
// Receiving a new client from the EXTERNAL socket.
|
||||||
|
// New client = new switch from a distant TCP connection to a
|
||||||
|
// local libipc service.
|
||||||
|
|
||||||
|
var buffer: [1000]u8 = undefined;
|
||||||
|
var size = try client.stream.read(&buffer);
|
||||||
|
print ("Asking to connect to service [{s}]\n", .{buffer[0..size]});
|
||||||
|
var servicefd = try ctx.connect_service (buffer[0..size]);
|
||||||
|
// Send a message to inform remote TCPd that the connection is established.
|
||||||
|
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
|
||||||
|
_ = try client.stream.write("ok");
|
||||||
|
|
||||||
|
print("add current client as external connection (for now)\n", .{});
|
||||||
|
try ctx.add_external (client.stream.handle);
|
||||||
|
|
||||||
|
print ("Message sent, switching.\n", .{});
|
||||||
|
try ctx.add_switch(client.stream.handle, servicefd);
|
||||||
|
print ("DONE.\n", .{});
|
||||||
},
|
},
|
||||||
|
|
||||||
.SWITCH_RX => {
|
.SWITCH_RX => {
|
||||||
print("Message has been received (SWITCH).\n", .{});
|
print("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
|
||||||
},
|
},
|
||||||
|
|
||||||
.SWITCH_TX => {
|
.SWITCH_TX => {
|
||||||
print("Message has been sent (SWITCH).\n", .{});
|
print("Message has been sent (SWITCH fd {}).\n", .{some_event.origin});
|
||||||
},
|
},
|
||||||
|
|
||||||
.MESSAGE => {
|
.MESSAGE => {
|
||||||
|
@ -105,18 +168,38 @@ fn create_service() !void {
|
||||||
print ("proto [{s}] address [{s}] path [{s}]\n"
|
print ("proto [{s}] address [{s}] path [{s}]\n"
|
||||||
, .{uri.protocol, uri.address, uri.path});
|
, .{uri.protocol, uri.address, uri.path});
|
||||||
|
|
||||||
// TODO FIXME: CURRENTLY ONLY CONNECT TO LOCAL UNIX SERVICE
|
var iterator = std.mem.split(u8, uri.address, ":");
|
||||||
// TODO: TCP!!
|
var real_tcp_address = iterator.first();
|
||||||
var servicefd = try ctx.connect_service (uri.path);
|
var real_tcp_port = try std.fmt.parseUnsigned(u16, iterator.rest(), 10);
|
||||||
|
var socket_addr = try net.Address.parseIp(real_tcp_address, real_tcp_port);
|
||||||
|
var stream = try net.tcpConnectToAddress(socket_addr);
|
||||||
|
|
||||||
|
print("Writing URI PATH: {s}\n", .{uri.path});
|
||||||
|
_ = try stream.write(uri.path);
|
||||||
|
|
||||||
|
print("Writing URI PATH - written, waiting for the final 'ok'\n", .{});
|
||||||
|
var buffer: [1000]u8 = undefined;
|
||||||
|
var size = try stream.read(&buffer);
|
||||||
|
if (! std.mem.eql(u8, buffer[0..size], "ok")) {
|
||||||
|
print("didn't receive 'ok', let's kill the connection\n", .{});
|
||||||
|
stream.close();
|
||||||
|
try ctx.close_fd(some_event.origin);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
print("final 'ok' received, sending 'ok' to IPCd\n", .{});
|
||||||
|
|
||||||
// Connection is established, inform IPCd.
|
// Connection is established, inform IPCd.
|
||||||
var response = try Message.init(some_event.origin, allocator, "ok");
|
var response = try Message.init(some_event.origin, allocator, "ok");
|
||||||
defer response.deinit();
|
defer response.deinit();
|
||||||
try ctx.write(response);
|
try ctx.write(response);
|
||||||
|
|
||||||
|
print("add current client as external connection (for now)\n", .{});
|
||||||
|
try ctx.add_external (stream.handle);
|
||||||
|
|
||||||
|
print("Finally, add switching\n", .{});
|
||||||
// Let's switch the connections!
|
// Let's switch the connections!
|
||||||
try ctx.add_switch(some_event.origin, servicefd);
|
try ctx.add_switch(some_event.origin, stream.handle);
|
||||||
// TODO: for real TCP code, invoke: ctx.set_switch_callbacks ();
|
// TODO: should probably invoke: ctx.set_switch_callbacks ();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// TCPd was contacted without providing a message, nothing to do.
|
// TCPd was contacted without providing a message, nothing to do.
|
||||||
|
|
Reference in New Issue