Compare commits

..

14 Commits
dev ... master

6 changed files with 761 additions and 124 deletions

View File

@ -22,6 +22,8 @@ targets:
main: src/websocketd-dev.cr
websocketc:
main: src/websocketc.cr
oldwsc:
main: src/old-websocketc.cr
# tests
test-ws-pong:

31
src/network.cr Normal file
View File

@ -0,0 +1,31 @@
require "ipc"
require "json"
require "ipc/json"
class IPC::JSON
def handle(service : IPC::Server, event : IPC::Event::Events)
raise "unimplemented"
end
end
module Websocketc
class_getter requests = [] of IPC::JSON.class
class_getter responses = [] of IPC::JSON.class
end
class Websocketc::Response
IPC::JSON.message Error, 0 do
property reason : String | Array(String)
def initialize(@reason)
end
end
IPC::JSON.message Success, 1 do
def initialize
end
end
end
require "./requests/*"

83
src/old-websocketc.cr Normal file
View File

@ -0,0 +1,83 @@
require "http/web_socket"
require "option_parser"
require "ipc"
require "./colors"
require "./utils"
require "./lib_modifications.cr"
class CLI
class_property uri = "ws://localhost:1234/pong"
class_property rounds = 1
end
OptionParser.parse do |parser|
parser.on "-u uri", "--uri uri", "URI" do |opturi|
CLI.uri = opturi
end
parser.on "-r rounds", "--rounds nb-messages", "Nb messages to send." do |opt|
CLI.rounds = opt.to_i
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
def read_then_print(ws : WebSocket)
m = read ws
puts "message: #{String.new(m)}"
end
def read_then_print_hexa(ws : WebSocket)
m = read ws
print_hexa(String.new(m), "#{CBLUE}Received message hexa#{CRESET}")
end
def read(ws : WebSocket)
puts "reading a message"
m = ws.run_once
if m.nil?
raise "empty message"
end
m
end
def send_with_announce(ws : WebSocket, m : String)
puts "sending #{m}"
send ws, m
end
def send(ws : WebSocket, m : String | Slice)
ws.send m
end
begin
ws = WebSocket.new(URI.parse(CLI.uri))
puts "connection done: sending pong"
ws.on_close do |socket|
puts "socket is closing"
exit 0
end
message = if CLI.uri.ends_with? ".JSON"
IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_json
else
IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_packet
end
puts "final message: #{message}"
CLI.rounds.times do |i|
send ws, message
pp! read ws
end
ws.close
# pp! ws.run_once
rescue e
puts "Exception: #{e}"
end

81
src/requests/admin.cr Normal file
View File

@ -0,0 +1,81 @@
class Websocketc::Request
IPC::JSON.message Test, 2 do
def initialize
end
# def read_then_print(ws : WebSocket)
# m = read ws
# puts "message: #{String.new(m)}"
# end
#
# def read_then_print_hexa(ws : WebSocket)
# m = read ws
# print_hexa(String.new(m), "#{CBLUE}Received message hexa#{CRESET}")
# end
#
# def read(ws : WebSocket)
# puts "reading a message"
# m = ws.run_once
# if m.nil?
# raise "empty message"
# end
#
# m
# end
#
# def send_with_announce(ws : WebSocket, m : String)
# puts "sending #{m}"
# send ws, m
# end
#
# def send(ws : WebSocket, m : String | Slice)
# ws.send m
# end
def handle(websocketc : Websocketc::Service, event : IPC::Event::Events)
Baguette::Log.warning "within the test class"
# ws = WebSocket.new(URI.parse(Websocketc::Context.uri))
# puts "connection done: sending pong"
#
# ws.on_close do |socket|
# puts "socket is closing"
# exit 0
# end
#
# message = if Websocketc::Context.uri.ends_with? ".JSON"
# IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_json
# else
# IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_packet
# end
#
# puts "final message: #{message}"
# Websocketc::Context.rounds.times do |i|
# send ws, message
# pp! read ws
# end
#
# ws.close
## pp! ws.run_once
# At the end of this function, we should get something like this.
# connections[event.fd] = ws
Response::Success.new
rescue e
Baguette::Log.error "during ws init #{e}"
Response::Error.new "not implemented"
end
end
Websocketc.requests << Test
end
class Websocketc::Response
#IPC::JSON.message TestResponse, 2 do
# def initialize
# end
#end
end

