websocketd: bin msg, conf. timer delay, loop on buffered msg
parent
02a6ac97c1
commit
6a871e8267
|
@ -43,15 +43,10 @@ def send_with_announce(ws : WebSocket, m : String)
|
||||||
send ws, m
|
send ws, m
|
||||||
end
|
end
|
||||||
|
|
||||||
def send(ws : WebSocket, m : String)
|
def send(ws : WebSocket, m : String | Slice)
|
||||||
ws.send m
|
ws.send m
|
||||||
end
|
end
|
||||||
|
|
||||||
def send(ws : WebSocket, m : Slice)
|
|
||||||
ws.send m
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
begin
|
begin
|
||||||
ws = WebSocket.new(URI.parse(uri))
|
ws = WebSocket.new(URI.parse(uri))
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ require "./ws"
|
||||||
require "json"
|
require "json"
|
||||||
|
|
||||||
require "socket"
|
require "socket"
|
||||||
require "http"
|
|
||||||
require "http/server"
|
require "http/server"
|
||||||
require "base64"
|
require "base64"
|
||||||
require "digest"
|
require "digest"
|
||||||
|
@ -39,6 +38,7 @@ end
|
||||||
service_name = "websocket"
|
service_name = "websocket"
|
||||||
host = "0.0.0.0"
|
host = "0.0.0.0"
|
||||||
port_to_listen = 1234
|
port_to_listen = 1234
|
||||||
|
timer_delay = 30.to_i64
|
||||||
|
|
||||||
OptionParser.parse do |parser|
|
OptionParser.parse do |parser|
|
||||||
parser.on "-l host", "--l host", "IP address to listen on." do |h|
|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
|
||||||
|
@ -53,6 +53,10 @@ OptionParser.parse do |parser|
|
||||||
service_name = name
|
service_name = name
|
||||||
end
|
end
|
||||||
|
|
||||||
|
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
|
||||||
|
timer_delay = t.to_i64
|
||||||
|
end
|
||||||
|
|
||||||
parser.on "-h", "--help", "Show this help" do
|
parser.on "-h", "--help", "Show this help" do
|
||||||
puts parser
|
puts parser
|
||||||
exit 0
|
exit 0
|
||||||
|
@ -159,7 +163,6 @@ def websocket_client_connection(client, context : InstanceStorage)
|
||||||
req_service = request.path.lchop
|
req_service = request.path.lchop
|
||||||
if req_service.empty?
|
if req_service.empty?
|
||||||
client.close
|
client.close
|
||||||
puts "should send a PATH"
|
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -181,7 +184,7 @@ def websocket_client_connection(client, context : InstanceStorage)
|
||||||
|
|
||||||
wsclient = WebSocket.new client
|
wsclient = WebSocket.new client
|
||||||
wsclient.on_pong do |m|
|
wsclient.on_pong do |m|
|
||||||
puts "Received a pong message: #{m}"
|
puts "pong #{m}"
|
||||||
end
|
end
|
||||||
context.is_client[client.fd] = true
|
context.is_client[client.fd] = true
|
||||||
|
|
||||||
|
@ -248,22 +251,22 @@ class IPC::Message
|
||||||
end
|
end
|
||||||
|
|
||||||
def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
# FIXME: debugging purposes
|
|
||||||
begin
|
begin
|
||||||
if context.is_client[activefd]
|
if context.is_client[activefd]
|
||||||
# The client is a WebSocket on top of a TCP connection
|
# The client is a WebSocket on top of a TCP connection
|
||||||
client = context.fd_to_tcpsocket[activefd]
|
client = context.fd_to_tcpsocket[activefd]
|
||||||
wsclient = context.fd_to_websocket[activefd]
|
wsclient = context.fd_to_websocket[activefd]
|
||||||
|
loop do
|
||||||
begin
|
begin
|
||||||
message = wsclient.read
|
message = wsclient.read
|
||||||
|
|
||||||
# puts "RECEIVING A MESSAGE from #{activefd}: #{message}"
|
|
||||||
rescue e
|
rescue e
|
||||||
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
|
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
|
||||||
closing_client activefd, context
|
closing_client activefd, context
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
still_something_to_read = wsclient.ws.io.still_something_to_read?
|
||||||
|
|
||||||
if wsclient.closed?
|
if wsclient.closed?
|
||||||
# puts "#{CBLUE}client is closed#{CRESET}"
|
# puts "#{CBLUE}client is closed#{CRESET}"
|
||||||
closing_client client.fd, context
|
closing_client client.fd, context
|
||||||
|
@ -277,37 +280,32 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
end
|
end
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Ping)
|
if message.is_a?(WebSocket::Ping)
|
||||||
# puts "#{CBLUE}This is a ping message#{CRESET}"
|
# puts "#{CBLUE}Received a ping message#{CRESET}"
|
||||||
return
|
next if still_something_to_read
|
||||||
|
break
|
||||||
end
|
end
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Pong)
|
if message.is_a?(WebSocket::Pong)
|
||||||
# puts "#{CBLUE}This is a pong message#{CRESET}"
|
# puts "#{CBLUE}Received a pong message#{CRESET}"
|
||||||
return
|
next if still_something_to_read
|
||||||
|
break
|
||||||
end
|
end
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Close)
|
if message.is_a?(WebSocket::Close)
|
||||||
# puts "#{CRED}This is a close message#{CRESET}"
|
# puts "#{CBLUE}Received a close message#{CRESET}"
|
||||||
|
closing_client client.fd, context
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# TODO: when receiving a binary message
|
||||||
|
# we should test the format and maybe its content
|
||||||
if message.is_a?(Slice(UInt8))
|
if message.is_a?(Slice(UInt8))
|
||||||
# puts "#{CRED}This is a binary message: not yet implemented#{CRESET}"
|
# puts "#{CBLUE}Received a binary message#{CRESET}"
|
||||||
return
|
elsif context.is_json[activefd]
|
||||||
end
|
|
||||||
|
|
||||||
# TODO: verify this
|
|
||||||
if context.is_json[activefd]
|
|
||||||
jsonmessage = JSONWSMessage.from_json message
|
jsonmessage = JSONWSMessage.from_json message
|
||||||
message = to_message jsonmessage.mtype, jsonmessage.payload
|
message = to_message jsonmessage.mtype, jsonmessage.payload
|
||||||
|
|
||||||
# puts "JSON TYPE !!!"
|
|
||||||
# pp! jsonmessage
|
|
||||||
# pp! message
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# puts "switching: client to service"
|
|
||||||
# print_hexa(String.new(message), "#{CBLUE}Received message hexa#{CRESET}")
|
|
||||||
# client => service
|
# client => service
|
||||||
fdservice = context.switchtable[activefd]
|
fdservice = context.switchtable[activefd]
|
||||||
|
|
||||||
|
@ -315,35 +313,25 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
|
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
|
||||||
#serv = context.fd_to_ipcconnection[fdservice]
|
#serv = context.fd_to_ipcconnection[fdservice]
|
||||||
serv.send message
|
serv.send message
|
||||||
# puts "SENT MESSAGE TO SERVER: #{message}"
|
|
||||||
|
|
||||||
|
break unless still_something_to_read
|
||||||
|
puts "#{CRED}STILL SOMETHING TO READ#{CRESET}"
|
||||||
|
|
||||||
|
end # loop over the remaining messages to read on the websocket
|
||||||
else
|
else
|
||||||
# puts "switching: service to client"
|
|
||||||
# service => client
|
# service => client
|
||||||
|
|
||||||
# puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}"
|
|
||||||
|
|
||||||
fdclient = context.switchtable[activefd]
|
fdclient = context.switchtable[activefd]
|
||||||
wsclient = context.fd_to_websocket[fdclient]
|
wsclient = context.fd_to_websocket[fdclient]
|
||||||
|
|
||||||
serv = context.fd_to_ipcconnection[activefd]
|
serv = context.fd_to_ipcconnection[activefd]
|
||||||
message = serv.read
|
message = serv.read
|
||||||
|
|
||||||
# puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}: #{message}"
|
|
||||||
# puts "received message from service: #{message.to_s}"
|
|
||||||
|
|
||||||
if context.is_json[fdclient]
|
if context.is_json[fdclient]
|
||||||
buf = message.to_json
|
buf = message.to_json
|
||||||
# puts "JSON TYPE !!!"
|
|
||||||
# pp! buf
|
|
||||||
else
|
else
|
||||||
buf = message.to_buffer
|
buf = message.to_buffer
|
||||||
end
|
end
|
||||||
|
|
||||||
# TODO: REMOVE THIS
|
|
||||||
# buf = message.to_buffer
|
|
||||||
# print_hexa String.new(buf), "\033[31m Message to send to the client \033[00m "
|
|
||||||
|
|
||||||
wsclient.send buf
|
wsclient.send buf
|
||||||
end
|
end
|
||||||
rescue e
|
rescue e
|
||||||
|
@ -358,12 +346,12 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Every few seconds, the service should trigger the timer
|
# Every few seconds, the service should trigger the timer
|
||||||
service.base_timer = 30
|
service.base_timer = timer_delay
|
||||||
|
|
||||||
service.loop do |event|
|
service.loop do |event|
|
||||||
case event
|
case event
|
||||||
when IPC::Event::Timer
|
when IPC::Event::Timer
|
||||||
# puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
||||||
context.fd_to_websocket.each do |fd, ws|
|
context.fd_to_websocket.each do |fd, ws|
|
||||||
|
|
||||||
begin
|
begin
|
||||||
|
@ -378,11 +366,11 @@ service.loop do |event|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
when IPC::Event::Connection
|
when IPC::Event::Connection
|
||||||
puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}"
|
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
|
||||||
when IPC::Event::Disconnection
|
when IPC::Event::Disconnection
|
||||||
puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}"
|
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.connection.fd}"
|
||||||
when IPC::Event::ExtraSocket
|
when IPC::Event::ExtraSocket
|
||||||
puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}"
|
puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.connection.fd}"
|
||||||
|
|
||||||
# 1. accept new websocket clients
|
# 1. accept new websocket clients
|
||||||
if server.fd == event.connection.fd
|
if server.fd == event.connection.fd
|
||||||
|
|
34
src/ws.cr
34
src/ws.cr
|
@ -2,6 +2,29 @@
|
||||||
require "http"
|
require "http"
|
||||||
require "http/web_socket/protocol"
|
require "http/web_socket/protocol"
|
||||||
|
|
||||||
|
# The Socket library uses the IO::Buffered module.
|
||||||
|
# IO::Buffered needs a new function to check if there is still something to read
|
||||||
|
# in its buffer. Since WebSocket is compatible with all IO instances, then IO
|
||||||
|
# needs this function as well, even if it doesn't really makes sense here.
|
||||||
|
class IO
|
||||||
|
def still_something_to_read? : Bool
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
#
|
||||||
|
module IO::Buffered
|
||||||
|
def still_something_to_read? : Bool
|
||||||
|
@in_buffer_rem.size > 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# class Socket
|
||||||
|
# def still_something_to_read? : Bool
|
||||||
|
# @in_buffer_rem.size > 0
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
|
||||||
class HTTP::WebSocket
|
class HTTP::WebSocket
|
||||||
record Pong
|
record Pong
|
||||||
record Ping
|
record Ping
|
||||||
|
@ -11,6 +34,7 @@ class HTTP::WebSocket
|
||||||
size = 0
|
size = 0
|
||||||
begin
|
begin
|
||||||
info = @ws.receive(@buffer)
|
info = @ws.receive(@buffer)
|
||||||
|
# puts "receiving a message size #{info.size}, #{info.final ? "final" : "non final"}"
|
||||||
rescue IO::EOFError
|
rescue IO::EOFError
|
||||||
close
|
close
|
||||||
return nil
|
return nil
|
||||||
|
@ -61,6 +85,16 @@ class HTTP::WebSocket
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def ws
|
||||||
|
@ws
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
class HTTP::WebSocket::Protocol
|
||||||
|
def io
|
||||||
|
@io
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
Reference in New Issue