websocketd/src/websocketd.cr

417 lines
12 KiB
Crystal
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

require "option_parser"
require "ipc"
require "socket"
require "./colors"
require "json"
require "socket"
require "http/server"
require "base64"
require "digest"
require "./utils"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class CLI
# service instance parameters
# they can be changed via the cli
class_property service_name : String = "websocket"
class_property host : String = "0.0.0.0"
class_property port_to_listen : UInt16 = 1234
class_property timer_delay : Int32 = 30_000 # 30 seconds
class_property verbosity : Int32 = 1
end
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
CLI.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
CLI.port_to_listen = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
CLI.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
CLI.timer_delay = t.to_i32 * 1000 # stored in ms
end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
CLI.verbosity = opt.to_i
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
def sending_ping_messages
puts "sending ping messages"
end
# def sending_ping_messages(context : InstanceStorage)
# context.fd_to_websocket.each do |fd, ws|
# begin
# ws.ping "hello from #{fd}"
# rescue e
# puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
# begin
# context.remove_fd fd
# rescue e
# puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
# end
# end
# end
# end
# def closing_client (fdclient : Int, context : InstanceStorage)
# context.remove_fd fdclient
# end
def ws_http_upgrade(client)
request = HTTP::Request.from_io client
if request.nil?
raise "#REQUEST IS NIL"
end
if request.is_a? HTTP::Status && request == HTTP::Status::BAD_REQUEST
raise "BAD REQUEST DAZE~"
end
if request.is_a? HTTP::Status
raise "Not bad request but still pretty bad: #{request.to_s}"
end
# FIXME: check they actually wanted to upgrade to websocket
key = request.headers["Sec-WebSocket-Key"]
response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# puts response_key
# HTTP inside bru
headers_header = "HTTP/1.1 101 Switching Protocols"
headers = HTTP::Headers {
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Accept" => response_key
}
headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join
# requested service, fd
req_service = request.path.lchop
if req_service.empty?
puts "closing the client"
client.close
end
return req_service
end
def other_function(client, req_service, context : InstanceStorage)
# The client may ask to transcript JSON-based messages into IPC messages.
# To that end, the client may send the name of the service it wants to reach with the prefix ".JSON".
if req_service.ends_with? ".JSON"
context.is_json[client.fd] = true
req_service = req_service.gsub /.JSON$/, ""
else
context.is_json[client.fd] = false
end
websocket_connection_procedure req_service, client.fd, context
# TODO: if trackerd, send the IP address of the client
if req_service == "tracker"
puts "tracker - sending the IP address"
puts "connection from #{client.remote_address}"
sfd = context.switchtable[client.fd]
# message = IPC::Message.from_json(JSON).to_packet
# => JSON has to include these attributes: mtype, utype, payload
# message = IPC::Message.new mtype, utype, payload
remote_address = client.remote_address.address
message = IPC::Message.new 1, 1.to_u8, "{\"ipaddress\": \"#{remote_address}\"}"
serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET)
serv.send message.to_packet
end
# puts "#{headers_header}\n#{headers.to_s}\r\n"
client.send "#{headers_header}\n#{headers.to_s}\r\n"
wsclient = WebSocket.new client
wsclient.on_pong do |m|
puts "pong #{m}"
end
context.is_client[client.fd] = true
# listen to the client's file descriptor
context.service << client.fd
# puts "#{CBLUE}new client: #{client.fd}#{CRESET}"
# registering the client into storing structures to avoid being garbage collected
context.fd_to_tcpsocket[client.fd] = client
context.fd_to_websocket[client.fd] = wsclient
end
# def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
# begin
# if context.is_client[activefd]
# # The client is a WebSocket on top of a TCP connection
# client = context.fd_to_tcpsocket[activefd]
# wsclient = context.fd_to_websocket[activefd]
# loop do
# begin
# message = wsclient.run_once
# rescue e
# puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
# context.remove_fd activefd
# return
# end
#
# # Checking the internals of WebSocket, then the contained IO within, to know if there is still something to read in the socket.
# still_something_to_read = ! wsclient.ws.io.empty?
#
# if wsclient.closed?
# # puts "#{CBLUE}client is closed#{CRESET}"
# context.remove_fd client.fd
# return
# end
#
# if message.nil?
# puts "#{CRED}message is nil#{CRESET}"
# context.remove_fd client.fd
# return
# end
#
# case message
# when WebSocket::Error
# puts "#{CRED}An error occured#{CRESET}"
# context.remove_fd client.fd
# return
# when WebSocket::Ping
# # puts "#{CBLUE}Received a ping message#{CRESET}"
# next if still_something_to_read
# break
# when WebSocket::Pong
# # puts "#{CBLUE}Received a pong message#{CRESET}"
# next if still_something_to_read
# break
# when WebSocket::Close
# # puts "#{CBLUE}Received a close message#{CRESET}"
# context.remove_fd client.fd
# return
# when WebSocket::NotFinal
# # puts "#{CBLUE}Received only part of a message#{CRESET}"
# next if still_something_to_read
# break
# when Bytes
# # TODO: when receiving a binary message
# # we should test the format and maybe its content
# # puts "#{CBLUE}Received a binary message#{CRESET}"
# end
#
# if context.is_json[activefd] && message.is_a?(String)
# message = IPC::Message.from_json(message).to_packet
# end
#
# # client => service
# fdservice = context.switchtable[activefd]
#
# # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit
# # Also, I changed the "finalize" method of the TCPFileDescriptor class not to close the socket
# # when the object is GC.
# serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
# #serv = context.fd_to_ipcconnection[fdservice]
# serv.send message
#
# break unless still_something_to_read
#
# end # loop over the remaining messages to read on the websocket
# else
# # service => client
# fdclient = context.switchtable[activefd]
# wsclient = context.fd_to_websocket[fdclient]
#
# serv = context.fd_to_ipcconnection[activefd]
# message = serv.read
#
# if context.is_json[fdclient]
# buf = message.to_json
# else
# buf = message.to_packet
# end
#
# wsclient.send buf
# end
# rescue e
# puts "#{CRED}Exception during message transfer:#{CRESET} #{e}"
# if context.is_client[activefd]
# closing_client activefd, context
# else
# clientfd = context.switchtable[activefd]
# closing_client clientfd, context
# end
# end
# end
# # first message from the client: requested service name
# # 1. connection to the service
# # 2. listening on the service fd
# # 3. bounding both file descriptors (through switchtable hash)
# # 4. indicating that the client is connected (in is_client)
# def websocket_connection_procedure (requested_service : String, clientfd : Int32, context : InstanceStorage)
# begin
# # 2. establishing a connection to the service
# newservice = IPC::Connection.new requested_service
# context.fd_to_ipcconnection[newservice.fd] = newservice
#
# # 3. listening on the client fd and the service fd
# context.service << newservice.fd
#
# # cannot perform automatic switching due to websockets headers
# # future version of the libipc lib should include some workaround, probably
# # service.switch.add fdclient, newservice.fd
#
# # 4. bounding the client and the service fd
# context.switchtable[clientfd] = newservice.fd
# context.switchtable[newservice.fd] = clientfd
#
# # 5. the client is then connected, send it a message
# context.is_client[clientfd] = true
# context.is_client[newservice.fd] = false
# rescue e
# puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}"
# context.remove_fd clientfd
# end
# end
def handle_new_clients(service, server)
# TODO: this is a lot of C-like notation, should be changed.
# In the client accept function
# 1. accept TCP/WS connection
client = server.accept
# 2. upgrade HTTP connections and get the service name in the URI path.
req_service = ws_http_upgrade client
puts "requested service: #{req_service}"
unless client.nil?
client.not_nil!.close
end
# 3. connect to the service via ipc_connection_switched
LibIPC.ipc_connection_switched service.pointer, req_service
puts "ipc_connection_switched: OK"
# 4. add client fd in the context, type = switched via ipc_ctx_fd_type
service << client.fd
service.pp
LibIPC.ipc_ctx_fd_type service.pointer, client.fd, LibIPC::ConnectionType::Switched
puts "ipc_ctx_fd_type: OK"
# 5. add both fd in switched via ipc_switching_add
switchdb = service.context.switchdb
LibIPC.ipc_switching_add pointerof(switchdb),
client.fd,
service.context.pollfd[service.context.size - 1].fd
puts "ipc_switching_add: OK"
proc_cb_in = ->my_cb_in(Int32, Pointer(LibIPC::Message))
proc_cb_out = ->my_cb_out(Int32, Pointer(LibIPC::Message))
# 6. change client callbacks via ipc_switching_callbacks
# only the client callbacks are changed, since the associated server is a simple libipc service
LibIPC.ipc_switching_callbacks service.pointer, client.fd,
proc_cb_in,
proc_cb_out
puts "ipc_switching_callbacks: OK"
puts "#{CBLUE}new client:#{CRESET} #{client.fd}" unless CLI.verbosity == 0
rescue e
puts "Exception: #{CRED}#{e}#{CRESET}"
unless client.nil?
client.not_nil!.close
end
end
def my_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
puts "OUT fd is #{fd}"
return LibIPC::IPCCB::NoError
end
def my_cb_in(fd : Int32, pm : Pointer(LibIPC::Message))
puts "IN fd is #{fd}"
return LibIPC::IPCCB::NoError
end
def main
# by default, listen on any IP address
server = TCPServer.new(CLI.host, CLI.port_to_listen)
service = IPC::Server.new CLI.service_name
service << server.fd
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
service.base_timer = CLI.timer_delay
service.timer = CLI.timer_delay
# context = InstanceStorage.new service
service.loop do |event|
puts "current state of the context:"
service.pp
case event
when IPC::Event::Timer
if CLI.verbosity > 0
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
end
sending_ping_messages # context
when IPC::Event::Connection
if CLI.verbosity > 0
puts "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}"
end
when IPC::Event::Disconnection
if CLI.verbosity > 0
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}"
end
when IPC::Event::ExtraSocket
if CLI.verbosity > 0
puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}"
end
if server.fd != event.fd
raise "Error: the only extra socket should be the TCP/WS server"
end
handle_new_clients(service, server)
when IPC::Event::Switch
if CLI.verbosity > 0
puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}"
end
raise "Not implemented."
when IPC::Event::MessageSent
if CLI.verbosity > 0
puts "Message sent, for #{event.fd}"
end
when IPC::Event::MessageReceived
if CLI.verbosity > 0
puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}"
end
raise "Not implemented."
end
end
end
main