diff --git a/src/websocketd.cr b/src/websocketd.cr index 89c63bd..134bd82 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -85,6 +85,7 @@ class InstanceStorage property fd_to_tcpsocket : Hash(Int32, TCPSocket) property fd_to_websocket : Hash(Int32, WebSocket) property fd_to_ipcclient : Hash(Int32, IPC::Client) + property fd_to_buffer : Hash(Int32, String) def initialize (@service : IPC::SwitchingService) # fdlist_client = [] of TCPSocket @@ -94,6 +95,7 @@ class InstanceStorage @fd_to_tcpsocket = Hash(Int32, TCPSocket).new @fd_to_websocket = Hash(Int32, WebSocket).new @fd_to_ipcclient = Hash(Int32, IPC::Client).new + @fd_to_buffer = Hash(Int32, String).new end def remove_fd (fdclient : Int32) @@ -125,6 +127,14 @@ class InstanceStorage fdc != fdclient && fds != fdclient end + @fd_to_buffer.select! do |fd, buffer| + if fdservice.nil? + fd != fdclient + else + fd != fdclient && fd != fdservice + end + end + # 6. removing the client and the service from is_client @is_client = @is_client.select do |fd,v| fd != fdclient end @is_json = @is_json.select do |fd,v| fd != fdclient end @@ -296,10 +306,22 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32 end end -class FragmentBuffer - property buffer : String = String.new +def print_message(message : String, origin : Int32) + return unless Context.print_messages + + client = if Context.context.is_client[origin] + "client" + else + "server" + end + Baguette::Log.info "received message from client #{origin} (#{client}):" + pp! JSON.parse message +rescue e + Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}" end +# WARNING: currently this is only covering String messages. +# Byte messages aren't addressed, yet. def websocket_switching_procedure (activefd : Int) begin # Baguette::Log.title "activefd is #{activefd}" @@ -309,8 +331,6 @@ def websocket_switching_procedure (activefd : Int) client = Context.context.fd_to_tcpsocket[activefd] wsclient = Context.context.fd_to_websocket[activefd] - fb = FragmentBuffer.new - loop do begin message = wsclient.run_once @@ -348,6 +368,7 @@ def websocket_switching_procedure (activefd : Int) next end return + when WebSocket::Ping Baguette::Log.debug "Received a ping message" if still_something_to_read @@ -355,6 +376,7 @@ def websocket_switching_procedure (activefd : Int) next end break + when WebSocket::Pong Baguette::Log.debug "Received a pong message" if still_something_to_read @@ -362,6 +384,7 @@ def websocket_switching_procedure (activefd : Int) next end break + when WebSocket::Close Baguette::Log.debug "Received a close message" Context.context.remove_fd activefd @@ -372,12 +395,16 @@ def websocket_switching_procedure (activefd : Int) break when WebSocket::NotFinal - Baguette::Log.warning "Received only part of a message" - # TODO: check if the message is OK when multiplexing - # pp! message - fb.buffer = fb.buffer + message.message + Baguette::Log.debug "Received only part of a message" + # There is a per-user buffer in case we receive "not final" websocket messages. + # In case the buffer isn't set yet. + unless Context.context.fd_to_buffer[activefd]? + Context.context.fd_to_buffer[activefd] = String.new + end + # Add the received message to the buffer. + Context.context.fd_to_buffer[activefd] += message.message if still_something_to_read - Baguette::Log.debug "Still something to read" + Baguette::Log.debug "and there is still something to read" next end break @@ -390,25 +417,24 @@ def websocket_switching_procedure (activefd : Int) Baguette::Log.info "Still something to read" next end + break end - # In case there was a previous messagee within a fragment. - if fb.buffer.size > 0 - Baguette::Log.warning "SHOULD reconstitute the message!!" + # Is there a (non empty) buffer for the active fd? + buffer_size = if Context.context.fd_to_buffer[activefd]? + Context.context.fd_to_buffer[activefd].size + else + 0 end - if message.is_a?(String) && fb.buffer.size > 0 - Baguette::Log.warning "Reconstitute the message!!" - message = fb.buffer + message - fb.buffer = String.new + # In case there was a previous message within a fragment. + if message.is_a?(String) && buffer_size > 0 + message = Context.context.fd_to_buffer[activefd] + message + Context.context.fd_to_buffer[activefd] = String.new end if Context.context.is_json[activefd] && message.is_a?(String) - if Context.print_messages - j = JSON.parse message - Baguette::Log.info "received from client #{activefd}" - pp! j["payload"] - end + print_message message, activefd message = IPC::Message.from_json(message).to_packet end @@ -420,7 +446,11 @@ def websocket_switching_procedure (activefd : Int) # when the object is GC. serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) #serv = Context.context.fd_to_ipcclient[fdservice] - serv.send message + begin + serv.send message + rescue e + Baguette::Log.error "Sending a message failed: #{e}" + end break unless still_something_to_read @@ -435,11 +465,7 @@ def websocket_switching_procedure (activefd : Int) if Context.context.is_json[fdclient] buf = message.to_json - if Context.print_messages - j = JSON.parse buf - Baguette::Log.info "received from service #{activefd}" - pp! j["payload"] - end + print_message buf, activefd else buf = message.to_packet end