From 158cc47897421998c94377fa0c6de73f684d8d34 Mon Sep 17 00:00:00 2001 From: Karchnu Date: Fri, 3 Jul 2020 13:42:19 +0200 Subject: [PATCH] Context --- src/ipc/client.cr | 15 +++- src/ipc/connection.cr | 193 ++---------------------------------------- src/ipc/context.cr | 178 ++++++++++++++++++++++++++++++++++++++ src/ipc/lowlevel.cr | 1 + src/ipc/message.cr | 12 +-- src/ipc/service.cr | 46 +--------- 6 files changed, 205 insertions(+), 240 deletions(-) create mode 100644 src/ipc/context.cr diff --git a/src/ipc/client.cr b/src/ipc/client.cr index 124df1f..b3729a8 100644 --- a/src/ipc/client.cr +++ b/src/ipc/client.cr @@ -2,11 +2,20 @@ require "./lowlevel" require "./message" require "./event" require "./service" -require "./connection" +require "./context" +class IPC::Client < IPC::Context + property connection : IPC::Connection -class IPC::Client < IPC::Connections - @connection : IPC::Connection + # By default, this is a client. + def initialize(service_name : String) + super() + r = LibIPC.ipc_connection(self.pointer, service_name) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "error during connection establishment: #{m}" + end + end def initialize(name : String) super() diff --git a/src/ipc/connection.cr b/src/ipc/connection.cr index a6d44b0..5b2a04e 100644 --- a/src/ipc/connection.cr +++ b/src/ipc/connection.cr @@ -3,90 +3,13 @@ require "./message" require "./event" class IPC::Connection - getter ctx : LibIPC::Ctx getter connection : LibIPC::Connection + getter pollfd : LibIPC::Pollfd getter closed = false - # connection already established - def initialize(c : LibIPC::Ctx) - @ctx = c - end - # def initialize(c : LibIPC::Connection) - # @connection = c - # end - - def initialize(service_name : String) - @ctx = LibIPC::Ctx.new - - r = LibIPC.ipc_connection(self.pointer, service_name) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "error during connection establishment: #{m}" - end - end - - def initialize(name, &block) - initialize(name) - - yield self - - close - end - - # Adds a new connection based on the socket file descriptor - def initialize(fd : Int32) - external_connection = LibIPC::Connection.new - external_connection.fd = fd - initialize(external_connection) - end - - # sanitizer - def fd - @connection.fd - end - - def send(utype : UInt8, payload : Bytes) - message = LibIPC::Message.new type: LibIPC::MessageType::Data.to_u8, - user_type: utype, - length: payload.bytesize, - payload: payload.to_unsafe - - r = LibIPC.ipc_write(self.pointer, pointerof(message)) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "error writing a message: #{m}" - end - end - - def send(utype : UInt8, payload : String) - send(utype, Bytes.new(payload.to_unsafe, payload.bytesize)) - end - - def send(message : IPC::Message) - send(message.utype, message.payload) - end - - def read - message = LibIPC::Message.new - r = LibIPC.ipc_read(pointerof(@connection), pointerof(message)) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "error reading a message: #{m}" - end - - IPC::Message.new pointerof(message) - end - - def close - return if @closed - - r = LibIPC.ipc_close(self.pointer) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot correctly close the connection: #{m}" - end - - @closed = true + def initialize + @connection = LibIPC::Connection.new + @pollfd = LibIPC::Pollfd.new end def pointer @@ -98,7 +21,7 @@ class IPC::Connection end end -# This class is designed for stand alone connections, where the StandAloneConnection object +# This class is designed for stand alone context, where the StandAloneConnection object # should NOT be garbage collected (which means the end of the communication) class IPC::StandAloneConnection # close the connection in case the object is garbage collected @@ -106,109 +29,3 @@ class IPC::StandAloneConnection close end end - -class IPC::Connections - property base_timer : Float64 = 0.0 - property timer : Float64 = 0.0 - getter connections : LibIPC::Connections - - def initialize - @connections = LibIPC::Connections.new - end - - def initialize(@connections : LibIPC::Connections) - end - - def << (client : IPC::Connection) - r = LibIPC.ipc_add(self.pointer, client.pointer) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot add an arbitrary file descriptor: #{m}" - end - end - - def << (fd : Int) - r = LibIPC.ipc_add_fd(self.pointer, fd) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot add an arbitrary file descriptor: #{m}" - end - end - - def remove (client : IPC::Connection) - c = client.connection - r = LibIPC.ipc_del(self.pointer, pointerof(c)) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot remove a client: #{m}" - end - end - - def remove_fd (fd : Int) - r = LibIPC.ipc_del_fd(self.pointer, fd) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot remove an arbitrary file descriptor: #{m}" - end - end - - def wait_event(&block) : IPC::Event::Events | Exception - event = LibIPC::Event.new - - r = LibIPC.ipc_events_loop self.pointer, pointerof(event), pointerof(@timer) - if r.error_code != 0 - m = String.new r.error_message.to_slice - yield IPC::Exception.new "error waiting for a new event: #{m}" - end - - eventtype = event.type.unsafe_as(LibIPC::EventType) - - # if event type is Timer, there is no connection nor message - case eventtype - when LibIPC::EventType::NotSet - return Exception.new "'Event type: not set" - when LibIPC::EventType::Error - return IPC::Event::Error.new event.index, event.origin - when LibIPC::EventType::ExtraSocket # Message received from a non IPC socket. - return IPC::Event::ExtraSocket.new event.origin, event.index - when LibIPC::EventType::Switch # Message to send to a corresponding fd. - return IPC::Event::Switch.new event.origin, event.index - when LibIPC::EventType::Connection # New user. - return IPC::Event::Connection.new event.origin, event.index - when LibIPC::EventType::Disconnection # User disconnected. - return IPC::Event::Disconnection.new event.origin, event.index - when LibIPC::EventType::Message # New message. - message = event.message.unsafe_as(Pointer(LibIPC::Message)) - return IPC::Event::MessageReceived.new event.origin, event.index, message - when LibIPC::EventType::LookUp # Client asking for a service through ipcd. - # for now, the libipc does not provide lookup events - # ipcd uses a simple LibIPC::EventType::Message - return IPC::Event::LookUp.new event.origin, event.index - when LibIPC::EventType::Timer # Timeout in the poll(2) function. - return IPC::Event::Timer.new - when LibIPC::EventType::Tx # Message sent. - return IPC::Event::Tx.new event.origin, event.index - end - - yield Exception.new "Cannot understand the event type: #{eventtype}" - end - - def loop(&block : Proc(IPC::Event::Events|Exception, Nil)) - if @base_timer > 0 && @timer == 0 - @timer = @base_timer - end - - ::loop do - yield wait_event &block - end - end - - # sanitizer - def pointer - pointerof(@ctx) - end - - # def pp - # LibIPC.ipc_connections_print @connections - # end -end diff --git a/src/ipc/context.cr b/src/ipc/context.cr new file mode 100644 index 0000000..4687bf6 --- /dev/null +++ b/src/ipc/context.cr @@ -0,0 +1,178 @@ +require "./lowlevel" +require "./message" +require "./event" + +class IPC::Context + property base_timer : Float64 = 0.0 + property timer : Float64 = 0.0 + getter context : LibIPC::Ctx + + def initialize + @context = LibIPC::Ctx.new + end + + def initialize(@context : LibIPC::Ctx) + end + + # By default, this is a client. + def initialize(service_name : String, &block) + initialize(name) + yield self + close + end + + def << (client : IPC::Connection) + r = LibIPC.ipc_add(self.pointer, client.pointer, pointerof(client.pollfd)) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot add an arbitrary file descriptor: #{m}" + end + end + + def << (fd : Int) + r = LibIPC.ipc_add_fd(self.pointer, fd) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot add an arbitrary file descriptor: #{m}" + end + end + + def remove (client : IPC::Connection) + c = client.connection + r = LibIPC.ipc_del(self.pointer, pointerof(c)) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot remove a client: #{m}" + end + end + + def remove_fd (fd : Int) + r = LibIPC.ipc_del_fd(self.pointer, fd) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot remove an arbitrary file descriptor: #{m}" + end + end + + def wait_event(&block) : IPC::Event::Events | Exception + event = LibIPC::Event.new + + r = LibIPC.ipc_events_loop self.pointer, pointerof(event), pointerof(@timer) + if r.error_code != 0 + m = String.new r.error_message.to_slice + yield IPC::Exception.new "error waiting for a new event: #{m}" + end + + eventtype = event.type.unsafe_as(LibIPC::EventType) + + # if event type is Timer, there is no connection nor message + case eventtype + when LibIPC::EventType::NotSet + return Exception.new "'Event type: not set" + when LibIPC::EventType::Error + return IPC::Event::Error.new event.index, event.origin + when LibIPC::EventType::ExtraSocket # Message received from a non IPC socket. + return IPC::Event::ExtraSocket.new event.origin, event.index + when LibIPC::EventType::Switch # Message to send to a corresponding fd. + return IPC::Event::Switch.new event.origin, event.index + when LibIPC::EventType::Connection # New user. + return IPC::Event::Connection.new event.origin, event.index + when LibIPC::EventType::Disconnection # User disconnected. + return IPC::Event::Disconnection.new event.origin, event.index + when LibIPC::EventType::Message # New message. + message = event.message.unsafe_as(Pointer(LibIPC::Message)) + return IPC::Event::MessageReceived.new event.origin, event.index, message + when LibIPC::EventType::LookUp # Client asking for a service through ipcd. + # for now, the libipc does not provide lookup events + # ipcd uses a simple LibIPC::EventType::Message + return IPC::Event::LookUp.new event.origin, event.index + when LibIPC::EventType::Timer # Timeout in the poll(2) function. + return IPC::Event::Timer.new + when LibIPC::EventType::Tx # Message sent. + return IPC::Event::Tx.new event.origin, event.index + end + + yield Exception.new "Cannot understand the event type: #{eventtype}" + end + + def loop(&block : Proc(IPC::Event::Events|Exception, Nil)) + if @base_timer > 0 && @timer == 0 + @timer = @base_timer + end + + ::loop do + yield wait_event &block + end + end + + def send(fd : Int32, utype : UInt8, payload : Bytes) + message = LibIPC::Message.new fd: fd, + type: LibIPC::MessageType::Data.to_u8, + user_type: utype, + length: payload.bytesize, + payload: payload.to_unsafe + + r = LibIPC.ipc_write(self.pointer, pointerof(message)) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "error writing a message: #{m}" + end + end + + def send(fd : Int32, utype : UInt8, payload : String) + send(fd, utype, Bytes.new(payload.to_unsafe, payload.bytesize)) + end + + def send(fd : Int32, message : IPC::Message) + send(fd, message.utype, message.payload) + end + + def read(index : UInt32) + message = LibIPC::Message.new + r = LibIPC.ipc_read(self.pointer, index, pointerof(message)) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "error reading a message: #{m}" + end + + IPC::Message.new pointerof(message) + end + + def close + return if @closed + + r = LibIPC.ipc_close(self.pointer) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot correctly close the connection: #{m}" + end + + @closed = true + end + + # sanitizer + def pointer + pointerof(@context) + end + + # sanitizer + def fd + @connection.fd + end + + def close + return if @closed + + r = LibIPC.ipc_close_all(self.pointer) + if r.error_code != 0 + m = String.new r.error_message.to_slice + raise Exception.new "cannot correctly close the connection: #{m}" + end + + @closed = true + end + + # def pp + # LibIPC.ipc_connections_print @context + # end +end diff --git a/src/ipc/lowlevel.cr b/src/ipc/lowlevel.cr index 615abf9..99d1f72 100644 --- a/src/ipc/lowlevel.cr +++ b/src/ipc/lowlevel.cr @@ -103,6 +103,7 @@ lib LibIPC # The error message is contained in the IPCError structure, this function should not be used, in most cases. fun ipc_errors_get (LibC::Int) : LibC::Char* + # Exchanging file descriptors (used with ipcd on connection). fun ipc_receive_fd (sock : LibC::Int, fd : LibC::Int*) : IPCError fun ipc_provide_fd (sock : LibC::Int, fd : LibC::Int ) : IPCError diff --git a/src/ipc/message.cr b/src/ipc/message.cr index 0276e9f..a5c25ab 100644 --- a/src/ipc/message.cr +++ b/src/ipc/message.cr @@ -5,7 +5,7 @@ require "json" # At some point, this will be replaced by the CBOR format class IPC::Message - + property fd : Int32 # file descriptor property mtype : UInt8 # libipc message type property utype : UInt8 # libipc user message type property payload : Bytes @@ -34,10 +34,12 @@ class IPC::Message def initialize(message : Pointer(LibIPC::Message)) if message.null? @mtype = LibIPC::MessageType::Error.to_u8 + @fd = 0 @utype = 0 @payload = Bytes.new "".to_unsafe, 0 else m = message.value + @fd = m.fd @mtype = m.type @utype = m.user_type @payload = Bytes.new m.payload, m.length @@ -48,14 +50,12 @@ class IPC::Message initialize pointerof(message) end - def initialize(mtype, utype, payload : Bytes) + def initialize(@fd, mtype, @utype, @payload : Bytes) @mtype = mtype.to_u8 - @utype = utype - @payload = payload end - def initialize(mtype, utype, payload : String) - initialize(mtype, utype, Bytes.new(payload.to_unsafe, payload.bytesize)) + def initialize(fd, mtype, utype, payload : String) + initialize(fd, mtype, utype, Bytes.new(payload.to_unsafe, payload.bytesize)) end def self.to_packet (user_type : Int, message : String) diff --git a/src/ipc/service.cr b/src/ipc/service.cr index c7094bd..ff64136 100644 --- a/src/ipc/service.cr +++ b/src/ipc/service.cr @@ -3,13 +3,11 @@ require "./message" require "./event" require "./connection" -# the server is a connection with two different function calls +# the server is a connection with a different function call for the connection # ipc_connection => ipc_server_init -# ipc_close => ipc_server_close class IPC::Server < IPC::Connection def initialize(name : String) - @connection = LibIPC::Connection.new - r = LibIPC.ipc_server_init(LibC.environ, self.pointer, name) + r = LibIPC.ipc_server_init(self.pointer, name) if r.error_code != 0 m = String.new r.error_message.to_slice raise Exception.new "cannot initialize the server named #{name}: #{m}" @@ -18,21 +16,9 @@ class IPC::Server < IPC::Connection # Very important as there are filesystem side-effects. at_exit { close } end - - def close - return if @closed - - r = LibIPC.ipc_server_close(self.pointer) - if r.error_code != 0 - m = String.new r.error_message.to_slice - raise Exception.new "cannot close the server correctly: #{m}" - end - - @closed = true - end end -class IPC::Service < IPC::Connections +class IPC::Service < IPC::Context @service_info : IPC::Server def initialize(name : String) @@ -69,31 +55,5 @@ class IPC::SwitchingService < IPC::Service super @switch.del fd end - - def wait_event(server : IPC::Connection , &block) : Tuple(LibIPC::EventType, IPC::Message, IPC::Connection) | Tuple(LibIPC::EventType, Nil, Nil) - event = LibIPC::Event.new - - serverp = server.pointer - r = LibIPC.ipc_wait_event_networkd self.pointer, serverp, pointerof(event), @switch.pointer, pointerof(@timer) - - if r.error_code != 0 - m = String.new r.error_message.to_slice - yield IPC::Exception.new "error waiting for a new event: #{m}" - end - - eventtype = event.type.unsafe_as(LibIPC::EventType) - - # if event type is Timer, there is no connection nor message - case eventtype - when LibIPC::EventType::Timer - return eventtype, nil, nil - end - - connection = IPC::Connection.new event.origin.unsafe_as(Pointer(LibIPC::Connection)).value - - message = event.message.unsafe_as(Pointer(LibIPC::Message)) - - return eventtype, IPC::Message.new(message), connection - end end