Fix switch read message + add print debug.
parent
9dc4cfa003
commit
3123051ef0
|
@ -681,7 +681,7 @@ test "Context - creation, echo once" {
|
||||||
try c.server_path("simple-context-test", writer);
|
try c.server_path("simple-context-test", writer);
|
||||||
var path = fbs.getWritten();
|
var path = fbs.getWritten();
|
||||||
|
|
||||||
// SERVER SIDE: creating a service.
|
// SERVER SIDE: creating a service.
|
||||||
var server = c.server_init("simple-context-test") catch |err| switch(err) {
|
var server = c.server_init("simple-context-test") catch |err| switch(err) {
|
||||||
error.FileNotFound => {
|
error.FileNotFound => {
|
||||||
print ("\nError: cannot init server at {s}\n", .{path});
|
print ("\nError: cannot init server at {s}\n", .{path});
|
||||||
|
|
|
@ -139,7 +139,7 @@ fn create_service() !void {
|
||||||
|
|
||||||
// 3. connect whether asked to and send a message
|
// 3. connect whether asked to and send a message
|
||||||
if (final_destination) |dest| {
|
if (final_destination) |dest| {
|
||||||
print("Let's contact {s} (service requested: {s})\n"
|
print("Connecting to {s} (service requested: {s})\n"
|
||||||
, .{dest, service_to_contact});
|
, .{dest, service_to_contact});
|
||||||
|
|
||||||
var uri = URI.read(dest);
|
var uri = URI.read(dest);
|
||||||
|
|
|
@ -15,7 +15,7 @@ const builtin = @import("builtin");
|
||||||
const native_os = builtin.target.os.tag;
|
const native_os = builtin.target.os.tag;
|
||||||
const print = std.debug.print;
|
const print = std.debug.print;
|
||||||
const testing = std.testing;
|
const testing = std.testing;
|
||||||
const print_eq = @import("./util.zig").print_eq;
|
const util = @import("./util.zig");
|
||||||
|
|
||||||
fn create_service() !void {
|
fn create_service() !void {
|
||||||
const config = .{.safety = true};
|
const config = .{.safety = true};
|
||||||
|
@ -81,8 +81,9 @@ fn create_service() !void {
|
||||||
|
|
||||||
.MESSAGE => {
|
.MESSAGE => {
|
||||||
if (some_event.m) |m| {
|
if (some_event.m) |m| {
|
||||||
print("New message: {}\n", .{m});
|
print("New message ({} bytes)\n", .{m.payload.len});
|
||||||
print("Let's echo it...\n", .{});
|
util.print_message ("RECEIVED MESSAGE", m);
|
||||||
|
print("Echoing it...\n", .{});
|
||||||
try ctx.schedule(m);
|
try ctx.schedule(m);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -11,7 +11,8 @@ const CBEventType = ipc.CBEvent.Type;
|
||||||
|
|
||||||
const Allocator = std.mem.Allocator;
|
const Allocator = std.mem.Allocator;
|
||||||
|
|
||||||
const print_eq = @import("./util.zig").print_eq;
|
const util = @import("./util.zig");
|
||||||
|
const print_eq = util.print_eq;
|
||||||
const print = std.debug.print;
|
const print = std.debug.print;
|
||||||
|
|
||||||
const Event = ipc.Event;
|
const Event = ipc.Event;
|
||||||
|
@ -80,7 +81,6 @@ pub const SwitchDB = struct {
|
||||||
var message_size: u32 = @truncate(u32, buffer.len);
|
var message_size: u32 = @truncate(u32, buffer.len);
|
||||||
var r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
var r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
|
||||||
|
|
||||||
print ("MESSAGE READ: {} (from {} to {})\n", .{r, fd, managedconnection.dest});
|
|
||||||
switch (r) {
|
switch (r) {
|
||||||
// The message should be ignored (protocol specific).
|
// The message should be ignored (protocol specific).
|
||||||
CBEventType.IGNORE => { return null; },
|
CBEventType.IGNORE => { return null; },
|
||||||
|
@ -89,9 +89,9 @@ pub const SwitchDB = struct {
|
||||||
// TODO: better allocator?
|
// TODO: better allocator?
|
||||||
// TODO: better errors?
|
// TODO: better errors?
|
||||||
var message: Message
|
var message: Message
|
||||||
= Message.init(managedconnection.dest
|
= Message.read(managedconnection.dest
|
||||||
, std.heap.c_allocator
|
, buffer[0..message_size]
|
||||||
, buffer[0..message_size]) catch {
|
, std.heap.c_allocator) catch {
|
||||||
return error.generic;
|
return error.generic;
|
||||||
};
|
};
|
||||||
return message;
|
return message;
|
||||||
|
@ -295,44 +295,27 @@ test "nuke 'em" {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
fn default_in (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
|
||||||
// print ("receiving a message originated from {}\n", .{origin});
|
|
||||||
|
|
||||||
// This may be kinda hacky, idk.
|
// This may be kinda hacky, idk.
|
||||||
var stream: net.Stream = .{ .handle = origin };
|
var stream: net.Stream = .{ .handle = origin };
|
||||||
var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
|
var packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
|
||||||
|
|
||||||
// Let's handle this as a disconnection.
|
// Let's handle this as a disconnection.
|
||||||
if (packet_size < 4) {
|
if (packet_size < 4) {
|
||||||
print("message is less than 4 bytes ({} bytes)\n", .{packet_size});
|
// print("message is less than 4 bytes ({} bytes)\n", .{packet_size});
|
||||||
return CBEventType.FD_CLOSING;
|
return CBEventType.FD_CLOSING;
|
||||||
}
|
}
|
||||||
|
|
||||||
mlen.* = @truncate(u32, packet_size);
|
mlen.* = @truncate(u32, packet_size);
|
||||||
|
|
||||||
var hexbuf: [4000]u8 = undefined;
|
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
|
||||||
var hexwriter = hexfbs.writer();
|
|
||||||
hexdump.hexdump(hexwriter, "DEFAULT IN: MESSAGE RECEIVED", mcontent[0..packet_size]) catch unreachable;
|
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
|
||||||
print("packet_size {}, mlen.* {}\n", .{packet_size, mlen.*});
|
|
||||||
|
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
|
fn default_out (fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
|
||||||
// print ("sending a message originated from {}\n", .{origin});
|
|
||||||
// Message contains the fd, no need to search for the right structure to copy,
|
// Message contains the fd, no need to search for the right structure to copy,
|
||||||
// let's just recreate a Stream from the fd.
|
// let's just recreate a Stream from the fd.
|
||||||
|
|
||||||
var to_send = mcontent[0..mlen];
|
var to_send = mcontent[0..mlen];
|
||||||
var hexbuf: [4000]u8 = undefined;
|
|
||||||
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
|
||||||
var hexwriter = hexfbs.writer();
|
|
||||||
hexdump.hexdump(hexwriter, "DEFAULT OUT: MESSAGE TO SEND", to_send) catch unreachable;
|
|
||||||
print("{s}\n", .{hexfbs.getWritten()});
|
|
||||||
|
|
||||||
var stream = net.Stream { .handle = fd };
|
var stream = net.Stream { .handle = fd };
|
||||||
var bytes_sent = stream.write (to_send) catch return CBEventType.ERROR;
|
_ = stream.write (to_send) catch return CBEventType.ERROR;
|
||||||
print("sent {} to {}\n", .{bytes_sent, fd});
|
|
||||||
return CBEventType.NO_ERROR;
|
return CBEventType.NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,8 +135,20 @@ fn create_service() !void {
|
||||||
var size = try client.stream.read(&buffer);
|
var size = try client.stream.read(&buffer);
|
||||||
var service_to_contact = buffer[0..size];
|
var service_to_contact = buffer[0..size];
|
||||||
|
|
||||||
|
if (service_to_contact.len == 0) {
|
||||||
|
print("Error, no service provided, closing the connection.\n", .{});
|
||||||
|
client.stream.close();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
print ("Ask to connect to service [{s}].\n", .{service_to_contact});
|
print ("Ask to connect to service [{s}].\n", .{service_to_contact});
|
||||||
var servicefd = try ctx.connect_service (service_to_contact);
|
var servicefd = ctx.connect_service (service_to_contact) catch |err| {
|
||||||
|
print("Error while connecting to the service {s}: {}.\n"
|
||||||
|
, .{service_to_contact, err});
|
||||||
|
print ("Closing the connection.\n", .{});
|
||||||
|
client.stream.close();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
errdefer ctx.close_fd (servicefd) catch {};
|
errdefer ctx.close_fd (servicefd) catch {};
|
||||||
|
|
||||||
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
|
print ("Send a message to inform remote TCPd that the connection is established.\n", .{});
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
// const hexdump = @import("./hexdump.zig");
|
const hexdump = @import("./hexdump.zig");
|
||||||
const testing = std.testing;
|
const testing = std.testing;
|
||||||
|
|
||||||
|
const print = std.debug.print;
|
||||||
|
const Message = @import("./message.zig").Message;
|
||||||
|
|
||||||
/// A VERY LIGHT and INCOMPLETE way of decoding URI.
|
/// A VERY LIGHT and INCOMPLETE way of decoding URI.
|
||||||
/// DO NOT USE IT UNLESS YOU KNOW WHAT TO EXPECT.
|
/// DO NOT USE IT UNLESS YOU KNOW WHAT TO EXPECT.
|
||||||
pub const URI = struct {
|
pub const URI = struct {
|
||||||
|
@ -34,6 +37,18 @@ test "URI simple decoding" {
|
||||||
try testing.expectEqualSlices(u8, uri.path, "some-path");
|
try testing.expectEqualSlices(u8, uri.path, "some-path");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn print_buffer (header: []const u8, buffer: []const u8) void {
|
||||||
|
var hexbuf: [4000]u8 = undefined;
|
||||||
|
var hexfbs = std.io.fixedBufferStream(&hexbuf);
|
||||||
|
var hexwriter = hexfbs.writer();
|
||||||
|
hexdump.hexdump(hexwriter, header, buffer) catch unreachable;
|
||||||
|
print("{s}\n", .{hexfbs.getWritten()});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn print_message (header: []const u8, m: Message) void {
|
||||||
|
print_buffer (header, m.payload);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn print_eq(expected: anytype, obj: anytype) !void {
|
pub fn print_eq(expected: anytype, obj: anytype) !void {
|
||||||
var buffer: [4096]u8 = undefined;
|
var buffer: [4096]u8 = undefined;
|
||||||
var fbs = std.io.fixedBufferStream(&buffer);
|
var fbs = std.io.fixedBufferStream(&buffer);
|
||||||
|
|
Reference in New Issue