From 59b701c03ebeecc6b2da8f534de6fd7919d63428 Mon Sep 17 00:00:00 2001 From: Karchnu Date: Thu, 5 Nov 2020 04:40:23 +0100 Subject: [PATCH] ws: compilable, still very WIP. --- src/websocketc.cr | 306 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 252 insertions(+), 54 deletions(-) diff --git a/src/websocketc.cr b/src/websocketc.cr index 937435b..9e4b470 100644 --- a/src/websocketc.cr +++ b/src/websocketc.cr @@ -22,11 +22,6 @@ require "./lib_modifications.cr" module Websocketc class Exception < ::Exception end - - class Context - class_property uri = "ws://localhost:1234/pong" - class_property rounds = 1 - end end class Baguette::Configuration @@ -40,64 +35,272 @@ class Baguette::Configuration end end +class Relation + property fd_client : Int32 + property fd_service : Int32 + property ws : WebSocket + # ws can send JSON messages instead of libipc-formated messages. + property is_json : Bool + + def related? (fd : Int32) + fd == @fd_client || fd == @fd_service + end + + def initialize(@fd_client, @fd_service, @ws, @is_json) + end + + def to_s(io) + c = "%4d" % @fd_client + s = "%4d" % @fd_service + j = if @is_json + "JSON" + else + "Not JSON" + end + io << "client #{c} service #{s} #{j}" + end +end + +# Hide the complexity of managing relations. +class Relations < Array(Relation) + property all_fd : Array(Int32) = Array(Int32).new + def <<(relation : Relation) + Baguette::Log.debug "Adding a new relation: #{relation}" + @all_fd << relation.fd_client + @all_fd << relation.fd_service + super relation + end + + def search?(fd : Int32) : Relation? + find {|r| r.fd_client == fd || r.fd_service == fd } + end + + def remove(fd : Int32) + each do |r| + if r.related? fd + Baguette::Log.debug "TODO: closing this relation: #{r}" + all_fd.select! {|v| v != r.fd_client && v != r.fd_service } + end + end + select! {|r| ! r.related? fd } + end +end require "./network.cr" - class Websocketc::Service < IPC::Server - property connections : Hash(Int32, WebSocket) = {} of Int32 => WebSocket - getter all_fd : Array(Int32) = Array(Int32).new - - property config : Baguette::Configuration::Websocketc + property relations : Relations = Relations.new + property config : Baguette::Configuration::Websocketc def initialize(@config : Baguette::Configuration::Websocketc) - super "websocketc" + super "ws" end - def handle_request(event : IPC::Event::MessageReceived) - request_start = Time.utc + #def handle_request(event : IPC::Event::MessageReceived) + # # send_now + #end - # TODO: The message can come from an already linked fd. +# def handle_request(event : IPC::Event::MessageReceived) +# request_start = Time.utc +# +# # TODO: The message can come from an already linked fd. +# +# request = Websocketc.requests.parse_ipc_json event.message +# +# if request.nil? +# raise "unknown request type" +# end +# +# request_name = request.class.name.sub /^Websocketc::Request::/, "" +# Baguette::Log.debug "#{request_name}" +# +# response = begin +# request.handle self, event +# rescue e : Websocketc::Exception +# Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}" +# Websocketc::Response::Error.new "generic error" +# rescue e +# Baguette::Log.error "#{request_name} generic error #{e}" +# Websocketc::Response::Error.new "unknown error" +# end +# +# # If clients sent requests with an “id” field, it is copied +# # in the responses. Allows identifying responses easily. +# response.id = request.id +# +# send event.fd, response +# +# duration = Time.utc - request_start +# +# response_name = response.class.name.sub /^Websocketc::Response::/, "" +# +# if response.is_a? Websocketc::Response::Error +# Baguette::Log.warning "#{response_name} (#{response.reason})" +# else +# Baguette::Log.debug "#{response_name} (Total duration: #{duration})" +# end +# end - request = Websocketc.requests.parse_ipc_json event.message + def first_connection(event : IPC::Event::MessageReceived) + # First message format: "URI" + payload = String.new event.message.payload + Baguette::Log.info "First message received: #{payload}" - if request.nil? - raise "unknown request type" - end + # TODO: handle exceptions and errors - request_name = request.class.name.sub /^Websocketc::Request::/, "" - Baguette::Log.debug "<< #{request_name}" + begin + uri = URI.parse payload + ws = WebSocket.new uri + + is_json = uri.path.ends_with? ".JSON" + + pp! ws.ws + # service_fd = ws.ws.io + service_fd = 0 + + relation = Relation.new event.fd, service_fd, ws, is_json + pp! relation + @relations << relation + + # Change client fd status. + + LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched + + # Add service fd as switched. + LibIPC.ipc_add_fd_switched self.pointer, service_fd + + # Change service fd callbacks: only the service callbacks are changed, + # since the associated client is a simple libipc client + + proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*) + proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message)) + + # self.remove_fd event.fd + LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out + + Baguette::Log.debug "new client: #{event.fd}" - response = begin - request.handle self, event - rescue e : Websocketc::Exception - Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}" - Websocketc::Response::Error.new "generic error" rescue e - Baguette::Log.error "#{request_name} generic error #{e}" - Websocketc::Response::Error.new "unknown error" + Baguette::Log.error "cannot connect to #{payload}: #{e}" end - # If clients sent requests with an “id” field, it is copied - # in the responses. Allows identifying responses easily. - response.id = request.id - - send event.fd, response - - duration = Time.utc - request_start - - response_name = response.class.name.sub /^Websocketc::Response::/, "" - - if response.is_a? Websocketc::Response::Error - Baguette::Log.warning ">> #{response_name} (#{response.reason})" - else - Baguette::Log.debug ">> #{response_name} (Total duration: #{duration})" - end + Baguette::Log.info "Let's say it's OK" + send_now event.fd, 1.to_u8, "OK" end + def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) + Baguette::Log.info "OUT fd is #{fd}" + @relations.search?(fd).try do |relation| + message = IPC::Message.new pm + Baguette::Log.info "message to send: #{message}" + + if relation.is_json + buf = message.to_json + else + buf = message.to_packet + end + + relation.ws.send buf + return LibIPC::IPCCB::NoError + end + + Baguette::Log.error "Wait, not supposed to get here. No relation?" + return LibIPC::IPCCB::Error + + rescue e + Baguette::Log.error "Exception during message transfer: #{e}" + @relations.remove fd + return LibIPC::IPCCB::Error + end + + def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*) + Baguette::Log.info "IN fd is #{fd}" + + @relations.search?(fd).try do |relation| + message = nil + begin + message = relation.ws.run_once + rescue e + Baguette::Log.error "run_once FAILED: #{e}" + @relations.remove fd + return LibIPC::IPCCB::Error + end + + if relation.ws.ws.io.empty? + more_to_read[0] = 0 + else + more_to_read[0] = 1 + end + + if relation.ws.closed? + Baguette::Log.info "client closed the connection" + @relations.remove fd + return LibIPC::IPCCB::Closing + end + + if message.nil? + Baguette::Log.error "Reveiced a nil message" + @relations.remove fd + return LibIPC::IPCCB::Closing + end + + case message + when String + if relation.is_json + ipc_message = IPC::Message.from_json(message) + ipc_message.copy_to_message_pointer pm + return LibIPC::IPCCB::NoError + end + when WebSocket::Ping + Baguette::Log.debug "TODO: Received a ping message" + return LibIPC::IPCCB::Ignore + else + # every other option should be dropped + case message + when WebSocket::Error + Baguette::Log.error "An error occured" + @relations.remove fd + return LibIPC::IPCCB::Error + when WebSocket::Pong + Baguette::Log.error "Received a pong message" + return LibIPC::IPCCB::Ignore + when WebSocket::Close + Baguette::Log.debug "Received a close message" + @relations.remove fd + return LibIPC::IPCCB::Closing + when WebSocket::NotFinal + Baguette::Log.warning "TODO: Received only part of a message." + @relations.remove fd + return LibIPC::IPCCB::Error + when Bytes + # TODO: when receiving a binary message + # we should test the format and maybe its content + Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET" + @relations.remove fd + return LibIPC::IPCCB::Error + else + Baguette::Log.error "Received a websocket message with unknown type" + end + end + end + + return LibIPC::IPCCB::Error + rescue e + Baguette::Log.error "Exception (receiving a message) #{e}" + # tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) + # tcp.close + @relations.remove fd + return LibIPC::IPCCB::Error + end + + + def run Baguette::Log.title "Starting websocketc" + @timer = @config.ipc_timer + @base_timer = @config.ipc_timer + self.loop do |event| begin case event @@ -106,19 +309,22 @@ class Websocketc::Service < IPC::Server when IPC::Event::Connection Baguette::Log.debug "connection from #{event.fd}" - @all_fd << event.fd when IPC::Event::Disconnection Baguette::Log.debug "disconnection from #{event.fd}" - @all_fd.select! &.!=(event.fd) - connections.delete event.fd + @relations.remove event.fd when IPC::Event::MessageSent Baguette::Log.debug "message sent to #{event.fd}" when IPC::Event::MessageReceived Baguette::Log.debug "message received from #{event.fd}" - handle_request event + if r = @relations.search? event.fd + Baguette::Log.error "MessageReceived but from an already existent relation" + Baguette::Log.error "relation: #{r}" + else + first_connection event + end when IPC::Exception Baguette::Log.warning "IPC::Exception: #{event.message}" @@ -149,14 +355,6 @@ class Websocketc::Service < IPC::Server OptionParser.parse do |parser| - parser.on "-u uri", "--uri uri", "URI" do |opturi| - Websocketc::Context.uri = opturi - end - - parser.on "-r rounds", "--rounds nb-messages", "Nb messages to send." do |opt| - Websocketc::Context.rounds = opt.to_i - end - parser.on "-h", "--help", "Show this help" do puts parser exit 0