Compiles with zig-0.12

master
Philippe PITTOLI 2024-06-06 03:39:52 +02:00
parent c5aad3535a
commit 13081f3612
6 changed files with 64 additions and 55 deletions

View File

@ -18,7 +18,7 @@ export fn ipc_context_init(ptr: **Context) callconv(.C) i32 {
/// Start a libipc service.
export fn ipc_service_init(ctx: *Context, servicefd: *i32, service_name: [*]const u8, service_name_len: u16) callconv(.C) i32 {
const streamserver = ctx.server_init(service_name[0..service_name_len]) catch return -1;
servicefd.* = streamserver.sockfd.?;
servicefd.* = streamserver.stream.handle;
return 0;
}
@ -147,7 +147,9 @@ export fn ipc_add_switch(ctx: *Context, fd1: i32, fd2: i32) callconv(.C) i32 {
return 0;
}
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) callconv(.C) i32 {
export fn ipc_set_switch_callbacks(ctx: *Context, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) callconv(.C) i32 {
ctx.set_switch_callbacks(fd, in, out) catch return -1;
return 0;
}

View File

@ -26,7 +26,7 @@ pub const CBEvent = struct {
// to it. This is a lookup.
// For IO callbacks (switching).
pub const Type = enum {
pub const Type = enum(u8) {
NO_ERROR, // No error. A message was generated.
ERROR, // Generic error.
FD_CLOSING, // The fd is closing.

View File

@ -5,6 +5,7 @@ const net = std.net;
const os = std.os;
const fmt = std.fmt;
const c = std.c;
const posix = std.posix;
// const print = std.debug.print;
@ -29,7 +30,7 @@ const SwitchDB = @import("./switch.zig").SwitchDB;
const Connections = @import("./connection.zig").Connections;
const CBEventType = @import("./main.zig").CBEvent.Type;
pub const PollFD = std.ArrayList(std.os.pollfd);
pub const PollFD = std.ArrayList(posix.pollfd);
pub const IPC_HEADER_SIZE = 4; // Size (4 bytes) then content.
pub const IPC_BASE_SIZE = 100000; // 100 KB, plenty enough space for messages
@ -69,7 +70,7 @@ pub const Context = struct {
defer _ = umask(previous_mask);
// Create the run directory, where all UNIX sockets will be.
std.os.mkdir(rundir, 0o0770) catch |err| switch (err) {
posix.mkdir(rundir, 0o0770) catch |err| switch (err) {
error.PathAlreadyExists => {
log.warn("runtime directory ({s}) already exists, (everything is fine, ignoring)", .{rundir});
},
@ -102,7 +103,7 @@ pub const Context = struct {
fn connect_(self: *Self, ctype: Connection.Type, path: []const u8) !i32 {
const stream = try net.connectUnixSocket(path);
const newfd = stream.handle;
errdefer std.os.closeSocket(newfd);
errdefer std.posix.close(newfd);
const newcon = Connection.init(ctype, null);
try self.add_(newcon, newfd);
return newfd;
@ -173,7 +174,7 @@ pub const Context = struct {
/// TODO: Add a new connection, but takes care of memory problems:
/// in case one of the arrays cannot sustain another entry, the other
/// won't be added.
fn add_(self: *Self, new_connection: Connection, fd: os.socket_t) !void {
fn add_(self: *Self, new_connection: Connection, fd: posix.socket_t) !void {
try self.connections.append(new_connection);
try self.pollfd.append(.{ .fd = fd, .events = std.os.linux.POLL.IN, .revents = 0 });
}
@ -218,24 +219,26 @@ pub const Context = struct {
}
fn accept_new_client(self: *Self, event: *Event, server_index: usize) !void {
// net.StreamServer
// net.Server
const serverfd = self.pollfd.items[server_index].fd;
const path = self.connections.items[server_index].path orelse return error.ServerWithNoPath;
var server = net.StreamServer{ .sockfd = serverfd, .kernel_backlog = 100, .reuse_address = false, .reuse_port = false, .listen_address = try net.Address.initUnix(path) };
const client = try server.accept(); // net.StreamServer.Connection
var server = net.Server{ .stream = .{ .handle = serverfd },
.listen_address = try net.Address.initUnix(path)
};
const client = try server.accept(); // net.Server.Connection
const newfd = client.stream.handle;
const newcon = Connection.init(Connection.Type.IPC, null);
try self.add_(newcon, newfd);
const sfd = server.sockfd orelse return error.SocketLOL; // TODO
const sfd = server.stream.handle;
// WARNING: imply every new item is last
event.set(Event.Type.CONNECTION, self.pollfd.items.len - 1, sfd, null);
}
// Create a unix socket.
// Store std lib structures in the context.
pub fn server_init(self: *Self, service_name: []const u8) !net.StreamServer {
pub fn server_init(self: *Self, service_name: []const u8) !net.Server {
var buffer: [1000]u8 = undefined;
var buffer_lock: [1000]u8 = undefined;
const path = try std.fmt.bufPrint(&buffer, "{s}/{s}", .{ self.rundir, service_name });
@ -258,16 +261,15 @@ pub const Context = struct {
defer _ = umask(previous_mask);
// Remove the old UNIX socket.
std.os.unlink(path) catch |err| switch (err) {
posix.unlink(path) catch |err| switch (err) {
error.FileNotFound => log.debug("no unlink necessary for {s}", .{path}),
else => return err,
};
var server = net.StreamServer.init(.{});
const socket_addr = try net.Address.initUnix(path);
try server.listen(socket_addr);
const server = try net.Address.listen(socket_addr, .{});
const newfd = server.sockfd orelse return error.SocketLOL; // TODO
const newfd = server.stream.handle;
// Store the path in the Connection structure, so the UNIX socket file can be removed later.
const newcon = Connection.init(Connection.Type.SERVER, try self.allocator.dupeZ(u8, path));
try self.add_(newcon, newfd);
@ -315,7 +317,9 @@ pub const Context = struct {
try self.switchdb.add_switch(fd1, fd2);
}
pub fn set_switch_callbacks(self: *Self, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
pub fn set_switch_callbacks(self: *Self, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
try self.switchdb.set_callbacks(fd, in, out);
}
@ -351,7 +355,7 @@ pub const Context = struct {
/// This is useful for switched connections: FDs could be closed without libipc being informed.
fn safe_close_fd(self: *Self, fd: i32) void {
var should_close = true;
_ = std.os.fcntl(fd, std.os.F.GETFD, 0) catch {
_ = posix.fcntl(fd, posix.F.GETFD, 0) catch {
should_close = false;
};
if (should_close) {
@ -391,7 +395,7 @@ pub const Context = struct {
// Polling.
var count: usize = undefined;
count = try os.poll(self.pollfd.items, wait_duration);
count = try posix.poll(self.pollfd.items, wait_duration);
if (count < 0) {
log.err("there is a problem: poll < 0", .{});
@ -571,7 +575,7 @@ pub const Context = struct {
}
const pollfd = self.pollfd.swapRemove(index);
log.debug("closed client index {} (fd = {})", .{ index, pollfd.fd });
std.os.close(pollfd.fd);
posix.close(pollfd.fd);
// Remove all its non-sent messages.
var i: usize = 0;
@ -658,7 +662,7 @@ test "Context - creation, display and memory check" {
const t = try std.Thread.spawn(.{}, CommunicationTestThread.clientFn, .{});
defer t.join();
// Server.accept returns a net.StreamServer.Connection.
// Server.accept returns a net.Server.Connection.
var client = try server.accept();
defer client.stream.close();
var buf: [16]u8 = undefined;
@ -727,7 +731,7 @@ test "Context - creation, echo once" {
const t = try std.Thread.spawn(.{}, ConnectThenSendMessageThread.clientFn, .{});
defer t.join();
// Server.accept returns a net.StreamServer.Connection.
// Server.accept returns a net.Server.Connection.
var client = try server.accept();
defer client.stream.close();
var buf: [1000]u8 = undefined;

View File

@ -1,14 +1,15 @@
const std = @import("std");
const testing = std.testing;
const os = std.os;
const posix = std.posix;
const log = std.log.scoped(.libipc_exchangefd);
const builtin = @import("builtin");
const windows = std.os.windows;
const errno = std.os.errno;
const system = std.os.system;
const unexpectedErrno = std.os.unexpectedErrno;
const SendMsgError = std.os.SendMsgError;
const errno = posix.errno;
const system = posix.system;
const unexpectedErrno = posix.unexpectedErrno;
const SendMsgError = posix.SendMsgError;
const SCM_RIGHTS: c_int = 1;
@ -69,26 +70,26 @@ pub fn Cmsghdr(comptime T: type) type {
}
test {
std.testing.refAllDecls(Cmsghdr([3]std.os.fd_t));
std.testing.refAllDecls(Cmsghdr([3]posix.fd_t));
}
/// Send a file descriptor and a message through a UNIX socket.
/// TODO: currently voluntarily crashes if data isn't sent properly, should return an error instead.
pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void {
var iov = [_]os.iovec_const{
pub fn send_fd(sockfd: posix.socket_t, msg: []const u8, fd: posix.fd_t) void {
var iov = [_]posix.iovec_const{
.{
.iov_base = msg.ptr,
.iov_len = msg.len,
},
};
var cmsg = Cmsghdr(os.fd_t).init(.{
.level = os.SOL.SOCKET,
var cmsg = Cmsghdr(posix.fd_t).init(.{
.level = posix.SOL.SOCKET,
.type = SCM_RIGHTS,
.data = fd,
});
const len = os.sendmsg(sockfd, &std.os.msghdr_const{
const len = posix.sendmsg(sockfd, &posix.msghdr_const{
.name = null,
.namelen = 0,
.iov = &iov,
@ -104,7 +105,7 @@ pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void {
if (len != msg.len) {
// We don't have much choice but to exit here.
log.err("expected sendmsg to return {} but got {}", .{ msg.len, len });
os.exit(0xff);
posix.exit(0xff);
}
}
@ -113,9 +114,9 @@ pub fn send_fd(sockfd: os.socket_t, msg: []const u8, fd: os.fd_t) void {
/// WARNING: can only work on linux for now (recvmsg is lacking on other systems).
pub fn recvmsg(
/// The file descriptor of the sending socket.
sockfd: os.socket_t,
sockfd: posix.socket_t,
/// Message header and iovecs
msg: std.os.msghdr,
msg: posix.msghdr,
flags: u32,
) SendMsgError!usize {
while (true) {
@ -186,20 +187,20 @@ pub fn recvmsg(
/// Receive a file descriptor through a UNIX socket.
/// A message can be carried with it, copied into 'buffer'.
/// WARNING: buffer must be at least 1500 bytes.
pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t {
pub fn receive_fd(sockfd: posix.socket_t, buffer: []u8, msg_size: *usize) !posix.fd_t {
var msg_buffer = [_]u8{0} ** 1500;
var iov = [_]os.iovec{
var iov = [_]posix.iovec{
.{ .iov_base = msg_buffer[0..], .iov_len = msg_buffer.len },
};
var cmsg = Cmsghdr(os.fd_t).init(.{
.level = os.SOL.SOCKET,
var cmsg = Cmsghdr(posix.fd_t).init(.{
.level = posix.SOL.SOCKET,
.type = SCM_RIGHTS,
.data = 0,
});
const msg: std.os.msghdr = .{ .name = null, .namelen = 0, .iov = &iov, .iovlen = 1, .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0 };
const msg: posix.msghdr = .{ .name = null, .namelen = 0, .iov = &iov, .iovlen = 1, .control = &cmsg, .controllen = @sizeOf(@TypeOf(cmsg)), .flags = 0 };
const msglen = recvmsg(sockfd, msg, 0) catch |err| {
log.err("error recvmsg failed with {s}", .{@errorName(err)});
@ -207,7 +208,7 @@ pub fn receive_fd(sockfd: os.socket_t, buffer: []u8, msg_size: *usize) !os.fd_t
};
const received_fd = @as(i32, cmsg.dataPtr().*);
std.mem.copy(u8, buffer, &msg_buffer);
std.mem.copyForwards(u8, buffer, &msg_buffer);
msg_size.* = msglen;
return received_fd;

View File

@ -27,7 +27,7 @@ pub const Message = struct {
var fbs = std.io.fixedBufferStream(buffer);
var reader = fbs.reader();
const msg_len = try reader.readIntBig(u32);
const msg_len = try reader.readInt(u32, .big);
if (msg_len > buffer.len - 4) {
return error.wrongMessageLength;
}
@ -37,7 +37,7 @@ pub const Message = struct {
}
pub fn write(self: Self, writer: anytype) !usize {
try writer.writeIntBig(u32, @as(u32, @truncate(self.payload.len)));
try writer.writeInt(u32, @as(u32, @truncate(self.payload.len)), .big);
return 4 + try writer.write(self.payload);
}

View File

@ -62,7 +62,9 @@ pub const SwitchDB = struct {
try self.db.put(fd2, ManagedConnection{ .dest = fd1 });
}
pub fn set_callbacks(self: *Self, fd: i32, in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType, out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType) !void {
pub fn set_callbacks(self: *Self, fd: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8) !void {
var managedconnection = self.db.get(fd) orelse return error.unregisteredFD;
managedconnection.in = in;
managedconnection.out = out;
@ -76,7 +78,7 @@ pub const SwitchDB = struct {
var buffer = [_]u8{0} ** 100000; // TODO: buffer size
var message_size: u32 = @truncate(buffer.len);
const r: CBEventType = managedconnection.in(fd, &buffer, &message_size);
const r: CBEventType = @enumFromInt(managedconnection.in(fd, &buffer, &message_size));
switch (r) {
// The message should be ignored (protocol specific).
@ -118,7 +120,7 @@ pub const SwitchDB = struct {
_ = message.write(writer) catch return error.generic;
const written = fbs.getWritten();
const r = managedconnection.out(message.fd, written.ptr, @truncate(written.len));
const r: CBEventType = @enumFromInt(managedconnection.out(message.fd, written.ptr, @truncate(written.len)));
switch (r) {
// The message should be ignored (protocol specific).
@ -180,8 +182,8 @@ pub const SwitchDB = struct {
const ManagedConnection = struct {
dest: i32,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType = default_in,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) CBEventType = default_out,
in: *const fn (origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 = default_in,
out: *const fn (origin: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 = default_out,
};
test "creation and display" {
@ -305,28 +307,28 @@ test "nuke 'em" {
try testing.expect(switchdb.db.count() == 0);
}
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) CBEventType {
fn default_in(origin: i32, mcontent: [*]u8, mlen: *u32) callconv(.C) u8 {
// This may be kinda hacky, idk.
var stream: net.Stream = .{ .handle = origin };
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return CBEventType.ERROR;
const packet_size: usize = stream.read(mcontent[0..mlen.*]) catch return @intFromEnum(CBEventType.ERROR);
// Let's handle this as a disconnection.
if (packet_size < 4) {
log.debug("message is less than 4 bytes ({} bytes)", .{packet_size});
return CBEventType.FD_CLOSING;
return @intFromEnum(CBEventType.FD_CLOSING);
}
mlen.* = @truncate(packet_size);
return CBEventType.NO_ERROR;
return @intFromEnum(CBEventType.NO_ERROR);
}
fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) CBEventType {
fn default_out(fd: i32, mcontent: [*]const u8, mlen: u32) callconv(.C) u8 {
// Message contains the fd, no need to search for the right structure to copy,
// let's just recreate a Stream from the fd.
const to_send = mcontent[0..mlen];
var stream = net.Stream{ .handle = fd };
_ = stream.write(to_send) catch return CBEventType.ERROR;
return CBEventType.NO_ERROR;
_ = stream.write(to_send) catch return @intFromEnum(CBEventType.ERROR);
return @intFromEnum(CBEventType.NO_ERROR);
}