Grooming and fixing iov length (big WIP).
parent
1a6c13c85d
commit
49b1b3bab2
|
@ -50,7 +50,6 @@ pub const Context = struct {
|
||||||
pub fn init(allocator: std.mem.Allocator) !Self {
|
pub fn init(allocator: std.mem.Allocator) !Self {
|
||||||
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) {
|
var rundir = std.process.getEnvVarOwned(allocator, "RUNDIR") catch |err| switch(err) {
|
||||||
error.EnvironmentVariableNotFound => blk: {
|
error.EnvironmentVariableNotFound => blk: {
|
||||||
// print("RUNTIME variable not set, using default /tmp/libipc-run/\n", .{});
|
|
||||||
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
|
break :blk try allocator.dupeZ(u8, "/tmp/libipc-run/");
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
|
@ -83,7 +82,6 @@ pub const Context = struct {
|
||||||
self.connections.deinit();
|
self.connections.deinit();
|
||||||
self.pollfd.deinit();
|
self.pollfd.deinit();
|
||||||
for (self.tx.items) |m| {
|
for (self.tx.items) |m| {
|
||||||
print("context deinit: removing message {}\n", .{m});
|
|
||||||
m.deinit();
|
m.deinit();
|
||||||
}
|
}
|
||||||
self.tx.deinit();
|
self.tx.deinit();
|
||||||
|
@ -145,17 +143,18 @@ pub const Context = struct {
|
||||||
// Read LOOKUP response
|
// Read LOOKUP response
|
||||||
// case error: ignore and move on (TODO)
|
// case error: ignore and move on (TODO)
|
||||||
// else: get fd sent by IPCd then close IPCd fd
|
// else: get fd sent by IPCd then close IPCd fd
|
||||||
var reception_buffer: [1500]u8 = undefined;
|
var reception_buffer: [2000]u8 = undefined;
|
||||||
var reception_size: usize = 0;
|
var reception_size: usize = 0;
|
||||||
var newfd = try receive_fd (ipcdfd, &reception_buffer, &reception_size);
|
var newfd = try receive_fd (ipcdfd, &reception_buffer, &reception_size);
|
||||||
var response = reception_buffer[0..reception_size - 1];
|
if (reception_size == 0) {
|
||||||
if (response.len > 0) {
|
return error.IPCdFailedNoMessage;
|
||||||
print ("receive_fd:message received: {s} (len: {}\n)\n", .{response, response.len});
|
|
||||||
}
|
}
|
||||||
const ok = "ok";
|
|
||||||
if (! std.mem.eql(u8, response[0..1], ok[0..1])) {
|
var response: []u8 = reception_buffer[0..reception_size];
|
||||||
print("THIS IS NOT OKAY :O\n", .{}); // DEBUG TODO FIXME
|
// print ("receive_fd:message received: {s} (len: {})\n", .{response, reception_size});
|
||||||
return error.IPCdFailed;
|
|
||||||
|
if (! std.mem.eql(u8, response, "ok")) {
|
||||||
|
return error.IPCdFailedNotOk;
|
||||||
}
|
}
|
||||||
var newcon = Connection.init(connection_type, null);
|
var newcon = Connection.init(connection_type, null);
|
||||||
try self.add_ (newcon, newfd);
|
try self.add_ (newcon, newfd);
|
||||||
|
@ -185,7 +184,6 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Return the new fd. Can be useful to the caller.
|
// Return the new fd. Can be useful to the caller.
|
||||||
pub fn connect(self: *Self, path: []const u8) !i32 {
|
pub fn connect(self: *Self, path: []const u8) !i32 {
|
||||||
// print("connection to:\t{s}\n", .{path});
|
|
||||||
return self.connect_ (Connection.Type.IPC, path);
|
return self.connect_ (Connection.Type.IPC, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,7 +218,6 @@ pub const Context = struct {
|
||||||
// return newfd;
|
// return newfd;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// TODO: find better error name
|
|
||||||
pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
pub fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
|
||||||
// net.StreamServer
|
// net.StreamServer
|
||||||
var serverfd = self.pollfd.items[server_index].fd;
|
var serverfd = self.pollfd.items[server_index].fd;
|
||||||
|
@ -244,7 +241,6 @@ pub const Context = struct {
|
||||||
|
|
||||||
// Create a unix socket.
|
// Create a unix socket.
|
||||||
// Store std lib structures in the context.
|
// Store std lib structures in the context.
|
||||||
// TODO: find better error name
|
|
||||||
pub fn server_init(self: *Self, service_name: [] const u8) !net.StreamServer {
|
pub fn server_init(self: *Self, service_name: [] const u8) !net.StreamServer {
|
||||||
var buffer: [1000]u8 = undefined;
|
var buffer: [1000]u8 = undefined;
|
||||||
var fbs = std.io.fixedBufferStream(&buffer);
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
|
@ -266,9 +262,6 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write (_: *Self, m: Message) !void {
|
pub fn write (_: *Self, m: Message) !void {
|
||||||
print("write msg on fd {}\n", .{m.fd});
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
// Message contains the fd, no need to search for
|
// Message contains the fd, no need to search for
|
||||||
// the right structure to copy, let's just recreate
|
// the right structure to copy, let's just recreate
|
||||||
// a Stream from the fd.
|
// a Stream from the fd.
|
||||||
|
@ -284,7 +277,6 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn schedule (self: *Self, m: Message) !void {
|
pub fn schedule (self: *Self, m: Message) !void {
|
||||||
print("schedule msg for fd {}\n", .{m.fd});
|
|
||||||
try self.tx.append(m);
|
try self.tx.append(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +284,6 @@ pub const Context = struct {
|
||||||
if (index >= self.pollfd.items.len) {
|
if (index >= self.pollfd.items.len) {
|
||||||
return error.IndexOutOfBounds;
|
return error.IndexOutOfBounds;
|
||||||
}
|
}
|
||||||
print("read index {}\n", .{index});
|
|
||||||
|
|
||||||
var buffer: [2000000]u8 = undefined; // TODO: FIXME??
|
var buffer: [2000000]u8 = undefined; // TODO: FIXME??
|
||||||
var packet_size: usize = undefined;
|
var packet_size: usize = undefined;
|
||||||
|
@ -325,17 +316,13 @@ pub const Context = struct {
|
||||||
if (self.timer) |t| { wait_duration = t; }
|
if (self.timer) |t| { wait_duration = t; }
|
||||||
else { print("listening (no timer)\n", .{}); }
|
else { print("listening (no timer)\n", .{}); }
|
||||||
|
|
||||||
// print("listening for MAXIMUM {} ms\n", .{wait_duration});
|
|
||||||
|
|
||||||
// Make sure we listen to the right file descriptors,
|
// Make sure we listen to the right file descriptors,
|
||||||
// setting POLLIN & POLLOUT flags.
|
// setting POLLIN & POLLOUT flags.
|
||||||
for (self.pollfd.items) |*fd| {
|
for (self.pollfd.items) |*fd| {
|
||||||
// print("listening to fd {}\n", .{fd.fd});
|
|
||||||
fd.events |= std.os.linux.POLL.IN; // just to make sure
|
fd.events |= std.os.linux.POLL.IN; // just to make sure
|
||||||
}
|
}
|
||||||
|
|
||||||
for (self.tx.items) |m| {
|
for (self.tx.items) |m| {
|
||||||
print("wait for writing a message to fd {}\n", .{m.fd});
|
|
||||||
for (self.pollfd.items) |*fd| {
|
for (self.pollfd.items) |*fd| {
|
||||||
if (fd.fd == m.fd) {
|
if (fd.fd == m.fd) {
|
||||||
fd.events |= std.os.linux.POLL.OUT; // just to make sure
|
fd.events |= std.os.linux.POLL.OUT; // just to make sure
|
||||||
|
@ -349,7 +336,6 @@ pub const Context = struct {
|
||||||
// Polling.
|
// Polling.
|
||||||
var count: usize = undefined;
|
var count: usize = undefined;
|
||||||
|
|
||||||
// print("Let's wait for an event (either stdin or unix socket)\n", .{});
|
|
||||||
// print("fds: {any}\n", .{self.pollfd.items});
|
// print("fds: {any}\n", .{self.pollfd.items});
|
||||||
count = try os.poll(self.pollfd.items, wait_duration);
|
count = try os.poll(self.pollfd.items, wait_duration);
|
||||||
// print("fds NOW: {any}\n", .{self.pollfd.items});
|
// print("fds NOW: {any}\n", .{self.pollfd.items});
|
||||||
|
@ -362,7 +348,6 @@ pub const Context = struct {
|
||||||
|
|
||||||
var duration = timer.read() / 1000000; // ns -> ms
|
var duration = timer.read() / 1000000; // ns -> ms
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
// print("wait: configured {} measured {}\n", .{wait_duration, duration});
|
|
||||||
if (duration >= wait_duration) {
|
if (duration >= wait_duration) {
|
||||||
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
current_event = Event.init(Event.Type.TIMER, 0, 0, null);
|
||||||
}
|
}
|
||||||
|
@ -373,7 +358,7 @@ pub const Context = struct {
|
||||||
return current_event;
|
return current_event;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle messages
|
// handle messages
|
||||||
// => loop over self.pollfd.items
|
// => loop over self.pollfd.items
|
||||||
for (self.pollfd.items) |*fd, i| {
|
for (self.pollfd.items) |*fd, i| {
|
||||||
// .revents is POLLIN
|
// .revents is POLLIN
|
||||||
|
@ -386,7 +371,6 @@ pub const Context = struct {
|
||||||
// SWITCHED = send message to the right dest (or drop the switch)
|
// SWITCHED = send message to the right dest (or drop the switch)
|
||||||
else if (self.connections.items[i].t == .SWITCHED) {
|
else if (self.connections.items[i].t == .SWITCHED) {
|
||||||
// TODO: send message to SWITCH dest
|
// TODO: send message to SWITCH dest
|
||||||
// TODO: handle_switched_message
|
|
||||||
return Event.init(Event.Type.SWITCH, i, fd.fd, null);
|
return Event.init(Event.Type.SWITCH, i, fd.fd, null);
|
||||||
}
|
}
|
||||||
// EXTERNAL = user handles IO
|
// EXTERNAL = user handles IO
|
||||||
|
@ -427,8 +411,6 @@ pub const Context = struct {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// otherwise = write message for the msg.fd
|
// otherwise = write message for the msg.fd
|
||||||
// TODO: handle_writing_message
|
|
||||||
|
|
||||||
var index: usize = undefined;
|
var index: usize = undefined;
|
||||||
for (self.tx.items) |m, index_| {
|
for (self.tx.items) |m, index_| {
|
||||||
if (m.fd == self.pollfd.items[i].fd) {
|
if (m.fd == self.pollfd.items[i].fd) {
|
||||||
|
@ -471,8 +453,6 @@ pub const Context = struct {
|
||||||
return error.IndexOutOfBounds;
|
return error.IndexOutOfBounds;
|
||||||
}
|
}
|
||||||
|
|
||||||
print("closing client/server at index {}\n", .{index});
|
|
||||||
|
|
||||||
// close the connection and remove it from the two structures
|
// close the connection and remove it from the two structures
|
||||||
var con = self.connections.swapRemove(index);
|
var con = self.connections.swapRemove(index);
|
||||||
// Remove service's UNIX socket file.
|
// Remove service's UNIX socket file.
|
||||||
|
@ -483,8 +463,6 @@ pub const Context = struct {
|
||||||
var pollfd = self.pollfd.swapRemove(index);
|
var pollfd = self.pollfd.swapRemove(index);
|
||||||
std.os.close(pollfd.fd);
|
std.os.close(pollfd.fd);
|
||||||
|
|
||||||
print("closing client at index {}\n", .{index});
|
|
||||||
|
|
||||||
// Remove all its non-sent messages.
|
// Remove all its non-sent messages.
|
||||||
var i: usize = 0;
|
var i: usize = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -591,8 +569,6 @@ test "Context - creation, display and memory check" {
|
||||||
defer server.deinit();
|
defer server.deinit();
|
||||||
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
defer std.fs.cwd().deleteFile(path) catch {}; // Once done, remove file.
|
||||||
|
|
||||||
// print ("Context: {}\n", .{c});
|
|
||||||
// print("\n", .{});
|
|
||||||
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
|
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
|
||||||
defer t.join();
|
defer t.join();
|
||||||
|
|
||||||
|
@ -638,7 +614,6 @@ const ConnectThenSendMessageThread = struct {
|
||||||
defer m.deinit();
|
defer m.deinit();
|
||||||
_ = try m.write(message_writer);
|
_ = try m.write(message_writer);
|
||||||
|
|
||||||
// print("So we're a client now... path: {s}\n", .{path});
|
|
||||||
_ = try socket.writer().writeAll(message_fbs.getWritten());
|
_ = try socket.writer().writeAll(message_fbs.getWritten());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -194,7 +194,7 @@ pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t
|
||||||
var iov = [1]os.iovec{
|
var iov = [1]os.iovec{
|
||||||
.{
|
.{
|
||||||
.iov_base = msg_buffer[0..]
|
.iov_base = msg_buffer[0..]
|
||||||
, .iov_len = buffer.len
|
, .iov_len = msg_buffer.len
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -205,13 +205,13 @@ pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t
|
||||||
});
|
});
|
||||||
|
|
||||||
var msg: std.os.msghdr = .{
|
var msg: std.os.msghdr = .{
|
||||||
.name = undefined,
|
.name = undefined
|
||||||
.namelen = 0,
|
, .namelen = 0
|
||||||
.iov = &iov,
|
, .iov = &iov
|
||||||
.iovlen = 1,
|
, .iovlen = 2
|
||||||
.control = &cmsg,
|
, .control = &cmsg
|
||||||
.controllen = @sizeOf(@TypeOf(cmsg)),
|
, .controllen = @sizeOf(@TypeOf(cmsg))
|
||||||
.flags = 0,
|
, .flags = 0
|
||||||
};
|
};
|
||||||
|
|
||||||
var msglen = recvmsg(sockfd, msg, 0) catch |err| {
|
var msglen = recvmsg(sockfd, msg, 0) catch |err| {
|
||||||
|
|
|
@ -71,7 +71,7 @@ fn create_service() !void {
|
||||||
try os.sigaction(os.SIG.HUP, &sa, null);
|
try os.sigaction(os.SIG.HUP, &sa, null);
|
||||||
|
|
||||||
var some_event: ipc.Event = undefined;
|
var some_event: ipc.Event = undefined;
|
||||||
ctx.timer = 10000; // 10 seconds
|
ctx.timer = 1000; // 1 second
|
||||||
var count: u32 = 0;
|
var count: u32 = 0;
|
||||||
while(! S.should_quit) {
|
while(! S.should_quit) {
|
||||||
some_event = try ctx.wait_event();
|
some_event = try ctx.wait_event();
|
||||||
|
@ -117,23 +117,22 @@ fn create_service() !void {
|
||||||
.LOOKUP => {
|
.LOOKUP => {
|
||||||
print("Client asking for a service through ipcd.\n", .{});
|
print("Client asking for a service through ipcd.\n", .{});
|
||||||
if (some_event.m) |m| {
|
if (some_event.m) |m| {
|
||||||
print("Message: {}\n", .{m});
|
print("{}\n", .{m});
|
||||||
|
|
||||||
// 1. split message
|
// 1. split message
|
||||||
print("payload is: {s}\n", .{m.payload});
|
|
||||||
var iterator = std.mem.split(u8, m.payload, ";");
|
var iterator = std.mem.split(u8, m.payload, ";");
|
||||||
var service_to_contact = iterator.first();
|
var service_to_contact = iterator.first();
|
||||||
print("service to contact: {s}\n", .{service_to_contact});
|
// print("service to contact: {s}\n", .{service_to_contact});
|
||||||
var final_destination: ?[]const u8 = null;
|
var final_destination: ?[]const u8 = null;
|
||||||
|
|
||||||
// 2. find relevant part of the message
|
// 2. find relevant part of the message
|
||||||
while (iterator.next()) |next| {
|
while (iterator.next()) |next| {
|
||||||
print("next part: {s}\n", .{next});
|
// print("next part: {s}\n", .{next});
|
||||||
var iterator2 = std.mem.split(u8, next, " ");
|
var iterator2 = std.mem.split(u8, next, " ");
|
||||||
var sname = iterator2.first();
|
var sname = iterator2.first();
|
||||||
var target = iterator2.next();
|
var target = iterator2.next();
|
||||||
if (target) |t| {
|
if (target) |t| {
|
||||||
print ("sname: {s} - target: {s}\n", .{sname, t});
|
// print ("sname: {s} - target: {s}\n", .{sname, t});
|
||||||
if (std.mem.eql(u8, service_to_contact, sname)) {
|
if (std.mem.eql(u8, service_to_contact, sname)) {
|
||||||
final_destination = t;
|
final_destination = t;
|
||||||
}
|
}
|
||||||
|
@ -147,16 +146,13 @@ fn create_service() !void {
|
||||||
// Should include TCP connections in a near future.
|
// Should include TCP connections in a near future.
|
||||||
|
|
||||||
if (final_destination) |dest| {
|
if (final_destination) |dest| {
|
||||||
print("service IPCd should contact for the client: {s}, via {s}\n"
|
print("Let's contact {s} (original service requested: {s})\n"
|
||||||
, .{service_to_contact, dest});
|
, .{dest, service_to_contact});
|
||||||
|
|
||||||
var newfd = try ctx.connect_service (dest);
|
var newfd = try ctx.connect_service (dest);
|
||||||
send_fd (some_event.origin, "ok", newfd);
|
send_fd (some_event.origin, "ok", newfd);
|
||||||
print("fd sent\n" , .{});
|
|
||||||
try ctx.close_fd (some_event.origin);
|
try ctx.close_fd (some_event.origin);
|
||||||
print("FD 1 removed\n" , .{});
|
|
||||||
try ctx.close_fd (newfd);
|
try ctx.close_fd (newfd);
|
||||||
print("FDs removed\n" , .{});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.deinit();
|
m.deinit();
|
||||||
|
|
Reference in New Issue