Split source code into different files.
parent
200219d2fe
commit
ceafe4c84f
|
@ -0,0 +1,38 @@
|
|||
pub const CBEvent = struct {
|
||||
|
||||
// CallBack Event types.
|
||||
// In the main event loop, servers and clients can receive connections,
|
||||
// disconnections, errors or messages from their pairs. They also can
|
||||
// set a timer so the loop will allow a periodic routine (sending ping
|
||||
// messages for websockets, for instance).
|
||||
//
|
||||
// A few other events can occur.
|
||||
//
|
||||
// Extra socket
|
||||
// The main loop waiting for an event can be used as an unique entry
|
||||
// point for socket management. libipc users can register sockets via
|
||||
// ipc_add_fd allowing them to trigger an event, so events unrelated
|
||||
// to libipc are managed the same way.
|
||||
// Switch
|
||||
// libipc can be used to create protocol-related programs, such as a
|
||||
// websocket proxy allowing libipc services to be accessible online.
|
||||
// To help those programs (with TCP-complient sockets), two sockets
|
||||
// can be bound together, each message coming from one end will be
|
||||
// automatically transfered to the other socket and a Switch event
|
||||
// will be triggered.
|
||||
// Look Up
|
||||
// When a client establishes a connection to a service, it asks the
|
||||
// ipc daemon (ipcd) to locate the service and establish a connection
|
||||
// to it. This is a lookup.
|
||||
|
||||
// For IO callbacks (switching).
|
||||
pub const Type = enum {
|
||||
NO_ERROR, // No error. A message was generated.
|
||||
FD_CLOSING, // The fd is closing.
|
||||
FD_ERROR, // Generic error.
|
||||
PARSING_ERROR, // The message was read but with errors.
|
||||
IGNORE, // The message should be ignored (protocol specific).
|
||||
};
|
||||
|
||||
t: CBEvent.Type,
|
||||
};
|
|
@ -0,0 +1,66 @@
|
|||
const std = @import("std");
|
||||
const hexdump = @import("./hexdump.zig");
|
||||
const net = std.net;
|
||||
const fmt = std.fmt;
|
||||
|
||||
const print = std.debug.print;
|
||||
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
pub const Connections = std.ArrayList(Connection);
|
||||
|
||||
pub const Connection = struct {
|
||||
|
||||
pub const Type = enum {
|
||||
IPC, // Standard connection.
|
||||
EXTERNAL, // Non IPC connection (TCP, UDP, etc.).
|
||||
SERVER, // Messages received = new connections.
|
||||
SWITCHED, // IO operations should go through registered callbacks.
|
||||
};
|
||||
|
||||
t: Connection.Type,
|
||||
path: ?[] const u8, // Not always needed.
|
||||
|
||||
// TODO: use these connections
|
||||
server: ?net.StreamServer = null,
|
||||
client: ?net.StreamServer.Connection = null,
|
||||
|
||||
// more_to_read: bool, // useless for now
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(t: Connection.Type, path: ?[] const u8) Self {
|
||||
return Self {
|
||||
.t = t,
|
||||
.path = path,
|
||||
// .more_to_read = false, // TODO: maybe useless
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
if (self.server) |*s| { s.deinit(); }
|
||||
// if (self.client) |*c| { c.deinit(); }
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path});
|
||||
|
||||
if (self.server) |s| {
|
||||
try fmt.format(out_stream, "{}" , .{s});
|
||||
}
|
||||
if (self.client) |c| {
|
||||
try fmt.format(out_stream, "{}" , .{c});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test "Connection - creation and display" {
|
||||
// origin destination
|
||||
var path = "/some/path";
|
||||
var c1 = Connection.init(Connection.Type.EXTERNAL, path);
|
||||
defer c1.deinit();
|
||||
var c2 = Connection.init(Connection.Type.IPC , null);
|
||||
defer c2.deinit();
|
||||
try print_eq("connection.Connection.Type.EXTERNAL, path /some/path", c1);
|
||||
try print_eq("connection.Connection.Type.IPC, path null", c2);
|
||||
}
|
|
@ -0,0 +1,369 @@
|
|||
const std = @import("std");
|
||||
const hexdump = @import("./hexdump.zig");
|
||||
const testing = std.testing;
|
||||
const net = std.net;
|
||||
const fmt = std.fmt;
|
||||
|
||||
const print = std.debug.print;
|
||||
|
||||
const CBEvent = @import("./callback.zig").CBEvent;
|
||||
const Connection = @import("./connection.zig").Connection;
|
||||
const Message = @import("./message.zig").Message;
|
||||
const Event = @import("./event.zig").Event;
|
||||
const Switch = @import("./switch.zig").Switch;
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
const Messages = @import("./message.zig").Messages;
|
||||
const Switches = @import("./switch.zig").Switches;
|
||||
const Connections = @import("./connection.zig").Connections;
|
||||
pub const PollFD = std.ArrayList(i32);
|
||||
|
||||
// Context of the whole networking state.
|
||||
pub const Context = struct {
|
||||
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
||||
pub const IPC_BASE_SIZE = 2000000; // 2 MB, plenty enough space for messages
|
||||
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE-IPC_HEADER_SIZE;
|
||||
pub const IPC_VERSION = 1;
|
||||
|
||||
rundir: [] u8,
|
||||
allocator: std.mem.Allocator, // Memory allocator.
|
||||
connections: Connections, // Keep track of connections.
|
||||
|
||||
// TODO: List of "pollfd" structures within cinfos,
|
||||
// so we can pass it to poll(2). Share indexes with 'connections'.
|
||||
// For now, this list doesn't do anything.
|
||||
// Can even be replaced in a near future.
|
||||
pollfd: PollFD, // File descriptors.
|
||||
|
||||
tx: Messages, // Messages to send, once their fd is available.
|
||||
switchdb: ?Switches, // Relations between fd.
|
||||
|
||||
timer: ?i32 = null, // No timer by default (no TIMER event).
|
||||
|
||||
const Self = @This();
|
||||
|
||||
// Context initialization:
|
||||
// - init structures (provide the allocator)
|
||||
pub fn init(allocator: std.mem.Allocator) !Self {
|
||||
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) {
|
||||
error.EnvironmentVariableNotFound => blk: {
|
||||
// print("RUNTIME variable not set, using default /tmp/libipc-run/\n", .{});
|
||||
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
|
||||
},
|
||||
else => {
|
||||
return err;
|
||||
},
|
||||
};
|
||||
|
||||
return Self {
|
||||
.rundir = rundir
|
||||
, .connections = Connections.init(allocator)
|
||||
, .pollfd = PollFD.init(allocator)
|
||||
, .tx = Messages.init(allocator)
|
||||
, .switchdb = null
|
||||
, .allocator = allocator
|
||||
};
|
||||
}
|
||||
|
||||
// create a server path for the UNIX socket based on the service name
|
||||
pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void {
|
||||
try writer.print("{s}/{s}", .{self.rundir, service_name});
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.close_all() catch |err| switch(err){
|
||||
error.IndexOutOfBounds => {
|
||||
print("context.deinit(): IndexOutOfBounds\n", .{});
|
||||
},
|
||||
};
|
||||
self.allocator.free(self.rundir);
|
||||
self.connections.deinit();
|
||||
self.pollfd.deinit();
|
||||
self.tx.deinit();
|
||||
if (self.switchdb) |sdb| { sdb.deinit(); }
|
||||
}
|
||||
|
||||
// Both simple connection and the switched one share this code.
|
||||
fn connect_ (self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
|
||||
var stream = try net.connectUnixSocket(path);
|
||||
const newfd = stream.handle;
|
||||
errdefer std.os.closeSocket(newfd);
|
||||
var newcon = Connection.init(ctype, path);
|
||||
newcon.client = stream;
|
||||
try self.connections.append(newcon);
|
||||
try self.pollfd.append(newfd);
|
||||
return newfd;
|
||||
}
|
||||
|
||||
// Return the new fd. Can be useful to the caller.
|
||||
pub fn connect(self: *Self, path: []const u8) !i32 {
|
||||
// print("connection to:\t{s}\n", .{path});
|
||||
return self.connect_ (Connection.Type.IPC, path);
|
||||
}
|
||||
|
||||
// Connection to a service, but with switched with the client fd.
|
||||
// pub fn connection_switched(self: *Self
|
||||
// , path: [] const u8
|
||||
// , clientfd: i32) !i32 {
|
||||
// // print("connection switched from {} to path {s}\n", .{clientfd, path});
|
||||
// var newfd = try self.connect_ (Connection.Type.SWITCHED, path);
|
||||
// // TODO: record switch.
|
||||
// return newfd;
|
||||
// }
|
||||
|
||||
// Create a unix socket.
|
||||
// Store std lib structures in the context.
|
||||
pub fn server_init(self: *Self, path: [] const u8) !net.StreamServer {
|
||||
// print("context server init {s}\n", .{path});
|
||||
var server = net.StreamServer.init(.{});
|
||||
var socket_addr = try net.Address.initUnix(path);
|
||||
try server.listen(socket_addr);
|
||||
|
||||
const newfd = server.sockfd orelse return error.SocketLOL;
|
||||
var newcon = Connection.init(Connection.Type.SERVER, path);
|
||||
newcon.server = server;
|
||||
try self.connections.append(newcon);
|
||||
try self.pollfd.append(newfd);
|
||||
return server;
|
||||
}
|
||||
|
||||
pub fn write (self: *Self, m: Message) !void {
|
||||
print("write fd {}\n", .{m.fd});
|
||||
self.tx.append(m);
|
||||
}
|
||||
|
||||
pub fn read (self: *Self, index: u32) !Message {
|
||||
if (index >= self.pollfd.items.len) {
|
||||
return error.IndexOutOfBounds;
|
||||
}
|
||||
print("read index {}\n", .{index});
|
||||
var fd = self.pollfd[index];
|
||||
return self.read_fd(fd);
|
||||
}
|
||||
|
||||
pub fn read_fd (self: *Self, fd: i32) !Message {
|
||||
print("read fd {}\n", .{fd});
|
||||
|
||||
// TODO: read the actual content.
|
||||
var payload = "hello!!";
|
||||
|
||||
var m = Message.init(fd, Message.Type.DATA, self.allocator, payload);
|
||||
return m;
|
||||
}
|
||||
|
||||
// Wait an event.
|
||||
pub fn wait_event(self: *Self) !Event {
|
||||
// TODO: remove these debug prints.
|
||||
// for (self.pollfd.items) |fd| {
|
||||
// print("listening to fd {}\n", .{fd});
|
||||
// }
|
||||
if (self.timer) |t| { print("listening for MAXIMUM {} us\n", .{t}); }
|
||||
else { print("listening (no timer)\n", .{}); }
|
||||
|
||||
// TODO: listening to these file descriptors.
|
||||
var some_event = Event.init(Event.Type.CONNECTION, 5, 8, null);
|
||||
return some_event;
|
||||
}
|
||||
|
||||
pub fn close(self: *Self, index: usize) !void {
|
||||
// REMINDER: connections and pollfd have the same length
|
||||
if (index >= self.pollfd.items.len) {
|
||||
return error.IndexOutOfBounds;
|
||||
}
|
||||
|
||||
// close the connection and remove it from the two structures
|
||||
var con = self.connections.swapRemove(index);
|
||||
if (con.server) |s| {
|
||||
// Remove service's UNIX socket file.
|
||||
var addr = s.listen_address;
|
||||
var path = std.mem.sliceTo(&addr.un.path, 0);
|
||||
std.fs.cwd().deleteFile(path) catch {};
|
||||
}
|
||||
if (con.client) |c| {
|
||||
// Close the client's socket.
|
||||
c.stream.close();
|
||||
}
|
||||
_ = self.pollfd.swapRemove(index);
|
||||
}
|
||||
|
||||
pub fn close_all(self: *Self) !void {
|
||||
while(self.connections.items.len > 0) { try self.close(0); }
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "context ({} connections and {} messages):"
|
||||
, .{self.connections.items.len, self.tx.items.len});
|
||||
|
||||
for (self.connections.items) |con| {
|
||||
try fmt.format(out_stream, "\n- ", .{});
|
||||
try con.format(form, options, out_stream);
|
||||
}
|
||||
|
||||
for (self.tx.items) |tx| {
|
||||
try fmt.format(out_stream, "\n- ", .{});
|
||||
try tx.format(form, options, out_stream);
|
||||
}
|
||||
}
|
||||
|
||||
// PRIVATE API
|
||||
|
||||
fn read_ (_: *Self, client: net.StreamServer.Connection, buf: [] u8) !usize {
|
||||
return try client.stream.reader().read(buf);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Creating a new thread: testing UNIX communication.
|
||||
// This is a client sending a raw "Hello world!" bytestring,
|
||||
// not an instance of Message.
|
||||
const CommunicationTestThread = struct {
|
||||
fn clientFn() !void {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var c = try Context.init(allocator);
|
||||
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
try c.server_path("simple-context-test", writer);
|
||||
var path = fbs.getWritten();
|
||||
const socket = try net.connectUnixSocket(path);
|
||||
defer socket.close();
|
||||
// print("So we're a client now... path: {s}\n", .{path});
|
||||
_ = try socket.writer().writeAll("Hello world!");
|
||||
}
|
||||
};
|
||||
|
||||
test "Simple structures - init, display and memory check" {
|
||||
// origin destination
|
||||
// var s = Switch.init(3,8);
|
||||
// var payload = "hello!!";
|
||||
// // fd type payload
|
||||
// var m = Message.init(0, Message.Type.DATA, payload);
|
||||
//
|
||||
// // type index origin message
|
||||
// var e = Event.init(Event.Type.CONNECTION, 5, 8, &m);
|
||||
|
||||
// // CLIENT SIDE: connection to a service.
|
||||
// _ = try c.connect(path);
|
||||
|
||||
// // TODO: connection to a server, but switched with clientfd "3".
|
||||
// _ = try c.connection_switched(path, 3);
|
||||
}
|
||||
|
||||
test "Context - creation, display and memory check" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var c = try Context.init(allocator);
|
||||
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
try c.server_path("simple-context-test", writer);
|
||||
var path = fbs.getWritten();
|
||||
|
||||
// SERVER SIDE: creating a service.
|
||||
var server = try c.server_init(path);
|
||||
defer server.deinit();
|
||||
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
||||
|
||||
// print ("Context: {}\n", .{c});
|
||||
// print("\n", .{});
|
||||
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
|
||||
defer t.join();
|
||||
|
||||
// Server.accept returns a net.StreamServer.Connection.
|
||||
var client = try server.accept();
|
||||
defer client.stream.close();
|
||||
var buf: [16]u8 = undefined;
|
||||
const n = try client.stream.reader().read(&buf);
|
||||
|
||||
try testing.expectEqual(@as(usize, 12), n);
|
||||
try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]);
|
||||
}
|
||||
|
||||
// // TODO:
|
||||
// // Creating a new thread: testing UNIX communication.
|
||||
// // This is a client sending a raw "Hello world!" bytestring,
|
||||
// // not an instance of Message.
|
||||
// const ConnectThenSendMessageThread = struct {
|
||||
// fn clientFn() !void {
|
||||
// const config = .{.safety = true};
|
||||
// var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
// defer _ = gpa.deinit();
|
||||
// const allocator = gpa.allocator();
|
||||
//
|
||||
// var c = try Context.init(allocator);
|
||||
// defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
//
|
||||
// var path_buffer: [1000]u8 = undefined;
|
||||
// var path_fbs = std.io.fixedBufferStream(&path_buffer);
|
||||
// var path_writer = path_fbs.writer();
|
||||
// try c.server_path("simple-context-test", path_writer);
|
||||
// var path = path_fbs.getWritten();
|
||||
//
|
||||
// // Actual UNIX socket connection.
|
||||
// const socket = try net.connectUnixSocket(path);
|
||||
// defer socket.close();
|
||||
//
|
||||
// // Writing message into a buffer.
|
||||
// var message_buffer: [1000]u8 = undefined;
|
||||
// var message_fbs = std.io.fixedBufferStream(&message_buffer);
|
||||
// var message_writer = message_fbs.writer();
|
||||
// // 'fd' parameter is not taken into account here (no loop)
|
||||
//
|
||||
// var m = try Message.init(0, Message.Type.DATA, allocator, "Hello world!");
|
||||
// try m.write(message_writer);
|
||||
//
|
||||
// // print("So we're a client now... path: {s}\n", .{path});
|
||||
// _ = try socket.writer().writeAll(message_fbs.getWritten());
|
||||
// }
|
||||
// };
|
||||
//
|
||||
//
|
||||
// test "Context - creation, echo once" {
|
||||
// const config = .{.safety = true};
|
||||
// var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
// defer _ = gpa.deinit();
|
||||
//
|
||||
// const allocator = gpa.allocator();
|
||||
//
|
||||
// var c = try Context.init(allocator);
|
||||
// defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
//
|
||||
// var buffer: [1000]u8 = undefined;
|
||||
// var fbs = std.io.fixedBufferStream(&buffer);
|
||||
// var writer = fbs.writer();
|
||||
// try c.server_path("simple-context-test", writer);
|
||||
// var path = fbs.getWritten();
|
||||
//
|
||||
// // SERVER SIDE: creating a service.
|
||||
// var server = try c.server_init(path);
|
||||
// defer server.deinit();
|
||||
// defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
||||
//
|
||||
// const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{});
|
||||
// defer t.join();
|
||||
//
|
||||
// // Server.accept returns a net.StreamServer.Connection.
|
||||
// var client = try server.accept();
|
||||
// defer client.stream.close();
|
||||
// var buf: [1000]u8 = undefined;
|
||||
// const n = try client.stream.reader().read(&buf);
|
||||
// var m = try Message.read(buf[0..n], allocator);
|
||||
//
|
||||
// try testing.expectEqual(@as(usize, 12), m.payload.len);
|
||||
// try testing.expectEqualSlices(u8, m.payload, "Hello world!");
|
||||
// }
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
const std = @import("std");
|
||||
const testing = std.testing;
|
||||
const fmt = std.fmt;
|
||||
|
||||
const Message = @import("./message.zig").Message;
|
||||
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
pub const Event = struct {
|
||||
|
||||
// Event types.
|
||||
// In the main event loop, servers and clients can receive connections,
|
||||
// disconnections, errors or messages from their pairs. They also can
|
||||
// set a timer so the loop will allow a periodic routine (sending ping
|
||||
// messages for websockets, for instance).
|
||||
//
|
||||
// A few other events can occur.
|
||||
//
|
||||
// Extra socket
|
||||
// The main loop waiting for an event can be used as an unique entry
|
||||
// point for socket management. libipc users can register sockets via
|
||||
// ipc_add_fd allowing them to trigger an event, so events unrelated
|
||||
// to libipc are managed the same way.
|
||||
// Switch
|
||||
// libipc can be used to create protocol-related programs, such as a
|
||||
// websocket proxy allowing libipc services to be accessible online.
|
||||
// To help those programs (with TCP-complient sockets), two sockets
|
||||
// can be bound together, each message coming from one end will be
|
||||
// automatically transfered to the other socket and a Switch event
|
||||
// will be triggered.
|
||||
// Look Up
|
||||
// When a client establishes a connection to a service, it asks the
|
||||
// ipc daemon (ipcd) to locate the service and establish a connection
|
||||
// to it. This is a lookup.
|
||||
|
||||
pub const Type = enum {
|
||||
NOT_SET, // Default. TODO: should we keep this?
|
||||
ERROR, // A problem occured.
|
||||
EXTRA_SOCKET, // Message received from a non IPC socket.
|
||||
SWITCH, // Message to send to a corresponding fd.
|
||||
CONNECTION, // New user.
|
||||
DISCONNECTION, // User disconnected.
|
||||
MESSAGE, // New message.
|
||||
LOOKUP, // Client asking for a service through ipcd.
|
||||
TIMER, // Timeout in the poll(2) function.
|
||||
TX, // Message sent.
|
||||
};
|
||||
|
||||
t: Event.Type,
|
||||
index: u32,
|
||||
origin: usize, // socket fd
|
||||
m: ?*Message, // message pointer
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(t: Event.Type, index: u32, origin: usize, m: ?*Message) Self {
|
||||
return Self { .t = t, .index = index, .origin = origin, .m = m, };
|
||||
}
|
||||
|
||||
pub fn set(self: *Self, t: Event.Type, index: u32, origin: usize, m: ?*Message) void {
|
||||
self.t = t;
|
||||
self.index = index;
|
||||
self.origin = origin;
|
||||
self.m = m;
|
||||
}
|
||||
|
||||
pub fn clean(self: *Self) void {
|
||||
self.t = Event.Type.NOT_SET;
|
||||
self.index = @as(u8,0);
|
||||
self.origin = @as(usize,0);
|
||||
if (self.m) |message| {
|
||||
message.deinit();
|
||||
}
|
||||
self.m = null;
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "{}, origin: {}, index {}, message: [{?}]"
|
||||
, .{ self.t, self.origin, self.index, self.m} );
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
test "Event - creation and display" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var s = "hello!!";
|
||||
var m = try Message.init(1, Message.Type.DATA, allocator, s); // fd type payload
|
||||
defer m.deinit();
|
||||
var e = Event.init(Event.Type.CONNECTION, 5, 8, &m); // type index origin message
|
||||
|
||||
try print_eq("event.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, message.Message.Type.DATA, payload: [hello!!]]", e);
|
||||
}
|
|
@ -14,738 +14,31 @@ const fmt = std.fmt;
|
|||
// TODO: API should completely obfuscate the inner structures.
|
||||
// Only structures in this file should be necessary.
|
||||
|
||||
const CBEvent = @import("./callback.zig").CBEvent;
|
||||
const Connection = @import("./connection.zig").Connection;
|
||||
const Message = @import("./message.zig").Message;
|
||||
const Event = @import("./event.zig").Event;
|
||||
const Switch = @import("./switch.zig").Switch;
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
const print = std.debug.print;
|
||||
|
||||
pub const Messages = std.ArrayList(Message);
|
||||
pub const Switches = std.ArrayList(Switch);
|
||||
pub const Connections = std.ArrayList(Connection);
|
||||
const Messages = @import("./message.zig").Messages;
|
||||
const Switches = @import("./switch.zig").Switches;
|
||||
const Connections = @import("./connection.zig").Connections;
|
||||
const Context = @import("./context.zig").Context;
|
||||
pub const PollFD = std.ArrayList(i32);
|
||||
|
||||
pub const IPC_TYPE = enum {
|
||||
UNIX_SOCKETS
|
||||
};
|
||||
|
||||
pub const Message = struct {
|
||||
|
||||
pub const Type = enum {
|
||||
SERVER_CLOSE,
|
||||
ERROR,
|
||||
DATA,
|
||||
NETWORK_LOOKUP,
|
||||
};
|
||||
|
||||
t: Message.Type, // Internal message type.
|
||||
fd: usize, // File descriptor concerned about this message.
|
||||
payload: []const u8,
|
||||
|
||||
allocator: std.mem.Allocator, // Memory allocator.
|
||||
|
||||
const Self = @This();
|
||||
|
||||
// TODO
|
||||
//pub fn initFromConnection(fd: usize) Self {
|
||||
// return Self{
|
||||
// .t = Message.Type.ERROR,
|
||||
// .fd = fd,
|
||||
// .payload = "hello",
|
||||
// };
|
||||
//}
|
||||
|
||||
pub fn init(fd: usize, t: Message.Type
|
||||
, allocator: std.mem.Allocator
|
||||
, payload: []const u8) !Self {
|
||||
return Message { .fd = fd, .t = t
|
||||
, .allocator = allocator
|
||||
, .payload = try allocator.dupe(u8, payload) };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.allocator.free(self.payload);
|
||||
}
|
||||
|
||||
pub fn read(buffer: []const u8, allocator: std.mem.Allocator) !Self {
|
||||
|
||||
// var hexbuf: [4000]u8 = undefined;
|
||||
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
// var hexwriter = hexfbs.writer();
|
||||
// try hexdump.hexdump(hexwriter, "Message.read input buffer", buffer);
|
||||
// print("{s}\n", .{hexfbs.getWritten()});
|
||||
|
||||
// var payload = allocator.
|
||||
// defer allocator.free(payload);
|
||||
var fbs = std.io.fixedBufferStream(buffer);
|
||||
var reader = fbs.reader();
|
||||
|
||||
const msg_type = @intToEnum(Message.Type, try reader.readByte());
|
||||
const msg_len = try reader.readIntBig(u32);
|
||||
const msg_payload = buffer[5..5+msg_len];
|
||||
|
||||
return try Message.init(0, msg_type, allocator, msg_payload);
|
||||
}
|
||||
|
||||
pub fn write(self: Self, writer: anytype) !usize {
|
||||
try writer.writeByte(@enumToInt(self.t));
|
||||
try writer.writeIntBig(u32, @truncate(u32, self.payload.len));
|
||||
return 5 + try writer.write(self.payload);
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream, "fd: {}, {}, payload: [{s}]",
|
||||
.{self.fd, self.t, self.payload} );
|
||||
}
|
||||
};
|
||||
|
||||
fn print_eq(expected: anytype, obj: anytype) !void {
|
||||
var buffer: [4096]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
try writer.print("{}", .{obj});
|
||||
// print("print_eq, expected: {s}\n", .{expected});
|
||||
// print("print_eq: {s}\n", .{fbs.getWritten()});
|
||||
|
||||
// typing workaround
|
||||
var secbuffer: [4096]u8 = undefined;
|
||||
var secfbs = std.io.fixedBufferStream(&secbuffer);
|
||||
var secwriter = secfbs.writer();
|
||||
|
||||
try secwriter.print("{s}", .{expected});
|
||||
|
||||
try std.testing.expectEqualSlices(u8, secfbs.getWritten(), fbs.getWritten());
|
||||
test {
|
||||
_ = @import("./callback.zig");
|
||||
_ = @import("./connection.zig");
|
||||
_ = @import("./context.zig");
|
||||
_ = @import("./event.zig");
|
||||
_ = @import("./message.zig");
|
||||
_ = @import("./switch.zig");
|
||||
_ = @import("./util.zig");
|
||||
}
|
||||
|
||||
test "Message - creation and display" {
|
||||
// fd type payload
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var s = "hello!!";
|
||||
var m = try Message.init(1, Message.Type.DATA, allocator, s);
|
||||
defer m.deinit();
|
||||
|
||||
try print_eq("fd: 1, main.Message.Type.DATA, payload: [hello!!]", m);
|
||||
}
|
||||
|
||||
test "Message - read and write" {
|
||||
// fd type payload
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
// First, create a message.
|
||||
var s = "hello!!";
|
||||
var first_message = try Message.init(1, Message.Type.DATA, allocator, s);
|
||||
defer first_message.deinit();
|
||||
|
||||
// Test its content.
|
||||
try std.testing.expect(first_message.fd == 1);
|
||||
try std.testing.expect(first_message.payload.len == 7);
|
||||
try std.testing.expectEqualSlices(u8, first_message.payload, "hello!!");
|
||||
|
||||
// Write it in a buffer, similar to sending it on the network.
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
var count = try first_message.write(writer);
|
||||
|
||||
var second_buffer: [2000]u8 = undefined;
|
||||
var fba = std.heap.FixedBufferAllocator.init(&second_buffer);
|
||||
var second_allocator = fba.allocator();
|
||||
|
||||
// Read the buffer, similar to receiving a message on the network.
|
||||
var second_message = try Message.read(buffer[0..count], second_allocator);
|
||||
// var second_message = try Message.read(fbs.getWritten(), second_allocator);
|
||||
defer second_message.deinit();
|
||||
|
||||
// Test its content, should be equal to the first.
|
||||
try std.testing.expect(second_message.payload.len == first_message.payload.len);
|
||||
try std.testing.expectEqualSlices(u8, second_message.payload, first_message.payload);
|
||||
}
|
||||
|
||||
pub const Event = struct {
|
||||
|
||||
// Event types.
|
||||
// In the main event loop, servers and clients can receive connections,
|
||||
// disconnections, errors or messages from their pairs. They also can
|
||||
// set a timer so the loop will allow a periodic routine (sending ping
|
||||
// messages for websockets, for instance).
|
||||
//
|
||||
// A few other events can occur.
|
||||
//
|
||||
// Extra socket
|
||||
// The main loop waiting for an event can be used as an unique entry
|
||||
// point for socket management. libipc users can register sockets via
|
||||
// ipc_add_fd allowing them to trigger an event, so events unrelated
|
||||
// to libipc are managed the same way.
|
||||
// Switch
|
||||
// libipc can be used to create protocol-related programs, such as a
|
||||
// websocket proxy allowing libipc services to be accessible online.
|
||||
// To help those programs (with TCP-complient sockets), two sockets
|
||||
// can be bound together, each message coming from one end will be
|
||||
// automatically transfered to the other socket and a Switch event
|
||||
// will be triggered.
|
||||
// Look Up
|
||||
// When a client establishes a connection to a service, it asks the
|
||||
// ipc daemon (ipcd) to locate the service and establish a connection
|
||||
// to it. This is a lookup.
|
||||
|
||||
pub const Type = enum {
|
||||
NOT_SET, // Default. TODO: should we keep this?
|
||||
ERROR, // A problem occured.
|
||||
EXTRA_SOCKET, // Message received from a non IPC socket.
|
||||
SWITCH, // Message to send to a corresponding fd.
|
||||
CONNECTION, // New user.
|
||||
DISCONNECTION, // User disconnected.
|
||||
MESSAGE, // New message.
|
||||
LOOKUP, // Client asking for a service through ipcd.
|
||||
TIMER, // Timeout in the poll(2) function.
|
||||
TX, // Message sent.
|
||||
};
|
||||
|
||||
t: Event.Type,
|
||||
index: u32,
|
||||
origin: usize, // socket fd
|
||||
m: ?*Message, // message pointer
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(t: Event.Type, index: u32, origin: usize, m: ?*Message) Self {
|
||||
return Self { .t = t, .index = index, .origin = origin, .m = m, };
|
||||
}
|
||||
|
||||
pub fn set(self: *Self, t: Event.Type, index: u32, origin: usize, m: ?*Message) void {
|
||||
self.t = t;
|
||||
self.index = index;
|
||||
self.origin = origin;
|
||||
self.m = m;
|
||||
}
|
||||
|
||||
pub fn clean(self: *Self) void {
|
||||
self.t = Event.Type.NOT_SET;
|
||||
self.index = @as(u8,0);
|
||||
self.origin = @as(usize,0);
|
||||
if (self.m) |message| {
|
||||
message.deinit();
|
||||
}
|
||||
self.m = null;
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "{}, origin: {}, index {}, message: [{?}]"
|
||||
, .{ self.t, self.origin, self.index, self.m} );
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
test "Event - creation and display" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var s = "hello!!";
|
||||
var m = try Message.init(1, Message.Type.DATA, allocator, s); // fd type payload
|
||||
defer m.deinit();
|
||||
var e = Event.init(Event.Type.CONNECTION, 5, 8, &m); // type index origin message
|
||||
|
||||
try print_eq("main.Event.Type.CONNECTION, origin: 8, index 5, message: [fd: 1, main.Message.Type.DATA, payload: [hello!!]]", e);
|
||||
}
|
||||
|
||||
pub const CBEvent = struct {
|
||||
|
||||
// CallBack Event types.
|
||||
// In the main event loop, servers and clients can receive connections,
|
||||
// disconnections, errors or messages from their pairs. They also can
|
||||
// set a timer so the loop will allow a periodic routine (sending ping
|
||||
// messages for websockets, for instance).
|
||||
//
|
||||
// A few other events can occur.
|
||||
//
|
||||
// Extra socket
|
||||
// The main loop waiting for an event can be used as an unique entry
|
||||
// point for socket management. libipc users can register sockets via
|
||||
// ipc_add_fd allowing them to trigger an event, so events unrelated
|
||||
// to libipc are managed the same way.
|
||||
// Switch
|
||||
// libipc can be used to create protocol-related programs, such as a
|
||||
// websocket proxy allowing libipc services to be accessible online.
|
||||
// To help those programs (with TCP-complient sockets), two sockets
|
||||
// can be bound together, each message coming from one end will be
|
||||
// automatically transfered to the other socket and a Switch event
|
||||
// will be triggered.
|
||||
// Look Up
|
||||
// When a client establishes a connection to a service, it asks the
|
||||
// ipc daemon (ipcd) to locate the service and establish a connection
|
||||
// to it. This is a lookup.
|
||||
|
||||
// For IO callbacks (switching).
|
||||
pub const Type = enum {
|
||||
NO_ERROR, // No error. A message was generated.
|
||||
FD_CLOSING, // The fd is closing.
|
||||
FD_ERROR, // Generic error.
|
||||
PARSING_ERROR, // The message was read but with errors.
|
||||
IGNORE, // The message should be ignored (protocol specific).
|
||||
};
|
||||
|
||||
t: CBEvent.Type,
|
||||
};
|
||||
|
||||
pub const Connection = struct {
|
||||
|
||||
pub const Type = enum {
|
||||
IPC, // Standard connection.
|
||||
EXTERNAL, // Non IPC connection (TCP, UDP, etc.).
|
||||
SERVER, // Messages received = new connections.
|
||||
SWITCHED, // IO operations should go through registered callbacks.
|
||||
};
|
||||
|
||||
t: Connection.Type,
|
||||
path: ?[] const u8, // Not always needed.
|
||||
|
||||
// TODO: use these connections
|
||||
server: ?net.StreamServer = null,
|
||||
client: ?net.StreamServer.Connection = null,
|
||||
|
||||
// more_to_read: bool, // useless for now
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(t: Connection.Type, path: ?[] const u8) Self {
|
||||
return Self {
|
||||
.t = t,
|
||||
.path = path,
|
||||
// .more_to_read = false, // TODO: maybe useless
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
if (self.server) |*s| { s.deinit(); }
|
||||
// if (self.client) |*c| { c.deinit(); }
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream, "{}, path {?s}", .{ self.t, self.path});
|
||||
|
||||
if (self.server) |s| {
|
||||
try fmt.format(out_stream, "{}" , .{s});
|
||||
}
|
||||
if (self.client) |c| {
|
||||
try fmt.format(out_stream, "{}" , .{c});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
test "Connection - creation and display" {
|
||||
// origin destination
|
||||
var path = "/some/path";
|
||||
var c1 = Connection.init(Connection.Type.EXTERNAL, path);
|
||||
defer c1.deinit();
|
||||
var c2 = Connection.init(Connection.Type.IPC , null);
|
||||
defer c2.deinit();
|
||||
try print_eq("main.Connection.Type.EXTERNAL, path /some/path", c1);
|
||||
try print_eq("main.Connection.Type.IPC, path null", c2);
|
||||
}
|
||||
|
||||
// TODO: default callbacks, actual switching.
|
||||
pub const Switch = struct {
|
||||
origin : usize,
|
||||
destination : usize,
|
||||
|
||||
// orig_in: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// orig_out: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// dest_in: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// dest_out: ?fn (origin: usize, m: Message) CBEvent,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(origin: usize, destination: usize) Self {
|
||||
return Self {
|
||||
.origin = origin,
|
||||
.destination = destination,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "switch {} <-> {}"
|
||||
, .{ self.origin, self.destination} );
|
||||
}
|
||||
};
|
||||
|
||||
test "Switch - creation and display" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var switchdb = Switches.init(allocator);
|
||||
defer switchdb.deinit();
|
||||
|
||||
var first = Switch.init(3,8); // origin destination
|
||||
var second = Switch.init(2,4); // origin destination
|
||||
try switchdb.append(first);
|
||||
try switchdb.append(second);
|
||||
|
||||
try print_eq("switch 3 <-> 8", first);
|
||||
try print_eq("switch 2 <-> 4", second);
|
||||
|
||||
try std.testing.expect(2 == switchdb.items.len);
|
||||
}
|
||||
|
||||
// Context of the whole networking state.
|
||||
pub const Context = struct {
|
||||
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
|
||||
pub const IPC_BASE_SIZE = 2000000; // 2 MB, plenty enough space for messages
|
||||
pub const IPC_MAX_MESSAGE_SIZE = IPC_BASE_SIZE-IPC_HEADER_SIZE;
|
||||
pub const IPC_VERSION = 1;
|
||||
|
||||
rundir: [] u8,
|
||||
allocator: std.mem.Allocator, // Memory allocator.
|
||||
connections: Connections, // Keep track of connections.
|
||||
|
||||
// TODO: List of "pollfd" structures within cinfos,
|
||||
// so we can pass it to poll(2). Share indexes with 'connections'.
|
||||
// For now, this list doesn't do anything.
|
||||
// Can even be replaced in a near future.
|
||||
pollfd: PollFD, // File descriptors.
|
||||
|
||||
tx: Messages, // Messages to send, once their fd is available.
|
||||
switchdb: ?Switches, // Relations between fd.
|
||||
|
||||
timer: ?i32 = null, // No timer by default (no TIMER event).
|
||||
|
||||
const Self = @This();
|
||||
|
||||
// Context initialization:
|
||||
// - init structures (provide the allocator)
|
||||
pub fn init(allocator: std.mem.Allocator) !Self {
|
||||
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) {
|
||||
error.EnvironmentVariableNotFound => blk: {
|
||||
// print("RUNTIME variable not set, using default /tmp/libipc-run/\n", .{});
|
||||
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
|
||||
},
|
||||
else => {
|
||||
return err;
|
||||
},
|
||||
};
|
||||
|
||||
return Self {
|
||||
.rundir = rundir
|
||||
, .connections = Connections.init(allocator)
|
||||
, .pollfd = PollFD.init(allocator)
|
||||
, .tx = Messages.init(allocator)
|
||||
, .switchdb = null
|
||||
, .allocator = allocator
|
||||
};
|
||||
}
|
||||
|
||||
// create a server path for the UNIX socket based on the service name
|
||||
pub fn server_path(self: *Self, service_name: []const u8, writer: anytype) !void {
|
||||
try writer.print("{s}/{s}", .{self.rundir, service_name});
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.close_all() catch |err| switch(err){
|
||||
error.IndexOutOfBounds => {
|
||||
print("context.deinit(): IndexOutOfBounds\n", .{});
|
||||
},
|
||||
};
|
||||
self.allocator.free(self.rundir);
|
||||
self.connections.deinit();
|
||||
self.pollfd.deinit();
|
||||
self.tx.deinit();
|
||||
if (self.switchdb) |sdb| { sdb.deinit(); }
|
||||
}
|
||||
|
||||
// Both simple connection and the switched one share this code.
|
||||
fn connect_ (self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
|
||||
var stream = try net.connectUnixSocket(path);
|
||||
const newfd = stream.handle;
|
||||
errdefer std.os.closeSocket(newfd);
|
||||
var newcon = Connection.init(ctype, path);
|
||||
newcon.client = stream;
|
||||
try self.connections.append(newcon);
|
||||
try self.pollfd.append(newfd);
|
||||
return newfd;
|
||||
}
|
||||
|
||||
// Return the new fd. Can be useful to the caller.
|
||||
pub fn connect(self: *Self, path: []const u8) !i32 {
|
||||
// print("connection to:\t{s}\n", .{path});
|
||||
return self.connect_ (Connection.Type.IPC, path);
|
||||
}
|
||||
|
||||
// Connection to a service, but with switched with the client fd.
|
||||
// pub fn connection_switched(self: *Self
|
||||
// , path: [] const u8
|
||||
// , clientfd: i32) !i32 {
|
||||
// // print("connection switched from {} to path {s}\n", .{clientfd, path});
|
||||
// var newfd = try self.connect_ (Connection.Type.SWITCHED, path);
|
||||
// // TODO: record switch.
|
||||
// return newfd;
|
||||
// }
|
||||
|
||||
// Create a unix socket.
|
||||
// Store std lib structures in the context.
|
||||
pub fn server_init(self: *Self, path: [] const u8) !net.StreamServer {
|
||||
// print("context server init {s}\n", .{path});
|
||||
var server = net.StreamServer.init(.{});
|
||||
var socket_addr = try net.Address.initUnix(path);
|
||||
try server.listen(socket_addr);
|
||||
|
||||
const newfd = server.sockfd orelse return error.SocketLOL;
|
||||
var newcon = Connection.init(Connection.Type.SERVER, path);
|
||||
newcon.server = server;
|
||||
try self.connections.append(newcon);
|
||||
try self.pollfd.append(newfd);
|
||||
return server;
|
||||
}
|
||||
|
||||
pub fn write (self: *Self, m: Message) !void {
|
||||
print("write fd {}\n", .{m.fd});
|
||||
self.tx.append(m);
|
||||
}
|
||||
|
||||
pub fn read (self: *Self, index: u32) !Message {
|
||||
if (index >= self.pollfd.items.len) {
|
||||
return error.IndexOutOfBounds;
|
||||
}
|
||||
print("read index {}\n", .{index});
|
||||
var fd = self.pollfd[index];
|
||||
return self.read_fd(fd);
|
||||
}
|
||||
|
||||
pub fn read_fd (self: *Self, fd: i32) !Message {
|
||||
print("read fd {}\n", .{fd});
|
||||
|
||||
// TODO: read the actual content.
|
||||
var payload = "hello!!";
|
||||
|
||||
var m = Message.init(fd, Message.Type.DATA, self.allocator, payload);
|
||||
return m;
|
||||
}
|
||||
|
||||
// Wait an event.
|
||||
pub fn wait_event(self: *Self) !Event {
|
||||
// TODO: remove these debug prints.
|
||||
// for (self.pollfd.items) |fd| {
|
||||
// print("listening to fd {}\n", .{fd});
|
||||
// }
|
||||
if (self.timer) |t| { print("listening for MAXIMUM {} us\n", .{t}); }
|
||||
else { print("listening (no timer)\n", .{}); }
|
||||
|
||||
// TODO: listening to these file descriptors.
|
||||
var event = Event.init(Event.Type.CONNECTION, 5, 8, null);
|
||||
return event;
|
||||
}
|
||||
|
||||
pub fn close(self: *Self, index: usize) !void {
|
||||
// REMINDER: connections and pollfd have the same length
|
||||
if (index >= self.pollfd.items.len) {
|
||||
return error.IndexOutOfBounds;
|
||||
}
|
||||
|
||||
// close the connection and remove it from the two structures
|
||||
var con = self.connections.swapRemove(index);
|
||||
if (con.server) |s| {
|
||||
// Remove service's UNIX socket file.
|
||||
var addr = s.listen_address;
|
||||
var path = std.mem.sliceTo(&addr.un.path, 0);
|
||||
std.fs.cwd().deleteFile(path) catch {};
|
||||
}
|
||||
if (con.client) |c| {
|
||||
// Close the client's socket.
|
||||
c.stream.close();
|
||||
}
|
||||
_ = self.pollfd.swapRemove(index);
|
||||
}
|
||||
|
||||
pub fn close_all(self: *Self) !void {
|
||||
while(self.connections.items.len > 0) { try self.close(0); }
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime form: []const u8, options: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "context ({} connections and {} messages):"
|
||||
, .{self.connections.items.len, self.tx.items.len});
|
||||
|
||||
for (self.connections.items) |con| {
|
||||
try fmt.format(out_stream, "\n- ", .{});
|
||||
try con.format(form, options, out_stream);
|
||||
}
|
||||
|
||||
for (self.tx.items) |tx| {
|
||||
try fmt.format(out_stream, "\n- ", .{});
|
||||
try tx.format(form, options, out_stream);
|
||||
}
|
||||
}
|
||||
|
||||
// PRIVATE API
|
||||
|
||||
fn read_ (_: *Self, client: net.StreamServer.Connection, buf: [] u8) !usize {
|
||||
return try client.stream.reader().read(buf);
|
||||
}
|
||||
};
|
||||
|
||||
// Creating a new thread: testing UNIX communication.
|
||||
// This is a client sending a raw "Hello world!" bytestring,
|
||||
// not an instance of Message.
|
||||
const CommunicationTestThread = struct {
|
||||
fn clientFn() !void {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var c = try Context.init(allocator);
|
||||
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
try c.server_path("simple-context-test", writer);
|
||||
var path = fbs.getWritten();
|
||||
const socket = try net.connectUnixSocket(path);
|
||||
defer socket.close();
|
||||
// print("So we're a client now... path: {s}\n", .{path});
|
||||
_ = try socket.writer().writeAll("Hello world!");
|
||||
}
|
||||
};
|
||||
|
||||
test "Simple structures - init, display and memory check" {
|
||||
// origin destination
|
||||
// var s = Switch.init(3,8);
|
||||
// var payload = "hello!!";
|
||||
// // fd type payload
|
||||
// var m = Message.init(0, Message.Type.DATA, payload);
|
||||
//
|
||||
// // type index origin message
|
||||
// var e = Event.init(Event.Type.CONNECTION, 5, 8, &m);
|
||||
|
||||
// // CLIENT SIDE: connection to a service.
|
||||
// _ = try c.connect(path);
|
||||
|
||||
// // TODO: connection to a server, but switched with clientfd "3".
|
||||
// _ = try c.connection_switched(path, 3);
|
||||
}
|
||||
|
||||
test "Context - creation, display and memory check" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var c = try Context.init(allocator);
|
||||
defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
try c.server_path("simple-context-test", writer);
|
||||
var path = fbs.getWritten();
|
||||
|
||||
// SERVER SIDE: creating a service.
|
||||
var server = try c.server_init(path);
|
||||
defer server.deinit();
|
||||
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
||||
|
||||
// print ("Context: {}\n", .{c});
|
||||
// print("\n", .{});
|
||||
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
|
||||
defer t.join();
|
||||
|
||||
// Server.accept returns a net.StreamServer.Connection.
|
||||
var client = try server.accept();
|
||||
defer client.stream.close();
|
||||
var buf: [16]u8 = undefined;
|
||||
const n = try client.stream.reader().read(&buf);
|
||||
|
||||
try testing.expectEqual(@as(usize, 12), n);
|
||||
try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]);
|
||||
}
|
||||
|
||||
// // TODO:
|
||||
// // Creating a new thread: testing UNIX communication.
|
||||
// // This is a client sending a raw "Hello world!" bytestring,
|
||||
// // not an instance of Message.
|
||||
// const ConnectThenSendMessageThread = struct {
|
||||
// fn clientFn() !void {
|
||||
// const config = .{.safety = true};
|
||||
// var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
// defer _ = gpa.deinit();
|
||||
// const allocator = gpa.allocator();
|
||||
//
|
||||
// var c = try Context.init(allocator);
|
||||
// defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
//
|
||||
// var path_buffer: [1000]u8 = undefined;
|
||||
// var path_fbs = std.io.fixedBufferStream(&path_buffer);
|
||||
// var path_writer = path_fbs.writer();
|
||||
// try c.server_path("simple-context-test", path_writer);
|
||||
// var path = path_fbs.getWritten();
|
||||
//
|
||||
// // Actual UNIX socket connection.
|
||||
// const socket = try net.connectUnixSocket(path);
|
||||
// defer socket.close();
|
||||
//
|
||||
// // Writing message into a buffer.
|
||||
// var message_buffer: [1000]u8 = undefined;
|
||||
// var message_fbs = std.io.fixedBufferStream(&message_buffer);
|
||||
// var message_writer = message_fbs.writer();
|
||||
// // 'fd' parameter is not taken into account here (no loop)
|
||||
//
|
||||
// var m = try Message.init(0, Message.Type.DATA, allocator, "Hello world!");
|
||||
// try m.write(message_writer);
|
||||
//
|
||||
// // print("So we're a client now... path: {s}\n", .{path});
|
||||
// _ = try socket.writer().writeAll(message_fbs.getWritten());
|
||||
// }
|
||||
// };
|
||||
//
|
||||
//
|
||||
// test "Context - creation, echo once" {
|
||||
// const config = .{.safety = true};
|
||||
// var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
// defer _ = gpa.deinit();
|
||||
//
|
||||
// const allocator = gpa.allocator();
|
||||
//
|
||||
// var c = try Context.init(allocator);
|
||||
// defer c.deinit(); // There. Can't leak. Isn't Zig wonderful?
|
||||
//
|
||||
// var buffer: [1000]u8 = undefined;
|
||||
// var fbs = std.io.fixedBufferStream(&buffer);
|
||||
// var writer = fbs.writer();
|
||||
// try c.server_path("simple-context-test", writer);
|
||||
// var path = fbs.getWritten();
|
||||
//
|
||||
// // SERVER SIDE: creating a service.
|
||||
// var server = try c.server_init(path);
|
||||
// defer server.deinit();
|
||||
// defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
||||
//
|
||||
// const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{});
|
||||
// defer t.join();
|
||||
//
|
||||
// // Server.accept returns a net.StreamServer.Connection.
|
||||
// var client = try server.accept();
|
||||
// defer client.stream.close();
|
||||
// var buf: [1000]u8 = undefined;
|
||||
// const n = try client.stream.reader().read(&buf);
|
||||
// var m = try Message.read(buf[0..n], allocator);
|
||||
//
|
||||
// try testing.expectEqual(@as(usize, 12), m.payload.len);
|
||||
// try testing.expectEqualSlices(u8, m.payload, "Hello world!");
|
||||
// }
|
||||
|
||||
|
||||
// FIRST
|
||||
fn create_service() !void {
|
||||
const config = .{.safety = true};
|
||||
|
@ -760,13 +53,13 @@ fn create_service() !void {
|
|||
|
||||
// SERVER SIDE: creating a service.
|
||||
_ = try ctx.server_init(path);
|
||||
var event = try ctx.wait_event();
|
||||
switch (event.t) {
|
||||
var some_event = try ctx.wait_event();
|
||||
switch (some_event.t) {
|
||||
.CONNECTION => {
|
||||
print("New connection!\n", .{});
|
||||
},
|
||||
else => {
|
||||
print("New event: {}\n", .{event.t});
|
||||
print("New event: {}\n", .{some_event.t});
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
const std = @import("std");
|
||||
// const hexdump = @import("./hexdump.zig");
|
||||
const testing = std.testing;
|
||||
const net = std.net;
|
||||
const fmt = std.fmt;
|
||||
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
pub const Messages = std.ArrayList(Message);
|
||||
|
||||
pub const Message = struct {
|
||||
|
||||
pub const Type = enum {
|
||||
SERVER_CLOSE,
|
||||
ERROR,
|
||||
DATA,
|
||||
NETWORK_LOOKUP,
|
||||
};
|
||||
|
||||
t: Message.Type, // Internal message type.
|
||||
fd: usize, // File descriptor concerned about this message.
|
||||
payload: []const u8,
|
||||
|
||||
allocator: std.mem.Allocator, // Memory allocator.
|
||||
|
||||
const Self = @This();
|
||||
|
||||
// TODO
|
||||
//pub fn initFromConnection(fd: usize) Self {
|
||||
// return Self{
|
||||
// .t = Message.Type.ERROR,
|
||||
// .fd = fd,
|
||||
// .payload = "hello",
|
||||
// };
|
||||
//}
|
||||
|
||||
pub fn init(fd: usize, t: Message.Type
|
||||
, allocator: std.mem.Allocator
|
||||
, payload: []const u8) !Self {
|
||||
return Message { .fd = fd, .t = t
|
||||
, .allocator = allocator
|
||||
, .payload = try allocator.dupe(u8, payload) };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.allocator.free(self.payload);
|
||||
}
|
||||
|
||||
pub fn read(buffer: []const u8, allocator: std.mem.Allocator) !Self {
|
||||
|
||||
// var hexbuf: [4000]u8 = undefined;
|
||||
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
// var hexwriter = hexfbs.writer();
|
||||
// try hexdump.hexdump(hexwriter, "Message.read input buffer", buffer);
|
||||
// print("{s}\n", .{hexfbs.getWritten()});
|
||||
|
||||
// var payload = allocator.
|
||||
// defer allocator.free(payload);
|
||||
var fbs = std.io.fixedBufferStream(buffer);
|
||||
var reader = fbs.reader();
|
||||
|
||||
const msg_type = @intToEnum(Message.Type, try reader.readByte());
|
||||
const msg_len = try reader.readIntBig(u32);
|
||||
const msg_payload = buffer[5..5+msg_len];
|
||||
|
||||
return try Message.init(0, msg_type, allocator, msg_payload);
|
||||
}
|
||||
|
||||
pub fn write(self: Self, writer: anytype) !usize {
|
||||
try writer.writeByte(@enumToInt(self.t));
|
||||
try writer.writeIntBig(u32, @truncate(u32, self.payload.len));
|
||||
return 5 + try writer.write(self.payload);
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream, "fd: {}, {}, payload: [{s}]",
|
||||
.{self.fd, self.t, self.payload} );
|
||||
}
|
||||
};
|
||||
|
||||
test "Message - creation and display" {
|
||||
// fd type payload
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var s = "hello!!";
|
||||
var m = try Message.init(1, Message.Type.DATA, allocator, s);
|
||||
defer m.deinit();
|
||||
|
||||
try print_eq("fd: 1, message.Message.Type.DATA, payload: [hello!!]", m);
|
||||
}
|
||||
|
||||
test "Message - read and write" {
|
||||
// fd type payload
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
// First, create a message.
|
||||
var s = "hello!!";
|
||||
var first_message = try Message.init(1, Message.Type.DATA, allocator, s);
|
||||
defer first_message.deinit();
|
||||
|
||||
// Test its content.
|
||||
try testing.expect(first_message.fd == 1);
|
||||
try testing.expect(first_message.payload.len == 7);
|
||||
try testing.expectEqualSlices(u8, first_message.payload, "hello!!");
|
||||
|
||||
// Write it in a buffer, similar to sending it on the network.
|
||||
var buffer: [1000]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
var count = try first_message.write(writer);
|
||||
|
||||
var second_buffer: [2000]u8 = undefined;
|
||||
var fba = std.heap.FixedBufferAllocator.init(&second_buffer);
|
||||
var second_allocator = fba.allocator();
|
||||
|
||||
// Read the buffer, similar to receiving a message on the network.
|
||||
var second_message = try Message.read(buffer[0..count], second_allocator);
|
||||
// var second_message = try Message.read(fbs.getWritten(), second_allocator);
|
||||
defer second_message.deinit();
|
||||
|
||||
// Test its content, should be equal to the first.
|
||||
try testing.expect(second_message.payload.len == first_message.payload.len);
|
||||
try testing.expectEqualSlices(u8, second_message.payload, first_message.payload);
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
const std = @import("std");
|
||||
const testing = std.testing;
|
||||
const fmt = std.fmt;
|
||||
|
||||
pub const Switches = std.ArrayList(Switch);
|
||||
|
||||
const print_eq = @import("./util.zig").print_eq;
|
||||
|
||||
// TODO: default callbacks, actual switching.
|
||||
pub const Switch = struct {
|
||||
origin : usize,
|
||||
destination : usize,
|
||||
|
||||
// orig_in: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// orig_out: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// dest_in: ?fn (origin: usize, m: Message) CBEvent,
|
||||
// dest_out: ?fn (origin: usize, m: Message) CBEvent,
|
||||
|
||||
const Self = @This();
|
||||
|
||||
pub fn init(origin: usize, destination: usize) Self {
|
||||
return Self {
|
||||
.origin = origin,
|
||||
.destination = destination,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn format(self: Self, comptime _: []const u8, _: fmt.FormatOptions, out_stream: anytype) !void {
|
||||
try fmt.format(out_stream
|
||||
, "switch {} <-> {}"
|
||||
, .{ self.origin, self.destination} );
|
||||
}
|
||||
};
|
||||
|
||||
test "Switch - creation and display" {
|
||||
const config = .{.safety = true};
|
||||
var gpa = std.heap.GeneralPurposeAllocator(config){};
|
||||
defer _ = gpa.deinit();
|
||||
const allocator = gpa.allocator();
|
||||
|
||||
var switchdb = Switches.init(allocator);
|
||||
defer switchdb.deinit();
|
||||
|
||||
var first = Switch.init(3,8); // origin destination
|
||||
var second = Switch.init(2,4); // origin destination
|
||||
try switchdb.append(first);
|
||||
try switchdb.append(second);
|
||||
|
||||
try print_eq("switch 3 <-> 8", first);
|
||||
try print_eq("switch 2 <-> 4", second);
|
||||
|
||||
try testing.expect(2 == switchdb.items.len);
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
const std = @import("std");
|
||||
// const hexdump = @import("./hexdump.zig");
|
||||
const testing = std.testing;
|
||||
|
||||
|
||||
pub fn print_eq(expected: anytype, obj: anytype) !void {
|
||||
var buffer: [4096]u8 = undefined;
|
||||
var fbs = std.io.fixedBufferStream(&buffer);
|
||||
var writer = fbs.writer();
|
||||
|
||||
try writer.print("{}", .{obj});
|
||||
// print("print_eq, expected: {s}\n", .{expected});
|
||||
// print("print_eq: {s}\n", .{fbs.getWritten()});
|
||||
|
||||
// typing workaround
|
||||
var secbuffer: [4096]u8 = undefined;
|
||||
var secfbs = std.io.fixedBufferStream(&secbuffer);
|
||||
var secwriter = secfbs.writer();
|
||||
|
||||
try secwriter.print("{s}", .{expected});
|
||||
|
||||
try testing.expectEqualSlices(u8, secfbs.getWritten(), fbs.getWritten());
|
||||
}
|
Loading…
Reference in New Issue