From 6a871e82674d4f1d59bd12350ac8f07f6defc8f7 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Thu, 16 Jan 2020 17:41:07 +0100 Subject: [PATCH] websocketd: bin msg, conf. timer delay, loop on buffered msg --- src/websocketc.cr | 7 +-- src/websocketd.cr | 136 +++++++++++++++++++++------------------------- src/ws.cr | 34 ++++++++++++ 3 files changed, 97 insertions(+), 80 deletions(-) diff --git a/src/websocketc.cr b/src/websocketc.cr index e6979b3..3ad80d6 100644 --- a/src/websocketc.cr +++ b/src/websocketc.cr @@ -43,15 +43,10 @@ def send_with_announce(ws : WebSocket, m : String) send ws, m end -def send(ws : WebSocket, m : String) +def send(ws : WebSocket, m : String | Slice) ws.send m end -def send(ws : WebSocket, m : Slice) - ws.send m -end - - begin ws = WebSocket.new(URI.parse(uri)) diff --git a/src/websocketd.cr b/src/websocketd.cr index c2a1178..4b67e90 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -7,7 +7,6 @@ require "./ws" require "json" require "socket" -require "http" require "http/server" require "base64" require "digest" @@ -39,6 +38,7 @@ end service_name = "websocket" host = "0.0.0.0" port_to_listen = 1234 +timer_delay = 30.to_i64 OptionParser.parse do |parser| parser.on "-l host", "--l host", "IP address to listen on." do |h| @@ -53,6 +53,10 @@ OptionParser.parse do |parser| service_name = name end + parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t| + timer_delay = t.to_i64 + end + parser.on "-h", "--help", "Show this help" do puts parser exit 0 @@ -159,7 +163,6 @@ def websocket_client_connection(client, context : InstanceStorage) req_service = request.path.lchop if req_service.empty? client.close - puts "should send a PATH" return end @@ -181,7 +184,7 @@ def websocket_client_connection(client, context : InstanceStorage) wsclient = WebSocket.new client wsclient.on_pong do |m| - puts "Received a pong message: #{m}" + puts "pong #{m}" end context.is_client[client.fd] = true @@ -248,102 +251,87 @@ class IPC::Message end def websocket_switching_procedure (activefd : Int, context : InstanceStorage) - # FIXME: debugging purposes begin if context.is_client[activefd] # The client is a WebSocket on top of a TCP connection client = context.fd_to_tcpsocket[activefd] wsclient = context.fd_to_websocket[activefd] - begin - message = wsclient.read + loop do + begin + message = wsclient.read + rescue e + puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" + closing_client activefd, context + return + end - # puts "RECEIVING A MESSAGE from #{activefd}: #{message}" - rescue e - puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" - closing_client activefd, context - return - end + still_something_to_read = wsclient.ws.io.still_something_to_read? - if wsclient.closed? - # puts "#{CBLUE}client is closed#{CRESET}" - closing_client client.fd, context - return - end + if wsclient.closed? + # puts "#{CBLUE}client is closed#{CRESET}" + closing_client client.fd, context + return + end - if message.nil? - puts "#{CRED}message is nil#{CRESET}" - closing_client client.fd, context - return - end + if message.nil? + puts "#{CRED}message is nil#{CRESET}" + closing_client client.fd, context + return + end - if message.is_a?(WebSocket::Ping) - # puts "#{CBLUE}This is a ping message#{CRESET}" - return - end + if message.is_a?(WebSocket::Ping) + # puts "#{CBLUE}Received a ping message#{CRESET}" + next if still_something_to_read + break + end - if message.is_a?(WebSocket::Pong) - # puts "#{CBLUE}This is a pong message#{CRESET}" - return - end + if message.is_a?(WebSocket::Pong) + # puts "#{CBLUE}Received a pong message#{CRESET}" + next if still_something_to_read + break + end - if message.is_a?(WebSocket::Close) - # puts "#{CRED}This is a close message#{CRESET}" - return - end + if message.is_a?(WebSocket::Close) + # puts "#{CBLUE}Received a close message#{CRESET}" + closing_client client.fd, context + return + end - if message.is_a?(Slice(UInt8)) - # puts "#{CRED}This is a binary message: not yet implemented#{CRESET}" - return - end + # TODO: when receiving a binary message + # we should test the format and maybe its content + if message.is_a?(Slice(UInt8)) + # puts "#{CBLUE}Received a binary message#{CRESET}" + elsif context.is_json[activefd] + jsonmessage = JSONWSMessage.from_json message + message = to_message jsonmessage.mtype, jsonmessage.payload + end - # TODO: verify this - if context.is_json[activefd] - jsonmessage = JSONWSMessage.from_json message - message = to_message jsonmessage.mtype, jsonmessage.payload + # client => service + fdservice = context.switchtable[activefd] - # puts "JSON TYPE !!!" - # pp! jsonmessage - # pp! message - end + # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit + serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) + #serv = context.fd_to_ipcconnection[fdservice] + serv.send message - # puts "switching: client to service" - # print_hexa(String.new(message), "#{CBLUE}Received message hexa#{CRESET}") - # client => service - fdservice = context.switchtable[activefd] - - # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit - serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) - #serv = context.fd_to_ipcconnection[fdservice] - serv.send message - # puts "SENT MESSAGE TO SERVER: #{message}" + break unless still_something_to_read + puts "#{CRED}STILL SOMETHING TO READ#{CRESET}" + end # loop over the remaining messages to read on the websocket else - # puts "switching: service to client" # service => client - - # puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}" - fdclient = context.switchtable[activefd] wsclient = context.fd_to_websocket[fdclient] serv = context.fd_to_ipcconnection[activefd] message = serv.read - # puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}: #{message}" - # puts "received message from service: #{message.to_s}" - if context.is_json[fdclient] buf = message.to_json - # puts "JSON TYPE !!!" - # pp! buf else buf = message.to_buffer end - # TODO: REMOVE THIS - # buf = message.to_buffer - # print_hexa String.new(buf), "\033[31m Message to send to the client \033[00m " - wsclient.send buf end rescue e @@ -358,12 +346,12 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) end # Every few seconds, the service should trigger the timer -service.base_timer = 30 +service.base_timer = timer_delay service.loop do |event| case event when IPC::Event::Timer - # puts "#{CORANGE}IPC::Event::Timer#{CRESET}" + puts "#{CORANGE}IPC::Event::Timer#{CRESET}" context.fd_to_websocket.each do |fd, ws| begin @@ -378,11 +366,11 @@ service.loop do |event| end end when IPC::Event::Connection - puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}" + puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}" when IPC::Event::Disconnection - puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}" + puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}" when IPC::Event::ExtraSocket - puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}" + puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}" # 1. accept new websocket clients if server.fd == event.connection.fd diff --git a/src/ws.cr b/src/ws.cr index 67f44f5..89d942e 100644 --- a/src/ws.cr +++ b/src/ws.cr @@ -2,6 +2,29 @@ require "http" require "http/web_socket/protocol" +# The Socket library uses the IO::Buffered module. +# IO::Buffered needs a new function to check if there is still something to read +# in its buffer. Since WebSocket is compatible with all IO instances, then IO +# needs this function as well, even if it doesn't really makes sense here. +class IO + def still_something_to_read? : Bool + false + end +end + +# +module IO::Buffered + def still_something_to_read? : Bool + @in_buffer_rem.size > 0 + end +end + +# class Socket +# def still_something_to_read? : Bool +# @in_buffer_rem.size > 0 +# end +# end + class HTTP::WebSocket record Pong record Ping @@ -11,6 +34,7 @@ class HTTP::WebSocket size = 0 begin info = @ws.receive(@buffer) + # puts "receiving a message size #{info.size}, #{info.final ? "final" : "non final"}" rescue IO::EOFError close return nil @@ -61,6 +85,16 @@ class HTTP::WebSocket end end + def ws + @ws + end + +end + +class HTTP::WebSocket::Protocol + def io + @io + end end