websocketd/src/websocketd.cr

599 lines
17 KiB
Crystal
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

require "option_parser"
require "ipc"
require "ipc/json"
require "socket"
require "./colors"
require "json"
require "socket"
require "http/server"
require "base64"
require "digest"
require "./utils"
require "baguette-crystal-base"
require "colorize"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class Tracking::Request
IPC::JSON.message IpAddress, 1 do
property ipaddress : String?
def initialize(@ipaddress = nil)
end
end
end
class Baguette::Configuration
class Websocket < IPC
# service instance parameters
# they can be changed via the cli
property service_name : String = "websocket"
property host : String = "0.0.0.0"
property port : UInt16 = 1234
property print_messages : Bool = false
end
end
simulation, no_configuration, configuration_file = Baguette::Configuration.option_parser
configuration = if no_configuration
Baguette::Log.info "do not load a configuration file."
Baguette::Configuration::Websocket.new
else
Baguette::Configuration::Websocket.get(configuration_file) ||
Baguette::Configuration::Websocket.new
end
Baguette::Context.verbosity = configuration.verbosity
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
configuration.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
configuration.port = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
configuration.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
configuration.ipc_timer = t.to_i32 * 1000
end
parser.on "-T", "--print-timer", "Print timer." do
configuration.print_ipc_timer = true
end
parser.on "-M", "--print-messages", "Print messages received and sent." do
configuration.print_messages = true
Baguette::Log.info "print messages"
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
# Link between fd and TCPSocket, WebSocket and IPC::Client instances.
class InstanceStorage
property service : IPC::SwitchingService
property switchtable : Hash(Int32, Int32)
property is_client : Hash(Int32,Bool)
property is_json : Hash(Int32, Bool)
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
property fd_to_websocket : Hash(Int32, WebSocket)
property fd_to_ipcclient : Hash(Int32, IPC::Client)
property fd_to_buffer : Hash(Int32, String)
def initialize (@service : IPC::SwitchingService)
# fdlist_client = [] of TCPSocket
@switchtable = Hash(Int32, Int32).new
@is_client = Hash(Int32,Bool).new
@is_json = Hash(Int32,Bool).new
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
@fd_to_websocket = Hash(Int32, WebSocket).new
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
@fd_to_buffer = Hash(Int32, String).new
end
def remove_fd (fdclient : Int32)
Baguette::Log.info "closing the client:#{CRESET} #{fdclient}"
if fdclient == -1
raise "fdclient is -1, nothing to do with it"
end
begin
# 1. closing both the client and the service
tcpfdc = @fd_to_tcpsocket[fdclient]
# 2. closing the TCP connections
tcpfdc.close unless tcpfdc.closed?
rescue e
Baguette::Log.error "remove_fd: 1 #{e}"
end
# 3. removing the client and the service fds from the loop check
begin
@service.remove_fd (fdclient)
rescue e
Baguette::Log.error "remove_fd: 2 #{e}"
end
# 5. removing both the client and the service from the switchtable
fdservice = @switchtable[fdclient]?
@switchtable = @switchtable.select do |fdc, fds|
fdc != fdclient && fds != fdclient
end
@fd_to_buffer.select! do |fd, buffer|
if fdservice.nil?
fd != fdclient
else
fd != fdclient && fd != fdservice
end
end
# 6. removing the client and the service from is_client
@is_client = @is_client.select do |fd,v| fd != fdclient end
@is_json = @is_json.select do |fd,v| fd != fdclient end
@fd_to_websocket.select! do |fd, ws|
fd != fdclient
end
begin
if fdservice.nil?
Baguette::Log.debug "client #{fdclient} aleady has its service removed"
else
@service.remove_fd (fdservice)
service = @fd_to_ipcclient[fdservice]
@fd_to_ipcclient = @fd_to_ipcclient.select do |k, v|
k != fdservice
end
@is_client = @is_client.select do |fd,v| fd != fdservice end
# perform the close at the end
# on crash, this service still is removed from the list of listened fd
service.close
end
rescue e
Baguette::Log.error "remove_fd: 3 #{e}"
end
rescue e
Baguette::Log.error "in InstanceStorage#remove_fd: #{e}"
end
end
class Context
class_property service_name = "websocketd"
end
Context.service_name = configuration.service_name
class Context
class_property service = IPC::SwitchingService.new service_name
class_property context = InstanceStorage.new service
class_property print_messages = false
class_property print_timer = false
end
Context.print_messages = configuration.print_messages
Context.print_timer = configuration.print_ipc_timer
if simulation
pp! configuration
exit 0
end
# by default, listen on any IP address
server = TCPServer.new configuration.host, configuration.port
Context.service << server.fd
def websocket_client_connection(client)
request = HTTP::Request.from_io client
if Context.print_messages
pp! request
end
if request.nil?
raise "#REQUEST IS NIL"
end
if request.is_a? HTTP::Status && request == HTTP::Status::BAD_REQUEST
raise "BAD REQUEST DAZE~"
end
if request.is_a? HTTP::Status
raise "Not bad request but still pretty bad: #{request.to_s}"
end
# FIXME: check they actually wanted to upgrade to websocket
key = request.headers["Sec-WebSocket-Key"]
response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# puts response_key
# HTTP inside bru
headers_header = "HTTP/1.1 101 Switching Protocols"
headers = HTTP::Headers {
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Accept" => response_key
}
headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join
# requested service, fd
req_service = request.path.lchop.sub "ws/", ""
if req_service.empty?
client.close
return
end
# The client may ask to transcript JSON-based messages into IPC messages.
# To that end, the client may send the name of the service it wants to reach with the prefix ".JSON".
if req_service.ends_with? ".JSON"
Context.context.is_json[client.fd] = true
req_service = req_service.gsub /.JSON$/, ""
else
Context.context.is_json[client.fd] = false
end
# Hack to preserve the daemon naming convention.
if req_service == "tracker"
req_service = "tracking"
end
websocket_connection_procedure req_service, client.fd
# If the requested service is trackingd, send the IP address of the client
if req_service == "tracking"
real_ip_address = request.headers["X-Real-IP"]? || client.remote_address.address
sfd = Context.context.switchtable[client.fd]
Baguette::Log.info "trackingd - sending the IP address #{real_ip_address} to fd #{sfd}"
message = Tracking::Request::IpAddress.new real_ip_address
if Context.print_messages
Baguette::Log.info "to trackingd: #{message.to_s}"
end
Context.service.send_now sfd, message
end
if Context.print_messages
Baguette::Log.info "to client: #{headers_header}\n#{headers.to_s}"
end
client.send "#{headers_header}\n#{headers.to_s}\r\n"
wsclient = WebSocket.new client
wsclient.on_pong do |m|
Baguette::Log.debug "pong #{m}"
end
Context.context.is_client[client.fd] = true
# listen to the client's file descriptor
Context.context.service << client.fd
# registering the client into storing structures to avoid being garbage collected
Context.context.fd_to_tcpsocket[client.fd] = client
Context.context.fd_to_websocket[client.fd] = wsclient
end
def closing_client (fdclient : Int)
Baguette::Log.warning "Closing client #{fdclient}"
Context.context.remove_fd fdclient
end
# first message from the client: requested service name
# 1. connection to the service
# 2. listening on the service fd
# 3. bounding both file descriptors (through switchtable hash)
# 4. indicating that the client is connected (in is_client)
def websocket_connection_procedure (requested_service : String, clientfd : Int32)
begin
# 1. establishing a connection to the service
newservice = IPC::Client.new requested_service
new_service_fd = newservice.fd.not_nil!
Context.context.fd_to_ipcclient[new_service_fd] = newservice
# 2. listening on the client fd and the service fd
Context.context.service << new_service_fd
# cannot perform automatic switching due to websockets headers
# future version of the libipc lib should include some workaround, probably
# service.switch.add fdclient, newservice.fd
# 3. bounding the client and the service fd
Context.context.switchtable[clientfd] = new_service_fd
Context.context.switchtable[new_service_fd] = clientfd
# 4. the client is then connected, send it a message
Context.context.is_client[clientfd] = true
Context.context.is_client[new_service_fd] = false
rescue e
Baguette::Log.error "Exception during connection to the service: #{e}"
Context.context.remove_fd clientfd
end
end
def print_message(message : Bytes, origin : Int32)
return unless Context.print_messages
client = if Context.context.is_client[origin]
"client"
else
"server"
end
Baguette::Log.info "received message from #{origin} (#{client}):"
Baguette::Log.info "#{message.hexstring}"
end
def print_message(message : String, origin : Int32)
return unless Context.print_messages
client = if Context.context.is_client[origin]
"client"
else
"server"
end
Baguette::Log.info "received message from #{origin} (#{client}):"
pp! JSON.parse message
rescue e
Baguette::Log.warning "cannot see the message from #{origin} (#{client}): #{e}"
end
# WARNING: currently this is only covering String messages.
# Byte messages aren't addressed, yet.
def websocket_switching_procedure (activefd : Int)
begin
# Baguette::Log.title "activefd is #{activefd}"
if Context.context.is_client[activefd]
# Baguette::Log.title "activefd #{activefd} is a client"
# The client is a WebSocket on top of a TCP connection
client = Context.context.fd_to_tcpsocket[activefd]
wsclient = Context.context.fd_to_websocket[activefd]
loop do
begin
message = wsclient.run_once
rescue e
Baguette::Log.error "Exception (receiving a message) #{e}"
Context.context.remove_fd activefd
break
end
# Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed?
Baguette::Log.info "client #{activefd} is closing"
Context.context.remove_fd activefd
break
end
if message.nil?
Baguette::Log.error "message received from #{activefd} is nil"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.info "Still something to read, but #{activefd} was removed"
next
end
break
end
case message
when WebSocket::Error
Baguette::Log.error "An error occured with client #{activefd}"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.debug "Still something to read, but #{activefd} was removed"
next
end
return
when WebSocket::Ping
Baguette::Log.debug "Received a ping message"
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::Pong
Baguette::Log.debug "Received a pong message"
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::Close
Baguette::Log.debug "Received a close message"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::NotFinal
Baguette::Log.debug "Received only part of a message"
# There is a per-user buffer in case we receive "not final" websocket messages.
# In case the buffer isn't set yet.
unless Context.context.fd_to_buffer[activefd]?
Context.context.fd_to_buffer[activefd] = String.new
end
# Add the received message to the buffer.
Context.context.fd_to_buffer[activefd] += message.message
if still_something_to_read
Baguette::Log.debug "and there is still something to read"
next
end
break
when Bytes
# We should test the format and maybe its content when receiving a binary message.
Baguette::Log.warning "Received a binary message"
end
# Is there a (non empty) buffer for the active fd?
buffer_size = if Context.context.fd_to_buffer[activefd]?
Context.context.fd_to_buffer[activefd].size
else
0
end
# TODO: make this buffer work with bytes messages.
# In case there was a previous message within a fragment.
if message.is_a?(String) && buffer_size > 0
message = Context.context.fd_to_buffer[activefd] + message
Context.context.fd_to_buffer[activefd] = String.new
end
if Context.context.is_json[activefd] && message.is_a?(String)
print_message message, activefd
message = IPC::Message.from_json(message).to_packet
elsif ! Context.context.is_json[activefd] && message.is_a?(Bytes)
# not json and bytes
print_message message, activefd
message = IPC::Message.from_cbor(message).to_packet
else
Baguette::Log.warning "message is neither JSON and string, or not JSON and bytes."
Baguette::Log.warning "message is #{message.class.to_s}"
end
# client => service
fdservice = Context.context.switchtable[activefd]
# XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
# Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
# when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = Context.context.fd_to_ipcclient[fdservice]
begin
serv.send message
rescue e
Baguette::Log.error "Sending a message failed: #{e}"
end
break unless still_something_to_read
end # loop over the remaining messages to read on the websocket
else
# service => client
fdclient = Context.context.switchtable[activefd]
wsclient = Context.context.fd_to_websocket[fdclient]
serv = Context.context.fd_to_ipcclient[activefd]
message = serv.read
buf = if Context.context.is_json[fdclient]
message.to_json
else # We assume that non-JSON clients are CBOR clients.
message.to_cbor
end
print_message buf, activefd
wsclient.send buf
end
rescue e
Baguette::Log.error "Exception during message transfer: #{e}"
if Context.context.is_client[activefd]
closing_client activefd
else
clientfd = Context.context.switchtable[activefd]
closing_client clientfd
end
end
end
def sending_ping_messages
Context.context.fd_to_websocket.each do |fd, ws|
begin
ws.ping "hello from #{fd}"
rescue e
Baguette::Log.error "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
begin
Context.context.remove_fd fd
rescue e
Baguette::Log.error "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
end
end
end
end
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
Context.service.base_timer = configuration.ipc_timer
Context.service.loop do |event|
begin
case event
when IPC::Event::Timer
Baguette::Log.info "IPC::Event::Timer" if Context.print_timer
sending_ping_messages
when IPC::Event::Connection
Baguette::Log.debug "IPC::Event::Connection: #{event.fd}"
when IPC::Event::Disconnection
Baguette::Log.debug "IPC::Event::Disconnection: #{event.fd}"
when IPC::Event::ExtraSocket
Baguette::Log.debug "IPC::Event::ExtraSocket: #{event.fd}"
# 1. accept new websocket clients
if server.fd == event.fd
client = server.accept
begin
websocket_client_connection client
Baguette::Log.info "new client: #{client.fd}"
rescue e
Baguette::Log.error "Exception: #{e}"
client.close
end
next
end
# 2. active fd != server fd
activefd = event.fd
if activefd <= 0
Baguette::Log.error "faulty activefd: #{activefd}"
end
websocket_switching_procedure activefd
when IPC::Event::Switch
Baguette::Log.debug "IPC::Event::Switch: from fd #{event.fd}"
raise "Not implemented."
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::MessageSent
Baguette::Log.error "IPC::Event::MessageSent: #{event.fd}"
when IPC::Event::MessageReceived
Baguette::Log.debug "IPC::Event::MessageReceived: #{event.fd}"
raise "Not implemented."
end
rescue e
Baguette::Log.error "IPC loop final catch: #{e}"
end
end