diff --git a/src/instance_storage.cr b/src/instance_storage.cr index a066d56..6822ac3 100644 --- a/src/instance_storage.cr +++ b/src/instance_storage.cr @@ -3,10 +3,13 @@ class InstanceStorage property is_json : Hash(Int32, Bool) property fd_to_websocket : Hash(Int32, WebSocket) + # So the GC won't close it. + property fd_to_socket : Hash(Int32, TCPSocket) + def initialize - # fdlist_client = [] of TCPSocket @is_json = Hash(Int32,Bool).new @fd_to_websocket = Hash(Int32, WebSocket).new + @fd_to_socket = Hash(Int32, TCPSocket).new end def remove_fd (fdclient : Int32) @@ -16,5 +19,8 @@ class InstanceStorage # 2. remove the client from fd_to_websocket @fd_to_websocket.select! do |fd, ws| fd != fdclient end + + # 3. remove the client from fd_to_socket + @fd_to_socket.select! do |fd, ws| fd != fdclient end end end diff --git a/src/lib_modifications.cr b/src/lib_modifications.cr index bc69f83..484f9e8 100644 --- a/src/lib_modifications.cr +++ b/src/lib_modifications.cr @@ -47,6 +47,8 @@ class HTTP::WebSocket rescue IO::EOFError close return Error.new + rescue + return Error.new end case info.opcode @@ -113,7 +115,7 @@ class WebSocket < HTTP::WebSocket getter? closed = false def finalize - # puts "WrappedTCPFileDescriptor garbage collection!!" + puts "WebSocket garbage collection!!" # super end end @@ -121,7 +123,7 @@ end class WrappedTCPFileDescriptor < TCPSocket # do not close the connection when garbage collected!! def finalize - # puts "WrappedTCPFileDescriptor garbage collection!!" + puts "WrappedTCPFileDescriptor garbage collection!!" # super end end diff --git a/src/websocketd.cr b/src/websocketd.cr index cc7e065..305290b 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -131,9 +131,13 @@ def ws_http_upgrade(client) end # registering the client into storing structures to avoid being garbage collected + Context.context.fd_to_socket[client.fd] = client Context.context.fd_to_websocket[client.fd] = wsclient req_service +rescue e + puts "#{CRED}Exception in ws_http_upgrade#{CRESET}: #{CBLUE}#{e}#{CRESET}" + raise "DROP IT" end def handle_new_clients(service, server) @@ -145,58 +149,39 @@ def handle_new_clients(service, server) # 2. upgrade HTTP connections and get the service name in the URI path. req_service = ws_http_upgrade client - puts "requested service: #{req_service}" - # 3. connect to the service via ipc_connection_switched - LibIPC.ipc_connection_switched service.pointer, req_service - # puts "ipc_connection_switched: OK" - req_service_fd = service.context.pollfd[service.context.size - 1].fd + serverfd = 0 + # LibIPC.ipc_connection_switched service.pointer, req_service, client.fd, Pointer(Libc::Int).null + LibIPC.ipc_connection_switched service.pointer, req_service, client.fd, pointerof (serverfd) - # 4. add client fd in the context, type = switched via ipc_ctx_fd_type - service << client.fd - # service.pp - LibIPC.ipc_ctx_fd_type service.pointer, client.fd, LibIPC::ConnectionType::Switched - # puts "ipc_ctx_fd_type: OK" + proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message)) + proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message)) - # 5. add both fd in switched via ipc_switching_add - LibIPC.ipc_ctx_switching_add service.pointer, client.fd, req_service_fd - # service.pp - # puts "ipc_switching_add: OK" - - proc_cb_in = ->my_cb_in(Int32, Pointer(LibIPC::Message)) - proc_cb_out = ->my_cb_out(Int32, Pointer(LibIPC::Message)) - - # 6. change client callbacks via ipc_switching_callbacks + # 4. change client callbacks via ipc_switching_callbacks # only the client callbacks are changed, since the associated server is a simple libipc service - puts "#{CGREEN}BEFORE ipc_switching_callbacks#{CRESET}" LibIPC.ipc_switching_callbacks service.pointer, client.fd, proc_cb_in, proc_cb_out - puts "ipc_switching_callbacks: OK" puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless CLI.verbosity == 0 rescue e - puts "Exception: #{CRED}#{e}#{CRESET}" + puts "Exception in handle_new_client: #{CRED}#{e}#{CRESET}" unless client.nil? - client.not_nil!.close + client.close end end -def my_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) +def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message)) puts "OUT fd is #{fd}" wsclient = Context.context.fd_to_websocket[fd] message = IPC::Message.new pm - pp! message + # pp! message if Context.context.is_json[fd] - puts "OUT: this is a JSON client" buf = message.to_json else - puts "OUT: this is NOT a JSON client (fake, always JSON right now)" - # buf = message.to_packet - buf = message.to_json + buf = message.to_packet end - puts "OUT: SENDING" wsclient.send buf return LibIPC::IPCCB::NoError @@ -206,72 +191,83 @@ rescue e return LibIPC::IPCCB::Error end -def my_cb_in(fd : Int32, pm : LibIPC::Message*) +def ws_cb_in(fd : Int32, pm : LibIPC::Message*) puts "IN fd is #{fd}" - message = IPC::Message.new pm - wsclient = Context.context.fd_to_websocket[fd] - message = wsclient.run_once + puts + puts + puts "before run_once" + + message = nil + begin + message = wsclient.run_once + puts "after the run_once" + rescue e + puts "#{CRED}run_once FAILED#{CRESET}: #{e}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Error + end if wsclient.closed? puts "#{CBLUE}client is closed#{CRESET}" - Context.context.remove_fd fd + # Context.context.remove_fd fd return LibIPC::IPCCB::Closing end if message.nil? puts "#{CRED}message is nil#{CRESET}" - Context.context.remove_fd fd + # Context.context.remove_fd fd return LibIPC::IPCCB::Closing end case message - when WebSocket::Error - puts "#{CRED}An error occured#{CRESET}" - Context.context.remove_fd fd - return LibIPC::IPCCB::Error - when WebSocket::Ping - puts "#{CBLUE}Received a ping message#{CRESET}" - Context.context.remove_fd fd - return LibIPC::IPCCB::Ignore - when WebSocket::Pong - puts "#{CBLUE}Received a pong message#{CRESET}" - return LibIPC::IPCCB::Ignore - when WebSocket::Close - puts "#{CBLUE}Received a close message#{CRESET}" - Context.context.remove_fd fd - return LibIPC::IPCCB::Closing - when WebSocket::NotFinal - puts "#{CBLUE}Received only part of a message: NOT IMPLEMENTED#{CRESET}" - Context.context.remove_fd fd - return LibIPC::IPCCB::Error - when Bytes - # TODO: when receiving a binary message - # we should test the format and maybe its content - puts "#{CBLUE}Received a binary message: NOT IMPLEMENTED, YET#{CRESET}" - Context.context.remove_fd fd - return LibIPC::IPCCB::Error when String - puts "#{CBLUE}Received a String message#{CRESET}" + if Context.context.is_json[fd] + ipc_message = IPC::Message.from_json(message) + ipc_message.copy_to_message_pointer pm + return LibIPC::IPCCB::NoError + end + else + # every other option should be dropped + case message + when WebSocket::Error + puts "#{CRED}An error occured#{CRESET}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Error + when WebSocket::Ping + puts "#{CBLUE}Received a ping message#{CRESET}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Ignore + when WebSocket::Pong + puts "#{CBLUE}Received a pong message#{CRESET}" + return LibIPC::IPCCB::Ignore + when WebSocket::Close + puts "#{CBLUE}Received a close message#{CRESET}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Closing + when WebSocket::NotFinal + puts "#{CBLUE}Received only part of a message: NOT IMPLEMENTED#{CRESET}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Error + when Bytes + # TODO: when receiving a binary message + # we should test the format and maybe its content + puts "#{CBLUE}Received a binary message: NOT IMPLEMENTED, YET#{CRESET}" + # Context.context.remove_fd fd + return LibIPC::IPCCB::Error + else + puts "#{CRED}Received a websocket message with unknown type#{CRESET}" + end end - if Context.context.is_json[fd] && message.is_a?(String) - puts "#{CRED}CECI EST DU JSON#{CRESET}" - message = IPC::Message.from_json(message).to_packet - end - - pp! message - - # TODO: take the received message and convert it in pm - - return LibIPC::IPCCB::NoError + return LibIPC::IPCCB::Error rescue e puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" - tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) - tcp.close + # tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET) + # tcp.close Context.context.remove_fd fd return LibIPC::IPCCB::Error end