128 lines
3.0 KiB
Crystal
128 lines
3.0 KiB
Crystal
|
# Libraries modifications
|
||
|
|
||
|
require "http"
|
||
|
require "http/web_socket/protocol"
|
||
|
|
||
|
# The Socket library uses the IO::Buffered module.
|
||
|
# IO::Buffered needs a new function to check if there is still something to read
|
||
|
# in its buffer. Since WebSocket is compatible with all IO instances, then IO
|
||
|
# needs this function as well, even if it doesn't really makes sense here.
|
||
|
class IO
|
||
|
def empty? : Bool
|
||
|
true
|
||
|
end
|
||
|
end
|
||
|
|
||
|
module IO::Buffered
|
||
|
# :nodoc:
|
||
|
def empty? : Bool
|
||
|
@in_buffer_rem.size == 0
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class HTTP::WebSocket
|
||
|
# Infinite loop over `run_once`.
|
||
|
# Stops when the socket closes or an error occurs.
|
||
|
def run
|
||
|
loop do
|
||
|
ret = run_once
|
||
|
case ret
|
||
|
when Error | Close
|
||
|
break
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# `run_once` returns the type of the message which could be a Ping or Pong message, a closing socket, an error (when an exception occurs) or a message that is not yet entirely received.
|
||
|
record Ping
|
||
|
record Pong
|
||
|
record Close
|
||
|
record Error
|
||
|
record NotFinal
|
||
|
|
||
|
# :nodoc:
|
||
|
def run_once : Bytes | String | Close | Ping | Pong | NotFinal | Error
|
||
|
begin
|
||
|
info = @ws.receive(@buffer)
|
||
|
rescue IO::EOFError
|
||
|
close
|
||
|
return Error.new
|
||
|
end
|
||
|
|
||
|
case info.opcode
|
||
|
when Protocol::Opcode::PING
|
||
|
@current_message.write @buffer[0, info.size]
|
||
|
if info.final
|
||
|
message = @current_message.to_s
|
||
|
@on_ping.try &.call(message)
|
||
|
pong(message) unless closed?
|
||
|
@current_message.clear
|
||
|
return Ping.new
|
||
|
end
|
||
|
when Protocol::Opcode::PONG
|
||
|
@current_message.write @buffer[0, info.size]
|
||
|
if info.final
|
||
|
@on_pong.try &.call(@current_message.to_s)
|
||
|
@current_message.clear
|
||
|
return Pong.new
|
||
|
end
|
||
|
when Protocol::Opcode::TEXT
|
||
|
message = @buffer[0, info.size]
|
||
|
@current_message.write message
|
||
|
if info.final
|
||
|
@on_message.try &.call(@current_message.to_s)
|
||
|
@current_message.clear
|
||
|
return String.new message
|
||
|
end
|
||
|
when Protocol::Opcode::BINARY
|
||
|
message = @buffer[0, info.size]
|
||
|
@current_message.write message
|
||
|
if info.final
|
||
|
@on_binary.try &.call(@current_message.to_slice)
|
||
|
@current_message.clear
|
||
|
return message
|
||
|
end
|
||
|
when Protocol::Opcode::CLOSE
|
||
|
@current_message.write @buffer[0, info.size]
|
||
|
if info.final
|
||
|
message = @current_message.to_s
|
||
|
@on_close.try &.call(HTTP::WebSocket::CloseCode::NormalClosure, message)
|
||
|
close(HTTP::WebSocket::CloseCode::NormalClosure, message) unless closed?
|
||
|
@current_message.clear
|
||
|
return Close.new
|
||
|
end
|
||
|
end
|
||
|
|
||
|
return NotFinal.new
|
||
|
end
|
||
|
|
||
|
# Returns the websocket instance.
|
||
|
def ws
|
||
|
@ws
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class HTTP::WebSocket::Protocol
|
||
|
def io
|
||
|
@io
|
||
|
end
|
||
|
end
|
||
|
|
||
|
|
||
|
class WebSocket < HTTP::WebSocket
|
||
|
getter? closed = false
|
||
|
|
||
|
def finalize
|
||
|
# puts "WrappedTCPFileDescriptor garbage collection!!"
|
||
|
# super
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class WrappedTCPFileDescriptor < TCPSocket
|
||
|
# do not close the connection when garbage collected!!
|
||
|
def finalize
|
||
|
# puts "WrappedTCPFileDescriptor garbage collection!!"
|
||
|
# super
|
||
|
end
|
||
|
end
|