diff --git a/src/websocketd.cr b/src/websocketd.cr index 4b67e90..3dac0f7 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -258,14 +258,14 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) wsclient = context.fd_to_websocket[activefd] loop do begin - message = wsclient.read + message = wsclient.run_once rescue e puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" closing_client activefd, context return end - still_something_to_read = wsclient.ws.io.still_something_to_read? + still_something_to_read = ! wsclient.ws.io.empty? if wsclient.closed? # puts "#{CBLUE}client is closed#{CRESET}" @@ -279,29 +279,34 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) return end - if message.is_a?(WebSocket::Ping) + case message + when WebSocket::Error + puts "#{CRED}An error occured#{CRESET}" + closing_client client.fd, context + return + when WebSocket::Ping # puts "#{CBLUE}Received a ping message#{CRESET}" next if still_something_to_read break - end - - if message.is_a?(WebSocket::Pong) + when WebSocket::Pong # puts "#{CBLUE}Received a pong message#{CRESET}" next if still_something_to_read break - end - - if message.is_a?(WebSocket::Close) + when WebSocket::Close # puts "#{CBLUE}Received a close message#{CRESET}" closing_client client.fd, context return + when WebSocket::NotFinal + # puts "#{CBLUE}Received only part of a message#{CRESET}" + next if still_something_to_read + break + when Bytes + # TODO: when receiving a binary message + # we should test the format and maybe its content + # puts "#{CBLUE}Received a binary message#{CRESET}" end - # TODO: when receiving a binary message - # we should test the format and maybe its content - if message.is_a?(Slice(UInt8)) - # puts "#{CBLUE}Received a binary message#{CRESET}" - elsif context.is_json[activefd] + if context.is_json[activefd] && message.is_a?(String) jsonmessage = JSONWSMessage.from_json message message = to_message jsonmessage.mtype, jsonmessage.payload end @@ -315,7 +320,6 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) serv.send message break unless still_something_to_read - puts "#{CRED}STILL SOMETHING TO READ#{CRESET}" end # loop over the remaining messages to read on the websocket else @@ -345,26 +349,30 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) end end +def sending_ping_messages(context : InstanceStorage) + context.fd_to_websocket.each do |fd, ws| + begin + ws.ping "hello from #{fd}" + rescue e + puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" + begin + context.remove_fd fd + rescue e + puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" + end + end + end +end + # Every few seconds, the service should trigger the timer +# Allowing the sending of Ping messages to clients service.base_timer = timer_delay service.loop do |event| case event when IPC::Event::Timer puts "#{CORANGE}IPC::Event::Timer#{CRESET}" - context.fd_to_websocket.each do |fd, ws| - - begin - ws.ping "coucou from #{fd}" - rescue e - puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" - begin - context.remove_fd fd - rescue e - puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" - end - end - end + sending_ping_messages context when IPC::Event::Connection puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}" when IPC::Event::Disconnection diff --git a/src/ws.cr b/src/ws.cr index 89d942e..d75b211 100644 --- a/src/ws.cr +++ b/src/ws.cr @@ -7,37 +7,45 @@ require "http/web_socket/protocol" # in its buffer. Since WebSocket is compatible with all IO instances, then IO # needs this function as well, even if it doesn't really makes sense here. class IO - def still_something_to_read? : Bool - false + def empty? : Bool + true end end -# module IO::Buffered - def still_something_to_read? : Bool - @in_buffer_rem.size > 0 + # :nodoc: + def empty? : Bool + @in_buffer_rem.size == 0 end end -# class Socket -# def still_something_to_read? : Bool -# @in_buffer_rem.size > 0 -# end -# end - class HTTP::WebSocket - record Pong - record Ping - record Close + # Infinite loop over `run_once`. + # Stops when the socket closes or an error occurs. + def run + loop do + ret = run_once + case ret + when Error | Close + break + end + end + end - def read : Slice(UInt8) | String | Close | Ping | Pong | Nil - size = 0 + # `run_once` returns the type of the message which could be a Ping or Pong message, a closing socket, an error (when an exception occurs) or a message that is not yet entirely received. + record Ping + record Pong + record Close + record Error + record NotFinal + + # :nodoc: + def run_once : Bytes | String | Close | Ping | Pong | NotFinal | Error begin info = @ws.receive(@buffer) - # puts "receiving a message size #{info.size}, #{info.final ? "final" : "non final"}" rescue IO::EOFError close - return nil + return Error.new end case info.opcode @@ -48,31 +56,31 @@ class HTTP::WebSocket @on_ping.try &.call(message) pong(message) unless closed? @current_message.clear + return Ping.new end - return Ping.new when Protocol::Opcode::PONG @current_message.write @buffer[0, info.size] if info.final @on_pong.try &.call(@current_message.to_s) @current_message.clear + return Pong.new end - return Pong.new when Protocol::Opcode::TEXT message = @buffer[0, info.size] @current_message.write message if info.final @on_message.try &.call(@current_message.to_s) @current_message.clear + return String.new message end - return String.new message when Protocol::Opcode::BINARY message = @buffer[0, info.size] @current_message.write message if info.final @on_binary.try &.call(@current_message.to_slice) @current_message.clear + return message end - return message when Protocol::Opcode::CLOSE @current_message.write @buffer[0, info.size] if info.final @@ -80,15 +88,16 @@ class HTTP::WebSocket @on_close.try &.call(message) close(message) unless closed? @current_message.clear + return Close.new end - return Close.new end + + return NotFinal.new end def ws @ws end - end class HTTP::WebSocket::Protocol