diff --git a/zig-impl/src/context.zig b/zig-impl/src/context.zig index b3ed0c3..f821bc0 100644 --- a/zig-impl/src/context.zig +++ b/zig-impl/src/context.zig @@ -5,6 +5,8 @@ const net = std.net; const os = std.os; const fmt = std.fmt; +const receive_fd = @import("./exchange-fd.zig").receive_fd; + const Timer = std.time.Timer; const print = std.debug.print; @@ -33,10 +35,7 @@ pub const Context = struct { allocator: std.mem.Allocator, // Memory allocator. connections: Connections, // Keep track of connections. - // TODO: List of "pollfd" structures within cinfos, - // so we can pass it to poll(2). Share indexes with 'connections'. - // For now, this list doesn't do anything. - // Can even be replaced in a near future. + // "pollfd" structures passed to poll(2). Same indexes as "connections". pollfd: PollFD, // .fd (fd_t) + .events (i16) + .revents (i16) tx: Messages, // Messages to send, once their fd is available. @@ -97,14 +96,12 @@ pub const Context = struct { const newfd = stream.handle; errdefer std.os.closeSocket(newfd); var newcon = Connection.init(ctype, null); - try self.connections.append(newcon); - try self.pollfd.append(.{ .fd = newfd - , .events = std.os.linux.POLL.IN - , .revents = 0 }); + try self.add_ (newcon, newfd); return newfd; } - fn connect_ipcd (self: *Self, service_name: []const u8) !?i32 { + fn connect_ipcd (self: *Self, service_name: []const u8 + , connection_type: Connection.Type) !?i32 { const buffer_size = 10000; var buffer: [buffer_size]u8 = undefined; @@ -138,15 +135,25 @@ pub const Context = struct { // content: target service name;${IPC_NETWORK} // example: pong;pong tls://example.com:8998/pong - var m = try Message.init (ipcdfd, fba, Message.Type.LOOKUP, lookupfbs.getWritten()); + var m = try Message.init (ipcdfd, Message.Type.LOOKUP, fba, lookupfbs.getWritten()); try self.write (m); - // TODO - // var response = something like "try os.recvmsg" // Read LOOKUP response - // case error: ignore and move on + // case error: ignore and move on (TODO) // else: get fd sent by IPCd then close IPCd fd + var newfd = try receive_fd (ipcdfd); + var newcon = Connection.init(connection_type, null); + try self.add_ (newcon, newfd); + } + /// TODO: Add a new connection, but takes care of memory problems: + /// in case one of the arrays cannot sustain another entry, the other + /// won't be added. + fn add_ (self: *Self, new_connection: Connection, fd: os.socket_t) !void { + try self.connections.append(new_connection); + try self.pollfd.append(.{ .fd = fd + , .events = std.os.linux.POLL.IN + , .revents = 0 }); } fn fd_to_index (self: Self, fd: i32) !usize { @@ -200,10 +207,7 @@ pub const Context = struct { const newfd = client.stream.handle; var newcon = Connection.init(Connection.Type.IPC, null); - try self.connections.append(newcon); - try self.pollfd.append(.{ .fd = newfd - , .events = std.os.linux.POLL.IN - , .revents = 0 }); + try self.add_ (newcon, newfd); const sfd = server.sockfd orelse return error.SocketLOL; // TODO // WARNING: imply every new item is last @@ -222,10 +226,7 @@ pub const Context = struct { const newfd = server.sockfd orelse return error.SocketLOL; // TODO // Store the path in the Connection structure, so the UNIX socket file can be removed later. var newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path)); - try self.connections.append(newcon); - try self.pollfd.append(.{ .fd = newfd - , .events = std.os.linux.POLL.IN - , .revents = 0 }); + try self.add_ (newcon, newfd); return server; } diff --git a/zig-impl/src/exchange-fd.zig b/zig-impl/src/exchange-fd.zig index 494bb2d..8f01a0a 100644 --- a/zig-impl/src/exchange-fd.zig +++ b/zig-impl/src/exchange-fd.zig @@ -72,6 +72,8 @@ test { std.testing.refAllDecls(Cmsghdr([3]std.os.fd_t)); } +/// Send a file descriptor and a message through a UNIX socket. +/// TODO: currently voluntarily crashes if data isn't sent properly, should return an error instead. pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void { var iov = [_]os.iovec_const{ .{ @@ -107,7 +109,9 @@ pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void { } } -// WARNING: errors aren't RECEPTION errors. +/// WARNING: recvmsg is a WIP. +/// WARNING: errors aren't RECEPTION errors. +/// WARNING: can only work on linux for now (recvmsg is lacking on other systems). pub fn recvmsg( /// The file descriptor of the sending socket. sockfd: os.socket_t, @@ -180,13 +184,17 @@ pub fn recvmsg( } } -pub fn receive_fd(sockfd: os.socket_t) !os.fd_t { - var buffer: [100]u8 = undefined; +/// Receive a file descriptor through a UNIX socket. +/// A message can be carried with it, copied into 'buffer'. +/// WARNING: buffer must be at least 1500 bytes. +pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t { + + var msg_buffer: [1500]u8 = undefined; var iov = [1]os.iovec{ .{ - .iov_base = buffer[0..], - .iov_len = buffer.len, + .iov_base = msg_buffer[0..] + , .iov_len = buffer.len }, }; @@ -204,14 +212,17 @@ pub fn receive_fd(sockfd: os.socket_t) !os.fd_t { .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0, - }; + }; - const len = recvmsg(sockfd, msg, 0) catch |err| { + _ = recvmsg(sockfd, msg, 0) catch |err| { print("error sendmsg failed with {s}", .{@errorName(err)}); return 0; }; - print("received {} bytes, fd is {}\n", .{len, @as(i32, cmsg.dataPtr().*)}); - print("iov base {s}\n", .{iov[0].iov_base[0..iov[0].iov_len - 1]}); - return @as(i32, cmsg.dataPtr().*); + var received_fd = @as(i32, cmsg.dataPtr().*); + // print("received {} bytes, fd is {}\n", .{len, received_fd}); + // print("payload (iov base) {s}\n", .{iov[0].iov_base[0..iov[0].iov_len - 1]}); + std.mem.copy(u8, buffer, &msg_buffer); + msg_size.* = iov[0].iov_len; + return received_fd; } diff --git a/zig-impl/src/ipcd.zig b/zig-impl/src/ipcd.zig index 0ba7a03..fc05b82 100644 --- a/zig-impl/src/ipcd.zig +++ b/zig-impl/src/ipcd.zig @@ -5,6 +5,11 @@ const fmt = std.fmt; const os = std.os; const ipc = @import("./main.zig"); +const Message = ipc.Message; + +// Import send_fd this way in order to produce docs for exchange-fd functions. +const exchange_fd = @import("./exchange-fd.zig"); +const send_fd = exchange_fd.send_fd; const builtin = @import("builtin"); const native_os = builtin.target.os.tag; @@ -111,8 +116,33 @@ fn create_service() !void { .LOOKUP => { print("Client asking for a service through ipcd.\n", .{}); - print("NOT IMPLEMENTED, YET. It's a suicide, then.\n", .{}); - break; + if (some_event.m) |m| { + print("Message: {}\n", .{m}); + // 1. split message + // TODO + print("payload is: {s}\n", .{m.payload}); + // 2. find relevant part of the message + // TODO + // 3. connect whether asked to + // TODO + // 4. send_fd or send an error + // TODO + var response = try Message.init(some_event.origin + , Message.Type.ERROR + , allocator + , "currently not implemented"); + try ctx.write(response); + } + else { + // There is a problem: ipcd was contacted without providing + // a message, meaning there is nothing to do. This should be + // explicitely warned about. + var m = try Message.init(some_event.origin + , Message.Type.ERROR + , allocator + , "lookup message without data"); + try ctx.write(m); + } }, .TX => {