View File

@ -2,82 +2,447 @@ require "http/web_socket"
require "option_parser"
require "ipc"
require "./colors"
require "baguette-crystal-base"
require "./utils"
require "./lib_modifications.cr"
class CLI
class_property uri = "ws://localhost:1234/pong"
class_property rounds = 1
end
OptionParser.parse do |parser|
parser.on "-u uri", "--uri uri", "URI" do |opturi|
CLI.uri = opturi
end
# Websocketc
# - is the application allowing libipc communications through Web Socket
# - is designed to connect to a Websocketd server, ask for a service, then exchange data
# - is WIP
parser.on "-r rounds", "--rounds nb-messages", "Nb messages to send." do |opt|
CLI.rounds = opt.to_i
end
# Currently, the exchange format is JSON, encapsulated in JSON:
# type: Int32 # type of the message
# payload: JSON::Any? # encapsulated payload
#
# Websocketd deserialize the type and the payload before sending it to the service.
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
module Websocketc
class Exception < ::Exception
end
end
def read_then_print(ws : WebSocket)
m = read ws
puts "message: #{String.new(m)}"
class Context
class_property service : Websocketc::Service?
end
def read_then_print_hexa(ws : WebSocket)
m = read ws
print_hexa(String.new(m), "#{CBLUE}Received message hexa#{CRESET}")
class Baguette::Configuration
class Websocketc < IPC
end
end
def read(ws : WebSocket)
puts "reading a message"
m = ws.run_once
if m.nil?
raise "empty message"
class Relation
property fd_client : Int32
property fd_service : Int32
property ws : WebSocket
# ws can send JSON messages instead of libipc-formated messages.
property is_json : Bool
property buffer_client : String = String.new
property buffer_service : String = String.new
def related? (fd : Int32)
fd == @fd_client || fd == @fd_service
end
m
end
def send_with_announce(ws : WebSocket, m : String)
puts "sending #{m}"
send ws, m
end
def send(ws : WebSocket, m : String | Slice)
ws.send m
end
begin
ws = WebSocket.new(URI.parse(CLI.uri))
puts "connection done: sending pong"
ws.on_close do |socket|
puts "socket is closing"
exit 0
def initialize(@fd_client, @fd_service, @ws, @is_json)
end
message = if CLI.uri.ends_with? ".JSON"
IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_json
else
IPC::Message.new(0, 2.to_u8, 3.to_u8, STDIN.gets_to_end).to_packet
def inspect(io) : Nil
to_s io
end
puts "final message: #{message}"
CLI.rounds.times do |i|
send ws, message
pp! read ws
def to_s(io)
c = "%4d" % @fd_client
s = "%4d" % @fd_service
j = if @is_json
"JSON"
else
"Not JSON"
end
io << "client #{c} service #{s} #{j}"
end
end
# Hide the complexity of managing relations.
class Relations < Array(Relation)
property all_fd : Array(Int32) = Array(Int32).new
def <<(relation : Relation)
Baguette::Log.debug "Adding a new relation: #{relation}"
@all_fd << relation.fd_client
@all_fd << relation.fd_service
super relation
end
ws.close
# pp! ws.run_once
def search?(fd : Int32) : Relation?
find {|r| r.fd_client == fd || r.fd_service == fd }
end
def remove(fd : Int32)
# Baguette::Log.warning "context before removing #{fd}:"
# Context.service.not_nil!.print_self
each do |r|
if r.related? fd
Baguette::Log.debug "Closing this relation: #{r}"
# Removing relations and file descriptors from C structures.
pointer_ctx = Context.service.not_nil!.pointer
LibIPC.ipc_ctx_switching_del pointer_ctx, r.fd_client
LibIPC.ipc_del_fd pointer_ctx, r.fd_client
LibIPC.ipc_del_fd pointer_ctx, r.fd_service
# Close these sockets.
# Baguette::Log.debug "Closing both #{r.fd_client} and #{r.fd_service} (ws)"
begin
s = Socket.new r.fd_client, Socket::Family::UNIX, Socket::Type::RAW
s.close
rescue e
Baguette::Log.error "(ignoring) closing the client socket: #{e}"
end
begin
r.ws.close
rescue e
Baguette::Log.error "(ignoring) closing the ws: #{e}"
end
# Trying not to close this socket (ws already do that).
#begin
# s = Socket.new r.fd_service, Socket::Family::INET, Socket::Type::STREAM
# s.close
#rescue e
# Baguette::Log.error "(ignoring) closing the service socket: #{e}"
#end
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
end
end
select! {|r| ! r.related? fd }
# Baguette::Log.warning "context after removing #{fd}"
# Context.service.not_nil!.print_self
end
end
require "./network.cr"
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
Context.service.not_nil!.relations.search?(fd).try do |relation|
# Baguette::Log.info "IN fd is #{fd} in relation #{relation}"
# Context.service.not_nil!.print_self
message = nil
begin
message = relation.ws.run_once
rescue e
Baguette::Log.error "run_once FAILED: #{e}"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Closing
end
if relation.ws.ws.io.empty?
more_to_read[0] = 0
else
more_to_read[0] = 1
end
if relation.ws.closed?
Baguette::Log.info "client closed the connection"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Closing
end
if message.nil?
Baguette::Log.error "Reveiced a nil message"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Closing
end
case message
when String
if relation.is_json
# Reassemble the message.
m = relation.buffer_client + message
# Clean the buffer.
relation.buffer_client = String.new
begin
ipc_message = IPC::Message.from_json m
ipc_message.copy_to_message_pointer pm
rescue e
Baguette::Log.error "cannot send message coming from #{fd}, message: #{m}"
Context.service.not_nil!.relations.remove fd
Baguette::Log.error "error: #{e}"
return LibIPC::IPCCB::Error
end
Baguette::Log.debug "no error reassembling the message"
return LibIPC::IPCCB::NoError
end
Baguette::Log.error "cannot handle non-json messages!"
return LibIPC::IPCCB::Error
when WebSocket::Ping
Baguette::Log.debug "TODO: Received a ping message"
return LibIPC::IPCCB::Ignore
when WebSocket::NotFinal
Baguette::Log.debug "Received only part of a message"
relation.buffer_client += message.message
return LibIPC::IPCCB::Ignore
when Bytes
# TODO: we should test the format and maybe its content
Baguette::Log.debug "Received a binary message"
unless relation.is_json
# Reassemble the message.
# m = relation.buffer_client.to_slice + message
# # Clean the buffer.
# relation.buffer_client = String.new
# TODO: FIXME: cannot have a buffer for large messages, yet
m = message
begin
ipc_message = IPC::Message.from_cbor m
ipc_message.copy_to_message_pointer pm
rescue e
Baguette::Log.error "cannot send message coming from #{fd}, message: #{m}"
Context.service.not_nil!.relations.remove fd
Baguette::Log.error "error: #{e}"
return LibIPC::IPCCB::Error
end
Baguette::Log.debug "no error reassembling the message"
return LibIPC::IPCCB::NoError
end
Baguette::Log.error "message received in bytes, yet it should be JSON"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Error
else
# every other option should be dropped
case message
when WebSocket::Error
Baguette::Log.error "An error occured"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Error
when WebSocket::Pong
Baguette::Log.error "Received a pong message"
return LibIPC::IPCCB::Ignore
when WebSocket::Close
Baguette::Log.debug "Received a close message"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Closing
else
Baguette::Log.error "Received a websocket message with unknown type"
end
end
end
return LibIPC::IPCCB::Error
rescue e
puts "Exception: #{e}"
Baguette::Log.error "Exception (receiving a message) #{e}"
# tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET)
# tcp.close
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Error
end
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
Context.service.not_nil!.relations.search?(fd).try do |relation|
# Baguette::Log.info "OUT fd is #{fd} in relation #{relation}"
# Context.service.not_nil!.print_self
message = IPC::Message.new pm
# Baguette::Log.info "message to send: #{message}"
if relation.is_json
buf = message.to_json
else
buf = message.to_cbor
end
relation.ws.send buf
return LibIPC::IPCCB::NoError
end
Baguette::Log.error "Wait, not supposed to get here. No relation?"
return LibIPC::IPCCB::Error
rescue e
Baguette::Log.error "Exception during message transfer: #{e}"
Context.service.not_nil!.relations.remove fd
return LibIPC::IPCCB::Error
end
class Websocketc::Service < IPC::Server
property relations : Relations = Relations.new
property config : Baguette::Configuration::Websocketc
def initialize(@config : Baguette::Configuration::Websocketc)
super "ws"
Context.service = self
end
def print_self
Baguette::Log.warning "From C perspective"
LibIPC.ipc_ctx_print self.pointer
Baguette::Log.warning "From Crystal perspective"
Baguette::Log.warning "all fd: #{@relations.all_fd.join ", "}"
@relations.each do |r| pp! r end
Baguette::Log.warning "==="
end
def first_connection(event : IPC::Event::MessageReceived)
# First message format: "URI"
payload = String.new event.message.payload
# Baguette::Log.info "First message received: #{payload}"
# TODO: handle exceptions and errors
begin
uri = URI.parse payload
ws = WebSocket.new uri
is_json = uri.path.ends_with? ".JSON"
service_fd = ws.ws.io.as(TCPSocket).fd
relation = Relation.new event.fd, service_fd, ws, is_json
@relations << relation
# Change client fd status.
LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched
# Add service fd as switched.
LibIPC.ipc_add_fd_switched self.pointer, service_fd
# Link both file descriptors
LibIPC.ipc_ctx_switching_add self.pointer, event.fd, service_fd
# Change service fd callbacks: only the service callbacks are changed,
# since the associated client is a simple libipc client
proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*)
proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message))
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
# Baguette::Log.debug "new client: #{event.fd}"
rescue e
Baguette::Log.error "cannot connect to #{payload}: #{e}"
end
send_now event.fd, 1.to_u8, "OK"
end
def run
Baguette::Log.title "Starting websocketc"
@timer = @config.ipc_timer
@base_timer = @config.ipc_timer
self.loop do |event|
begin
case event
when IPC::Event::Timer
Baguette::Log.debug "Timer" if @config.print_ipc_timer
# print_self
when IPC::Event::Connection
Baguette::Log.info "connection from #{event.fd}" if @config.print_ipc_connection
when IPC::Event::Disconnection
Baguette::Log.info "disconnection from #{event.fd}" if @config.print_ipc_disconnection
@relations.remove event.fd
when IPC::Event::MessageSent
Baguette::Log.info "message sent to #{event.fd}" if @config.print_ipc_message_sent
when IPC::Event::MessageReceived
Baguette::Log.info "message received from #{event.fd}" if @config.print_ipc_message_received
if r = @relations.search? event.fd
Baguette::Log.error "MessageReceived but from an already existent relation"
Baguette::Log.error "relation: #{r}"
exit 1
else
first_connection event
end
when IPC::Event::Switch
Baguette::Log.debug "switched message from #{event.fd}" if @config.print_ipc_switch
when IPC::Event::EventNotSet
Baguette::Log.error "Event not set: #{event.fd}"
@relations.remove event.fd
begin
s = Socket.new event.fd, Socket::Family::UNIX, Socket::Type::RAW
s.close
rescue e
Baguette::Log.warning "cannot close the socket #{event.fd}: #{e} (ignoring)"
end
when IPC::Event::Error
Baguette::Log.error "Event Error on fd #{event.fd} (removing it)"
@relations.remove event.fd
when IPC::Exception
Baguette::Log.error "IPC::Exception: #{event.message}"
if event.message == "closed recipient"
Baguette::Log.error "Bloody closed recipient!"
# @relations.remove #
else
Baguette::Log.error "CLOSING WS"
exit 1
end
else
Baguette::Log.warning "unhandled IPC event: #{event.class}"
exit 1
end
rescue e
Baguette::Log.error "Exception during event handling: #{typeof(e)} - #{e.message}"
exit 1
end
end
end
def self.from_cli
# First option parsing.
simulation, no_configuration, configuration_file = Baguette::Configuration.option_parser
# Websocketc configuration.
configuration = if no_configuration
Baguette::Log.info "do not load a configuration file."
Baguette::Configuration::Websocketc.new
else
# In case there is a configuration file helping with the parameters.
Baguette::Configuration::Websocketc.get(configuration_file) ||
Baguette::Configuration::Websocketc.new
end
OptionParser.parse do |parser|
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
Baguette::Context.verbosity = configuration.verbosity
if simulation
pp! configuration
exit 0
end
::Websocketc::Service.new configuration
end
end
Websocketc::Service.from_cli.run

