Fix upload when messages are fragmented.
This commit is contained in:
parent
9f6f711187
commit
cad5a776cc
@ -85,6 +85,7 @@ class InstanceStorage
|
|||||||
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
|
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
|
||||||
property fd_to_websocket : Hash(Int32, WebSocket)
|
property fd_to_websocket : Hash(Int32, WebSocket)
|
||||||
property fd_to_ipcclient : Hash(Int32, IPC::Client)
|
property fd_to_ipcclient : Hash(Int32, IPC::Client)
|
||||||
|
property fd_to_buffer : Hash(Int32, String)
|
||||||
|
|
||||||
def initialize (@service : IPC::SwitchingService)
|
def initialize (@service : IPC::SwitchingService)
|
||||||
# fdlist_client = [] of TCPSocket
|
# fdlist_client = [] of TCPSocket
|
||||||
@ -94,6 +95,7 @@ class InstanceStorage
|
|||||||
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
|
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
|
||||||
@fd_to_websocket = Hash(Int32, WebSocket).new
|
@fd_to_websocket = Hash(Int32, WebSocket).new
|
||||||
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
|
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
|
||||||
|
@fd_to_buffer = Hash(Int32, String).new
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_fd (fdclient : Int32)
|
def remove_fd (fdclient : Int32)
|
||||||
@ -125,6 +127,14 @@ class InstanceStorage
|
|||||||
fdc != fdclient && fds != fdclient
|
fdc != fdclient && fds != fdclient
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@fd_to_buffer.select! do |fd, buffer|
|
||||||
|
if fdservice.nil?
|
||||||
|
fd != fdclient
|
||||||
|
else
|
||||||
|
fd != fdclient && fd != fdservice
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# 6. removing the client and the service from is_client
|
# 6. removing the client and the service from is_client
|
||||||
@is_client = @is_client.select do |fd,v| fd != fdclient end
|
@is_client = @is_client.select do |fd,v| fd != fdclient end
|
||||||
@is_json = @is_json.select do |fd,v| fd != fdclient end
|
@is_json = @is_json.select do |fd,v| fd != fdclient end
|
||||||
@ -296,10 +306,22 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class FragmentBuffer
|
def print_message(message : String, origin : Int32)
|
||||||
property buffer : String = String.new
|
return unless Context.print_messages
|
||||||
|
|
||||||
|
client = if Context.context.is_client[origin]
|
||||||
|
"client"
|
||||||
|
else
|
||||||
|
"server"
|
||||||
|
end
|
||||||
|
Baguette::Log.info "received message from client #{origin} (#{client}):"
|
||||||
|
pp! JSON.parse message
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# WARNING: currently this is only covering String messages.
|
||||||
|
# Byte messages aren't addressed, yet.
|
||||||
def websocket_switching_procedure (activefd : Int)
|
def websocket_switching_procedure (activefd : Int)
|
||||||
begin
|
begin
|
||||||
# Baguette::Log.title "activefd is #{activefd}"
|
# Baguette::Log.title "activefd is #{activefd}"
|
||||||
@ -309,8 +331,6 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
client = Context.context.fd_to_tcpsocket[activefd]
|
client = Context.context.fd_to_tcpsocket[activefd]
|
||||||
wsclient = Context.context.fd_to_websocket[activefd]
|
wsclient = Context.context.fd_to_websocket[activefd]
|
||||||
|
|
||||||
fb = FragmentBuffer.new
|
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
begin
|
begin
|
||||||
message = wsclient.run_once
|
message = wsclient.run_once
|
||||||
@ -348,6 +368,7 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
next
|
next
|
||||||
end
|
end
|
||||||
return
|
return
|
||||||
|
|
||||||
when WebSocket::Ping
|
when WebSocket::Ping
|
||||||
Baguette::Log.debug "Received a ping message"
|
Baguette::Log.debug "Received a ping message"
|
||||||
if still_something_to_read
|
if still_something_to_read
|
||||||
@ -355,6 +376,7 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
next
|
next
|
||||||
end
|
end
|
||||||
break
|
break
|
||||||
|
|
||||||
when WebSocket::Pong
|
when WebSocket::Pong
|
||||||
Baguette::Log.debug "Received a pong message"
|
Baguette::Log.debug "Received a pong message"
|
||||||
if still_something_to_read
|
if still_something_to_read
|
||||||
@ -362,6 +384,7 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
next
|
next
|
||||||
end
|
end
|
||||||
break
|
break
|
||||||
|
|
||||||
when WebSocket::Close
|
when WebSocket::Close
|
||||||
Baguette::Log.debug "Received a close message"
|
Baguette::Log.debug "Received a close message"
|
||||||
Context.context.remove_fd activefd
|
Context.context.remove_fd activefd
|
||||||
@ -372,12 +395,16 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
break
|
break
|
||||||
|
|
||||||
when WebSocket::NotFinal
|
when WebSocket::NotFinal
|
||||||
Baguette::Log.warning "Received only part of a message"
|
Baguette::Log.debug "Received only part of a message"
|
||||||
# TODO: check if the message is OK when multiplexing
|
# There is a per-user buffer in case we receive "not final" websocket messages.
|
||||||
# pp! message
|
# In case the buffer isn't set yet.
|
||||||
fb.buffer = fb.buffer + message.message
|
unless Context.context.fd_to_buffer[activefd]?
|
||||||
|
Context.context.fd_to_buffer[activefd] = String.new
|
||||||
|
end
|
||||||
|
# Add the received message to the buffer.
|
||||||
|
Context.context.fd_to_buffer[activefd] += message.message
|
||||||
if still_something_to_read
|
if still_something_to_read
|
||||||
Baguette::Log.debug "Still something to read"
|
Baguette::Log.debug "and there is still something to read"
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
break
|
break
|
||||||
@ -390,25 +417,24 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
Baguette::Log.info "Still something to read"
|
Baguette::Log.info "Still something to read"
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
|
break
|
||||||
end
|
end
|
||||||
|
|
||||||
# In case there was a previous messagee within a fragment.
|
# Is there a (non empty) buffer for the active fd?
|
||||||
if fb.buffer.size > 0
|
buffer_size = if Context.context.fd_to_buffer[activefd]?
|
||||||
Baguette::Log.warning "SHOULD reconstitute the message!!"
|
Context.context.fd_to_buffer[activefd].size
|
||||||
|
else
|
||||||
|
0
|
||||||
end
|
end
|
||||||
if message.is_a?(String) && fb.buffer.size > 0
|
|
||||||
Baguette::Log.warning "Reconstitute the message!!"
|
|
||||||
message = fb.buffer + message
|
|
||||||
|
|
||||||
fb.buffer = String.new
|
# In case there was a previous message within a fragment.
|
||||||
|
if message.is_a?(String) && buffer_size > 0
|
||||||
|
message = Context.context.fd_to_buffer[activefd] + message
|
||||||
|
Context.context.fd_to_buffer[activefd] = String.new
|
||||||
end
|
end
|
||||||
|
|
||||||
if Context.context.is_json[activefd] && message.is_a?(String)
|
if Context.context.is_json[activefd] && message.is_a?(String)
|
||||||
if Context.print_messages
|
print_message message, activefd
|
||||||
j = JSON.parse message
|
|
||||||
Baguette::Log.info "received from client #{activefd}"
|
|
||||||
pp! j["payload"]
|
|
||||||
end
|
|
||||||
message = IPC::Message.from_json(message).to_packet
|
message = IPC::Message.from_json(message).to_packet
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -420,7 +446,11 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
# when the object is GC.
|
# when the object is GC.
|
||||||
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
|
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
|
||||||
#serv = Context.context.fd_to_ipcclient[fdservice]
|
#serv = Context.context.fd_to_ipcclient[fdservice]
|
||||||
|
begin
|
||||||
serv.send message
|
serv.send message
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "Sending a message failed: #{e}"
|
||||||
|
end
|
||||||
|
|
||||||
break unless still_something_to_read
|
break unless still_something_to_read
|
||||||
|
|
||||||
@ -435,11 +465,7 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
|
|
||||||
if Context.context.is_json[fdclient]
|
if Context.context.is_json[fdclient]
|
||||||
buf = message.to_json
|
buf = message.to_json
|
||||||
if Context.print_messages
|
print_message buf, activefd
|
||||||
j = JSON.parse buf
|
|
||||||
Baguette::Log.info "received from service #{activefd}"
|
|
||||||
pp! j["payload"]
|
|
||||||
end
|
|
||||||
else
|
else
|
||||||
buf = message.to_packet
|
buf = message.to_packet
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user