require "http/web_socket" require "option_parser" require "ipc" require "baguette-crystal-base" require "./utils" require "./lib_modifications.cr" # Websocketc # - is the application allowing libipc communications through Web Socket # - is designed to connect to a Websocketd server, ask for a service, then exchange data # - is WIP # Currently, the exchange format is JSON, encapsulated in JSON: # type: Int32 # type of the message # payload: JSON::Any? # encapsulated payload # # Websocketd deserialize the type and the payload before sending it to the service. module Websocketc class Exception < ::Exception end end class Context class_property service : Websocketc::Service? end class Baguette::Configuration class Websocketc < IPC end end class Relation property fd_client : Int32 property fd_service : Int32 property ws : WebSocket # ws can send JSON messages instead of libipc-formated messages. property is_json : Bool property buffer_client : String = String.new property buffer_service : String = String.new def related? (fd : Int32) fd == @fd_client || fd == @fd_service end def initialize(@fd_client, @fd_service, @ws, @is_json) end def inspect(io) : Nil to_s io end def to_s(io) c = "%4d" % @fd_client s = "%4d" % @fd_service j = if @is_json "JSON" else "Not JSON" end io << "client #{c} service #{s} #{j}" end end # Hide the complexity of managing relations. class Relations < Array(Relation) property all_fd : Array(Int32) = Array(Int32).new def <<(relation : Relation) Baguette::Log.debug "Adding a new relation: #{relation}" @all_fd << relation.fd_client @all_fd << relation.fd_service super relation end def search?(fd : Int32) : Relation? find {|r| r.fd_client == fd || r.fd_service == fd } end def remove(fd : Int32) # Baguette::Log.warning "context before removing #{fd}:" # Context.service.not_nil!.print_self each do |r| if r.related? fd Baguette::Log.debug "Closing this relation: #{r}" # Removing relations and file descriptors from C structures. pointer_ctx = Context.service.not_nil!.pointer LibIPC.ipc_ctx_switching_del pointer_ctx, r.fd_client LibIPC.ipc_del_fd pointer_ctx, r.fd_client LibIPC.ipc_del_fd pointer_ctx, r.fd_service # Close these sockets. # Baguette::Log.debug "Closing both #{r.fd_client} and #{r.fd_service} (ws)" begin s = Socket.new r.fd_client, Socket::Family::UNIX, Socket::Type::RAW s.close rescue e Baguette::Log.error "(ignoring) closing the client socket: #{e}" end begin r.ws.close rescue e Baguette::Log.error "(ignoring) closing the ws: #{e}" end # Trying not to close this socket (ws already do that). #begin # s = Socket.new r.fd_service, Socket::Family::INET, Socket::Type::STREAM # s.close #rescue e # Baguette::Log.error "(ignoring) closing the service socket: #{e}" #end all_fd.select! {|v| v != r.fd_client && v != r.fd_service } end end select! {|r| ! r.related? fd } # Baguette::Log.warning "context after removing #{fd}" # Context.service.not_nil!.print_self end end require "./network.cr" def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*) Context.service.not_nil!.relations.search?(fd).try do |relation| # Baguette::Log.info "IN fd is #{fd} in relation #{relation}" # Context.service.not_nil!.print_self message = nil begin message = relation.ws.run_once rescue e Baguette::Log.error "run_once FAILED: #{e}" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Closing end if relation.ws.ws.io.empty? more_to_read[0] = 0 else more_to_read[0] = 1 end if relation.ws.closed? Baguette::Log.info "client closed the connection" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Closing end if message.nil? Baguette::Log.error "Reveiced a nil message" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Closing end case message when String if relation.is_json # Reassemble the message. m = relation.buffer_client + message # Clean the buffer. relation.buffer_client = String.new begin ipc_message = IPC::Message.from_json m ipc_message.copy_to_message_pointer pm rescue e Baguette::Log.error "cannot send message coming from #{fd}, message: #{m}" Context.service.not_nil!.relations.remove fd Baguette::Log.error "error: #{e}" return LibIPC::IPCCB::Error end Baguette::Log.debug "no error reassembling the message" return LibIPC::IPCCB::NoError end Baguette::Log.error "cannot handle non-json messages!" return LibIPC::IPCCB::Error when WebSocket::Ping Baguette::Log.debug "TODO: Received a ping message" return LibIPC::IPCCB::Ignore when WebSocket::NotFinal Baguette::Log.debug "Received only part of a message" relation.buffer_client += message.message return LibIPC::IPCCB::Ignore else # every other option should be dropped case message when WebSocket::Error Baguette::Log.error "An error occured" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Error when WebSocket::Pong Baguette::Log.error "Received a pong message" return LibIPC::IPCCB::Ignore when WebSocket::Close Baguette::Log.debug "Received a close message" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Closing when Bytes # TODO: when receiving a binary message # we should test the format and maybe its content Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Error else Baguette::Log.error "Received a websocket message with unknown type" end end end return LibIPC::IPCCB::Error rescue e Baguette::Log.error "Exception (receiving a message) #{e}" # tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) # tcp.close Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Error end def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) Context.service.not_nil!.relations.search?(fd).try do |relation| # Baguette::Log.info "OUT fd is #{fd} in relation #{relation}" # Context.service.not_nil!.print_self message = IPC::Message.new pm # Baguette::Log.info "message to send: #{message}" if relation.is_json buf = message.to_json else buf = message.to_packet end relation.ws.send buf return LibIPC::IPCCB::NoError end Baguette::Log.error "Wait, not supposed to get here. No relation?" return LibIPC::IPCCB::Error rescue e Baguette::Log.error "Exception during message transfer: #{e}" Context.service.not_nil!.relations.remove fd return LibIPC::IPCCB::Error end class Websocketc::Service < IPC::Server property relations : Relations = Relations.new property config : Baguette::Configuration::Websocketc def initialize(@config : Baguette::Configuration::Websocketc) super "ws" Context.service = self end def print_self Baguette::Log.warning "From C perspective" LibIPC.ipc_ctx_print self.pointer Baguette::Log.warning "From Crystal perspective" Baguette::Log.warning "all fd: #{@relations.all_fd.join ", "}" @relations.each do |r| pp! r end Baguette::Log.warning "===" end def first_connection(event : IPC::Event::MessageReceived) # First message format: "URI" payload = String.new event.message.payload # Baguette::Log.info "First message received: #{payload}" # TODO: handle exceptions and errors begin uri = URI.parse payload ws = WebSocket.new uri is_json = uri.path.ends_with? ".JSON" service_fd = ws.ws.io.as(TCPSocket).fd relation = Relation.new event.fd, service_fd, ws, is_json @relations << relation # Change client fd status. LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched # Add service fd as switched. LibIPC.ipc_add_fd_switched self.pointer, service_fd # Link both file descriptors LibIPC.ipc_ctx_switching_add self.pointer, event.fd, service_fd # Change service fd callbacks: only the service callbacks are changed, # since the associated client is a simple libipc client proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*) proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message)) LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out # Baguette::Log.debug "new client: #{event.fd}" rescue e Baguette::Log.error "cannot connect to #{payload}: #{e}" end send_now event.fd, 1.to_u8, "OK" end def run Baguette::Log.title "Starting websocketc" @timer = @config.ipc_timer @base_timer = @config.ipc_timer self.loop do |event| begin case event when IPC::Event::Timer Baguette::Log.debug "Timer" if @config.print_ipc_timer # print_self when IPC::Event::Connection Baguette::Log.info "connection from #{event.fd}" if @config.print_ipc_connection when IPC::Event::Disconnection Baguette::Log.info "disconnection from #{event.fd}" if @config.print_ipc_disconnection @relations.remove event.fd when IPC::Event::MessageSent Baguette::Log.info "message sent to #{event.fd}" if @config.print_ipc_message_sent when IPC::Event::MessageReceived Baguette::Log.info "message received from #{event.fd}" if @config.print_ipc_message_received if r = @relations.search? event.fd Baguette::Log.error "MessageReceived but from an already existent relation" Baguette::Log.error "relation: #{r}" exit 1 else first_connection event end when IPC::Event::Switch Baguette::Log.debug "switched message from #{event.fd}" if @config.print_ipc_switch when IPC::Event::EventNotSet Baguette::Log.error "Event not set: #{event.fd}" @relations.remove event.fd begin s = Socket.new event.fd, Socket::Family::UNIX, Socket::Type::RAW s.close rescue e Baguette::Log.warning "cannot close the socket #{event.fd}: #{e} (ignoring)" end when IPC::Event::Error Baguette::Log.error "Event Error on fd #{event.fd} (removing it)" @relations.remove event.fd when IPC::Exception Baguette::Log.error "IPC::Exception: #{event.message}" if event.message == "closed recipient" Baguette::Log.error "Bloody closed recipient!" # @relations.remove # else Baguette::Log.error "CLOSING WS" exit 1 end else Baguette::Log.warning "unhandled IPC event: #{event.class}" exit 1 end rescue e Baguette::Log.error "Exception during event handling: #{typeof(e)} - #{e.message}" exit 1 end end end def self.from_cli # First option parsing. simulation, no_configuration, configuration_file = Baguette::Configuration.option_parser # Websocketc configuration. configuration = if no_configuration Baguette::Log.info "do not load a configuration file." Baguette::Configuration::Websocketc.new else # In case there is a configuration file helping with the parameters. Baguette::Configuration::Websocketc.get(configuration_file) || Baguette::Configuration::Websocketc.new end OptionParser.parse do |parser| parser.on "-h", "--help", "Show this help" do puts parser exit 0 end end Baguette::Context.verbosity = configuration.verbosity if simulation pp! configuration exit 0 end ::Websocketc::Service.new configuration end end Websocketc::Service.from_cli.run