CBOR communications are working.
This commit is contained in:
parent
640284c067
commit
f2edaa5301
@ -191,6 +191,35 @@ def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
|||||||
relation.buffer_client += message.message
|
relation.buffer_client += message.message
|
||||||
return LibIPC::IPCCB::Ignore
|
return LibIPC::IPCCB::Ignore
|
||||||
|
|
||||||
|
when Bytes
|
||||||
|
# TODO: we should test the format and maybe its content
|
||||||
|
Baguette::Log.debug "Received a binary message"
|
||||||
|
unless relation.is_json
|
||||||
|
# Reassemble the message.
|
||||||
|
# m = relation.buffer_client.to_slice + message
|
||||||
|
# # Clean the buffer.
|
||||||
|
# relation.buffer_client = String.new
|
||||||
|
# TODO: FIXME: cannot have a buffer for large messages, yet
|
||||||
|
m = message
|
||||||
|
|
||||||
|
begin
|
||||||
|
ipc_message = IPC::Message.from_cbor m
|
||||||
|
ipc_message.copy_to_message_pointer pm
|
||||||
|
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "cannot send message coming from #{fd}, message: #{m}"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
Baguette::Log.error "error: #{e}"
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
end
|
||||||
|
|
||||||
|
Baguette::Log.debug "no error reassembling the message"
|
||||||
|
return LibIPC::IPCCB::NoError
|
||||||
|
end
|
||||||
|
Baguette::Log.error "message received in bytes, yet it should be JSON"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
|
||||||
else
|
else
|
||||||
# every other option should be dropped
|
# every other option should be dropped
|
||||||
case message
|
case message
|
||||||
@ -205,12 +234,6 @@ def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
|||||||
Baguette::Log.debug "Received a close message"
|
Baguette::Log.debug "Received a close message"
|
||||||
Context.service.not_nil!.relations.remove fd
|
Context.service.not_nil!.relations.remove fd
|
||||||
return LibIPC::IPCCB::Closing
|
return LibIPC::IPCCB::Closing
|
||||||
when Bytes
|
|
||||||
# TODO: when receiving a binary message
|
|
||||||
# we should test the format and maybe its content
|
|
||||||
Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET"
|
|
||||||
Context.service.not_nil!.relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
else
|
else
|
||||||
Baguette::Log.error "Received a websocket message with unknown type"
|
Baguette::Log.error "Received a websocket message with unknown type"
|
||||||
end
|
end
|
||||||
@ -236,7 +259,7 @@ def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
|||||||
if relation.is_json
|
if relation.is_json
|
||||||
buf = message.to_json
|
buf = message.to_json
|
||||||
else
|
else
|
||||||
buf = message.to_packet
|
buf = message.to_cbor
|
||||||
end
|
end
|
||||||
|
|
||||||
relation.ws.send buf
|
relation.ws.send buf
|
||||||
|
@ -76,6 +76,7 @@ OptionParser.parse do |parser|
|
|||||||
|
|
||||||
parser.on "-M", "--print-messages", "Print messages received and sent." do
|
parser.on "-M", "--print-messages", "Print messages received and sent." do
|
||||||
configuration.print_messages = true
|
configuration.print_messages = true
|
||||||
|
Baguette::Log.info "print messages"
|
||||||
end
|
end
|
||||||
|
|
||||||
parser.on "-h", "--help", "Show this help" do
|
parser.on "-h", "--help", "Show this help" do
|
||||||
@ -289,7 +290,6 @@ def websocket_client_connection(client)
|
|||||||
|
|
||||||
# listen to the client's file descriptor
|
# listen to the client's file descriptor
|
||||||
Context.context.service << client.fd
|
Context.context.service << client.fd
|
||||||
# puts "#{CBLUE}new client: #{client.fd}#{CRESET}"
|
|
||||||
|
|
||||||
# registering the client into storing structures to avoid being garbage collected
|
# registering the client into storing structures to avoid being garbage collected
|
||||||
Context.context.fd_to_tcpsocket[client.fd] = client
|
Context.context.fd_to_tcpsocket[client.fd] = client
|
||||||
@ -333,6 +333,18 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def print_message(message : Bytes, origin : Int32)
|
||||||
|
return unless Context.print_messages
|
||||||
|
|
||||||
|
client = if Context.context.is_client[origin]
|
||||||
|
"client"
|
||||||
|
else
|
||||||
|
"server"
|
||||||
|
end
|
||||||
|
Baguette::Log.info "received message from #{origin} (#{client}):"
|
||||||
|
Baguette::Log.info "#{message.hexstring}"
|
||||||
|
end
|
||||||
|
|
||||||
def print_message(message : String, origin : Int32)
|
def print_message(message : String, origin : Int32)
|
||||||
return unless Context.print_messages
|
return unless Context.print_messages
|
||||||
|
|
||||||
@ -341,7 +353,7 @@ def print_message(message : String, origin : Int32)
|
|||||||
else
|
else
|
||||||
"server"
|
"server"
|
||||||
end
|
end
|
||||||
Baguette::Log.info "received message from client #{origin} (#{client}):"
|
Baguette::Log.info "received message from #{origin} (#{client}):"
|
||||||
pp! JSON.parse message
|
pp! JSON.parse message
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}"
|
Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}"
|
||||||
@ -437,14 +449,8 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
break
|
break
|
||||||
|
|
||||||
when Bytes
|
when Bytes
|
||||||
# TODO: when receiving a binary message
|
# We should test the format and maybe its content when receiving a binary message.
|
||||||
# we should test the format and maybe its content
|
Baguette::Log.warning "Received a binary message"
|
||||||
Baguette::Log.warning "Received a binary message (not supported)"
|
|
||||||
if still_something_to_read
|
|
||||||
Baguette::Log.info "Still something to read"
|
|
||||||
next
|
|
||||||
end
|
|
||||||
break
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Is there a (non empty) buffer for the active fd?
|
# Is there a (non empty) buffer for the active fd?
|
||||||
@ -454,6 +460,7 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
0
|
0
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# TODO: make this buffer work with bytes messages.
|
||||||
# In case there was a previous message within a fragment.
|
# In case there was a previous message within a fragment.
|
||||||
if message.is_a?(String) && buffer_size > 0
|
if message.is_a?(String) && buffer_size > 0
|
||||||
message = Context.context.fd_to_buffer[activefd] + message
|
message = Context.context.fd_to_buffer[activefd] + message
|
||||||
@ -463,6 +470,13 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
if Context.context.is_json[activefd] && message.is_a?(String)
|
if Context.context.is_json[activefd] && message.is_a?(String)
|
||||||
print_message message, activefd
|
print_message message, activefd
|
||||||
message = IPC::Message.from_json(message).to_packet
|
message = IPC::Message.from_json(message).to_packet
|
||||||
|
elsif ! Context.context.is_json[activefd] && message.is_a?(Bytes)
|
||||||
|
# not json and bytes
|
||||||
|
print_message message, activefd
|
||||||
|
message = IPC::Message.from_cbor(message).to_packet
|
||||||
|
else
|
||||||
|
Baguette::Log.warning "message is neither JSON and string, or not JSON and bytes."
|
||||||
|
Baguette::Log.warning "message is #{message.class.to_s}"
|
||||||
end
|
end
|
||||||
|
|
||||||
# client => service
|
# client => service
|
||||||
@ -493,8 +507,9 @@ def websocket_switching_procedure (activefd : Int)
|
|||||||
if Context.context.is_json[fdclient]
|
if Context.context.is_json[fdclient]
|
||||||
buf = message.to_json
|
buf = message.to_json
|
||||||
print_message buf, activefd
|
print_message buf, activefd
|
||||||
else
|
else # We assume that non-JSON clients are CBOR clients.
|
||||||
buf = message.to_packet
|
buf = message.to_cbor
|
||||||
|
print_message buf, activefd
|
||||||
end
|
end
|
||||||
|
|
||||||
wsclient.send buf
|
wsclient.send buf
|
||||||
|
Loading…
Reference in New Issue
Block a user