Archived
3
0

Messages can be received.

This commit is contained in:
Philippe Pittoli 2022-12-25 06:26:38 +01:00
parent 9f214180a7
commit 6819de1da5
4 changed files with 48 additions and 30 deletions

View File

@ -165,16 +165,28 @@ pub const Context = struct {
return error.IndexOutOfBounds; return error.IndexOutOfBounds;
} }
print("read index {}\n", .{index}); print("read index {}\n", .{index});
return self.read_fd(self.pollfd.items[index].fd);
}
pub fn read_fd (self: *Self, fd: i32) !Message { var buffer: [2000000]u8 = undefined; // TODO: FIXME??
print("read fd {}\n", .{fd});
// TODO: read the actual content. var origin: i32 = undefined;
var payload = "hello!!"; // TODO: this is a problem from the network API in Zig,
// servers and clients are different, they aren't just fds.
// Maybe there is something to change in the API.
if (self.connections.items[index].t == .IPC) {
var client = self.connections.items[index].client
orelse return error.NoClientHere;
var stream: net.Stream = client.stream;
origin = stream.handle;
_ = try stream.read(buffer[0..]);
}
else if (self.connections.items[index].t == .SERVER) {
return error.messageOnServer;
}
var m = Message.init(fd, Message.Type.DATA, self.allocator, payload); // var m = try self.allocator.create(Message);
// m.* = try Message.read(buffer[0..], self.allocator);
var m = try Message.read(buffer[0..], self.allocator);
m.fd = origin;
return m; return m;
} }
@ -242,22 +254,24 @@ pub const Context = struct {
// SERVER = new connection // SERVER = new connection
if (self.connections.items[i].t == .SERVER) { if (self.connections.items[i].t == .SERVER) {
try self.accept_new_client(&current_event, i); try self.accept_new_client(&current_event, i);
return current_event;
} }
// SWITCHED = send message to the right dest (or drop the switch) // SWITCHED = send message to the right dest (or drop the switch)
else if (self.connections.items[i].t == .SWITCHED) { else if (self.connections.items[i].t == .SWITCHED) {
// TODO: send message to SWITCH dest // TODO: send message to SWITCH dest
// TODO: handle_switched_message // TODO: handle_switched_message
current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); return Event.init(Event.Type.SWITCH, i, fd.fd, null);
} }
// EXTERNAL = user handles IO // EXTERNAL = user handles IO
else if (self.connections.items[i].t == .EXTERNAL) { else if (self.connections.items[i].t == .EXTERNAL) {
current_event = Event.init(Event.Type.EXTERNAL, i, fd.fd, null); return Event.init(Event.Type.EXTERNAL, i, fd.fd, null);
} }
// otherwise = new message or disconnection // otherwise = new message or disconnection
else { else {
// TODO: handle incoming message // TODO: handle incoming message
// TODO: handle_new_message // TODO: handle_new_message
current_event = Event.init(Event.Type.MESSAGE, i, fd.fd, null); var m = try self.read(i);
return Event.init(Event.Type.MESSAGE, i, fd.fd, m);
} }
} }
@ -268,12 +282,12 @@ pub const Context = struct {
// SWITCHED = write message for its switch buddy (callbacks) // SWITCHED = write message for its switch buddy (callbacks)
if (self.connections.items[i].t == .SWITCHED) { if (self.connections.items[i].t == .SWITCHED) {
// TODO: handle_writing_switched_message // TODO: handle_writing_switched_message
current_event = Event.init(Event.Type.SWITCH, i, fd.fd, null); return Event.init(Event.Type.SWITCH, i, fd.fd, null);
} }
else { else {
// otherwise = write message for the msg.fd // otherwise = write message for the msg.fd
// TODO: handle_writing_message // TODO: handle_writing_message
current_event = Event.init(Event.Type.TX, i, fd.fd, null); return Event.init(Event.Type.TX, i, fd.fd, null);
} }
} }
// .revent is POLLHUP // .revent is POLLHUP
@ -281,11 +295,12 @@ pub const Context = struct {
// handle disconnection // handle disconnection
current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null); current_event = Event.init(Event.Type.DISCONNECTION, i, fd.fd, null);
try self.close(i); try self.close(i);
return current_event;
} }
// if fd revent is POLLERR or POLLNVAL // if fd revent is POLLERR or POLLNVAL
if ((fd.revents & std.os.linux.POLL.HUP > 0) or if ((fd.revents & std.os.linux.POLL.HUP > 0) or
(fd.revents & std.os.linux.POLL.HUP > 0)) { (fd.revents & std.os.linux.POLL.NVAL > 0)) {
current_event = Event.init(Event.Type.ERROR, i, fd.fd, null); return Event.init(Event.Type.ERROR, i, fd.fd, null);
} }
} }

