From e636ae5c26158ee31fd48ccb13f96d96c830436a Mon Sep 17 00:00:00 2001 From: Karchnu Date: Fri, 10 Jul 2020 16:22:31 +0200 Subject: [PATCH] compiles --- src/websocketd.cr | 374 ++++++++++++++++++---------------------------- 1 file changed, 147 insertions(+), 227 deletions(-) diff --git a/src/websocketd.cr b/src/websocketd.cr index 7d91a9d..cc7e065 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -11,6 +11,8 @@ require "base64" require "digest" require "./utils" +require "./instance_storage.cr" + # All modifications to standard libraries go there. require "./lib_modifications.cr" @@ -52,28 +54,20 @@ OptionParser.parse do |parser| end def sending_ping_messages - puts "sending ping messages" + Context.context.fd_to_websocket.each do |fd, ws| + begin + ws.ping "hello from #{fd}" + rescue e + puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" + begin + Context.context.remove_fd fd + rescue e + puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" + end + end + end end -# def sending_ping_messages(context : InstanceStorage) -# context.fd_to_websocket.each do |fd, ws| -# begin -# ws.ping "hello from #{fd}" -# rescue e -# puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}" -# begin -# context.remove_fd fd -# rescue e -# puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}" -# end -# end -# end -# end - -# def closing_client (fdclient : Int, context : InstanceStorage) -# context.remove_fd fdclient -# end - def ws_http_upgrade(client) request = HTTP::Request.from_io client @@ -106,40 +100,27 @@ def ws_http_upgrade(client) # requested service, fd req_service = request.path.lchop - if req_service.empty? - puts "closing the client" - client.close - end - - return req_service -end - -def other_function(client, req_service, context : InstanceStorage) - # The client may ask to transcript JSON-based messages into IPC messages. - # To that end, the client may send the name of the service it wants to reach with the prefix ".JSON". if req_service.ends_with? ".JSON" - context.is_json[client.fd] = true + Context.context.is_json[client.fd] = true req_service = req_service.gsub /.JSON$/, "" else - context.is_json[client.fd] = false + Context.context.is_json[client.fd] = false end - websocket_connection_procedure req_service, client.fd, context - # TODO: if trackerd, send the IP address of the client - if req_service == "tracker" - puts "tracker - sending the IP address" - puts "connection from #{client.remote_address}" - sfd = context.switchtable[client.fd] - # message = IPC::Message.from_json(JSON).to_packet - # => JSON has to include these attributes: mtype, utype, payload - # message = IPC::Message.new mtype, utype, payload - remote_address = client.remote_address.address - message = IPC::Message.new 1, 1.to_u8, "{\"ipaddress\": \"#{remote_address}\"}" - serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET) - serv.send message.to_packet - end + # if req_service == "tracker" + # puts "tracker - sending the IP address" + # puts "connection from #{client.remote_address}" + # sfd = Context.context.switchtable[client.fd] + # # message = IPC::Message.from_json(JSON).to_packet + # # => JSON has to include these attributes: mtype, utype, payload + # # message = IPC::Message.new mtype, utype, payload + # remote_address = client.remote_address.address + # message = IPC::Message.new 1, 1.to_u8, "{\"ipaddress\": \"#{remote_address}\"}" + # serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET) + # serv.send message.to_packet + # end # puts "#{headers_header}\n#{headers.to_s}\r\n" client.send "#{headers_header}\n#{headers.to_s}\r\n" @@ -148,149 +129,13 @@ def other_function(client, req_service, context : InstanceStorage) wsclient.on_pong do |m| puts "pong #{m}" end - context.is_client[client.fd] = true - - # listen to the client's file descriptor - context.service << client.fd - # puts "#{CBLUE}new client: #{client.fd}#{CRESET}" # registering the client into storing structures to avoid being garbage collected - context.fd_to_tcpsocket[client.fd] = client - context.fd_to_websocket[client.fd] = wsclient + Context.context.fd_to_websocket[client.fd] = wsclient + + req_service end -# def websocket_switching_procedure (activefd : Int, context : InstanceStorage) -# begin -# if context.is_client[activefd] -# # The client is a WebSocket on top of a TCP connection -# client = context.fd_to_tcpsocket[activefd] -# wsclient = context.fd_to_websocket[activefd] -# loop do -# begin -# message = wsclient.run_once -# rescue e -# puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" -# context.remove_fd activefd -# return -# end -# -# # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket. -# still_something_to_read = ! wsclient.ws.io.empty? -# -# if wsclient.closed? -# # puts "#{CBLUE}client is closed#{CRESET}" -# context.remove_fd client.fd -# return -# end -# -# if message.nil? -# puts "#{CRED}message is nil#{CRESET}" -# context.remove_fd client.fd -# return -# end -# -# case message -# when WebSocket::Error -# puts "#{CRED}An error occured#{CRESET}" -# context.remove_fd client.fd -# return -# when WebSocket::Ping -# # puts "#{CBLUE}Received a ping message#{CRESET}" -# next if still_something_to_read -# break -# when WebSocket::Pong -# # puts "#{CBLUE}Received a pong message#{CRESET}" -# next if still_something_to_read -# break -# when WebSocket::Close -# # puts "#{CBLUE}Received a close message#{CRESET}" -# context.remove_fd client.fd -# return -# when WebSocket::NotFinal -# # puts "#{CBLUE}Received only part of a message#{CRESET}" -# next if still_something_to_read -# break -# when Bytes -# # TODO: when receiving a binary message -# # we should test the format and maybe its content -# # puts "#{CBLUE}Received a binary message#{CRESET}" -# end -# -# if context.is_json[activefd] && message.is_a?(String) -# message = IPC::Message.from_json(message).to_packet -# end -# -# # client => service -# fdservice = context.switchtable[activefd] -# -# # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit -# # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket -# # when the object is GC. -# serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) -# #serv = context.fd_to_ipcconnection[fdservice] -# serv.send message -# -# break unless still_something_to_read -# -# end # loop over the remaining messages to read on the websocket -# else -# # service => client -# fdclient = context.switchtable[activefd] -# wsclient = context.fd_to_websocket[fdclient] -# -# serv = context.fd_to_ipcconnection[activefd] -# message = serv.read -# -# if context.is_json[fdclient] -# buf = message.to_json -# else -# buf = message.to_packet -# end -# -# wsclient.send buf -# end -# rescue e -# puts "#{CRED}Exception during message transfer:#{CRESET} #{e}" -# if context.is_client[activefd] -# closing_client activefd, context -# else -# clientfd = context.switchtable[activefd] -# closing_client clientfd, context -# end -# end -# end - -# # first message from the client: requested service name -# # 1. connection to the service -# # 2. listening on the service fd -# # 3. bounding both file descriptors (through switchtable hash) -# # 4. indicating that the client is connected (in is_client) -# def websocket_connection_procedure (requested_service : String, clientfd : Int32, context : InstanceStorage) -# begin -# # 2. establishing a connection to the service -# newservice = IPC::Connection.new requested_service -# context.fd_to_ipcconnection[newservice.fd] = newservice -# -# # 3. listening on the client fd and the service fd -# context.service << newservice.fd -# -# # cannot perform automatic switching due to websockets headers -# # future version of the libipc lib should include some workaround, probably -# # service.switch.add fdclient, newservice.fd -# -# # 4. bounding the client and the service fd -# context.switchtable[clientfd] = newservice.fd -# context.switchtable[newservice.fd] = clientfd -# -# # 5. the client is then connected, send it a message -# context.is_client[clientfd] = true -# context.is_client[newservice.fd] = false -# rescue e -# puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" -# context.remove_fd clientfd -# end -# end - def handle_new_clients(service, server) # TODO: this is a lot of C-like notation, should be changed. # In the client accept function @@ -301,35 +146,30 @@ def handle_new_clients(service, server) req_service = ws_http_upgrade client puts "requested service: #{req_service}" - unless client.nil? - client.not_nil!.close - end # 3. connect to the service via ipc_connection_switched LibIPC.ipc_connection_switched service.pointer, req_service - puts "ipc_connection_switched: OK" + # puts "ipc_connection_switched: OK" + req_service_fd = service.context.pollfd[service.context.size - 1].fd # 4. add client fd in the context, type = switched via ipc_ctx_fd_type service << client.fd - service.pp + # service.pp LibIPC.ipc_ctx_fd_type service.pointer, client.fd, LibIPC::ConnectionType::Switched - puts "ipc_ctx_fd_type: OK" + # puts "ipc_ctx_fd_type: OK" # 5. add both fd in switched via ipc_switching_add - switchdb = service.context.switchdb - LibIPC.ipc_switching_add pointerof(switchdb), - client.fd, - service.context.pollfd[service.context.size - 1].fd - puts "ipc_switching_add: OK" + LibIPC.ipc_ctx_switching_add service.pointer, client.fd, req_service_fd + # service.pp + # puts "ipc_switching_add: OK" proc_cb_in = ->my_cb_in(Int32, Pointer(LibIPC::Message)) proc_cb_out = ->my_cb_out(Int32, Pointer(LibIPC::Message)) # 6. change client callbacks via ipc_switching_callbacks # only the client callbacks are changed, since the associated server is a simple libipc service - LibIPC.ipc_switching_callbacks service.pointer, client.fd, - proc_cb_in, - proc_cb_out + puts "#{CGREEN}BEFORE ipc_switching_callbacks#{CRESET}" + LibIPC.ipc_switching_callbacks service.pointer, client.fd, proc_cb_in, proc_cb_out puts "ipc_switching_callbacks: OK" puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless CLI.verbosity == 0 @@ -342,14 +182,109 @@ end def my_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) puts "OUT fd is #{fd}" + wsclient = Context.context.fd_to_websocket[fd] + + message = IPC::Message.new pm + pp! message + + if Context.context.is_json[fd] + puts "OUT: this is a JSON client" + buf = message.to_json + else + puts "OUT: this is NOT a JSON client (fake, always JSON right now)" + # buf = message.to_packet + buf = message.to_json + end + + puts "OUT: SENDING" + wsclient.send buf + return LibIPC::IPCCB::NoError +rescue e + puts "#{CRED}Exception during message transfer:#{CRESET} #{e}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Error end -def my_cb_in(fd : Int32, pm : Pointer(LibIPC::Message)) +def my_cb_in(fd : Int32, pm : LibIPC::Message*) puts "IN fd is #{fd}" + + message = IPC::Message.new pm + + wsclient = Context.context.fd_to_websocket[fd] + + message = wsclient.run_once + + if wsclient.closed? + puts "#{CBLUE}client is closed#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Closing + end + + if message.nil? + puts "#{CRED}message is nil#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Closing + end + + case message + when WebSocket::Error + puts "#{CRED}An error occured#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Error + when WebSocket::Ping + puts "#{CBLUE}Received a ping message#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Ignore + when WebSocket::Pong + puts "#{CBLUE}Received a pong message#{CRESET}" + return LibIPC::IPCCB::Ignore + when WebSocket::Close + puts "#{CBLUE}Received a close message#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Closing + when WebSocket::NotFinal + puts "#{CBLUE}Received only part of a message: NOT IMPLEMENTED#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Error + when Bytes + # TODO: when receiving a binary message + # we should test the format and maybe its content + puts "#{CBLUE}Received a binary message: NOT IMPLEMENTED, YET#{CRESET}" + Context.context.remove_fd fd + return LibIPC::IPCCB::Error + when String + puts "#{CBLUE}Received a String message#{CRESET}" + end + + if Context.context.is_json[fd] && message.is_a?(String) + puts "#{CRED}CECI EST DU JSON#{CRESET}" + message = IPC::Message.from_json(message).to_packet + end + + pp! message + + # TODO: take the received message and convert it in pm + return LibIPC::IPCCB::NoError + +rescue e + puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" + tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) + tcp.close + Context.context.remove_fd fd + return LibIPC::IPCCB::Error end +def info(str : String) + if CLI.verbosity > 0 + puts str + end +end + +class Context + class_property context = InstanceStorage.new +end def main # by default, listen on any IP address @@ -361,29 +296,20 @@ def main service.base_timer = CLI.timer_delay service.timer = CLI.timer_delay - # context = InstanceStorage.new service - service.loop do |event| - puts "current state of the context:" + info "current state of the context:" service.pp case event when IPC::Event::Timer - if CLI.verbosity > 0 - puts "#{CORANGE}IPC::Event::Timer#{CRESET}" - end - sending_ping_messages # context + info "#{CORANGE}IPC::Event::Timer#{CRESET}" + sending_ping_messages when IPC::Event::Connection - if CLI.verbosity > 0 - puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}" - end + info "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}" when IPC::Event::Disconnection - if CLI.verbosity > 0 - puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}" - end + info "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}" + Context.context.remove_fd event.fd when IPC::Event::ExtraSocket - if CLI.verbosity > 0 - puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}" - end + info "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}" if server.fd != event.fd raise "Error: the only extra socket should be the TCP/WS server" @@ -392,21 +318,15 @@ def main handle_new_clients(service, server) when IPC::Event::Switch - if CLI.verbosity > 0 - puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}" - end + info "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}" - raise "Not implemented." + # raise "Not implemented." when IPC::Event::MessageSent - if CLI.verbosity > 0 - puts "Message sent, for #{event.fd}" - end + info "Message sent, for #{event.fd}" when IPC::Event::MessageReceived - if CLI.verbosity > 0 - puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}" - end + info "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}" raise "Not implemented." end