New IO and WebSocket (private) API.
parent
7d554b0e67
commit
a022d25913
|
@ -258,14 +258,14 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
wsclient = context.fd_to_websocket[activefd]
|
wsclient = context.fd_to_websocket[activefd]
|
||||||
loop do
|
loop do
|
||||||
begin
|
begin
|
||||||
message = wsclient.read
|
message = wsclient.run_once
|
||||||
rescue e
|
rescue e
|
||||||
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
|
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
|
||||||
closing_client activefd, context
|
closing_client activefd, context
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
still_something_to_read = wsclient.ws.io.still_something_to_read?
|
still_something_to_read = ! wsclient.ws.io.empty?
|
||||||
|
|
||||||
if wsclient.closed?
|
if wsclient.closed?
|
||||||
# puts "#{CBLUE}client is closed#{CRESET}"
|
# puts "#{CBLUE}client is closed#{CRESET}"
|
||||||
|
@ -279,29 +279,34 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Ping)
|
case message
|
||||||
|
when WebSocket::Error
|
||||||
|
puts "#{CRED}An error occured#{CRESET}"
|
||||||
|
closing_client client.fd, context
|
||||||
|
return
|
||||||
|
when WebSocket::Ping
|
||||||
# puts "#{CBLUE}Received a ping message#{CRESET}"
|
# puts "#{CBLUE}Received a ping message#{CRESET}"
|
||||||
next if still_something_to_read
|
next if still_something_to_read
|
||||||
break
|
break
|
||||||
end
|
when WebSocket::Pong
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Pong)
|
|
||||||
# puts "#{CBLUE}Received a pong message#{CRESET}"
|
# puts "#{CBLUE}Received a pong message#{CRESET}"
|
||||||
next if still_something_to_read
|
next if still_something_to_read
|
||||||
break
|
break
|
||||||
end
|
when WebSocket::Close
|
||||||
|
|
||||||
if message.is_a?(WebSocket::Close)
|
|
||||||
# puts "#{CBLUE}Received a close message#{CRESET}"
|
# puts "#{CBLUE}Received a close message#{CRESET}"
|
||||||
closing_client client.fd, context
|
closing_client client.fd, context
|
||||||
return
|
return
|
||||||
end
|
when WebSocket::NotFinal
|
||||||
|
# puts "#{CBLUE}Received only part of a message#{CRESET}"
|
||||||
|
next if still_something_to_read
|
||||||
|
break
|
||||||
|
when Bytes
|
||||||
# TODO: when receiving a binary message
|
# TODO: when receiving a binary message
|
||||||
# we should test the format and maybe its content
|
# we should test the format and maybe its content
|
||||||
if message.is_a?(Slice(UInt8))
|
|
||||||
# puts "#{CBLUE}Received a binary message#{CRESET}"
|
# puts "#{CBLUE}Received a binary message#{CRESET}"
|
||||||
elsif context.is_json[activefd]
|
end
|
||||||
|
|
||||||
|
if context.is_json[activefd] && message.is_a?(String)
|
||||||
jsonmessage = JSONWSMessage.from_json message
|
jsonmessage = JSONWSMessage.from_json message
|
||||||
message = to_message jsonmessage.mtype, jsonmessage.payload
|
message = to_message jsonmessage.mtype, jsonmessage.payload
|
||||||
end
|
end
|
||||||
|
@ -315,7 +320,6 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
serv.send message
|
serv.send message
|
||||||
|
|
||||||
break unless still_something_to_read
|
break unless still_something_to_read
|
||||||
puts "#{CRED}STILL SOMETHING TO READ#{CRESET}"
|
|
||||||
|
|
||||||
end # loop over the remaining messages to read on the websocket
|
end # loop over the remaining messages to read on the websocket
|
||||||
else
|
else
|
||||||
|
@ -345,17 +349,10 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Every few seconds, the service should trigger the timer
|
def sending_ping_messages(context : InstanceStorage)
|
||||||
service.base_timer = timer_delay
|
|
||||||
|
|
||||||
service.loop do |event|
|
|
||||||
case event
|
|
||||||
when IPC::Event::Timer
|
|
||||||
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
|
||||||
context.fd_to_websocket.each do |fd, ws|
|
context.fd_to_websocket.each do |fd, ws|
|
||||||
|
|
||||||
begin
|
begin
|
||||||
ws.ping "coucou from #{fd}"
|
ws.ping "hello from #{fd}"
|
||||||
rescue e
|
rescue e
|
||||||
puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
|
puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
|
||||||
begin
|
begin
|
||||||
|
@ -365,6 +362,17 @@ service.loop do |event|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Every few seconds, the service should trigger the timer
|
||||||
|
# Allowing the sending of Ping messages to clients
|
||||||
|
service.base_timer = timer_delay
|
||||||
|
|
||||||
|
service.loop do |event|
|
||||||
|
case event
|
||||||
|
when IPC::Event::Timer
|
||||||
|
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
||||||
|
sending_ping_messages context
|
||||||
when IPC::Event::Connection
|
when IPC::Event::Connection
|
||||||
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
|
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
|
||||||
when IPC::Event::Disconnection
|
when IPC::Event::Disconnection
|
||||||
|
|
57
src/ws.cr
57
src/ws.cr
|
@ -7,37 +7,45 @@ require "http/web_socket/protocol"
|
||||||
# in its buffer. Since WebSocket is compatible with all IO instances, then IO
|
# in its buffer. Since WebSocket is compatible with all IO instances, then IO
|
||||||
# needs this function as well, even if it doesn't really makes sense here.
|
# needs this function as well, even if it doesn't really makes sense here.
|
||||||
class IO
|
class IO
|
||||||
def still_something_to_read? : Bool
|
def empty? : Bool
|
||||||
false
|
true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
#
|
|
||||||
module IO::Buffered
|
module IO::Buffered
|
||||||
def still_something_to_read? : Bool
|
# :nodoc:
|
||||||
@in_buffer_rem.size > 0
|
def empty? : Bool
|
||||||
|
@in_buffer_rem.size == 0
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# class Socket
|
|
||||||
# def still_something_to_read? : Bool
|
|
||||||
# @in_buffer_rem.size > 0
|
|
||||||
# end
|
|
||||||
# end
|
|
||||||
|
|
||||||
class HTTP::WebSocket
|
class HTTP::WebSocket
|
||||||
record Pong
|
# Infinite loop over `run_once`.
|
||||||
record Ping
|
# Stops when the socket closes or an error occurs.
|
||||||
record Close
|
def run
|
||||||
|
loop do
|
||||||
|
ret = run_once
|
||||||
|
case ret
|
||||||
|
when Error | Close
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def read : Slice(UInt8) | String | Close | Ping | Pong | Nil
|
# `run_once` returns the type of the message which could be a Ping or Pong message, a closing socket, an error (when an exception occurs) or a message that is not yet entirely received.
|
||||||
size = 0
|
record Ping
|
||||||
|
record Pong
|
||||||
|
record Close
|
||||||
|
record Error
|
||||||
|
record NotFinal
|
||||||
|
|
||||||
|
# :nodoc:
|
||||||
|
def run_once : Bytes | String | Close | Ping | Pong | NotFinal | Error
|
||||||
begin
|
begin
|
||||||
info = @ws.receive(@buffer)
|
info = @ws.receive(@buffer)
|
||||||
# puts "receiving a message size #{info.size}, #{info.final ? "final" : "non final"}"
|
|
||||||
rescue IO::EOFError
|
rescue IO::EOFError
|
||||||
close
|
close
|
||||||
return nil
|
return Error.new
|
||||||
end
|
end
|
||||||
|
|
||||||
case info.opcode
|
case info.opcode
|
||||||
|
@ -48,31 +56,31 @@ class HTTP::WebSocket
|
||||||
@on_ping.try &.call(message)
|
@on_ping.try &.call(message)
|
||||||
pong(message) unless closed?
|
pong(message) unless closed?
|
||||||
@current_message.clear
|
@current_message.clear
|
||||||
end
|
|
||||||
return Ping.new
|
return Ping.new
|
||||||
|
end
|
||||||
when Protocol::Opcode::PONG
|
when Protocol::Opcode::PONG
|
||||||
@current_message.write @buffer[0, info.size]
|
@current_message.write @buffer[0, info.size]
|
||||||
if info.final
|
if info.final
|
||||||
@on_pong.try &.call(@current_message.to_s)
|
@on_pong.try &.call(@current_message.to_s)
|
||||||
@current_message.clear
|
@current_message.clear
|
||||||
end
|
|
||||||
return Pong.new
|
return Pong.new
|
||||||
|
end
|
||||||
when Protocol::Opcode::TEXT
|
when Protocol::Opcode::TEXT
|
||||||
message = @buffer[0, info.size]
|
message = @buffer[0, info.size]
|
||||||
@current_message.write message
|
@current_message.write message
|
||||||
if info.final
|
if info.final
|
||||||
@on_message.try &.call(@current_message.to_s)
|
@on_message.try &.call(@current_message.to_s)
|
||||||
@current_message.clear
|
@current_message.clear
|
||||||
end
|
|
||||||
return String.new message
|
return String.new message
|
||||||
|
end
|
||||||
when Protocol::Opcode::BINARY
|
when Protocol::Opcode::BINARY
|
||||||
message = @buffer[0, info.size]
|
message = @buffer[0, info.size]
|
||||||
@current_message.write message
|
@current_message.write message
|
||||||
if info.final
|
if info.final
|
||||||
@on_binary.try &.call(@current_message.to_slice)
|
@on_binary.try &.call(@current_message.to_slice)
|
||||||
@current_message.clear
|
@current_message.clear
|
||||||
end
|
|
||||||
return message
|
return message
|
||||||
|
end
|
||||||
when Protocol::Opcode::CLOSE
|
when Protocol::Opcode::CLOSE
|
||||||
@current_message.write @buffer[0, info.size]
|
@current_message.write @buffer[0, info.size]
|
||||||
if info.final
|
if info.final
|
||||||
|
@ -80,15 +88,16 @@ class HTTP::WebSocket
|
||||||
@on_close.try &.call(message)
|
@on_close.try &.call(message)
|
||||||
close(message) unless closed?
|
close(message) unless closed?
|
||||||
@current_message.clear
|
@current_message.clear
|
||||||
end
|
|
||||||
return Close.new
|
return Close.new
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
return NotFinal.new
|
||||||
|
end
|
||||||
|
|
||||||
def ws
|
def ws
|
||||||
@ws
|
@ws
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
class HTTP::WebSocket::Protocol
|
class HTTP::WebSocket::Protocol
|
||||||
|
|
Reference in New Issue