Switch: almost there.
This commit is contained in:
parent
b73550bdcf
commit
871b2b249c
@ -21,6 +21,7 @@ const print_eq = @import("./util.zig").print_eq;
|
||||
const Messages = @import("./message.zig").Messages;
|
||||
const SwitchDB = @import("./switch.zig").SwitchDB;
|
||||
const Connections = @import("./connection.zig").Connections;
|
||||
const CBEventType = @import("./main.zig").CBEvent.Type;
|
||||
|
||||
pub const PollFD = std.ArrayList(std.os.pollfd);
|
||||
|
||||
@ -270,6 +271,29 @@ pub const Context = struct {
|
||||
try self.tx.append(m);
|
||||
}
|
||||
|
||||
/// Read from a client (indexed by a FD).
|
||||
pub fn read_fd (self: *Self, fd: i32) !?Message {
|
||||
return try self.read(try self.fd_to_index (fd));
|
||||
}
|
||||
|
||||
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
|
||||
var index_origin = try self.fd_to_index(fd1);
|
||||
var index_destinataire = try self.fd_to_index(fd2);
|
||||
|
||||
self.connections.items[index_origin].t = Connection.Type.SWITCHED;
|
||||
self.connections.items[index_destinataire].t = Connection.Type.SWITCHED;
|
||||
|
||||
try self.switchdb.add_switch(fd1,fd2);
|
||||
print("ADD SWITCH\n", .{});
|
||||
print("self: {}\n", .{self});
|
||||
}
|
||||
|
||||
pub fn set_switch_callbacks(self: *Self, fd: i32
|
||||
, in : *const fn (origin: i32, m: *Message) CBEventType
|
||||
, out : *const fn (origin: i32, m: *const Message) CBEventType) !void {
|
||||
try self.switchdb.set_callbacks(fd,in, out);
|
||||
}
|
||||
|
||||
pub fn read (self: *Self, index: usize) !?Message {
|
||||
if (index >= self.pollfd.items.len) {
|
||||
return error.IndexOutOfBounds;
|
||||
|
@ -108,8 +108,10 @@ fn create_service() !void {
|
||||
|
||||
.MESSAGE => {
|
||||
print("Client asking for a service through ipcd.\n", .{});
|
||||
defer ctx.close_fd (some_event.origin) catch {};
|
||||
if (some_event.m) |m| {
|
||||
print("{}\n", .{m});
|
||||
defer m.deinit(); // Do not forget to free the message payload.
|
||||
|
||||
// 1. split message
|
||||
var iterator = std.mem.split(u8, m.payload, ";");
|
||||
@ -135,19 +137,65 @@ fn create_service() !void {
|
||||
}
|
||||
// 3. connect whether asked to and send a message
|
||||
// TODO: currently only switching with other UNIX sockets ^^'.
|
||||
// Should include TCP connections in a near future.
|
||||
// Should contact <protocol>d.
|
||||
|
||||
if (final_destination) |dest| {
|
||||
print("Let's contact {s} (original service requested: {s})\n"
|
||||
, .{dest, service_to_contact});
|
||||
|
||||
var newfd = try ctx.connect_service (dest);
|
||||
send_fd (some_event.origin, "ok", newfd);
|
||||
try ctx.close_fd (some_event.origin);
|
||||
try ctx.close_fd (newfd);
|
||||
}
|
||||
var iterator3 = std.mem.split(u8, dest, "://");
|
||||
var protocol = iterator3.first();
|
||||
print("Protocol: {s}\n" , .{protocol});
|
||||
|
||||
m.deinit();
|
||||
// 1. in case there is no URI
|
||||
if (std.mem.eql(u8, protocol, dest)) {
|
||||
var newfd = try ctx.connect_service (dest);
|
||||
send_fd (some_event.origin, "ok", newfd);
|
||||
try ctx.close_fd (newfd);
|
||||
}
|
||||
else if (std.mem.eql(u8, protocol, "unix")) {
|
||||
var newfd = try ctx.connect_service (iterator3.next().?);
|
||||
send_fd (some_event.origin, "ok", newfd);
|
||||
try ctx.close_fd (newfd);
|
||||
}
|
||||
// 2. else, contact <protocol>d or directly the dest in case there is none.
|
||||
else {
|
||||
print("should contact {s}d: TODO\n", .{protocol});
|
||||
var servicefd = try ctx.connect_service (protocol);
|
||||
defer ctx.close_fd (servicefd) catch {};
|
||||
// TODO: make a simple protocol between IPCd and <protocol>d
|
||||
// NEED inform about the connection (success or fail)
|
||||
// FIRST DRAFT:
|
||||
// - IPCd: send a message containing the destination
|
||||
// - PROTOCOLd: send "ok" to inform the connection is established
|
||||
// - PROTOCOLd: send "no" in case there was an error
|
||||
|
||||
var message = try Message.init(servicefd, allocator, dest);
|
||||
defer message.deinit();
|
||||
try ctx.write(message);
|
||||
var response_from_service = try ctx.read_fd(servicefd);
|
||||
if (response_from_service) |r| {
|
||||
if (std.mem.eql(u8, r.payload, "ok")) {
|
||||
// OK
|
||||
print("service has established the connection\n", .{});
|
||||
send_fd (some_event.origin, "ok", servicefd);
|
||||
}
|
||||
else if (std.mem.eql(u8, r.payload, "ne")) {
|
||||
// PROBLEM
|
||||
print("service cannot establish the connection\n", .{});
|
||||
// TODO
|
||||
}
|
||||
else {
|
||||
print("service isn't working properly, its response is: {s}\n", .{r.payload});
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
else {
|
||||
// No message = should be handled as a disconnection.
|
||||
print("No response from service: let's drop everything\n", .{});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
// There is a problem: ipcd was contacted without providing
|
||||
@ -156,8 +204,8 @@ fn create_service() !void {
|
||||
var response = try Message.init(some_event.origin
|
||||
, allocator
|
||||
, "lookup message without data");
|
||||
defer response.deinit();
|
||||
try ctx.write(response);
|
||||
response.deinit();
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -39,6 +39,11 @@ pub const SwitchDB = struct {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_switch(self: *Self, fd1: i32, fd2: i32) !void {
|
||||
try self.db.put(fd1, ManagedConnection {.dest = fd2});
|
||||
try self.db.put(fd2, ManagedConnection {.dest = fd1});
|
||||
}
|
||||
|
||||
pub fn set_callbacks(self: *Self, fd: i32
|
||||
, in : *const fn (origin: i32, m: *Message) CBEventType
|
||||
, out : *const fn (origin: i32, m: *const Message) CBEventType) !void {
|
||||
|
Reference in New Issue
Block a user