View File

@ -49,15 +49,15 @@ pub const Event = struct {
t: Event.Type, t: Event.Type,
index: usize, index: usize,
origin: i32, // socket fd origin: i32, // socket fd
m: ?*Message, // message pointer m: ?Message, // message
const Self = @This(); const Self = @This();
pub fn init(t: Event.Type, index: usize, origin: i32, m: ?*Message) Self { pub fn init(t: Event.Type, index: usize, origin: i32, m: ?Message) Self {
return Self { .t = t, .index = index, .origin = origin, .m = m, }; return Self { .t = t, .index = index, .origin = origin, .m = m, };
} }
pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?*Message) void { pub fn set(self: *Self, t: Event.Type, index: usize, origin: i32, m: ?Message) void {
self.t = t; self.t = t;
self.index = index; self.index = index;
self.origin = origin; self.origin = origin;

View File

@ -60,7 +60,6 @@ fn create_service() !void {
switch (some_event.t) { switch (some_event.t) {
.CONNECTION => { .CONNECTION => {
print("New connection: {}!\n", .{some_event}); print("New connection: {}!\n", .{some_event});
break;
}, },
.TIMER => { .TIMER => {
print("Timer! ({})\n", .{count_down}); print("Timer! ({})\n", .{count_down});
@ -71,27 +70,31 @@ fn create_service() !void {
} }
}, },
.EXTERNAL => { .EXTERNAL => {
print("Message received from a non IPC socket.", .{}); print("Message received from a non IPC socket.\n", .{});
break; break;
}, },
.SWITCH => { .SWITCH => {
print("Message to send to a corresponding fd.", .{}); print("Message to send to a corresponding fd.\n", .{});
break; break;
}, },
.DISCONNECTION => { .DISCONNECTION => {
print("User disconnected.", .{}); print("User disconnected.\n", .{});
break; break;
}, },
.MESSAGE => { .MESSAGE => {
print("New message.", .{}); print("New message. {}\n", .{some_event});
if (some_event.m) |m| {
print("message: {}\n", .{m});
m.deinit();
}
break; break;
}, },
.LOOKUP => { .LOOKUP => {
print("Client asking for a service through ipcd.", .{}); print("Client asking for a service through ipcd.\n", .{});
break; break;
}, },
.TX => { .TX => {
print("Message sent.", .{}); print("Message sent.\n", .{});
break; break;
}, },
.NOT_SET => { .NOT_SET => {
@ -99,7 +102,7 @@ fn create_service() !void {
break; break;
}, },
.ERROR => { .ERROR => {
print("A problem occured.\n", .{}); print("A problem occured, event: {}\n", .{some_event});
break; break;
}, },
} }

View File

@ -18,7 +18,7 @@ pub const Message = struct {
}; };
t: Message.Type, // Internal message type. t: Message.Type, // Internal message type.
fd: usize, // File descriptor concerned about this message. fd: i32, // File descriptor concerned about this message.
payload: []const u8, payload: []const u8,
allocator: std.mem.Allocator, // Memory allocator. allocator: std.mem.Allocator, // Memory allocator.
@ -26,7 +26,7 @@ pub const Message = struct {
const Self = @This(); const Self = @This();
// TODO // TODO
//pub fn initFromConnection(fd: usize) Self { //pub fn initFromConnection(fd: i32) Self {
// return Self{ // return Self{
// .t = Message.Type.ERROR, // .t = Message.Type.ERROR,
// .fd = fd, // .fd = fd,
@ -34,7 +34,7 @@ pub const Message = struct {
// }; // };
//} //}
pub fn init(fd: usize, t: Message.Type pub fn init(fd: i32, t: Message.Type
, allocator: std.mem.Allocator , allocator: std.mem.Allocator
, payload: []const u8) !Self { , payload: []const u8) !Self {
return Message { .fd = fd, .t = t return Message { .fd = fd, .t = t
@ -42,7 +42,7 @@ pub const Message = struct {
, .payload = try allocator.dupe(u8, payload) }; , .payload = try allocator.dupe(u8, payload) };
} }
pub fn deinit(self: *Self) void { pub fn deinit(self: Self) void {
self.allocator.free(self.payload); self.allocator.free(self.payload);
} }