Basic proxy works.

dev
Philippe PITTOLI 2024-06-16 01:17:06 +02:00
parent 7415cca852
commit cc8dcca121
1 changed files with 141 additions and 20 deletions

View File

@ -1,4 +1,7 @@
const std = @import("std");
const hexdump = @import("./hexdump.zig");
const AutoArrayHashMap = std.AutoArrayHashMap;
const allocator = std.mem.Allocator;
const ipc = @cImport({
// See https://github.com/ziglang/zig/issues/515
@cDefine("_NO_CRT_STDIO_INLINE", "1");
@ -6,47 +9,165 @@ const ipc = @cImport({
@cInclude("libipc.h");
});
// // Return type of callback functions when switching.
// enum cb_event_types {
// CB_NO_ERROR = 0 // No error. A message was generated.
// , CB_ERROR = 1 // Generic error.
// , CB_FD_CLOSING = 2 // The fd is closing.
// , CB_IGNORE = 3 // The message should be ignored (protocol specific).
// };
//
// int ipc_write (void* ctx, int servicefd, char* mcontent, uint32_t mlen);
// int ipc_read_fd (void* ctx, int fd, char* buffer, size_t* buflen);
// int ipc_read (void* ctx, size_t index, char* buffer, size_t* buflen);
// void ipc_context_timer (void* ctx, int timer);
// int ipc_close (void* ctx, size_t index);
// int ipc_close_all (void* ctx);
//
// // Switch functions (for "protocol" services, such as TCPd).
// int ipc_add_external (void* ctx, int newfd);
//
// // Returned "char" is a cb_event_types enum.
// int ipc_set_switch_callbacks (void* ctx, int fd
// , char (*in (int orig, const char *payload, uint32_t *mlen))
// , char (*out(int dest, char *payload, uint32_t mlen)));
// TODO: noreturn error function: print a error message then crash.
// TODO: connection to the proxied service
// TODO: handle connections and disconnections
// TODO: handle messages (proxy)
var context : ?*anyopaque = undefined;
var hash : AutoArrayHashMap(i32, i32) = undefined;
//pub fn in(origin : i32, payload : [*]u8, mlen : *u32) callconv(.C) u8
//{
//}
//
//pub fn out(dest_fd : i32, , payload : [*]u8, mlen : u32) callconv(.C) u8
//{
//}
pub fn connection_and_link(client_fd : i32) !void
{
const stdout = std.io.getStdOut().writer();
try stdout.print("Connection of a new user ({})\n", .{client_fd});
// Connection to the proxied service.
const proxied_service = "pong";
var proxied_service_socket_fd : i32 = 0;
const ret = ipc.ipc_connect_service (context, &proxied_service_socket_fd, proxied_service, proxied_service.len);
if (ret != 0) {
try stdout.print("Impossible to connect to the service {s}\n", .{proxied_service});
return error.CannotConnectToService;
}
errdefer _ = ipc.ipc_close_fd(context, proxied_service_socket_fd);
// Link the user to the proxied service.
//ret = ipc.ipc_add_switch (context, client_fd, proxied_service_socket_fd);
//if (ret != 0) {
// try stdout.print("Impossible switch both client {} and service {s}\n", .{client_fd, proxied_service});
// return error.CannotAddSwitch;
//}
errdefer _ = ipc.ipc_close_fd(context, client_fd);
try hash.put(client_fd, proxied_service_socket_fd);
try hash.put(proxied_service_socket_fd, client_fd);
try stdout.print("Connection of a new user ({}): linked with '{s}' ({})\n",
.{client_fd, proxied_service, proxied_service_socket_fd});
}
pub fn disconnection_and_unlink(fd : i32) !void
{
const stdout = std.io.getStdOut().writer();
try stdout.print("Disconnection of user ({})\n", .{fd});
const related_fd = hash.get(fd) orelse return error.noRelatedFD;
_ = ipc.ipc_close_fd(context, related_fd);
_ = hash.swapRemove(related_fd);
_ = hash.swapRemove(fd);
}
fn print_hex(buffer : [100_000]u8, buflen : u64) !void
{
const stdout = std.io.getStdOut().writer();
var hexbuf: [100_000]u8 = undefined;
var hexfbs = std.io.fixedBufferStream(&hexbuf);
const hexwriter = hexfbs.writer();
try hexdump.hexdump(hexwriter, "Message received", buffer[0..buflen]);
try stdout.print("{s}\n", .{hexfbs.getWritten()});
}
pub fn message_rx(fd : i32, buffer : [100_000]u8, buflen : u64) !void
{
const stdout = std.io.getStdOut().writer();
// There is no need to "read a message", it's already done.
// _ = ipc.ipc_read_fd (context, fd, &buffer, &buflen);
try stdout.print("Message received from {}, size: {}\n", .{fd, buflen});
try print_hex(buffer, buflen);
// Search for the related fd.
const related_fd = hash.get(fd) orelse return error.no_related_fd;
// Send a message to the related service or client.
_ = ipc.ipc_schedule(context, related_fd, &buffer, @intCast(buflen));
try stdout.print("Message scheduled for {}\n", .{fd});
}
pub fn message_tx(fd : i32) !void
{
const stdout = std.io.getStdOut().writer();
try stdout.print("Message sent to ({})\n", .{fd});
}
pub fn main() !void {
var context : ?*anyopaque = undefined;
var service_socket_fd : i32 = 0;
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
// Initialization of the hashmap.
hash = AutoArrayHashMap(i32, i32).init(std.heap.c_allocator);
defer hash.deinit(); // Free the hashmap structure memory.
// Initialization of the proxy.
switch(ipc.ipc_context_init(&context)) {
0 => try stdout.print("Context initialized\n", .{}),
else => try stdout.print("Problem while initializing the context\n", .{}),
}
defer { _ = ipc.ipc_context_deinit(&context); }
defer { _ = ipc.ipc_context_deinit(&context); } // Free the context structure memory.
switch(ipc.ipc_service_init(context, &service_socket_fd, "proxy", 5)) {
0 => try stdout.print("Service initialized\n", .{}),
else => try stdout.print("Problem while initializing the service\n", .{}),
}
defer { _ = ipc.ipc_close_all(context); }
defer { _ = ipc.ipc_close_all(context); } // Close all established connections.
try bw.flush(); // don't forget to flush!
// TODO: do some stuff
while(true) {
var t : u8 = undefined;
var index : u64 = undefined;
var buffer : [100_000]u8 = undefined;
var buflen : u64 = 100_000;
var origin_fd : i32 = 0;
var t : u8 = undefined; // event type
var index : u64 = undefined; // index of the connection on which the action happens
var buffer : [100_000]u8 = undefined; // message buffer
var buflen : u64 = 100_000; // message buffer length
var origin_fd : i32 = 0; // fd on which the action happens
var new_fd : i32 = 0; // new client's fd (in case of a connection)
_ = ipc.ipc_wait_event(context, &t, &index, &origin_fd, &buffer, &buflen);
const ret = ipc.ipc_wait_event(context, &t, &index, &origin_fd, &new_fd, &buffer, &buflen);
if (ret != 0) {
return error.wait_event_failed;
}
switch(t) {
ipc.ERROR => try stdout.print("ERROR\n", .{}), // A problem occured.
ipc.CONNECTION => try stdout.print("CONNECTION\n", .{}), // New user.
ipc.DISCONNECTION => try stdout.print("DISCONNECTION\n", .{}), // User disconnected.
ipc.MESSAGE_RX => try stdout.print("MESSAGE_RX\n", .{}), // New message.
ipc.MESSAGE_TX => try stdout.print("MESSAGE_TX\n", .{}), // Message sent.
ipc.CONNECTION => try connection_and_link(new_fd), // New user.
ipc.DISCONNECTION => try disconnection_and_unlink(origin_fd), // User disconnected.
ipc.MESSAGE_RX => try message_rx(origin_fd, buffer, buflen), // New message.
ipc.MESSAGE_TX => try message_tx(origin_fd), // Message sent.
ipc.TIMER => try stdout.print("TIMER\n", .{}), // Timeout in the poll(2) function.
ipc.EXTERNAL => try stdout.print("EXTERNAL\n", .{}), // Message received from a non IPC socket.
ipc.SWITCH_RX => try stdout.print("SWITCH_RX\n", .{}), // Message received from a switched FD.