From 794ee6260343391b72e24e726c275c86a1a83635 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Tue, 4 Feb 2020 02:06:13 +0100
Subject: [PATCH] websocketd: code simplifications (WIP)
---
shard.yml | 16 +++-
src/{ws.cr => lib_modifications.cr} | 14 ++-
src/pongd.cr | 2 +-
src/utils.cr | 14 ---
src/websocketd.cr | 101 +++++++++------------
tests/performances/ipc_connection.cr | 23 +++++
tests/performances/ipc_message.cr | 20 ++++
tests/performances/ipc_message_exchange.cr | 27 ++++++
tests/performances/puts.cr | 18 ++++
tests/performances/simple-stuff.cr | 28 ++++++
tests/test-ws.cr | 51 +++++++----
11 files changed, 222 insertions(+), 92 deletions(-)
rename src/{ws.cr => lib_modifications.cr} (89%)
create mode 100644 tests/performances/ipc_connection.cr
create mode 100644 tests/performances/ipc_message.cr
create mode 100644 tests/performances/ipc_message_exchange.cr
create mode 100644 tests/performances/puts.cr
create mode 100644 tests/performances/simple-stuff.cr
diff --git a/shard.yml b/shard.yml
index b5a26ef..4639ec1 100644
--- a/shard.yml
+++ b/shard.yml
@@ -1,5 +1,5 @@
name: ipcd
-version: 0.2.0
+version: 0.3.0
authors:
- karchnu
@@ -40,4 +40,18 @@ targets:
test-closing:
main: tests/closing.cr
+ # performance tests
+ test-perf-messages:
+ main: tests/performances/ipc_message.cr
+
+ test-perf-connection:
+ main: tests/performances/ipc_connection.cr
+
+ test-perf-message-exchange:
+ main: tests/performances/ipc_message_exchange.cr
+
+ test-perf-puts:
+ main: tests/performances/puts.cr
+
+
license: ISC
diff --git a/src/ws.cr b/src/lib_modifications.cr
similarity index 89%
rename from src/ws.cr
rename to src/lib_modifications.cr
index d75b211..cf3c8f6 100644
--- a/src/ws.cr
+++ b/src/lib_modifications.cr
@@ -1,3 +1,4 @@
+# Libraries modifications
require "http"
require "http/web_socket/protocol"
@@ -95,6 +96,7 @@ class HTTP::WebSocket
return NotFinal.new
end
+ # Returns the websocket instance.
def ws
@ws
end
@@ -111,7 +113,15 @@ class WebSocket < HTTP::WebSocket
getter? closed = false
def finalize
- # puts "WrappedTCPFileDescriptor garbage collection!!"
- # super
+ # puts "WrappedTCPFileDescriptor garbage collection!!"
+ # super
+ end
+end
+
+class WrappedTCPFileDescriptor < TCPSocket
+ # do not close the connection when garbage collected!!
+ def finalize
+ # puts "WrappedTCPFileDescriptor garbage collection!!"
+ # super
end
end
diff --git a/src/pongd.cr b/src/pongd.cr
index 74a8cdc..e60b1a7 100644
--- a/src/pongd.cr
+++ b/src/pongd.cr
@@ -42,7 +42,7 @@ service.loop do |event|
puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}"
if verbosity >= 2
m = String.new event.message.payload
- puts "#{CBLUE}message type #{event.message.type}: #{m} #{CRESET}"
+ puts "#{CBLUE}message type #{event.message.utype}: #{m} #{CRESET}"
end
end
event.connection.send event.message
diff --git a/src/utils.cr b/src/utils.cr
index 43980bb..1c0153d 100644
--- a/src/utils.cr
+++ b/src/utils.cr
@@ -1,19 +1,5 @@
require "io/hexdump"
-def to_message (user_type : Int, message : String)
- payload = Bytes.new (6 + message.to_slice.size)
-
- # true start
- payload[0] = 1.to_u8
- IO::ByteFormat::NetworkEndian.encode message.to_slice.size, (payload + 1)
-
- # second part: user message
- payload[5] = user_type.to_u8
- (payload + 6).copy_from message.to_slice
-
- return payload
-end
-
def print_hexa(message : String, aroundmsg : String)
puts "#{aroundmsg} [["
m = IO::Memory.new(message)
diff --git a/src/websocketd.cr b/src/websocketd.cr
index 3dac0f7..d76e209 100644
--- a/src/websocketd.cr
+++ b/src/websocketd.cr
@@ -2,7 +2,6 @@ require "option_parser"
require "ipc"
require "socket"
require "./colors"
-require "./ws"
require "json"
@@ -12,39 +11,23 @@ require "base64"
require "digest"
require "./utils"
-class IPC::Connection
- def initialize(fd : Int32)
- external_connection = LibIPC::Connection.new
- external_connection.fd = fd
- initialize(external_connection)
- end
-end
-
-class IPC::Message
- def to_buffer
- to_message @type, String.new(@payload)
- end
-end
-
-
-class WrappedTCPFileDescriptor < TCPSocket
- # do not close the connection when garbage collected!!
- def finalize
- # puts "WrappedTCPFileDescriptor garbage collection!!"
- # super
- end
-end
+# All modifications to standard libraries go there.
+require "./lib_modifications.cr"
+# service instance parameters
+# they can be changed via the cli
service_name = "websocket"
host = "0.0.0.0"
port_to_listen = 1234
timer_delay = 30.to_i64
+verbosity = 1
+
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
host = h
end
-
+
parser.on "-p port", "--port port", "Port to listen on." do |port|
port_to_listen = port.to_u16
end
@@ -57,12 +40,18 @@ OptionParser.parse do |parser|
timer_delay = t.to_i64
end
+ parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
+ verbosity = opt.to_i
+ end
+
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
+
+# Link between fd and TCPSocket, WebSocket and IPC::Connection instances.
class InstanceStorage
property service : IPC::SwitchingService
property switchtable : Hash(Int32, Int32)
@@ -83,6 +72,7 @@ class InstanceStorage
end
def remove_fd (fdclient : Int32)
+ puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}"
# 1. closing both the client and the service
fdservice = @switchtable[fdclient]?
tcpfdc = @fd_to_tcpsocket[fdclient]
@@ -199,7 +189,6 @@ end
def closing_client (fdclient : Int, context : InstanceStorage)
- puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}"
context.remove_fd fdclient
end
@@ -230,23 +219,7 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32
context.is_client[newservice.fd] = false
rescue e
puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}"
- closing_client clientfd, context
- end
-end
-
-class JSONWSMessage
- JSON.mapping(
- mtype: Int32,
- payload: String
- )
-
- def initialize (@mtype : Int32, @payload : String)
- end
-end
-
-class IPC::Message
- def to_json
- JSONWSMessage.new(@type.to_i, String.new(@payload)).to_json
+ context.remove_fd clientfd
end
end
@@ -261,28 +234,29 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
message = wsclient.run_once
rescue e
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
- closing_client activefd, context
+ context.remove_fd activefd
return
end
+ # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed?
# puts "#{CBLUE}client is closed#{CRESET}"
- closing_client client.fd, context
+ context.remove_fd client.fd
return
end
if message.nil?
puts "#{CRED}message is nil#{CRESET}"
- closing_client client.fd, context
+ context.remove_fd client.fd
return
end
case message
when WebSocket::Error
puts "#{CRED}An error occured#{CRESET}"
- closing_client client.fd, context
+ context.remove_fd client.fd
return
when WebSocket::Ping
# puts "#{CBLUE}Received a ping message#{CRESET}"
@@ -294,7 +268,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
break
when WebSocket::Close
# puts "#{CBLUE}Received a close message#{CRESET}"
- closing_client client.fd, context
+ context.remove_fd client.fd
return
when WebSocket::NotFinal
# puts "#{CBLUE}Received only part of a message#{CRESET}"
@@ -307,14 +281,15 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
end
if context.is_json[activefd] && message.is_a?(String)
- jsonmessage = JSONWSMessage.from_json message
- message = to_message jsonmessage.mtype, jsonmessage.payload
+ message = IPC::Message.from_json(message).to_packet
end
# client => service
fdservice = context.switchtable[activefd]
# XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
+ # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
+ # when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = context.fd_to_ipcconnection[fdservice]
serv.send message
@@ -333,7 +308,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
if context.is_json[fdclient]
buf = message.to_json
else
- buf = message.to_buffer
+ buf = message.to_packet
end
wsclient.send buf
@@ -371,21 +346,29 @@ service.base_timer = timer_delay
service.loop do |event|
case event
when IPC::Event::Timer
- puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
+ if verbosity > 0
+ puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
+ end
sending_ping_messages context
when IPC::Event::Connection
- puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
+ if verbosity > 0
+ puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
+ end
when IPC::Event::Disconnection
- puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}"
+ if verbosity > 0
+ puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}"
+ end
when IPC::Event::ExtraSocket
- puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}"
+ if verbosity > 0
+ puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}"
+ end
# 1. accept new websocket clients
if server.fd == event.connection.fd
client = server.accept
begin
websocket_client_connection client, context
- puts "#{CBLUE}new client:#{CRESET} #{client.fd}"
+ puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless verbosity == 0
rescue e
puts "Exception: #{CRED}#{e}#{CRESET}"
client.close
@@ -398,14 +381,18 @@ service.loop do |event|
websocket_switching_procedure activefd, context
when IPC::Event::Switch
- puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}"
+ if verbosity > 0
+ puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}"
+ end
raise "Not implemented."
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::Message
- puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}"
+ if verbosity > 0
+ puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}"
+ end
raise "Not implemented."
end
diff --git a/tests/performances/ipc_connection.cr b/tests/performances/ipc_connection.cr
new file mode 100644
index 0000000..0056a1e
--- /dev/null
+++ b/tests/performances/ipc_connection.cr
@@ -0,0 +1,23 @@
+require "benchmark"
+require "ipc"
+require "../test-ws"
+
+ws_uri = "ws://localhost:1234/pong.JSON"
+
+message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
+pong = IPC::Connection.new "pong"
+
+Benchmark.ips do |bm|
+ bm.report("connection") do
+ c = IPC::Connection.new "pong"
+ end
+end
+
+#puts "sleeping 5 seconds"
+#sleep 5
+#
+#Benchmark.ips do |bm|
+# bm.report("connection through websocket") do
+# tws = TestWS.new ws_uri
+# end
+#end
diff --git a/tests/performances/ipc_message.cr b/tests/performances/ipc_message.cr
new file mode 100644
index 0000000..0bced7d
--- /dev/null
+++ b/tests/performances/ipc_message.cr
@@ -0,0 +1,20 @@
+require "benchmark"
+require "ipc"
+
+message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
+
+str_message = %({ "mtype" : 3, "utype" : 30, "payload" : "coucou" })
+
+Benchmark.ips do |bm|
+ bm.report("from json") do
+ m = IPC::Message.from_json str_message
+ end
+
+ bm.report("to json") do
+ message.to_json
+ end
+
+ bm.report("to_packet") do
+ message.to_packet
+ end
+end
diff --git a/tests/performances/ipc_message_exchange.cr b/tests/performances/ipc_message_exchange.cr
new file mode 100644
index 0000000..6e1f588
--- /dev/null
+++ b/tests/performances/ipc_message_exchange.cr
@@ -0,0 +1,27 @@
+require "benchmark"
+require "ipc"
+require "../test-ws"
+
+ws_uri = "ws://localhost:1234/pong"
+ws_uri_json = "ws://localhost:1234/pong.JSON"
+ws_pong = TestWS.new ws_uri
+ws_pong_json = TestWS.new ws_uri_json
+
+pong = IPC::Connection.new "pong"
+
+Benchmark.ips do |bm|
+ bm.report("direct: round trip time") do
+ pong.send 42, "coucou"
+ m = pong.read
+ end
+
+ bm.report("web sockets: round trip time") do
+ ws_pong.send 42, "coucou"
+ m = ws_pong.read
+ end
+
+ bm.report("web sockets + json: round trip time") do
+ ws_pong_json.send 42, "coucou"
+ m = ws_pong_json.read
+ end
+end
diff --git a/tests/performances/puts.cr b/tests/performances/puts.cr
new file mode 100644
index 0000000..d3c4669
--- /dev/null
+++ b/tests/performances/puts.cr
@@ -0,0 +1,18 @@
+require "benchmark"
+require "ipc"
+
+message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
+
+Benchmark.ips do |bm|
+ bm.report("puts") do
+ puts "coucou"
+ end
+
+ bm.report("puts vide") do
+ puts
+ end
+
+ bm.report("pp! message") do
+ pp! message
+ end
+end
diff --git a/tests/performances/simple-stuff.cr b/tests/performances/simple-stuff.cr
new file mode 100644
index 0000000..419fdef
--- /dev/null
+++ b/tests/performances/simple-stuff.cr
@@ -0,0 +1,28 @@
+require "benchmark"
+
+hash_i = Hash(Int32, Int32).new
+hash_str = Hash(String, String).new
+
+full_hash = Hash(Int32, String).new
+
+1000.times do |i|
+ full_hash[i] = "blah #{i}"
+end
+
+Benchmark.ips do |bm|
+ bm.report("hash[i32] = i32") do
+ hash_i[1] = 3
+ end
+
+ bm.report("hash[String] = String") do
+ hash_str["coucou"] = "truc"
+ end
+
+ bm.report("hash[500]") do
+ blah = full_hash[500]
+ end
+
+ bm.report("hash[800]") do
+ blah = full_hash[800]
+ end
+end
diff --git a/tests/test-ws.cr b/tests/test-ws.cr
index 41240b6..f4ccabb 100644
--- a/tests/test-ws.cr
+++ b/tests/test-ws.cr
@@ -3,10 +3,28 @@ require "http/web_socket"
require "../src/colors"
require "../src/utils"
-require "../src/ws"
+require "ipc"
+
+require "../src/lib_modifications.cr"
require "json"
+class TestIPC
+ property ipcc : IPC::Connection
+ property is_json : Bool
+
+ def initialize(service_name : String)
+ @is_json = uri.ends_with? ".JSON"
+ @ipcc = IPC::Connection.new service_name
+ end
+
+ # TODO
+ #def run
+ # yield @ipcc
+ # ipcc.close
+ #end
+end
+
class TestWS
property ws : WebSocket
property is_json : Bool
@@ -20,32 +38,31 @@ class TestWS
end
def read
- m = @ws.read
- if m.nil?
- raise "empty message"
- end
-
- # remove ping messages, they are not application-relevent
- while m.is_a?(HTTP::WebSocket::Ping)
- puts "received a ping message, skipping"
- m = @ws.read
+ m = nil
+ loop do
+ m = @ws.run_once
if m.nil?
raise "empty message"
end
+
+ # remove ping messages, they are not application-relevent
+ unless m.is_a?(HTTP::WebSocket::Ping)
+ break
+ end
+ puts "received a ping message, skipping"
end
m
end
- def send(type : Int32, data : String | Slice)
- m = to_message type, data
+ def send(type : Int32, data : String)
+
+ m : String | Bytes
- # quick hack to send json messages
if @is_json
- json_message = data.chomp
- final_json_message = "{ \"mtype\": #{type}, \"payload\": \"#{json_message}\" }"
- # puts "message: #{final_json_message}"
- m = final_json_message
+ m = IPC::Message.new(1.to_u8, type.to_u8, data).to_json
+ else
+ m = IPC::Message.new(1.to_u8, type.to_u8, data).to_packet
end
@ws.send m