From a022d25913a9d650cf424cff374a41fde7922986 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 17 Jan 2020 13:01:07 +0100
Subject: [PATCH] New IO and WebSocket (private) API.
---
src/websocketd.cr | 64 ++++++++++++++++++++++++++---------------------
src/ws.cr | 57 +++++++++++++++++++++++------------------
2 files changed, 69 insertions(+), 52 deletions(-)
diff --git a/src/websocketd.cr b/src/websocketd.cr
index 4b67e90..3dac0f7 100644
--- a/src/websocketd.cr
+++ b/src/websocketd.cr
@@ -258,14 +258,14 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
wsclient = context.fd_to_websocket[activefd]
loop do
begin
- message = wsclient.read
+ message = wsclient.run_once
rescue e
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
closing_client activefd, context
return
end
- still_something_to_read = wsclient.ws.io.still_something_to_read?
+ still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed?
# puts "#{CBLUE}client is closed#{CRESET}"
@@ -279,29 +279,34 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
return
end
- if message.is_a?(WebSocket::Ping)
+ case message
+ when WebSocket::Error
+ puts "#{CRED}An error occured#{CRESET}"
+ closing_client client.fd, context
+ return
+ when WebSocket::Ping
# puts "#{CBLUE}Received a ping message#{CRESET}"
next if still_something_to_read
break
- end
-
- if message.is_a?(WebSocket::Pong)
+ when WebSocket::Pong
# puts "#{CBLUE}Received a pong message#{CRESET}"
next if still_something_to_read
break
- end
-
- if message.is_a?(WebSocket::Close)
+ when WebSocket::Close
# puts "#{CBLUE}Received a close message#{CRESET}"
closing_client client.fd, context
return
+ when WebSocket::NotFinal
+ # puts "#{CBLUE}Received only part of a message#{CRESET}"
+ next if still_something_to_read
+ break
+ when Bytes
+ # TODO: when receiving a binary message
+ # we should test the format and maybe its content
+ # puts "#{CBLUE}Received a binary message#{CRESET}"
end
- # TODO: when receiving a binary message
- # we should test the format and maybe its content
- if message.is_a?(Slice(UInt8))
- # puts "#{CBLUE}Received a binary message#{CRESET}"
- elsif context.is_json[activefd]
+ if context.is_json[activefd] && message.is_a?(String)
jsonmessage = JSONWSMessage.from_json message
message = to_message jsonmessage.mtype, jsonmessage.payload
end
@@ -315,7 +320,6 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
serv.send message
break unless still_something_to_read
- puts "#{CRED}STILL SOMETHING TO READ#{CRESET}"
end # loop over the remaining messages to read on the websocket
else
@@ -345,26 +349,30 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
end
end
+def sending_ping_messages(context : InstanceStorage)
+ context.fd_to_websocket.each do |fd, ws|
+ begin
+ ws.ping "hello from #{fd}"
+ rescue e
+ puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
+ begin
+ context.remove_fd fd
+ rescue e
+ puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
+ end
+ end
+ end
+end
+
# Every few seconds, the service should trigger the timer
+# Allowing the sending of Ping messages to clients
service.base_timer = timer_delay
service.loop do |event|
case event
when IPC::Event::Timer
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
- context.fd_to_websocket.each do |fd, ws|
-
- begin
- ws.ping "coucou from #{fd}"
- rescue e
- puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
- begin
- context.remove_fd fd
- rescue e
- puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
- end
- end
- end
+ sending_ping_messages context
when IPC::Event::Connection
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.connection.fd}"
when IPC::Event::Disconnection
diff --git a/src/ws.cr b/src/ws.cr
index 89d942e..d75b211 100644
--- a/src/ws.cr
+++ b/src/ws.cr
@@ -7,37 +7,45 @@ require "http/web_socket/protocol"
# in its buffer. Since WebSocket is compatible with all IO instances, then IO
# needs this function as well, even if it doesn't really makes sense here.
class IO
- def still_something_to_read? : Bool
- false
+ def empty? : Bool
+ true
end
end
-#
module IO::Buffered
- def still_something_to_read? : Bool
- @in_buffer_rem.size > 0
+ # :nodoc:
+ def empty? : Bool
+ @in_buffer_rem.size == 0
end
end
-# class Socket
-# def still_something_to_read? : Bool
-# @in_buffer_rem.size > 0
-# end
-# end
-
class HTTP::WebSocket
- record Pong
- record Ping
- record Close
+ # Infinite loop over `run_once`.
+ # Stops when the socket closes or an error occurs.
+ def run
+ loop do
+ ret = run_once
+ case ret
+ when Error | Close
+ break
+ end
+ end
+ end
- def read : Slice(UInt8) | String | Close | Ping | Pong | Nil
- size = 0
+ # `run_once` returns the type of the message which could be a Ping or Pong message, a closing socket, an error (when an exception occurs) or a message that is not yet entirely received.
+ record Ping
+ record Pong
+ record Close
+ record Error
+ record NotFinal
+
+ # :nodoc:
+ def run_once : Bytes | String | Close | Ping | Pong | NotFinal | Error
begin
info = @ws.receive(@buffer)
- # puts "receiving a message size #{info.size}, #{info.final ? "final" : "non final"}"
rescue IO::EOFError
close
- return nil
+ return Error.new
end
case info.opcode
@@ -48,31 +56,31 @@ class HTTP::WebSocket
@on_ping.try &.call(message)
pong(message) unless closed?
@current_message.clear
+ return Ping.new
end
- return Ping.new
when Protocol::Opcode::PONG
@current_message.write @buffer[0, info.size]
if info.final
@on_pong.try &.call(@current_message.to_s)
@current_message.clear
+ return Pong.new
end
- return Pong.new
when Protocol::Opcode::TEXT
message = @buffer[0, info.size]
@current_message.write message
if info.final
@on_message.try &.call(@current_message.to_s)
@current_message.clear
+ return String.new message
end
- return String.new message
when Protocol::Opcode::BINARY
message = @buffer[0, info.size]
@current_message.write message
if info.final
@on_binary.try &.call(@current_message.to_slice)
@current_message.clear
+ return message
end
- return message
when Protocol::Opcode::CLOSE
@current_message.write @buffer[0, info.size]
if info.final
@@ -80,15 +88,16 @@ class HTTP::WebSocket
@on_close.try &.call(message)
close(message) unless closed?
@current_message.clear
+ return Close.new
end
- return Close.new
end
+
+ return NotFinal.new
end
def ws
@ws
end
-
end
class HTTP::WebSocket::Protocol