Obsolete
/
ipcd
Archived
3
0
Fork 0

websocketd: code simplifications (WIP)

master
Philippe PITTOLI 2020-02-04 02:06:13 +01:00
parent a022d25913
commit 794ee62603
11 changed files with 222 additions and 92 deletions

View File

@ -1,5 +1,5 @@
name: ipcd name: ipcd
version: 0.2.0 version: 0.3.0
authors: authors:
- karchnu <karchnu@karchnu.fr> - karchnu <karchnu@karchnu.fr>
@ -40,4 +40,18 @@ targets:
test-closing: test-closing:
main: tests/closing.cr main: tests/closing.cr
# performance tests
test-perf-messages:
main: tests/performances/ipc_message.cr
test-perf-connection:
main: tests/performances/ipc_connection.cr
test-perf-message-exchange:
main: tests/performances/ipc_message_exchange.cr
test-perf-puts:
main: tests/performances/puts.cr
license: ISC license: ISC

View File

@ -1,3 +1,4 @@
# Libraries modifications
require "http" require "http"
require "http/web_socket/protocol" require "http/web_socket/protocol"
@ -95,6 +96,7 @@ class HTTP::WebSocket
return NotFinal.new return NotFinal.new
end end
# Returns the websocket instance.
def ws def ws
@ws @ws
end end
@ -111,7 +113,15 @@ class WebSocket < HTTP::WebSocket
getter? closed = false getter? closed = false
def finalize def finalize
# puts "WrappedTCPFileDescriptor garbage collection!!" # puts "WrappedTCPFileDescriptor garbage collection!!"
# super # super
end
end
class WrappedTCPFileDescriptor < TCPSocket
# do not close the connection when garbage collected!!
def finalize
# puts "WrappedTCPFileDescriptor garbage collection!!"
# super
end end
end end

View File

@ -42,7 +42,7 @@ service.loop do |event|
puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}" puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}"
if verbosity >= 2 if verbosity >= 2
m = String.new event.message.payload m = String.new event.message.payload
puts "#{CBLUE}message type #{event.message.type}: #{m} #{CRESET}" puts "#{CBLUE}message type #{event.message.utype}: #{m} #{CRESET}"
end end
end end
event.connection.send event.message event.connection.send event.message

View File

@ -1,19 +1,5 @@
require "io/hexdump" require "io/hexdump"
def to_message (user_type : Int, message : String)
payload = Bytes.new (6 + message.to_slice.size)
# true start
payload[0] = 1.to_u8
IO::ByteFormat::NetworkEndian.encode message.to_slice.size, (payload + 1)
# second part: user message
payload[5] = user_type.to_u8
(payload + 6).copy_from message.to_slice
return payload
end
def print_hexa(message : String, aroundmsg : String) def print_hexa(message : String, aroundmsg : String)
puts "#{aroundmsg} [[" puts "#{aroundmsg} [["
m = IO::Memory.new(message) m = IO::Memory.new(message)

View File

