From cc8dcca121b04f3279b7ad36ea9cdde8667d5df5 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Sun, 16 Jun 2024 01:17:06 +0200 Subject: [PATCH] Basic proxy works. --- src/main.zig | 161 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 141 insertions(+), 20 deletions(-) diff --git a/src/main.zig b/src/main.zig index c0c3e75..60f55a4 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,4 +1,7 @@ const std = @import("std"); +const hexdump = @import("./hexdump.zig"); +const AutoArrayHashMap = std.AutoArrayHashMap; +const allocator = std.mem.Allocator; const ipc = @cImport({ // See https://github.com/ziglang/zig/issues/515 @cDefine("_NO_CRT_STDIO_INLINE", "1"); @@ -6,52 +9,170 @@ const ipc = @cImport({ @cInclude("libipc.h"); }); +// // Return type of callback functions when switching. +// enum cb_event_types { +// CB_NO_ERROR = 0 // No error. A message was generated. +// , CB_ERROR = 1 // Generic error. +// , CB_FD_CLOSING = 2 // The fd is closing. +// , CB_IGNORE = 3 // The message should be ignored (protocol specific). +// }; +// +// int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen); +// int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen); +// int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen); +// void ipc_context_timer (void* ctx, int timer); +// int ipc_close (void* ctx, size_t index); +// int ipc_close_all (void* ctx); +// +// // Switch functions (for "protocol" services, such as TCPd). +// int ipc_add_external (void* ctx, int newfd); +// +// // Returned "char" is a cb_event_types enum. +// int ipc_set_switch_callbacks (void* ctx, int fd +// , char (*in (int orig, const char *payload, uint32_t *mlen)) +// , char (*out(int dest, char *payload, uint32_t mlen))); + // TODO: noreturn error function: print a error message then crash. // TODO: connection to the proxied service // TODO: handle connections and disconnections // TODO: handle messages (proxy) +var context : ?*anyopaque = undefined; +var hash : AutoArrayHashMap(i32, i32) = undefined; + +//pub fn in(origin : i32, payload : [*]u8, mlen : *u32) callconv(.C) u8 +//{ +//} +// +//pub fn out(dest_fd : i32, , payload : [*]u8, mlen : u32) callconv(.C) u8 +//{ +//} + +pub fn connection_and_link(client_fd : i32) !void +{ + const stdout = std.io.getStdOut().writer(); + + try stdout.print("Connection of a new user ({})\n", .{client_fd}); + + // Connection to the proxied service. + const proxied_service = "pong"; + var proxied_service_socket_fd : i32 = 0; + + const ret = ipc.ipc_connect_service (context, &proxied_service_socket_fd, proxied_service, proxied_service.len); + if (ret != 0) { + try stdout.print("Impossible to connect to the service {s}\n", .{proxied_service}); + return error.CannotConnectToService; + } + errdefer _ = ipc.ipc_close_fd(context, proxied_service_socket_fd); + + // Link the user to the proxied service. + //ret = ipc.ipc_add_switch (context, client_fd, proxied_service_socket_fd); + //if (ret != 0) { + // try stdout.print("Impossible switch both client {} and service {s}\n", .{client_fd, proxied_service}); + // return error.CannotAddSwitch; + //} + errdefer _ = ipc.ipc_close_fd(context, client_fd); + + try hash.put(client_fd, proxied_service_socket_fd); + try hash.put(proxied_service_socket_fd, client_fd); + + try stdout.print("Connection of a new user ({}): linked with '{s}' ({})\n", + .{client_fd, proxied_service, proxied_service_socket_fd}); +} + +pub fn disconnection_and_unlink(fd : i32) !void +{ + const stdout = std.io.getStdOut().writer(); + try stdout.print("Disconnection of user ({})\n", .{fd}); + + const related_fd = hash.get(fd) orelse return error.noRelatedFD; + _ = ipc.ipc_close_fd(context, related_fd); + _ = hash.swapRemove(related_fd); + _ = hash.swapRemove(fd); +} + +fn print_hex(buffer : [100_000]u8, buflen : u64) !void +{ + const stdout = std.io.getStdOut().writer(); + var hexbuf: [100_000]u8 = undefined; + var hexfbs = std.io.fixedBufferStream(&hexbuf); + const hexwriter = hexfbs.writer(); + try hexdump.hexdump(hexwriter, "Message received", buffer[0..buflen]); + try stdout.print("{s}\n", .{hexfbs.getWritten()}); +} + +pub fn message_rx(fd : i32, buffer : [100_000]u8, buflen : u64) !void +{ + const stdout = std.io.getStdOut().writer(); + + // There is no need to "read a message", it's already done. + // _ = ipc.ipc_read_fd (context, fd, &buffer, &buflen); + + try stdout.print("Message received from {}, size: {}\n", .{fd, buflen}); + try print_hex(buffer, buflen); + + // Search for the related fd. + const related_fd = hash.get(fd) orelse return error.no_related_fd; + + // Send a message to the related service or client. + _ = ipc.ipc_schedule(context, related_fd, &buffer, @intCast(buflen)); + try stdout.print("Message scheduled for {}\n", .{fd}); +} + +pub fn message_tx(fd : i32) !void +{ + const stdout = std.io.getStdOut().writer(); + try stdout.print("Message sent to ({})\n", .{fd}); +} + pub fn main() !void { - var context : ?*anyopaque = undefined; var service_socket_fd : i32 = 0; const stdout_file = std.io.getStdOut().writer(); var bw = std.io.bufferedWriter(stdout_file); const stdout = bw.writer(); + // Initialization of the hashmap. + hash = AutoArrayHashMap(i32, i32).init(std.heap.c_allocator); + defer hash.deinit(); // Free the hashmap structure memory. + + // Initialization of the proxy. switch(ipc.ipc_context_init(&context)) { 0 => try stdout.print("Context initialized\n", .{}), else => try stdout.print("Problem while initializing the context\n", .{}), } - defer { _ = ipc.ipc_context_deinit(&context); } + defer { _ = ipc.ipc_context_deinit(&context); } // Free the context structure memory. switch(ipc.ipc_service_init(context, &service_socket_fd, "proxy", 5)) { 0 => try stdout.print("Service initialized\n", .{}), else => try stdout.print("Problem while initializing the service\n", .{}), } - defer { _ = ipc.ipc_close_all(context); } + defer { _ = ipc.ipc_close_all(context); } // Close all established connections. try bw.flush(); // don't forget to flush! - // TODO: do some stuff while(true) { - var t : u8 = undefined; - var index : u64 = undefined; - var buffer : [100_000]u8 = undefined; - var buflen : u64 = 100_000; - var origin_fd : i32 = 0; + var t : u8 = undefined; // event type + var index : u64 = undefined; // index of the connection on which the action happens + var buffer : [100_000]u8 = undefined; // message buffer + var buflen : u64 = 100_000; // message buffer length + var origin_fd : i32 = 0; // fd on which the action happens + var new_fd : i32 = 0; // new client's fd (in case of a connection) - _ = ipc.ipc_wait_event(context, &t, &index, &origin_fd, &buffer, &buflen); + const ret = ipc.ipc_wait_event(context, &t, &index, &origin_fd, &new_fd, &buffer, &buflen); + if (ret != 0) { + return error.wait_event_failed; + } switch(t) { - ipc.ERROR => try stdout.print("ERROR\n", .{}), // A problem occured. - ipc.CONNECTION => try stdout.print("CONNECTION\n", .{}), // New user. - ipc.DISCONNECTION => try stdout.print("DISCONNECTION\n", .{}), // User disconnected. - ipc.MESSAGE_RX => try stdout.print("MESSAGE_RX\n", .{}), // New message. - ipc.MESSAGE_TX => try stdout.print("MESSAGE_TX\n", .{}), // Message sent. - ipc.TIMER => try stdout.print("TIMER\n", .{}), // Timeout in the poll(2) function. - ipc.EXTERNAL => try stdout.print("EXTERNAL\n", .{}), // Message received from a non IPC socket. - ipc.SWITCH_RX => try stdout.print("SWITCH_RX\n", .{}), // Message received from a switched FD. - ipc.SWITCH_TX => try stdout.print("SWITCH_TX\n", .{}), // Message sent to a switched fd. - else => try stdout.print("ELSE CASE\n", .{}), // Should never happen. + ipc.ERROR => try stdout.print("ERROR\n", .{}), // A problem occured. + ipc.CONNECTION => try connection_and_link(new_fd), // New user. + ipc.DISCONNECTION => try disconnection_and_unlink(origin_fd), // User disconnected. + ipc.MESSAGE_RX => try message_rx(origin_fd, buffer, buflen), // New message. + ipc.MESSAGE_TX => try message_tx(origin_fd), // Message sent. + ipc.TIMER => try stdout.print("TIMER\n", .{}), // Timeout in the poll(2) function. + ipc.EXTERNAL => try stdout.print("EXTERNAL\n", .{}), // Message received from a non IPC socket. + ipc.SWITCH_RX => try stdout.print("SWITCH_RX\n", .{}), // Message received from a switched FD. + ipc.SWITCH_TX => try stdout.print("SWITCH_TX\n", .{}), // Message sent to a switched fd. + else => try stdout.print("ELSE CASE\n", .{}), // Should never happen. } try bw.flush(); // don't forget to flush! }