ipcd/src/websocketd.cr

524 lines
15 KiB
Crystal
Raw Permalink Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

require "option_parser"
require "ipc"
require "socket"
require "./colors"
require "json"
require "socket"
require "http/server"
require "base64"
require "digest"
require "./utils"
require "baguette-crystal-base"
require "colorize"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class Context
# service instance parameters
# they can be changed via the cli
class_property service_name = "websocket"
class_property host = "0.0.0.0"
class_property port_to_listen : UInt16 = 1234
class_property timer_delay : Int32 = 30_000.to_i32
class_property print_messages = false
class_property print_timer = false
end
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
Context.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
Context.port_to_listen = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
Context.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
Context.timer_delay = t.to_i32 * 1000
end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
Baguette::Context.verbosity = opt.to_i
end
parser.on "-T", "--print-timer", "Print timer." do
Context.print_timer = true
end
parser.on "-M", "--print-messages", "Print messages received and sent." do
Context.print_messages = true
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
# Link between fd and TCPSocket, WebSocket and IPC::Client instances.
class InstanceStorage
property service : IPC::SwitchingService
property switchtable : Hash(Int32, Int32)
property is_client : Hash(Int32,Bool)
property is_json : Hash(Int32, Bool)
property fd_to_tcpsocket : Hash(Int32, TCPSocket)
property fd_to_websocket : Hash(Int32, WebSocket)
property fd_to_ipcclient : Hash(Int32, IPC::Client)
def initialize (@service : IPC::SwitchingService)
# fdlist_client = [] of TCPSocket
@switchtable = Hash(Int32, Int32).new
@is_client = Hash(Int32,Bool).new
@is_json = Hash(Int32,Bool).new
@fd_to_tcpsocket = Hash(Int32, TCPSocket).new
@fd_to_websocket = Hash(Int32, WebSocket).new
@fd_to_ipcclient = Hash(Int32, IPC::Client).new
end
def remove_fd (fdclient : Int32)
Baguette::Log.info "closing the client:#{CRESET} #{fdclient}"
if fdclient == -1
raise "fdclient is -1, nothing to do with it"
end
begin
# 1. closing both the client and the service
tcpfdc = @fd_to_tcpsocket[fdclient]
# 2. closing the TCP connections
tcpfdc.close unless tcpfdc.closed?
rescue e
Baguette::Log.error "remove_fd: 1 #{e}"
end
# 3. removing the client and the service fds from the loop check
begin
@service.remove_fd (fdclient)
rescue e
Baguette::Log.error "remove_fd: 2 #{e}"
end
# 5. removing both the client and the service from the switchtable
fdservice = @switchtable[fdclient]?
@switchtable = @switchtable.select do |fdc, fds|
fdc != fdclient && fds != fdclient
end
# 6. removing the client and the service from is_client
@is_client = @is_client.select do |fd,v| fd != fdclient end
@is_json = @is_json.select do |fd,v| fd != fdclient end
@fd_to_websocket.select! do |fd, ws|
fd != fdclient
end
begin
if fdservice.nil?
Baguette::Log.debug "client #{fdclient} aleady has its service removed"
else
@service.remove_fd (fdservice)
service = @fd_to_ipcclient[fdservice]
@fd_to_ipcclient = @fd_to_ipcclient.select do |k, v|
k != fdservice
end
@is_client = @is_client.select do |fd,v| fd != fdservice end
# perform the close at the end
# on crash, this service still is removed from the list of listened fd
service.close
end
rescue e
Baguette::Log.error "remove_fd: 3 #{e}"
end
rescue e
Baguette::Log.error "in InstanceStorage#remove_fd: #{e}"
end
end
class Context
class_property service = IPC::SwitchingService.new service_name
class_property context = InstanceStorage.new service
end
# by default, listen on any IP address
server = TCPServer.new(Context.host, Context.port_to_listen)
Context.service << server.fd
def websocket_client_connection(client)
request = HTTP::Request.from_io client
if Context.print_messages
pp! request
end
if request.nil?
raise "#REQUEST IS NIL"
end
if request.is_a? HTTP::Status && request == HTTP::Status::BAD_REQUEST
raise "BAD REQUEST DAZE~"
end
if request.is_a? HTTP::Status
raise "Not bad request but still pretty bad: #{request.to_s}"
end
# FIXME: check they actually wanted to upgrade to websocket
key = request.headers["Sec-WebSocket-Key"]
response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# puts response_key
# HTTP inside bru
headers_header = "HTTP/1.1 101 Switching Protocols"
headers = HTTP::Headers {
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Accept" => response_key
}
headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join
# requested service, fd
req_service = request.path.lchop.sub "ws/", ""
if req_service.empty?
client.close
return
end
# The client may ask to transcript JSON-based messages into IPC messages.
# To that end, the client may send the name of the service it wants to reach with the prefix ".JSON".
if req_service.ends_with? ".JSON"
Context.context.is_json[client.fd] = true
req_service = req_service.gsub /.JSON$/, ""
else
Context.context.is_json[client.fd] = false
end
# Hack to preserve the daemon naming convention.
if req_service == "tracker"
req_service = "tracking"
end
websocket_connection_procedure req_service, client.fd
# If the requested service is trackingd, send the IP address of the client
if req_service == "tracking"
real_ip_address = request.headers["X-Real-IP"]? || client.remote_address.address
sfd = Context.context.switchtable[client.fd]
Baguette::Log.info "trackingd - sending the IP address #{real_ip_address} to fd #{sfd}"
# message = IPC::Message.from_json(JSON).to_packet
# => JSON has to include these attributes: mtype, utype, payload
# message = IPC::Message.new mtype, utype, payload
message = IPC::Message.new sfd, 1, 1.to_u8, "{\"ipaddress\": \"#{real_ip_address}\"}"
if Context.print_messages
Baguette::Log.info "to trackingd: #{message.to_s}"
end
serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET)
serv.send message.to_packet
end
if Context.print_messages
Baguette::Log.info "to client: #{headers_header}\n#{headers.to_s}"
end
client.send "#{headers_header}\n#{headers.to_s}\r\n"
wsclient = WebSocket.new client
wsclient.on_pong do |m|
Baguette::Log.debug "pong #{m}"
end
Context.context.is_client[client.fd] = true
# listen to the client's file descriptor
Context.context.service << client.fd
# puts "#{CBLUE}new client: #{client.fd}#{CRESET}"
# registering the client into storing structures to avoid being garbage collected
Context.context.fd_to_tcpsocket[client.fd] = client
Context.context.fd_to_websocket[client.fd] = wsclient
end
def closing_client (fdclient : Int)
Baguette::Log.warning "Closing client #{fdclient}"
Context.context.remove_fd fdclient
end
# first message from the client: requested service name
# 1. connection to the service
# 2. listening on the service fd
# 3. bounding both file descriptors (through switchtable hash)
# 4. indicating that the client is connected (in is_client)
def websocket_connection_procedure (requested_service : String, clientfd : Int32)
begin
# 1. establishing a connection to the service
newservice = IPC::Client.new requested_service
new_service_fd = newservice.fd.not_nil!
Context.context.fd_to_ipcclient[new_service_fd] = newservice
# 2. listening on the client fd and the service fd
Context.context.service << new_service_fd
# cannot perform automatic switching due to websockets headers
# future version of the libipc lib should include some workaround, probably
# service.switch.add fdclient, newservice.fd
# 3. bounding the client and the service fd
Context.context.switchtable[clientfd] = new_service_fd
Context.context.switchtable[new_service_fd] = clientfd
# 4. the client is then connected, send it a message
Context.context.is_client[clientfd] = true
Context.context.is_client[new_service_fd] = false
rescue e
Baguette::Log.error "Exception during connection to the service: #{e}"
Context.context.remove_fd clientfd
end
end
class FragmentBuffer
property buffer : String = String.new
end
def websocket_switching_procedure (activefd : Int)
begin
# Baguette::Log.title "activefd is #{activefd}"
if Context.context.is_client[activefd]
# Baguette::Log.title "activefd #{activefd} is a client"
# The client is a WebSocket on top of a TCP connection
client = Context.context.fd_to_tcpsocket[activefd]
wsclient = Context.context.fd_to_websocket[activefd]
fb = FragmentBuffer.new
loop do
begin
message = wsclient.run_once
rescue e
Baguette::Log.error "Exception (receiving a message) #{e}"
Context.context.remove_fd activefd
break
end
# Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
still_something_to_read = ! wsclient.ws.io.empty?
if wsclient.closed?
Baguette::Log.info "client #{activefd} is closing"
Context.context.remove_fd activefd
break
end
if message.nil?
Baguette::Log.error "message received from #{activefd} is nil"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.info "Still something to read, but #{activefd} was removed"
next
end
break
end
case message
when WebSocket::Error
Baguette::Log.error "An error occured with client #{activefd}"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.debug "Still something to read, but #{activefd} was removed"
next
end
return
when WebSocket::Ping
Baguette::Log.debug "Received a ping message"
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::Pong
Baguette::Log.debug "Received a pong message"
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::Close
Baguette::Log.debug "Received a close message"
Context.context.remove_fd activefd
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when WebSocket::NotFinal
Baguette::Log.warning "Received only part of a message"
# TODO: check if the message is OK when multiplexing
# pp! message
fb.buffer = fb.buffer + message.message
if still_something_to_read
Baguette::Log.debug "Still something to read"
next
end
break
when Bytes
# TODO: when receiving a binary message
# we should test the format and maybe its content
Baguette::Log.warning "Received a binary message (not supported)"
if still_something_to_read
Baguette::Log.info "Still something to read"
next
end
end
# In case there was a previous messagee within a fragment.
if fb.buffer.size > 0
Baguette::Log.warning "SHOULD reconstitute the message!!"
end
if message.is_a?(String) && fb.buffer.size > 0
Baguette::Log.warning "Reconstitute the message!!"
message = fb.buffer + message
fb.buffer = String.new
end
if Context.context.is_json[activefd] && message.is_a?(String)
if Context.print_messages
j = JSON.parse message
Baguette::Log.info "received from client #{activefd}"
pp! j["payload"]
end
message = IPC::Message.from_json(message).to_packet
end
# client => service
fdservice = Context.context.switchtable[activefd]
# XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
# Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
# when the object is GC.
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
#serv = Context.context.fd_to_ipcclient[fdservice]
serv.send message
break unless still_something_to_read
end # loop over the remaining messages to read on the websocket
else
# service => client
fdclient = Context.context.switchtable[activefd]
wsclient = Context.context.fd_to_websocket[fdclient]
serv = Context.context.fd_to_ipcclient[activefd]
message = serv.read
if Context.context.is_json[fdclient]
buf = message.to_json
if Context.print_messages
j = JSON.parse buf
Baguette::Log.info "received from service #{activefd}"
pp! j["payload"]
end
else
buf = message.to_packet
end
wsclient.send buf
end
rescue e
Baguette::Log.error "Exception during message transfer: #{e}"
if Context.context.is_client[activefd]
closing_client activefd
else
clientfd = Context.context.switchtable[activefd]
closing_client clientfd
end
end
end
def sending_ping_messages
Context.context.fd_to_websocket.each do |fd, ws|
begin
ws.ping "hello from #{fd}"
rescue e
Baguette::Log.error "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
begin
Context.context.remove_fd fd
rescue e
Baguette::Log.error "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
end
end
end
end
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
Context.service.base_timer = Context.timer_delay
Context.service.loop do |event|
begin
case event
when IPC::Event::Timer
Baguette::Log.info "IPC::Event::Timer" if Context.print_timer
sending_ping_messages
when IPC::Event::Connection
Baguette::Log.debug "IPC::Event::Connection: #{event.fd}"
when IPC::Event::Disconnection
Baguette::Log.debug "IPC::Event::Disconnection: #{event.fd}"
when IPC::Event::ExtraSocket
Baguette::Log.debug "IPC::Event::ExtraSocket: #{event.fd}"
# 1. accept new websocket clients
if server.fd == event.fd
client = server.accept
begin
websocket_client_connection client
Baguette::Log.info "new client: #{client.fd}"
rescue e
Baguette::Log.error "Exception: #{e}"
client.close
end
next
end
# 2. active fd != server fd
activefd = event.fd
if activefd <= 0
Baguette::Log.error "faulty activefd: #{activefd}"
end
websocket_switching_procedure activefd
when IPC::Event::Switch
Baguette::Log.debug "IPC::Event::Switch: from fd #{event.fd}"
raise "Not implemented."
# IPC::Event::Message has to be the last entry
# because ExtraSocket and Switch inherit from Message class
when IPC::Event::MessageSent
Baguette::Log.error "IPC::Event::MessageSent: #{event.fd}"
when IPC::Event::MessageReceived
Baguette::Log.debug "IPC::Event::MessageReceived: #{event.fd}"
raise "Not implemented."
end
rescue e
Baguette::Log.error "IPC loop final catch: #{e}"
end
end