diff --git a/shard.yml b/shard.yml index b5a26ef..4639ec1 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: ipcd -version: 0.2.0 +version: 0.3.0 authors: - karchnu @@ -40,4 +40,18 @@ targets: test-closing: main: tests/closing.cr + # performance tests + test-perf-messages: + main: tests/performances/ipc_message.cr + + test-perf-connection: + main: tests/performances/ipc_connection.cr + + test-perf-message-exchange: + main: tests/performances/ipc_message_exchange.cr + + test-perf-puts: + main: tests/performances/puts.cr + + license: ISC diff --git a/src/ws.cr b/src/lib_modifications.cr similarity index 89% rename from src/ws.cr rename to src/lib_modifications.cr index d75b211..cf3c8f6 100644 --- a/src/ws.cr +++ b/src/lib_modifications.cr @@ -1,3 +1,4 @@ +# Libraries modifications require "http" require "http/web_socket/protocol" @@ -95,6 +96,7 @@ class HTTP::WebSocket return NotFinal.new end + # Returns the websocket instance. def ws @ws end @@ -111,7 +113,15 @@ class WebSocket < HTTP::WebSocket getter? closed = false def finalize - # puts "WrappedTCPFileDescriptor garbage collection!!" - # super + # puts "WrappedTCPFileDescriptor garbage collection!!" + # super + end +end + +class WrappedTCPFileDescriptor < TCPSocket + # do not close the connection when garbage collected!! + def finalize + # puts "WrappedTCPFileDescriptor garbage collection!!" + # super end end diff --git a/src/pongd.cr b/src/pongd.cr index 74a8cdc..e60b1a7 100644 --- a/src/pongd.cr +++ b/src/pongd.cr @@ -42,7 +42,7 @@ service.loop do |event| puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}" if verbosity >= 2 m = String.new event.message.payload - puts "#{CBLUE}message type #{event.message.type}: #{m} #{CRESET}" + puts "#{CBLUE}message type #{event.message.utype}: #{m} #{CRESET}" end end event.connection.send event.message diff --git a/src/utils.cr b/src/utils.cr index 43980bb..1c0153d 100644 --- a/src/utils.cr +++ b/src/utils.cr @@ -1,19 +1,5 @@ require "io/hexdump" -def to_message (user_type : Int, message : String) - payload = Bytes.new (6 + message.to_slice.size) - - # true start - payload[0] = 1.to_u8 - IO::ByteFormat::NetworkEndian.encode message.to_slice.size, (payload + 1) - - # second part: user message - payload[5] = user_type.to_u8 - (payload + 6).copy_from message.to_slice - - return payload -end - def print_hexa(message : String, aroundmsg : String) puts "#{aroundmsg} [[" m = IO::Memory.new(message) diff --git a/src/websocketd.cr b/src/websocketd.cr index 3dac0f7..d76e209 100644 --- a/src/websocketd.cr +++ b/src/websocketd.cr @@ -2,7 +2,6 @@ require "option_parser" require "ipc" require "socket" require "./colors" -require "./ws" require "json" @@ -12,39 +11,23 @@ require "base64" require "digest" require "./utils" -class IPC::Connection - def initialize(fd : Int32) - external_connection = LibIPC::Connection.new - external_connection.fd = fd - initialize(external_connection) - end -end - -class IPC::Message - def to_buffer - to_message @type, String.new(@payload) - end -end - - -class WrappedTCPFileDescriptor < TCPSocket - # do not close the connection when garbage collected!! - def finalize - # puts "WrappedTCPFileDescriptor garbage collection!!" - # super - end -end +# All modifications to standard libraries go there. +require "./lib_modifications.cr" +# service instance parameters +# they can be changed via the cli service_name = "websocket" host = "0.0.0.0" port_to_listen = 1234 timer_delay = 30.to_i64 +verbosity = 1 + OptionParser.parse do |parser| parser.on "-l host", "--l host", "IP address to listen on." do |h| host = h end - + parser.on "-p port", "--port port", "Port to listen on." do |port| port_to_listen = port.to_u16 end @@ -57,12 +40,18 @@ OptionParser.parse do |parser| timer_delay = t.to_i64 end + parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt| + verbosity = opt.to_i + end + parser.on "-h", "--help", "Show this help" do puts parser exit 0 end end + +# Link between fd and TCPSocket, WebSocket and IPC::Connection instances. class InstanceStorage property service : IPC::SwitchingService property switchtable : Hash(Int32, Int32) @@ -83,6 +72,7 @@ class InstanceStorage end def remove_fd (fdclient : Int32) + puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}" # 1. closing both the client and the service fdservice = @switchtable[fdclient]? tcpfdc = @fd_to_tcpsocket[fdclient] @@ -199,7 +189,6 @@ end def closing_client (fdclient : Int, context : InstanceStorage) - puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}" context.remove_fd fdclient end @@ -230,23 +219,7 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32 context.is_client[newservice.fd] = false rescue e puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" - closing_client clientfd, context - end -end - -class JSONWSMessage - JSON.mapping( - mtype: Int32, - payload: String - ) - - def initialize (@mtype : Int32, @payload : String) - end -end - -class IPC::Message - def to_json - JSONWSMessage.new(@type.to_i, String.new(@payload)).to_json + context.remove_fd clientfd end end @@ -261,28 +234,29 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) message = wsclient.run_once rescue e puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" - closing_client activefd, context + context.remove_fd activefd return end + # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket. still_something_to_read = ! wsclient.ws.io.empty? if wsclient.closed? # puts "#{CBLUE}client is closed#{CRESET}" - closing_client client.fd, context + context.remove_fd client.fd return end if message.nil? puts "#{CRED}message is nil#{CRESET}" - closing_client client.fd, context + context.remove_fd client.fd return end case message when WebSocket::Error puts "#{CRED}An error occured#{CRESET}" - closing_client client.fd, context + context.remove_fd client.fd return when WebSocket::Ping # puts "#{CBLUE}Received a ping message#{CRESET}" @@ -294,7 +268,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) break when WebSocket::Close # puts "#{CBLUE}Received a close message#{CRESET}" - closing_client client.fd, context + context.remove_fd client.fd return when WebSocket::NotFinal # puts "#{CBLUE}Received only part of a message#{CRESET}" @@ -307,14 +281,15 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) end if context.is_json[activefd] && message.is_a?(String) - jsonmessage = JSONWSMessage.from_json message - message = to_message jsonmessage.mtype, jsonmessage.payload + message = IPC::Message.from_json(message).to_packet end # client => service fdservice = context.switchtable[activefd] # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit + # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket + # when the object is GC. serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) #serv = context.fd_to_ipcconnection[fdservice] serv.send message @@ -333,7 +308,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage) if context.is_json[fdclient] buf = message.to_json else - buf = message.to_buffer + buf = message.to_packet end wsclient.send buf @@ -371,21 +346,29 @@ service.base_timer = timer_delay service.loop do |event| case event when IPC::Event::Timer - puts "#{CORANGE}IPC::Event::Timer#{CRESET}" + if verbosity > 0 + puts "#{CORANGE}IPC::Event::Timer#{CRESET}" + end sending_ping_messages context when IPC::Event::Connection - puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}" + if verbosity > 0 + puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}" + end when IPC::Event::Disconnection - puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}" + if verbosity > 0 + puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}" + end when IPC::Event::ExtraSocket - puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}" + if verbosity > 0 + puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}" + end # 1. accept new websocket clients if server.fd == event.connection.fd client = server.accept begin websocket_client_connection client, context - puts "#{CBLUE}new client:#{CRESET} #{client.fd}" + puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless verbosity == 0 rescue e puts "Exception: #{CRED}#{e}#{CRESET}" client.close @@ -398,14 +381,18 @@ service.loop do |event| websocket_switching_procedure activefd, context when IPC::Event::Switch - puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}" + if verbosity > 0 + puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}" + end raise "Not implemented." # IPC::Event::Message has to be the last entry # because ExtraSocket and Switch inherit from Message class when IPC::Event::Message - puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" + if verbosity > 0 + puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" + end raise "Not implemented." end diff --git a/tests/performances/ipc_connection.cr b/tests/performances/ipc_connection.cr new file mode 100644 index 0000000..0056a1e --- /dev/null +++ b/tests/performances/ipc_connection.cr @@ -0,0 +1,23 @@ +require "benchmark" +require "ipc" +require "../test-ws" + +ws_uri = "ws://localhost:1234/pong.JSON" + +message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" })) +pong = IPC::Connection.new "pong" + +Benchmark.ips do |bm| + bm.report("connection") do + c = IPC::Connection.new "pong" + end +end + +#puts "sleeping 5 seconds" +#sleep 5 +# +#Benchmark.ips do |bm| +# bm.report("connection through websocket") do +# tws = TestWS.new ws_uri +# end +#end diff --git a/tests/performances/ipc_message.cr b/tests/performances/ipc_message.cr new file mode 100644 index 0000000..0bced7d --- /dev/null +++ b/tests/performances/ipc_message.cr @@ -0,0 +1,20 @@ +require "benchmark" +require "ipc" + +message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" })) + +str_message = %({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }) + +Benchmark.ips do |bm| + bm.report("from json") do + m = IPC::Message.from_json str_message + end + + bm.report("to json") do + message.to_json + end + + bm.report("to_packet") do + message.to_packet + end +end diff --git a/tests/performances/ipc_message_exchange.cr b/tests/performances/ipc_message_exchange.cr new file mode 100644 index 0000000..6e1f588 --- /dev/null +++ b/tests/performances/ipc_message_exchange.cr @@ -0,0 +1,27 @@ +require "benchmark" +require "ipc" +require "../test-ws" + +ws_uri = "ws://localhost:1234/pong" +ws_uri_json = "ws://localhost:1234/pong.JSON" +ws_pong = TestWS.new ws_uri +ws_pong_json = TestWS.new ws_uri_json + +pong = IPC::Connection.new "pong" + +Benchmark.ips do |bm| + bm.report("direct: round trip time") do + pong.send 42, "coucou" + m = pong.read + end + + bm.report("web sockets: round trip time") do + ws_pong.send 42, "coucou" + m = ws_pong.read + end + + bm.report("web sockets + json: round trip time") do + ws_pong_json.send 42, "coucou" + m = ws_pong_json.read + end +end diff --git a/tests/performances/puts.cr b/tests/performances/puts.cr new file mode 100644 index 0000000..d3c4669 --- /dev/null +++ b/tests/performances/puts.cr @@ -0,0 +1,18 @@ +require "benchmark" +require "ipc" + +message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" })) + +Benchmark.ips do |bm| + bm.report("puts") do + puts "coucou" + end + + bm.report("puts vide") do + puts + end + + bm.report("pp! message") do + pp! message + end +end diff --git a/tests/performances/simple-stuff.cr b/tests/performances/simple-stuff.cr new file mode 100644 index 0000000..419fdef --- /dev/null +++ b/tests/performances/simple-stuff.cr @@ -0,0 +1,28 @@ +require "benchmark" + +hash_i = Hash(Int32, Int32).new +hash_str = Hash(String, String).new + +full_hash = Hash(Int32, String).new + +1000.times do |i| + full_hash[i] = "blah #{i}" +end + +Benchmark.ips do |bm| + bm.report("hash[i32] = i32") do + hash_i[1] = 3 + end + + bm.report("hash[String] = String") do + hash_str["coucou"] = "truc" + end + + bm.report("hash[500]") do + blah = full_hash[500] + end + + bm.report("hash[800]") do + blah = full_hash[800] + end +end diff --git a/tests/test-ws.cr b/tests/test-ws.cr index 41240b6..f4ccabb 100644 --- a/tests/test-ws.cr +++ b/tests/test-ws.cr @@ -3,10 +3,28 @@ require "http/web_socket" require "../src/colors" require "../src/utils" -require "../src/ws" +require "ipc" + +require "../src/lib_modifications.cr" require "json" +class TestIPC + property ipcc : IPC::Connection + property is_json : Bool + + def initialize(service_name : String) + @is_json = uri.ends_with? ".JSON" + @ipcc = IPC::Connection.new service_name + end + + # TODO + #def run + # yield @ipcc + # ipcc.close + #end +end + class TestWS property ws : WebSocket property is_json : Bool @@ -20,32 +38,31 @@ class TestWS end def read - m = @ws.read - if m.nil? - raise "empty message" - end - - # remove ping messages, they are not application-relevent - while m.is_a?(HTTP::WebSocket::Ping) - puts "received a ping message, skipping" - m = @ws.read + m = nil + loop do + m = @ws.run_once if m.nil? raise "empty message" end + + # remove ping messages, they are not application-relevent + unless m.is_a?(HTTP::WebSocket::Ping) + break + end + puts "received a ping message, skipping" end m end - def send(type : Int32, data : String | Slice) - m = to_message type, data + def send(type : Int32, data : String) + + m : String | Bytes - # quick hack to send json messages if @is_json - json_message = data.chomp - final_json_message = "{ \"mtype\": #{type}, \"payload\": \"#{json_message}\" }" - # puts "message: #{final_json_message}" - m = final_json_message + m = IPC::Message.new(1.to_u8, type.to_u8, data).to_json + else + m = IPC::Message.new(1.to_u8, type.to_u8, data).to_packet end @ws.send m