ipcd/src/websocketd.cr

422 lines
12 KiB
Crystal
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

require "option_parser"
require "ipc"
require "socket"
require "./colors"
require "json"
require "socket"
require "http/server"
require "base64"
require "digest"
require "./utils"
require "baguette-crystal-base"
require "colorize"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class Context
# service instance parameters
# they can be changed via the cli
class_property service_name = "websocket"
class_property host = "0.0.0.0"
class_property port_to_listen : UInt16 = 1234
class_property timer_delay : Int32 = 30_000.to_i32
end
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
Context.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
Context.port_to_listen = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
Context.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
Context.timer_delay = t.to_i32 * 1000
end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
Baguette::Context.verbosity = opt.to_i
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
# Link between fd and TCPSocket, WebSocket and IPC::Client instances.
class InstanceStorage
property service : IPC::SwitchingService
property switchtable : Hash(Int32, Int32)
property is_client : Hash(Int32,Bool)
property is_json : Hash(Int32, Bool)
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
property fd_to_websocket : Hash(Int32, WebSocket)
property fd_to_ipcclient : Hash(Int32, IPC::Client)
def initialize (@service : IPC::SwitchingService)
# fdlist_client = [] of TCPSocket
@switchtable = Hash(Int32, Int32).new
@is_client = Hash(Int32,Bool).new
@is_json = Hash(Int32,Bool).new
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
@fd_to_websocket = Hash(Int32, WebSocket).new
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
end
def remove_fd (fdclient : Int32)
Baguette::Log.info "closing the client:#{CRESET} #{fdclient}"
# 1. closing both the client and the service
fdservice = @switchtable[fdclient]?
tcpfdc = @fd_to_tcpsocket[fdclient]
# 2. closing the TCP connections
tcpfdc.close unless tcpfdc.closed?
# 3. removing the client and the service fds from the loop check
@service.remove_fd (fdclient)
# 5. removing both the client and the service from the switchtable
@switchtable = @switchtable.select do |fdc, fds|
fdc != fdclient && fds != fdclient
end
# 6. removing the client and the service from is_client
@is_client = @is_client.select do |fd,v| fd != fdclient end
@is_json = @is_json.select do |fd,v| fd != fdclient end
@fd_to_websocket.select! do |fd, ws|
fd != fdclient
end
unless fdservice.nil?
service = @fd_to_ipcclient[fdservice]
service.close
@service.remove_fd (fdservice)
@fd_to_ipcclient = @fd_to_ipcclient.select do |k, v|
k != fdservice
end
@is_client = @is_client.select do |fd,v| fd != fdservice end
end
end
end
class Context
class_property service = IPC::SwitchingService.new service_name
class_property context = InstanceStorage.new service
end
# by default, listen on any IP address
server = TCPServer.new(Context.host, Context.port_to_listen)
Context.service << server.fd
def websocket_client_connection(client)
request = HTTP::Request.from_io client
# pp! request
if request.nil?
raise "#REQUEST IS NIL"
end
if request.is_a? HTTP::Status && request == HTTP::Status::BAD_REQUEST
raise "BAD REQUEST DAZE~"
end
if request.is_a? HTTP::Status
raise "Not bad request but still pretty bad: #{request.to_s}"
end
# FIXME: check they actually wanted to upgrade to websocket
key = request.headers["Sec-WebSocket-Key"]
response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# puts response_key
# HTTP inside bru
headers_header = "HTTP/1.1 101 Switching Protocols"
headers = HTTP::Headers {
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Accept" => response_key
}
headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join
# requested service, fd
req_service = request.path.lchop.sub "ws/", ""
if req_service.empty?
client.close
return
end
# The client may ask to transcript JSON-based messages into IPC messages.
# To that end, the client may send the name of the service it wants to reach with the prefix ".JSON".
if req_service.ends_with? ".JSON"
Context.context.is_json[client.fd] = true
req_service = req_service.gsub /.JSON$/, ""
else
Context.context.is_json[client.fd] = false
end
# Hack to preserve the daemon naming convention.
if req_service == "tracker"
req_service = "tracking"
end
websocket_connection_procedure req_service, client.fd
# If the requested service is trackingd, send the IP address of the client
if req_service == "tracking"
real_ip_address = request.headers["X-Real-IP"] || client.remote_address.address
sfd = Context.context.switchtable[client.fd]
Baguette::Log.info "trackingd - sending the IP address #{real_ip_address} to fd #{sfd}"
# message = IPC::Message.from_json(JSON).to_packet
# => JSON has to include these attributes: mtype, utype, payload
# message = IPC::Message.new mtype, utype, payload
remote_address = client.remote_address.address
message = IPC::Message.new sfd, 1, 1.to_u8, "{\"ipaddress\": \"#{remote_address}\"}"
serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET)
serv.send message.to_packet
end
# puts "#{headers_header}\n#{headers.to_s}\r\n"
client.send "#{headers_header}\n#{headers.to_s}\r\n"
wsclient = WebSocket.new client
wsclient.on_pong do |m|
Baguette::Log.debug "pong #{m}"
end
Context.context.is_client[client.fd] = true
# listen to the client's file descriptor
Context.context.service << client.fd
# puts "#{CBLUE}new client: #{client.fd}#{CRESET}"
# registering the client into storing structures to avoid being garbage collected
Context.context.fd_to_tcpsocket[client.fd] = client
Context.context.fd_to_websocket[client.fd] = wsclient
end
def closing_client (fdclient : Int)
Context.context.remove_fd fdclient
end
# first message from the client: requested service name
# 1. connection to the service
# 2. listening on the service fd
# 3. bounding both file descriptors (through switchtable hash)
# 4. indicating that the client is connected (in is_client)
def websocket_connection_procedure (requested_service : String, clientfd : Int32)
begin
# 1. establishing a connection to the service
newservice = IPC::Client.new requested_service
new_service_fd = newservice.fd.not_nil!
Context.context.fd_to_ipcclient[new_service_fd] = newservice
# 2. listening on the client fd and the service fd
Context.context.service << new_service_fd
# cannot perform automatic switching due to websockets headers
# future version of the libipc lib should include some workaround, probably
# service.switch.add fdclient, newservice.fd
# 3. bounding the client and the service fd
Context.context.switchtable[clientfd] = new_service_fd
Context.context.switchtable[new_service_fd] = clientfd
# 4. the client is then connected, send it a message
Context.context.is_client[clientfd] = true
Context.context.is_client[new_service_fd] = false
rescue e
puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}"
Context.context.remove_fd clientfd
end
end
def websocket_switching_procedure (activefd : Int)
begin
if Context.context.is_client[activefd]
# The client is a WebSocket on top of a TCP connection
client = Context.context.fd_to_tcpsocket[activefd]
wsclient = Context.context.fd_to_websocket[activefd]
loop do
begin
message = wsclient.run_once
rescue e
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
Context.context.remove_fd activefd
return
end
# Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed?
# puts "#{CBLUE}client is closed#{CRESET}"
Context.context.remove_fd client.fd
return
end
if message.nil?
puts "#{CRED}message is nil#{CRESET}"
Context.context.remove_fd client.fd
return
end
case message
when WebSocket::Error
puts "#{CRED}An error occured#{CRESET}"
Context.context.remove_fd client.fd
return
when WebSocket::Ping
# puts "#{CBLUE}Received a ping message#{CRESET}"
next if still_something_to_read
break
when WebSocket::Pong
# puts "#{CBLUE}Received a pong message#{CRESET}"
next if still_something_to_read
break
when WebSocket::Close
# puts "#{CBLUE}Received a close message#{CRESET}"
Context.context.remove_fd client.fd
return
when WebSocket::NotFinal
# puts "#{CBLUE}Received only part of a message#{CRESET}"
next if still_something_to_read
break
when Bytes
# TODO: when receiving a binary message
# we should test the format and maybe its content
# puts "#{CBLUE}Received a binary message#{CRESET}"
end
if Context.context.is_json[activefd] && message.is_a?(String)
message = IPC::Message.from_json(message).to_packet
end
# client => service
fdservice = Context.context.switchtable[activefd]
# XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
# Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
# when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = Context.context.fd_to_ipcclient[fdservice]
serv.send message
break unless still_something_to_read
end # loop over the remaining messages to read on the websocket
else
# service => client
fdclient = Context.context.switchtable[activefd]
wsclient = Context.context.fd_to_websocket[fdclient]
serv = Context.context.fd_to_ipcclient[activefd]
message = serv.read
if Context.context.is_json[fdclient]
buf = message.to_json
else
buf = message.to_packet
end
wsclient.send buf
end
rescue e
puts "#{CRED}Exception during message transfer:#{CRESET} #{e}"
if Context.context.is_client[activefd]
closing_client activefd
else
clientfd = Context.context.switchtable[activefd]
closing_client clientfd
end
end
end
def sending_ping_messages
Context.context.fd_to_websocket.each do |fd, ws|
begin
ws.ping "hello from #{fd}"
rescue e
puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
begin
Context.context.remove_fd fd
rescue e
puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
end
end
end
end
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
Context.service.base_timer = Context.timer_delay
Context.service.loop do |event|
begin
case event
when IPC::Event::Timer
Baguette::Log.info "IPC::Event::Timer"
sending_ping_messages
when IPC::Event::Connection
Baguette::Log.debug "IPC::Event::Connection: #{event.fd}"
when IPC::Event::Disconnection
Baguette::Log.debug "IPC::Event::Disconnection: #{event.fd}"
when IPC::Event::ExtraSocket
Baguette::Log.debug "IPC::Event::ExtraSocket: #{event.fd}"
# 1. accept new websocket clients
if server.fd == event.fd
client = server.accept
begin
websocket_client_connection client
Baguette::Log.info "new client: #{client.fd}"
rescue e
Baguette::Log.error "Exception: #{e}"
client.close
end
next
end
# 2. active fd != server fd
activefd = event.fd
if activefd <= 0
Baguette::Log.error "faulty activefd: #{activefd}"
end
websocket_switching_procedure activefd
when IPC::Event::Switch
Baguette::Log.debug "IPC::Event::Switch: from fd #{event.fd}"
raise "Not implemented."
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::MessageSent
Baguette::Log.error "IPC::Event::MessageSent: #{event.fd}"
when IPC::Event::MessageReceived
Baguette::Log.debug "IPC::Event::MessageReceived: #{event.fd}"
raise "Not implemented."
end
rescue e
Baguette::Log.error "IPC loop final catch: #{e}"
end
end