@ -2,7 +2,6 @@ require "option_parser"
require "ipc" require "ipc"
require "socket" require "socket"
require "./colors" require "./colors"
require "./ws"
require "json" require "json"
@ -12,34 +11,18 @@ require "base64"
require "digest" require "digest"
require "./utils" require "./utils"
class IPC::Connection # All modifications to standard libraries go there.
def initialize(fd : Int32) require "./lib_modifications.cr"
external_connection = LibIPC::Connection.new
external_connection.fd = fd
initialize(external_connection)
end
end
class IPC::Message
def to_buffer
to_message @type, String.new(@payload)
end
end
class WrappedTCPFileDescriptor < TCPSocket
# do not close the connection when garbage collected!!
def finalize
# puts "WrappedTCPFileDescriptor garbage collection!!"
# super
end
end
# service instance parameters
# they can be changed via the cli
service_name = "websocket" service_name = "websocket"
host = "0.0.0.0" host = "0.0.0.0"
port_to_listen = 1234 port_to_listen = 1234
timer_delay = 30.to_i64 timer_delay = 30.to_i64
verbosity = 1
OptionParser.parse do |parser| OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h| parser.on "-l host", "--l host", "IP address to listen on." do |h|
host = h host = h
@ -57,12 +40,18 @@ OptionParser.parse do |parser|
timer_delay = t.to_i64 timer_delay = t.to_i64
end end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
verbosity = opt.to_i
end
parser.on "-h", "--help", "Show this help" do parser.on "-h", "--help", "Show this help" do
puts parser puts parser
exit 0 exit 0
end end
end end
# Link between fd and TCPSocket, WebSocket and IPC::Connection instances.
class InstanceStorage class InstanceStorage
property service : IPC::SwitchingService property service : IPC::SwitchingService
property switchtable : Hash(Int32, Int32) property switchtable : Hash(Int32, Int32)
@ -83,6 +72,7 @@ class InstanceStorage
end end
def remove_fd (fdclient : Int32) def remove_fd (fdclient : Int32)
puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}"
# 1. closing both the client and the service # 1. closing both the client and the service
fdservice = @switchtable[fdclient]? fdservice = @switchtable[fdclient]?
tcpfdc = @fd_to_tcpsocket[fdclient] tcpfdc = @fd_to_tcpsocket[fdclient]
@ -199,7 +189,6 @@ end
def closing_client (fdclient : Int, context : InstanceStorage) def closing_client (fdclient : Int, context : InstanceStorage)
puts "#{CBLUE}closing the client:#{CRESET} #{fdclient}"
context.remove_fd fdclient context.remove_fd fdclient
end end
@ -230,23 +219,7 @@ def websocket_connection_procedure (requested_service : String, clientfd : Int32
context.is_client[newservice.fd] = false context.is_client[newservice.fd] = false
rescue e rescue e
puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}"
closing_client clientfd, context context.remove_fd clientfd
end
end
class JSONWSMessage
JSON.mapping(
mtype: Int32,
payload: String
)
def initialize (@mtype : Int32, @payload : String)
end
end
class IPC::Message
def to_json
JSONWSMessage.new(@type.to_i, String.new(@payload)).to_json
end end
end end
@ -261,28 +234,29 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
message = wsclient.run_once message = wsclient.run_once
rescue e rescue e
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}" puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
closing_client activefd, context context.remove_fd activefd
return return
end end
# Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
still_something_to_read = ! wsclient.ws.io.empty? still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed? if wsclient.closed?
# puts "#{CBLUE}client is closed#{CRESET}" # puts "#{CBLUE}client is closed#{CRESET}"
closing_client client.fd, context context.remove_fd client.fd
return return
end end
if message.nil? if message.nil?
puts "#{CRED}message is nil#{CRESET}" puts "#{CRED}message is nil#{CRESET}"
closing_client client.fd, context context.remove_fd client.fd
return return
end end
case message case message
when WebSocket::Error when WebSocket::Error
puts "#{CRED}An error occured#{CRESET}" puts "#{CRED}An error occured#{CRESET}"
closing_client client.fd, context context.remove_fd client.fd
return return
when WebSocket::Ping when WebSocket::Ping
# puts "#{CBLUE}Received a ping message#{CRESET}" # puts "#{CBLUE}Received a ping message#{CRESET}"
@ -294,7 +268,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
break break
when WebSocket::Close when WebSocket::Close
# puts "#{CBLUE}Received a close message#{CRESET}" # puts "#{CBLUE}Received a close message#{CRESET}"
closing_client client.fd, context context.remove_fd client.fd
return return
when WebSocket::NotFinal when WebSocket::NotFinal
# puts "#{CBLUE}Received only part of a message#{CRESET}" # puts "#{CBLUE}Received only part of a message#{CRESET}"
@ -307,14 +281,15 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
end end
if context.is_json[activefd] && message.is_a?(String) if context.is_json[activefd] && message.is_a?(String)
jsonmessage = JSONWSMessage.from_json message message = IPC::Message.from_json(message).to_packet
message = to_message jsonmessage.mtype, jsonmessage.payload
end end
# client => service # client => service
fdservice = context.switchtable[activefd] fdservice = context.switchtable[activefd]
# XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
# Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
# when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = context.fd_to_ipcconnection[fdservice] #serv = context.fd_to_ipcconnection[fdservice]
serv.send message serv.send message
@ -333,7 +308,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
if context.is_json[fdclient] if context.is_json[fdclient]
buf = message.to_json buf = message.to_json
else else
buf = message.to_buffer buf = message.to_packet
end end
wsclient.send buf wsclient.send buf
@ -371,21 +346,29 @@ service.base_timer = timer_delay
service.loop do |event| service.loop do |event|
case event case event
when IPC::Event::Timer when IPC::Event::Timer
puts "#{CORANGE}IPC::Event::Timer#{CRESET}" if verbosity > 0
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
end
sending_ping_messages context sending_ping_messages context
when IPC::Event::Connection when IPC::Event::Connection
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}" if verbosity > 0
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
end
when IPC::Event::Disconnection when IPC::Event::Disconnection
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}" if verbosity > 0
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}"
end
when IPC::Event::ExtraSocket when IPC::Event::ExtraSocket
puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}" if verbosity > 0
puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}"
end
# 1. accept new websocket clients # 1. accept new websocket clients
if server.fd == event.connection.fd if server.fd == event.connection.fd
client = server.accept client = server.accept
begin begin
websocket_client_connection client, context websocket_client_connection client, context
puts "#{CBLUE}new client:#{CRESET} #{client.fd}" puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless verbosity == 0
rescue e rescue e
puts "Exception: #{CRED}#{e}#{CRESET}" puts "Exception: #{CRED}#{e}#{CRESET}"
client.close client.close
@ -398,14 +381,18 @@ service.loop do |event|
websocket_switching_procedure activefd, context websocket_switching_procedure activefd, context
when IPC::Event::Switch when IPC::Event::Switch
puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}" if verbosity > 0
puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}"
end
raise "Not implemented." raise "Not implemented."
# IPC::Event::Message has to be the last entry # IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class # because ExtraSocket and Switch inherit from Message class
when IPC::Event::Message when IPC::Event::Message
puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" if verbosity > 0
puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}"
end
raise "Not implemented." raise "Not implemented."
end end

