API change introduced bugs in switch: fixed.
This commit is contained in:
parent
eafdc3749a
commit
f07b915124
@ -18,6 +18,14 @@ enum event_types {
|
||||
, TX = 8 // Message sent.
|
||||
};
|
||||
|
||||
// Return type of callback functions when switching.
|
||||
enum cb_event_types {
|
||||
NO_ERROR = 0, // No error. A message was generated.
|
||||
, ERROR = 1, // Generic error.
|
||||
, FD_CLOSING = 2, // The fd is closing.
|
||||
, IGNORE = 3, // The message should be ignored (protocol specific).
|
||||
};
|
||||
|
||||
int ipc_context_init (void** ptr);
|
||||
int ipc_service_init (void* ctx, int* servicefd, const char* service_name, unsigned short service_name_len);
|
||||
int ipc_connect_service (void* ctx, int* servicefd, const char* service_name, unsigned short service_name_len);
|
||||
@ -30,7 +38,13 @@ int ipc_wait_event(void* ctx, char* t, size_t* index, int* originfd, char* buffe
|
||||
void ipc_context_timer (void* ctx, int timer);
|
||||
int ipc_close_fd (void* ctx, int fd);
|
||||
int ipc_close (void* ctx, size_t index);
|
||||
|
||||
// Switch functions (for "protocol" services, such as TCPd).
|
||||
int ipc_add_external (void* ctx, int newfd);
|
||||
int ipc_add_switch (void* ctx, int fd1, int fd2);
|
||||
|
||||
int ipc_set_switch_callbacks (void* ctx, int fd
|
||||
, enum cb_event_types (*in (int orig, const char *payload, unsigned int *mlen))
|
||||
, enum cb_event_types (*out(int dest, char *payload, unsigned int mlen)));
|
||||
|
||||
#endif
|
||||
|
@ -132,7 +132,6 @@ export fn ipc_add_switch (ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// TODO: change the functions in the switch code, not to take a Message as a parameter.
|
||||
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32
|
||||
, in : *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType
|
||||
, out : *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
|
||||
|
@ -275,6 +275,7 @@ pub const Context = struct {
|
||||
}
|
||||
|
||||
pub fn schedule (self: *Self, m: Message) !void {
|
||||
print ("scheduling new message {}\n", .{m});
|
||||
try self.tx.append(m);
|
||||
}
|
||||
|
||||
@ -426,7 +427,7 @@ pub const Context = struct {
|
||||
return error.incoherentSwitchError;
|
||||
},
|
||||
}
|
||||
return Event.init(Event.Type.SWITCH_RX, i, fd.fd, null);
|
||||
return current_event;
|
||||
}
|
||||
// EXTERNAL = user handles IO
|
||||
else if (self.connections.items[i].t == .EXTERNAL) {
|
||||
|
@ -1,4 +1,5 @@
|
||||
const std = @import("std");
|
||||
const hexdump = @import("./hexdump.zig");
|
||||
const testing = std.testing;
|
||||
const fmt = std.fmt;
|
||||
|
||||
@ -79,19 +80,22 @@ pub const SwitchDB = struct {
|
||||
var message_size: u32 = @truncate(u32, buffer.len);
|
||||
var r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
||||
|
||||
// TODO: read message
|
||||
// TODO: better allocator?
|
||||
// TODO: better errors?
|
||||
var message: Message = Message.init(managedconnection.dest
|
||||
, std.heap.c_allocator
|
||||
, buffer[0..message_size]) catch {
|
||||
return error.generic;
|
||||
};
|
||||
|
||||
print ("MESSAGE READ: {} (from {} to {})\n", .{r, fd, managedconnection.dest});
|
||||
switch (r) {
|
||||
// The message should be ignored (protocol specific).
|
||||
CBEventType.IGNORE => { return null; },
|
||||
CBEventType.NO_ERROR => { return message; },
|
||||
CBEventType.NO_ERROR => {
|
||||
// TODO: read message
|
||||
// TODO: better allocator?
|
||||
// TODO: better errors?
|
||||
var message: Message
|
||||
= Message.init(managedconnection.dest
|
||||
, std.heap.c_allocator
|
||||
, buffer[0..message_size]) catch {
|
||||
return error.generic;
|
||||
};
|
||||
return message;
|
||||
},
|
||||
CBEventType.FD_CLOSING => { return error.closeFD; },
|
||||
// Generic error, or the message was read but with errors.
|
||||
CBEventType.ERROR => { return error.generic; },
|
||||
@ -113,7 +117,8 @@ pub const SwitchDB = struct {
|
||||
// returning basic errors, no details.
|
||||
_ = message.write(writer) catch return error.generic;
|
||||
var written = fbs.getWritten();
|
||||
var r = managedconnection.out(managedconnection.dest, written.ptr, @truncate(u32, written.len));
|
||||
|
||||
var r = managedconnection.out(message.fd, written.ptr, @truncate(u32, written.len));
|
||||
|
||||
switch (r) {
|
||||
// The message should be ignored (protocol specific).
|
||||
@ -298,11 +303,19 @@ fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||
|
||||
// Let's handle this as a disconnection.
|
||||
if (packet_size < 4) {
|
||||
print("message is less than 4 bytes ({} bytes)\n", .{packet_size});
|
||||
return CBEventType.FD_CLOSING;
|
||||
}
|
||||
|
||||
mlen.* = @truncate(u32, packet_size);
|
||||
|
||||
var hexbuf: [4000]u8 = undefined;
|
||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
var hexwriter = hexfbs.writer();
|
||||
hexdump.hexdump(hexwriter, "DEFAULT IN: MESSAGE RECEIVED", mcontent[0..packet_size]) catch unreachable;
|
||||
print("{s}\n", .{hexfbs.getWritten()});
|
||||
print("packet_size {}, mlen.* {}\n", .{packet_size, mlen.*});
|
||||
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
||||
@ -310,7 +323,16 @@ fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
|
||||
// print ("sending a message originated from {}\n", .{origin});
|
||||
// Message contains the fd, no need to search for the right structure to copy,
|
||||
// let's just recreate a Stream from the fd.
|
||||
|
||||
var to_send = mcontent[0..mlen];
|
||||
var hexbuf: [4000]u8 = undefined;
|
||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
var hexwriter = hexfbs.writer();
|
||||
hexdump.hexdump(hexwriter, "DEFAULT OUT: MESSAGE TO SEND", to_send) catch unreachable;
|
||||
print("{s}\n", .{hexfbs.getWritten()});
|
||||
|
||||
var stream = net.Stream { .handle = fd };
|
||||
_ = stream.write (mcontent[0..mlen]) catch return CBEventType.ERROR;
|
||||
var bytes_sent = stream.write (to_send) catch return CBEventType.ERROR;
|
||||
print("sent {} to {}\n", .{bytes_sent, fd});
|
||||
return CBEventType.NO_ERROR;
|
||||
}
|
||||
|
@ -98,10 +98,16 @@ fn create_service() !void {
|
||||
try ctx.add_external (serverfd);
|
||||
|
||||
var some_event: ipc.Event = undefined;
|
||||
var previous_event: ipc.Event.Type = ipc.Event.Type.ERROR;
|
||||
ctx.timer = 1000; // 1 second
|
||||
var count: u32 = 0;
|
||||
while(! S.should_quit) {
|
||||
some_event = try ctx.wait_event();
|
||||
|
||||
// For clarity in the output.
|
||||
if (some_event.t != .TIMER and previous_event == .TIMER ) { print("\n", .{}); }
|
||||
previous_event = some_event.t;
|
||||
|
||||
switch (some_event.t) {
|
||||
.TIMER => {
|
||||
print ("\rTimer! ({})", .{count});
|
||||
@ -151,6 +157,16 @@ fn create_service() !void {
|
||||
|
||||
.SWITCH_RX => {
|
||||
print ("Message has been received (SWITCH fd {}).\n", .{some_event.origin});
|
||||
// if (some_event.m) |m| {
|
||||
// var hexbuf: [4000]u8 = undefined;
|
||||
// var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||
// var hexwriter = hexfbs.writer();
|
||||
// try hexdump.hexdump(hexwriter, "Received", m.payload);
|
||||
// print("{s}\n", .{hexfbs.getWritten()});
|
||||
// }
|
||||
// else {
|
||||
// print ("Message received without actually a message?! {}", .{some_event});
|
||||
// }
|
||||
},
|
||||
|
||||
.SWITCH_TX => {
|
||||
|
@ -7,6 +7,8 @@
|
||||
#define SERVICE "pong"
|
||||
#define SERVICE_LEN 4
|
||||
|
||||
#define MAX_MSG_SIZE 10000
|
||||
|
||||
int direct_write_then_read(void);
|
||||
int wait_event(void);
|
||||
|
||||
@ -50,8 +52,8 @@ int direct_write_then_read(void) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
char message[10000];
|
||||
size_t size = 10000;
|
||||
char message[MAX_MSG_SIZE];
|
||||
size_t size = MAX_MSG_SIZE;
|
||||
|
||||
ret = ipc_read_fd (ctx, servicefd, message, &size);
|
||||
|
||||
@ -86,9 +88,9 @@ int direct_write_then_read(void) {
|
||||
int wait_event(void) {
|
||||
int ret = 0;
|
||||
int servicefd = 0;
|
||||
char message[10000];
|
||||
char message[MAX_MSG_SIZE];
|
||||
memset (message, 0, 1000);
|
||||
size_t size = 10000;
|
||||
size_t size = MAX_MSG_SIZE;
|
||||
char event_type;
|
||||
size_t index = 0;
|
||||
int originfd = 0;
|
||||
@ -134,7 +136,7 @@ int wait_event(void) {
|
||||
char should_continue = 1;
|
||||
size_t count = 0;
|
||||
while(should_continue) {
|
||||
size = 10000;
|
||||
size = MAX_MSG_SIZE;
|
||||
ret = ipc_wait_event (ctx, &event_type, &index, &originfd, message, &size);
|
||||
if (ret != 0) {
|
||||
printf ("Error while waiting for an event.\n");
|
||||
|
@ -6,12 +6,13 @@
|
||||
#define SERVICE "pong"
|
||||
#define SERVICE_LEN 4
|
||||
|
||||
#define MAX_MSG_SIZE 10000
|
||||
|
||||
int main(int argc, char**argv) {
|
||||
int ret = 0;
|
||||
int servicefd = 0;
|
||||
char message[10000];
|
||||
size_t size = 10000;
|
||||
char message[MAX_MSG_SIZE];
|
||||
size_t size = MAX_MSG_SIZE;
|
||||
char event_type;
|
||||
size_t index = 0;
|
||||
int originfd = 0;
|
||||
@ -48,7 +49,7 @@ int main(int argc, char**argv) {
|
||||
size_t count = 0;
|
||||
size_t count_timer = 0;
|
||||
while(should_continue) {
|
||||
size = 10000;
|
||||
size = MAX_MSG_SIZE;
|
||||
ret = ipc_wait_event (ctx, &event_type, &index, &originfd, message, &size);
|
||||
if (ret != 0) {
|
||||
printf ("Error while waiting for an event.\n");
|
||||
|
Reference in New Issue
Block a user