From 131552e396243b51c2d2dbb4036de854f7e80a12 Mon Sep 17 00:00:00 2001 From: Karchnu Date: Thu, 9 Jul 2020 09:13:57 +0200 Subject: [PATCH] big chunks of websocketd OK, needs callbacks and debug --- src/websocketd.cr | 539 +++++++++++++++++++++++----------------------- 1 file changed, 271 insertions(+), 268 deletions(-) diff --git a/src/websocketd.cr b/src/websocketd.cr index d809c48..7d91a9d 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -14,34 +14,35 @@ require "./utils" # All modifications to standard libraries go there. require "./lib_modifications.cr" -# service instance parameters -# they can be changed via the cli -service_name = "websocket" -host = "0.0.0.0" -port_to_listen = 1234 -timer_delay = 30_000.to_f64 # 30 seconds - -verbosity = 1 +class CLI + # service instance parameters + # they can be changed via the cli + class_property service_name : String = "websocket" + class_property host : String = "0.0.0.0" + class_property port_to_listen : UInt16 = 1234 + class_property timer_delay : Int32 = 30_000 # 30 seconds + class_property verbosity : Int32 = 1 +end OptionParser.parse do |parser| parser.on "-l host", "--l host", "IP address to listen on." do |h| - host = h + CLI.host = h end parser.on "-p port", "--port port", "Port to listen on." do |port| - port_to_listen = port.to_u16 + CLI.port_to_listen = port.to_u16 end parser.on "-s service-name", "--service-name service-name", "Service name." do |name| - service_name = name + CLI.service_name = name end parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t| - timer_delay = t.to_f64 * 1000 # stored in ms + CLI.timer_delay = t.to_i32 * 1000 # stored in ms end parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt| - verbosity = opt.to_i + CLI.verbosity = opt.to_i end parser.on "-h", "--help", "Show this help" do @@ -50,76 +51,31 @@ OptionParser.parse do |parser| end end - -# Link between fd and TCPSocket, WebSocket and IPC::Connection instances. -class InstanceStorage - property service : IPC::SwitchingService - property switchtable : Hash(Int32, Int32) - property is_client : Hash(Int32,Bool) - property is_json : Hash(Int32, Bool) - property fd_to_tcpsocket : Hash(Int32, TCPSocket) - property fd_to_websocket : Hash(Int32, WebSocket) - property fd_to_ipcconnection : Hash(Int32, IPC::Connection) - - def initialize (@service : IPC::SwitchingService) - # fdlist_client = [] of TCPSocket - @switchtable = Hash(Int32, Int32).new - @is_client = Hash(Int32,Bool).new - @is_json = Hash(Int32,Bool).new - @fd_to_tcpsocket = Hash(Int32, TCPSocket).new - @fd_to_websocket = Hash(Int32, WebSocket).new - @fd_to_ipcconnection = Hash(Int32, IPC::Connection).new - end - - def remove_fd (fdclient : Int32) - puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}" - # 1. closing both the client and the service - fdservice = @switchtable[fdclient]? - tcpfdc = @fd_to_tcpsocket[fdclient] - - # 2. closing the TCP connections - tcpfdc.close unless tcpfdc.closed? - - # 3. removing the client and the service fds from the loop check - @service.remove_fd (fdclient) - - # 5. removing both the client and the service from the switchtable - @switchtable = @switchtable.select do |fdc, fds| - fdc != fdclient && fds != fdclient - end - - # 6. removing the client and the service from is_client - @is_client = @is_client.select do |fd,v| fd != fdclient end - @is_json = @is_json.select do |fd,v| fd != fdclient end - - @fd_to_websocket.select! do |fd, ws| - fd != fdclient - end - - unless fdservice.nil? - service = @fd_to_ipcconnection[fdservice] - service.close - @service.remove_fd (fdservice) - @fd_to_ipcconnection = @fd_to_ipcconnection.select do |k, v| - k != fdservice - end - - @is_client = @is_client.select do |fd,v| fd != fdservice end - end - end +def sending_ping_messages + puts "sending ping messages" end -# by default, listen on any IP address -server = TCPServer.new(host, port_to_listen) -service = IPC::SwitchingService.new service_name -service << server.fd -context = InstanceStorage.new service +# def sending_ping_messages(context : InstanceStorage) +# context.fd_to_websocket.each do |fd, ws| +# begin +# ws.ping "hello from #{fd}" +# rescue e +# puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" +# begin +# context.remove_fd fd +# rescue e +# puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" +# end +# end +# end +# end +# def closing_client (fdclient : Int, context : InstanceStorage) +# context.remove_fd fdclient +# end - -def websocket_client_connection(client, context : InstanceStorage) +def ws_http_upgrade(client) request = HTTP::Request.from_io client - # pp! request if request.nil? raise "#REQUEST IS NIL" @@ -133,7 +89,6 @@ def websocket_client_connection(client, context : InstanceStorage) raise "Not bad request but still pretty bad: #{request.to_s}" end - # FIXME: check they actually wanted to upgrade to websocket key = request.headers["Sec-WebSocket-Key"] response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" @@ -152,10 +107,14 @@ def websocket_client_connection(client, context : InstanceStorage) # requested service, fd req_service = request.path.lchop if req_service.empty? + puts "closing the client" client.close - return end + return req_service +end + +def other_function(client, req_service, context : InstanceStorage) # The client may ask to transcript JSON-based messages into IPC messages. # To that end, the client may send the name of the service it wants to reach with the prefix ".JSON". @@ -200,214 +159,258 @@ def websocket_client_connection(client, context : InstanceStorage) context.fd_to_websocket[client.fd] = wsclient end +# def websocket_switching_procedure (activefd : Int, context : InstanceStorage) +# begin +# if context.is_client[activefd] +# # The client is a WebSocket on top of a TCP connection +# client = context.fd_to_tcpsocket[activefd] +# wsclient = context.fd_to_websocket[activefd] +# loop do +# begin +# message = wsclient.run_once +# rescue e +# puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" +# context.remove_fd activefd +# return +# end +# +# # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket. +# still_something_to_read = ! wsclient.ws.io.empty? +# +# if wsclient.closed? +# # puts "#{CBLUE}client is closed#{CRESET}" +# context.remove_fd client.fd +# return +# end +# +# if message.nil? +# puts "#{CRED}message is nil#{CRESET}" +# context.remove_fd client.fd +# return +# end +# +# case message +# when WebSocket::Error +# puts "#{CRED}An error occured#{CRESET}" +# context.remove_fd client.fd +# return +# when WebSocket::Ping +# # puts "#{CBLUE}Received a ping message#{CRESET}" +# next if still_something_to_read +# break +# when WebSocket::Pong +# # puts "#{CBLUE}Received a pong message#{CRESET}" +# next if still_something_to_read +# break +# when WebSocket::Close +# # puts "#{CBLUE}Received a close message#{CRESET}" +# context.remove_fd client.fd +# return +# when WebSocket::NotFinal +# # puts "#{CBLUE}Received only part of a message#{CRESET}" +# next if still_something_to_read +# break +# when Bytes +# # TODO: when receiving a binary message +# # we should test the format and maybe its content +# # puts "#{CBLUE}Received a binary message#{CRESET}" +# end +# +# if context.is_json[activefd] && message.is_a?(String) +# message = IPC::Message.from_json(message).to_packet +# end +# +# # client => service +# fdservice = context.switchtable[activefd] +# +# # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit +# # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket +# # when the object is GC. +# serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) +# #serv = context.fd_to_ipcconnection[fdservice] +# serv.send message +# +# break unless still_something_to_read +# +# end # loop over the remaining messages to read on the websocket +# else +# # service => client +# fdclient = context.switchtable[activefd] +# wsclient = context.fd_to_websocket[fdclient] +# +# serv = context.fd_to_ipcconnection[activefd] +# message = serv.read +# +# if context.is_json[fdclient] +# buf = message.to_json +# else +# buf = message.to_packet +# end +# +# wsclient.send buf +# end +# rescue e +# puts "#{CRED}Exception during message transfer:#{CRESET} #{e}" +# if context.is_client[activefd] +# closing_client activefd, context +# else +# clientfd = context.switchtable[activefd] +# closing_client clientfd, context +# end +# end +# end -def closing_client (fdclient : Int, context : InstanceStorage) - context.remove_fd fdclient -end +# # first message from the client: requested service name +# # 1. connection to the service +# # 2. listening on the service fd +# # 3. bounding both file descriptors (through switchtable hash) +# # 4. indicating that the client is connected (in is_client) +# def websocket_connection_procedure (requested_service : String, clientfd : Int32, context : InstanceStorage) +# begin +# # 2. establishing a connection to the service +# newservice = IPC::Connection.new requested_service +# context.fd_to_ipcconnection[newservice.fd] = newservice +# +# # 3. listening on the client fd and the service fd +# context.service << newservice.fd +# +# # cannot perform automatic switching due to websockets headers +# # future version of the libipc lib should include some workaround, probably +# # service.switch.add fdclient, newservice.fd +# +# # 4. bounding the client and the service fd +# context.switchtable[clientfd] = newservice.fd +# context.switchtable[newservice.fd] = clientfd +# +# # 5. the client is then connected, send it a message +# context.is_client[clientfd] = true +# context.is_client[newservice.fd] = false +# rescue e +# puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" +# context.remove_fd clientfd +# end +# end -# first message from the client: requested service name -# 1. connection to the service -# 2. listening on the service fd -# 3. bounding both file descriptors (through switchtable hash) -# 4. indicating that the client is connected (in is_client) -def websocket_connection_procedure (requested_service : String, clientfd : Int32, context : InstanceStorage) - begin - # 2. establishing a connection to the service - newservice = IPC::Connection.new requested_service - context.fd_to_ipcconnection[newservice.fd] = newservice +def handle_new_clients(service, server) + # TODO: this is a lot of C-like notation, should be changed. + # In the client accept function + # 1. accept TCP/WS connection + client = server.accept - # 3. listening on the client fd and the service fd - context.service << newservice.fd + # 2. upgrade HTTP connections and get the service name in the URI path. + req_service = ws_http_upgrade client - # cannot perform automatic switching due to websockets headers - # future version of the libipc lib should include some workaround, probably - # service.switch.add fdclient, newservice.fd + puts "requested service: #{req_service}" + unless client.nil? + client.not_nil!.close + end - # 4. bounding the client and the service fd - context.switchtable[clientfd] = newservice.fd - context.switchtable[newservice.fd] = clientfd + # 3. connect to the service via ipc_connection_switched + LibIPC.ipc_connection_switched service.pointer, req_service + puts "ipc_connection_switched: OK" - # 5. the client is then connected, send it a message - context.is_client[clientfd] = true - context.is_client[newservice.fd] = false - rescue e - puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" - context.remove_fd clientfd + # 4. add client fd in the context, type = switched via ipc_ctx_fd_type + service << client.fd + service.pp + LibIPC.ipc_ctx_fd_type service.pointer, client.fd, LibIPC::ConnectionType::Switched + puts "ipc_ctx_fd_type: OK" + + # 5. add both fd in switched via ipc_switching_add + switchdb = service.context.switchdb + LibIPC.ipc_switching_add pointerof(switchdb), + client.fd, + service.context.pollfd[service.context.size - 1].fd + puts "ipc_switching_add: OK" + + proc_cb_in = ->my_cb_in(Int32, Pointer(LibIPC::Message)) + proc_cb_out = ->my_cb_out(Int32, Pointer(LibIPC::Message)) + + # 6. change client callbacks via ipc_switching_callbacks + # only the client callbacks are changed, since the associated server is a simple libipc service + LibIPC.ipc_switching_callbacks service.pointer, client.fd, + proc_cb_in, + proc_cb_out + puts "ipc_switching_callbacks: OK" + + puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless CLI.verbosity == 0 +rescue e + puts "Exception: #{CRED}#{e}#{CRESET}" + unless client.nil? + client.not_nil!.close end end -def websocket_switching_procedure (activefd : Int, context : InstanceStorage) - begin - if context.is_client[activefd] - # The client is a WebSocket on top of a TCP connection - client = context.fd_to_tcpsocket[activefd] - wsclient = context.fd_to_websocket[activefd] - loop do - begin - message = wsclient.run_once - rescue e - puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" - context.remove_fd activefd - return - end +def my_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) + puts "OUT fd is #{fd}" + return LibIPC::IPCCB::NoError +end - # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket. - still_something_to_read = ! wsclient.ws.io.empty? +def my_cb_in(fd : Int32, pm : Pointer(LibIPC::Message)) + puts "IN fd is #{fd}" + return LibIPC::IPCCB::NoError +end - if wsclient.closed? - # puts "#{CBLUE}client is closed#{CRESET}" - context.remove_fd client.fd - return - end - if message.nil? - puts "#{CRED}message is nil#{CRESET}" - context.remove_fd client.fd - return - end +def main + # by default, listen on any IP address + server = TCPServer.new(CLI.host, CLI.port_to_listen) + service = IPC::Server.new CLI.service_name + service << server.fd + # Every few seconds, the service should trigger the timer + # Allowing the sending of Ping messages to clients + service.base_timer = CLI.timer_delay + service.timer = CLI.timer_delay - case message - when WebSocket::Error - puts "#{CRED}An error occured#{CRESET}" - context.remove_fd client.fd - return - when WebSocket::Ping - # puts "#{CBLUE}Received a ping message#{CRESET}" - next if still_something_to_read - break - when WebSocket::Pong - # puts "#{CBLUE}Received a pong message#{CRESET}" - next if still_something_to_read - break - when WebSocket::Close - # puts "#{CBLUE}Received a close message#{CRESET}" - context.remove_fd client.fd - return - when WebSocket::NotFinal - # puts "#{CBLUE}Received only part of a message#{CRESET}" - next if still_something_to_read - break - when Bytes - # TODO: when receiving a binary message - # we should test the format and maybe its content - # puts "#{CBLUE}Received a binary message#{CRESET}" - end + # context = InstanceStorage.new service - if context.is_json[activefd] && message.is_a?(String) - message = IPC::Message.from_json(message).to_packet - end - - # client => service - fdservice = context.switchtable[activefd] - - # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit - # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket - # when the object is GC. - serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) - #serv = context.fd_to_ipcconnection[fdservice] - serv.send message - - break unless still_something_to_read - - end # loop over the remaining messages to read on the websocket - else - # service => client - fdclient = context.switchtable[activefd] - wsclient = context.fd_to_websocket[fdclient] - - serv = context.fd_to_ipcconnection[activefd] - message = serv.read - - if context.is_json[fdclient] - buf = message.to_json - else - buf = message.to_packet + service.loop do |event| + puts "current state of the context:" + service.pp + case event + when IPC::Event::Timer + if CLI.verbosity > 0 + puts "#{CORANGE}IPC::Event::Timer#{CRESET}" + end + sending_ping_messages # context + when IPC::Event::Connection + if CLI.verbosity > 0 + puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}" + end + when IPC::Event::Disconnection + if CLI.verbosity > 0 + puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}" + end + when IPC::Event::ExtraSocket + if CLI.verbosity > 0 + puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}" end - wsclient.send buf - end - rescue e - puts "#{CRED}Exception during message transfer:#{CRESET} #{e}" - if context.is_client[activefd] - closing_client activefd, context - else - clientfd = context.switchtable[activefd] - closing_client clientfd, context - end - end -end - -def sending_ping_messages(context : InstanceStorage) - context.fd_to_websocket.each do |fd, ws| - begin - ws.ping "hello from #{fd}" - rescue e - puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" - begin - context.remove_fd fd - rescue e - puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" + if server.fd != event.fd + raise "Error: the only extra socket should be the TCP/WS server" end - end - end -end -# Every few seconds, the service should trigger the timer -# Allowing the sending of Ping messages to clients -service.base_timer = timer_delay + handle_new_clients(service, server) -service.loop do |event| - case event - when IPC::Event::Timer - if verbosity > 0 - puts "#{CORANGE}IPC::Event::Timer#{CRESET}" - end - sending_ping_messages context - when IPC::Event::Connection - if verbosity > 0 - puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}" - end - when IPC::Event::Disconnection - if verbosity > 0 - puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}" - end - when IPC::Event::ExtraSocket - if verbosity > 0 - puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}" - end - - # 1. accept new websocket clients - if server.fd == event.fd - client = server.accept - begin - websocket_client_connection client, context - puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless verbosity == 0 - rescue e - puts "Exception: #{CRED}#{e}#{CRESET}" - client.close + when IPC::Event::Switch + if CLI.verbosity > 0 + puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}" end - next + + raise "Not implemented." + + when IPC::Event::MessageSent + if CLI.verbosity > 0 + puts "Message sent, for #{event.fd}" + end + + when IPC::Event::MessageReceived + if CLI.verbosity > 0 + puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}" + end + + raise "Not implemented." end - - # 2. active fd != server fd - activefd = event.fd - websocket_switching_procedure activefd, context - - when IPC::Event::Switch - if verbosity > 0 - puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}" - end - - raise "Not implemented." - - when IPC::Event::MessageSent - puts "Message sent, for #{event.origin}" - - when IPC::Event::MessageReceived - if verbosity > 0 - puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}" - end - - raise "Not implemented." end end + +main