From e41c6878cf52e31e114b6577d902b57a4cece76d Mon Sep 17 00:00:00 2001 From: Karchnu Date: Thu, 5 Nov 2020 17:01:27 +0100 Subject: [PATCH] ws: first message exchange. Still WIP, but we are getting there. --- src/websocketc.cr | 314 +++++++++++++++++++++++----------------------- 1 file changed, 157 insertions(+), 157 deletions(-) diff --git a/src/websocketc.cr b/src/websocketc.cr index 9e4b470..3b4eeae 100644 --- a/src/websocketc.cr +++ b/src/websocketc.cr @@ -24,6 +24,10 @@ module Websocketc end end +class Context + class_property service : Websocketc::Service? +end + class Baguette::Configuration class Websocketc < Base property verbosity : Int32 = 2 @@ -42,6 +46,9 @@ class Relation # ws can send JSON messages instead of libipc-formated messages. property is_json : Bool + property buffer_client : String = String.new + property buffer_service : String = String.new + def related? (fd : Int32) fd == @fd_client || fd == @fd_service end @@ -79,6 +86,16 @@ class Relations < Array(Relation) each do |r| if r.related? fd Baguette::Log.debug "TODO: closing this relation: #{r}" + Baguette::Log.warning "Before removing, here the switching list" + + sw = Context.service.not_nil!.context.switchdb + pointer_ctx = Context.service.not_nil!.pointer + + LibIPC.ipc_switching_print pointerof(sw) + LibIPC.ipc_del_fd pointer_ctx, r.fd_client + LibIPC.ipc_del_fd pointer_ctx, r.fd_service + LibIPC.ipc_switching_del pointerof(sw), r.fd_client + all_fd.select! {|v| v != r.fd_client && v != r.fd_service } end end @@ -88,58 +105,144 @@ end require "./network.cr" +def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*) + Baguette::Log.info "IN fd is #{fd}" + Context.service.not_nil!.relations.search?(fd).try do |relation| + message = nil + begin + message = relation.ws.run_once + rescue e + Baguette::Log.error "run_once FAILED: #{e}" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Error + end + + if relation.ws.ws.io.empty? + more_to_read[0] = 0 + else + more_to_read[0] = 1 + end + + if relation.ws.closed? + Baguette::Log.info "client closed the connection" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Closing + end + + if message.nil? + Baguette::Log.error "Reveiced a nil message" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Closing + end + + case message + when String + if relation.is_json + Baguette::Log.warning "reassembling the message!" + # Reassemble the message. + m = relation.buffer_client + message + pp! relation.buffer_client, message, m + # Clean the buffer. + relation.buffer_client = String.new + + begin + ipc_message = IPC::Message.from_json m + ipc_message.copy_to_message_pointer pm + rescue e + Baguette::Log.error "cannot send message coming from #{fd}" + Baguette::Log.error "message: #{m}" + Context.service.not_nil!.relations.remove fd + Context.service.not_nil!.print_self + Baguette::Log.error "error: #{e}" + return LibIPC::IPCCB::Error + end + + return LibIPC::IPCCB::NoError + end + Baguette::Log.error "cannot handle non-json messages!" + return LibIPC::IPCCB::Error + when WebSocket::Ping + Baguette::Log.debug "TODO: Received a ping message" + return LibIPC::IPCCB::Ignore + + when WebSocket::NotFinal + Baguette::Log.warning "Received only part of a message." + relation.buffer_client += message.message + return LibIPC::IPCCB::Ignore + + else + # every other option should be dropped + case message + when WebSocket::Error + Baguette::Log.error "An error occured" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Error + when WebSocket::Pong + Baguette::Log.error "Received a pong message" + return LibIPC::IPCCB::Ignore + when WebSocket::Close + Baguette::Log.debug "Received a close message" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Closing + when Bytes + # TODO: when receiving a binary message + # we should test the format and maybe its content + Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Error + else + Baguette::Log.error "Received a websocket message with unknown type" + end + end + end + + return LibIPC::IPCCB::Error +rescue e + Baguette::Log.error "Exception (receiving a message) #{e}" + # tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) + # tcp.close + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Error +end + +def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) + Baguette::Log.info "OUT fd is #{fd}" + Context.service.not_nil!.relations.search?(fd).try do |relation| + message = IPC::Message.new pm + Baguette::Log.info "message to send: #{message}" + + if relation.is_json + buf = message.to_json + else + buf = message.to_packet + end + + relation.ws.send buf + return LibIPC::IPCCB::NoError + end + + Baguette::Log.error "Wait, not supposed to get here. No relation?" + return LibIPC::IPCCB::Error + +rescue e + Baguette::Log.error "Exception during message transfer: #{e}" + Context.service.not_nil!.relations.remove fd + return LibIPC::IPCCB::Error +end + + class Websocketc::Service < IPC::Server property relations : Relations = Relations.new property config : Baguette::Configuration::Websocketc def initialize(@config : Baguette::Configuration::Websocketc) super "ws" + Context.service = self end - #def handle_request(event : IPC::Event::MessageReceived) - # # send_now - #end - -# def handle_request(event : IPC::Event::MessageReceived) -# request_start = Time.utc -# -# # TODO: The message can come from an already linked fd. -# -# request = Websocketc.requests.parse_ipc_json event.message -# -# if request.nil? -# raise "unknown request type" -# end -# -# request_name = request.class.name.sub /^Websocketc::Request::/, "" -# Baguette::Log.debug "#{request_name}" -# -# response = begin -# request.handle self, event -# rescue e : Websocketc::Exception -# Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}" -# Websocketc::Response::Error.new "generic error" -# rescue e -# Baguette::Log.error "#{request_name} generic error #{e}" -# Websocketc::Response::Error.new "unknown error" -# end -# -# # If clients sent requests with an “id” field, it is copied -# # in the responses. Allows identifying responses easily. -# response.id = request.id -# -# send event.fd, response -# -# duration = Time.utc - request_start -# -# response_name = response.class.name.sub /^Websocketc::Response::/, "" -# -# if response.is_a? Websocketc::Response::Error -# Baguette::Log.warning "#{response_name} (#{response.reason})" -# else -# Baguette::Log.debug "#{response_name} (Total duration: #{duration})" -# end -# end + def print_self + LibIPC.ipc_ctx_print self.pointer + end def first_connection(event : IPC::Event::MessageReceived) # First message format: "URI" @@ -154,28 +257,26 @@ class Websocketc::Service < IPC::Server is_json = uri.path.ends_with? ".JSON" - pp! ws.ws - # service_fd = ws.ws.io - service_fd = 0 + service_fd = ws.ws.io.as(TCPSocket).fd relation = Relation.new event.fd, service_fd, ws, is_json - pp! relation @relations << relation # Change client fd status. - LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched # Add service fd as switched. LibIPC.ipc_add_fd_switched self.pointer, service_fd + # Link both file descriptors + LibIPC.ipc_ctx_switching_add self.pointer, event.fd, service_fd + # Change service fd callbacks: only the service callbacks are changed, # since the associated client is a simple libipc client proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*) proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message)) - # self.remove_fd event.fd LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out Baguette::Log.debug "new client: #{event.fd}" @@ -188,113 +289,6 @@ class Websocketc::Service < IPC::Server send_now event.fd, 1.to_u8, "OK" end - def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) - Baguette::Log.info "OUT fd is #{fd}" - @relations.search?(fd).try do |relation| - message = IPC::Message.new pm - Baguette::Log.info "message to send: #{message}" - - if relation.is_json - buf = message.to_json - else - buf = message.to_packet - end - - relation.ws.send buf - return LibIPC::IPCCB::NoError - end - - Baguette::Log.error "Wait, not supposed to get here. No relation?" - return LibIPC::IPCCB::Error - - rescue e - Baguette::Log.error "Exception during message transfer: #{e}" - @relations.remove fd - return LibIPC::IPCCB::Error - end - - def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*) - Baguette::Log.info "IN fd is #{fd}" - - @relations.search?(fd).try do |relation| - message = nil - begin - message = relation.ws.run_once - rescue e - Baguette::Log.error "run_once FAILED: #{e}" - @relations.remove fd - return LibIPC::IPCCB::Error - end - - if relation.ws.ws.io.empty? - more_to_read[0] = 0 - else - more_to_read[0] = 1 - end - - if relation.ws.closed? - Baguette::Log.info "client closed the connection" - @relations.remove fd - return LibIPC::IPCCB::Closing - end - - if message.nil? - Baguette::Log.error "Reveiced a nil message" - @relations.remove fd - return LibIPC::IPCCB::Closing - end - - case message - when String - if relation.is_json - ipc_message = IPC::Message.from_json(message) - ipc_message.copy_to_message_pointer pm - return LibIPC::IPCCB::NoError - end - when WebSocket::Ping - Baguette::Log.debug "TODO: Received a ping message" - return LibIPC::IPCCB::Ignore - else - # every other option should be dropped - case message - when WebSocket::Error - Baguette::Log.error "An error occured" - @relations.remove fd - return LibIPC::IPCCB::Error - when WebSocket::Pong - Baguette::Log.error "Received a pong message" - return LibIPC::IPCCB::Ignore - when WebSocket::Close - Baguette::Log.debug "Received a close message" - @relations.remove fd - return LibIPC::IPCCB::Closing - when WebSocket::NotFinal - Baguette::Log.warning "TODO: Received only part of a message." - @relations.remove fd - return LibIPC::IPCCB::Error - when Bytes - # TODO: when receiving a binary message - # we should test the format and maybe its content - Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET" - @relations.remove fd - return LibIPC::IPCCB::Error - else - Baguette::Log.error "Received a websocket message with unknown type" - end - end - end - - return LibIPC::IPCCB::Error - rescue e - Baguette::Log.error "Exception (receiving a message) #{e}" - # tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) - # tcp.close - @relations.remove fd - return LibIPC::IPCCB::Error - end - - - def run Baguette::Log.title "Starting websocketc" @@ -324,13 +318,19 @@ class Websocketc::Service < IPC::Server Baguette::Log.error "relation: #{r}" else first_connection event + Baguette::Log.warning "context currently is" + print_self end + when IPC::Event::Switch + Baguette::Log.debug "switched message from #{event.fd}" + when IPC::Exception Baguette::Log.warning "IPC::Exception: #{event.message}" else Baguette::Log.warning "unhandled IPC event: #{event.class}" + exit 1 end rescue e Baguette::Log.error "exception: #{typeof(e)} - #{e.message}"