View File

@ -0,0 +1,23 @@
require "benchmark"
require "ipc"
require "../test-ws"
ws_uri = "ws://localhost:1234/pong.JSON"
message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
pong = IPC::Connection.new "pong"
Benchmark.ips do |bm|
bm.report("connection") do
c = IPC::Connection.new "pong"
end
end
#puts "sleeping 5 seconds"
#sleep 5
#
#Benchmark.ips do |bm|
# bm.report("connection through websocket") do
# tws = TestWS.new ws_uri
# end
#end

View File

@ -0,0 +1,20 @@
require "benchmark"
require "ipc"
message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
str_message = %({ "mtype" : 3, "utype" : 30, "payload" : "coucou" })
Benchmark.ips do |bm|
bm.report("from json") do
m = IPC::Message.from_json str_message
end
bm.report("to json") do
message.to_json
end
bm.report("to_packet") do
message.to_packet
end
end

View File

@ -0,0 +1,27 @@
require "benchmark"
require "ipc"
require "../test-ws"
ws_uri = "ws://localhost:1234/pong"
ws_uri_json = "ws://localhost:1234/pong.JSON"
ws_pong = TestWS.new ws_uri
ws_pong_json = TestWS.new ws_uri_json
pong = IPC::Connection.new "pong"
Benchmark.ips do |bm|
bm.report("direct: round trip time") do
pong.send 42, "coucou"
m = pong.read
end
bm.report("web sockets: round trip time") do
ws_pong.send 42, "coucou"
m = ws_pong.read
end
bm.report("web sockets + json: round trip time") do
ws_pong_json.send 42, "coucou"
m = ws_pong_json.read
end
end

View File

@ -0,0 +1,18 @@
require "benchmark"
require "ipc"
message = IPC::Message.from_json(%({ "mtype" : 3, "utype" : 30, "payload" : "coucou" }))
Benchmark.ips do |bm|
bm.report("puts") do
puts "coucou"
end
bm.report("puts vide") do
puts
end
bm.report("pp! message") do
pp! message
end
end

View File

@ -0,0 +1,28 @@
require "benchmark"
hash_i = Hash(Int32, Int32).new
hash_str = Hash(String, String).new
full_hash = Hash(Int32, String).new
1000.times do |i|
full_hash[i] = "blah #{i}"
end
Benchmark.ips do |bm|
bm.report("hash[i32] = i32") do
hash_i[1] = 3
end
bm.report("hash[String] = String") do
hash_str["coucou"] = "truc"
end
bm.report("hash[500]") do
blah = full_hash[500]
end
bm.report("hash[800]") do
blah = full_hash[800]
end
end

View File

@ -3,10 +3,28 @@ require "http/web_socket"
require "../src/colors" require "../src/colors"
require "../src/utils" require "../src/utils"
require "../src/ws" require "ipc"
require "../src/lib_modifications.cr"
require "json" require "json"
class TestIPC
property ipcc : IPC::Connection
property is_json : Bool
def initialize(service_name : String)
@is_json = uri.ends_with? ".JSON"
@ipcc = IPC::Connection.new service_name
end
# TODO
#def run
# yield @ipcc
# ipcc.close
#end
end
class TestWS class TestWS
property ws : WebSocket property ws : WebSocket
property is_json : Bool property is_json : Bool
@ -20,32 +38,31 @@ class TestWS
end end
def read def read
m = @ws.read m = nil
if m.nil? loop do
raise "empty message" m = @ws.run_once
end
# remove ping messages, they are not application-relevent
while m.is_a?(HTTP::WebSocket::Ping)
puts "received a ping message, skipping"
m = @ws.read
if m.nil? if m.nil?
raise "empty message" raise "empty message"
end end
# remove ping messages, they are not application-relevent
unless m.is_a?(HTTP::WebSocket::Ping)
break
end
puts "received a ping message, skipping"
end end
m m
end end
def send(type : Int32, data : String | Slice) def send(type : Int32, data : String)
m = to_message type, data
m : String | Bytes
# quick hack to send json messages
if @is_json if @is_json
json_message = data.chomp m = IPC::Message.new(1.to_u8, type.to_u8, data).to_json
final_json_message = "{ \"mtype\": #{type}, \"payload\": \"#{json_message}\" }" else
# puts "message: #{final_json_message}" m = IPC::Message.new(1.to_u8, type.to_u8, data).to_packet
m = final_json_message
end end
@ws.send m @ws.send m