View File

@ -1,5 +1,6 @@
require "option_parser"
require "ipc"
require "ipc/json"
require "socket"
require "./colors"
@ -18,45 +19,64 @@ require "colorize"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class Context
# service instance parameters
# they can be changed via the cli
class_property service_name = "websocket"
class_property host = "0.0.0.0"
class_property port_to_listen : UInt16 = 1234
class_property timer_delay : Int32 = 30_000.to_i32
class Tracking::Request
IPC::JSON.message IpAddress, 1 do
property ipaddress : String?
class_property print_messages = false
class_property print_timer = false
def initialize(@ipaddress = nil)
end
end
end
class Baguette::Configuration
class Websocket < IPC
# service instance parameters
# they can be changed via the cli
property service_name : String = "websocket"
property host : String = "0.0.0.0"
property port : UInt16 = 1234
property print_messages : Bool = false
end
end
simulation, no_configuration, configuration_file = Baguette::Configuration.option_parser
configuration = if no_configuration
Baguette::Log.info "do not load a configuration file."
Baguette::Configuration::Websocket.new
else
Baguette::Configuration::Websocket.get(configuration_file) ||
Baguette::Configuration::Websocket.new
end
Baguette::Context.verbosity = configuration.verbosity
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
Context.host = h
configuration.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
Context.port_to_listen = port.to_u16
configuration.port = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
Context.service_name = name
configuration.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
Context.timer_delay = t.to_i32 * 1000
end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
Baguette::Context.verbosity = opt.to_i
configuration.ipc_timer = t.to_i32 * 1000
end
parser.on "-T", "--print-timer", "Print timer." do
Context.print_timer = true
configuration.print_ipc_timer = true
end
parser.on "-M", "--print-messages", "Print messages received and sent." do
Context.print_messages = true
configuration.print_messages = true
Baguette::Log.info "print messages"
end
parser.on "-h", "--help", "Show this help" do
@ -75,6 +95,7 @@ class InstanceStorage
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
property fd_to_websocket : Hash(Int32, WebSocket)
property fd_to_ipcclient : Hash(Int32, IPC::Client)
property fd_to_buffer : Hash(Int32, String)
def initialize (@service : IPC::SwitchingService)
# fdlist_client = [] of TCPSocket
@ -84,6 +105,7 @@ class InstanceStorage
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
@fd_to_websocket = Hash(Int32, WebSocket).new
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
@fd_to_buffer = Hash(Int32, String).new
end
def remove_fd (fdclient : Int32)
@ -115,6 +137,14 @@ class InstanceStorage
fdc != fdclient && fds != fdclient
end
@fd_to_buffer.select! do |fd, buffer|
if fdservice.nil?
fd != fdclient
else
fd != fdclient && fd != fdservice
end
end
# 6. removing the client and the service from is_client
@is_client = @is_client.select do |fd,v| fd != fdclient end
@is_json = @is_json.select do |fd,v| fd != fdclient end
@ -149,12 +179,30 @@ class InstanceStorage
end
class Context
class_property service = IPC::SwitchingService.new service_name
class_property context = InstanceStorage.new service
class_property service_name = "websocketd"
end
Context.service_name = configuration.service_name
class Context
class_property service = IPC::SwitchingService.new service_name
class_property context = InstanceStorage.new service
class_property print_messages = false
class_property print_timer = false
end
Context.print_messages = configuration.print_messages
Context.print_timer = configuration.print_ipc_timer
if simulation
pp! configuration
exit 0
end
# by default, listen on any IP address
server = TCPServer.new(Context.host, Context.port_to_listen)
server = TCPServer.new configuration.host, configuration.port
Context.service << server.fd
def websocket_client_connection(client)
@ -221,15 +269,12 @@ def websocket_client_connection(client)
sfd = Context.context.switchtable[client.fd]
Baguette::Log.info "trackingd - sending the IP address #{real_ip_address} to fd #{sfd}"
# message = IPC::Message.from_json(JSON).to_packet
# => JSON has to include these attributes: mtype, utype, payload
# message = IPC::Message.new mtype, utype, payload
message = IPC::Message.new sfd, 1, 1.to_u8, "{\"ipaddress\": \"#{real_ip_address}\"}"
message = Tracking::Request::IpAddress.new real_ip_address
if Context.print_messages
Baguette::Log.info "to trackingd: #{message.to_s}"
end
serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET)
serv.send message.to_packet
Context.service.send_now sfd, message
end
if Context.print_messages
@ -245,7 +290,6 @@ def websocket_client_connection(client)
# listen to the client's file descriptor
Context.context.service << client.fd
# puts "#{CBLUE}new client: #{client.fd}#{CRESET}"
# registering the client into storing structures to avoid being garbage collected
Context.context.fd_to_tcpsocket[client.fd] = client
@ -289,10 +333,34 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32
end
end
class FragmentBuffer
property buffer : String = String.new
def print_message(message : Bytes, origin : Int32)
return unless Context.print_messages
client = if Context.context.is_client[origin]
"client"
else
"server"
end
Baguette::Log.info "received message from #{origin} (#{client}):"
Baguette::Log.info "#{message.hexstring}"
end
def print_message(message : String, origin : Int32)
return unless Context.print_messages
client = if Context.context.is_client[origin]
"client"
else
"server"
end
Baguette::Log.info "received message from #{origin} (#{client}):"
pp! JSON.parse message
rescue e
Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}"
end
# WARNING: currently this is only covering String messages.
# Byte messages aren't addressed, yet.
def websocket_switching_procedure (activefd : Int)
begin
# Baguette::Log.title "activefd is #{activefd}"
@ -302,8 +370,6 @@ def websocket_switching_procedure (activefd : Int)
client = Context.context.fd_to_tcpsocket[activefd]
wsclient = Context.context.fd_to_websocket[activefd]
fb = FragmentBuffer.new
loop do
begin
message = wsclient.run_once
@ -341,6 +407,7 @@ def websocket_switching_procedure (activefd : Int)
next
end
return
when WebSocket::Ping
Baguette::Log.debug "Received a ping message"
if still_something_to_read
@ -348,6 +415,7 @@ def websocket_switching_procedure (activefd : Int)
next
end
break
when WebSocket::Pong
Baguette::Log.debug "Received a pong message"
if still_something_to_read
@ -355,6 +423,7 @@ def websocket_switching_procedure (activefd : Int)
next
end
break
when WebSocket::Close
Baguette::Log.debug "Received a close message"
Context.context.remove_fd activefd
@ -365,44 +434,49 @@ def websocket_switching_procedure (activefd : Int)
break
when WebSocket::NotFinal
Baguette::Log.warning "Received only part of a message"
# TODO: check if the message is OK when multiplexing
# pp! message
fb.buffer = fb.buffer + message.message
Baguette::Log.debug "Received only part of a message"
# There is a per-user buffer in case we receive "not final" websocket messages.
# In case the buffer isn't set yet.
unless Context.context.fd_to_buffer[activefd]?
Context.context.fd_to_buffer[activefd] = String.new
end
# Add the received message to the buffer.
Context.context.fd_to_buffer[activefd] += message.message
if still_something_to_read
Baguette::Log.debug "Still something to read"
Baguette::Log.debug "and there is still something to read"
next
end
break
when Bytes
# TODO: when receiving a binary message
# we should test the format and maybe its content
Baguette::Log.warning "Received a binary message (not supported)"
if still_something_to_read
Baguette::Log.info "Still something to read"
next
end
# We should test the format and maybe its content when receiving a binary message.
Baguette::Log.warning "Received a binary message"
end
# In case there was a previous messagee within a fragment.
if fb.buffer.size > 0
Baguette::Log.warning "SHOULD reconstitute the message!!"
# Is there a (non empty) buffer for the active fd?
buffer_size = if Context.context.fd_to_buffer[activefd]?
Context.context.fd_to_buffer[activefd].size
else
0
end
if message.is_a?(String) && fb.buffer.size > 0
Baguette::Log.warning "Reconstitute the message!!"
message = fb.buffer + message
fb.buffer = String.new
# TODO: make this buffer work with bytes messages.
# In case there was a previous message within a fragment.
if message.is_a?(String) && buffer_size > 0
message = Context.context.fd_to_buffer[activefd] + message
Context.context.fd_to_buffer[activefd] = String.new
end
if Context.context.is_json[activefd] && message.is_a?(String)
if Context.print_messages
j = JSON.parse message
Baguette::Log.info "received from client #{activefd}"
pp! j["payload"]
end
print_message message, activefd
message = IPC::Message.from_json(message).to_packet
elsif ! Context.context.is_json[activefd] && message.is_a?(Bytes)
# not json and bytes
print_message message, activefd
message = IPC::Message.from_cbor(message).to_packet
else
Baguette::Log.warning "message is neither JSON and string, or not JSON and bytes."
Baguette::Log.warning "message is #{message.class.to_s}"
end
# client => service
@ -413,7 +487,11 @@ def websocket_switching_procedure (activefd : Int)
# when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = Context.context.fd_to_ipcclient[fdservice]
serv.send message
begin
serv.send message
rescue e
Baguette::Log.error "Sending a message failed: #{e}"
end
break unless still_something_to_read
@ -426,17 +504,14 @@ def websocket_switching_procedure (activefd : Int)
serv = Context.context.fd_to_ipcclient[activefd]
message = serv.read
if Context.context.is_json[fdclient]
buf = message.to_json
if Context.print_messages
j = JSON.parse buf
Baguette::Log.info "received from service #{activefd}"
pp! j["payload"]
end
else
buf = message.to_packet
buf = if Context.context.is_json[fdclient]
message.to_json
else # We assume that non-JSON clients are CBOR clients.
message.to_cbor
end
print_message buf, activefd
wsclient.send buf
end
rescue e
@ -467,7 +542,7 @@ end
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
Context.service.base_timer = Context.timer_delay
Context.service.base_timer = configuration.ipc_timer
Context.service.loop do |event|
begin