websocketd/src/websocketd.cr

327 lines
8.8 KiB
Crystal
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

require "option_parser"
require "ipc"
require "socket"
require "./colors"
require "json"
require "socket"
require "http/server"
require "base64"
require "digest"
require "./utils"
require "./instance_storage.cr"
# All modifications to standard libraries go there.
require "./lib_modifications.cr"
class CLI
# service instance parameters
# they can be changed via the cli
class_property service_name : String = "websocket"
class_property host : String = "0.0.0.0"
class_property port_to_listen : UInt16 = 1234
class_property timer_delay : Int32 = 30_000 # 30 seconds
class_property verbosity : Int32 = 1
end
OptionParser.parse do |parser|
parser.on "-l host", "--l host", "IP address to listen on." do |h|
CLI.host = h
end
parser.on "-p port", "--port port", "Port to listen on." do |port|
CLI.port_to_listen = port.to_u16
end
parser.on "-s service-name", "--service-name service-name", "Service name." do |name|
CLI.service_name = name
end
parser.on "-t timer-delay", "--timer-delay timer-delay", "Timer delay (in seconds)" do |t|
CLI.timer_delay = t.to_i32 * 1000 # stored in ms
end
parser.on "-v verbosity-level", "--verbosity level", "Verbosity." do |opt|
CLI.verbosity = opt.to_i
end
parser.on "-h", "--help", "Show this help" do
puts parser
exit 0
end
end
def sending_ping_messages
Context.context.fd_to_websocket.each do |fd, ws|
begin
ws.ping "hello from #{fd}"
rescue e
puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
begin
Context.context.remove_fd fd
rescue e
puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
end
end
end
end
def ws_http_upgrade(client)
request = HTTP::Request.from_io client
if request.nil?
raise "#REQUEST IS NIL"
end
if request.is_a? HTTP::Status && request == HTTP::Status::BAD_REQUEST
raise "BAD REQUEST DAZE~"
end
if request.is_a? HTTP::Status
raise "Not bad request but still pretty bad: #{request.to_s}"
end
# FIXME: check they actually wanted to upgrade to websocket
key = request.headers["Sec-WebSocket-Key"]
response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# puts response_key
# HTTP inside bru
headers_header = "HTTP/1.1 101 Switching Protocols"
headers = HTTP::Headers {
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Accept" => response_key
}
headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join
# requested service, fd
req_service = request.path.lchop
if req_service.ends_with? ".JSON"
Context.context.is_json[client.fd] = true
req_service = req_service.gsub /.JSON$/, ""
else
Context.context.is_json[client.fd] = false
end
# TODO: if trackerd, send the IP address of the client
# if req_service == "tracker"
# puts "tracker - sending the IP address"
# puts "connection from #{client.remote_address}"
# sfd = Context.context.switchtable[client.fd]
# # message = IPC::Message.from_json(JSON).to_packet
# # => JSON has to include these attributes: mtype, utype, payload
# # message = IPC::Message.new mtype, utype, payload
# remote_address = client.remote_address.address
# message = IPC::Message.new 1, 1.to_u8, "{\"ipaddress\": \"#{remote_address}\"}"
# serv = WrappedTCPFileDescriptor.new(fd: sfd, family: Socket::Family::INET)
# serv.send message.to_packet
# end
# puts "#{headers_header}\n#{headers.to_s}\r\n"
client.send "#{headers_header}\n#{headers.to_s}\r\n"
wsclient = WebSocket.new client
wsclient.on_pong do |m|
puts "pong #{m}"
end
# registering the client into storing structures to avoid being garbage collected
Context.context.fd_to_socket[client.fd] = client
Context.context.fd_to_websocket[client.fd] = wsclient
req_service
rescue e
puts "#{CRED}Exception in ws_http_upgrade#{CRESET}: #{CBLUE}#{e}#{CRESET}"
raise "DROP IT"
end
def handle_new_clients(service, server)
# TODO: this is a lot of C-like notation, should be changed.
# In the client accept function
# 1. accept TCP/WS connection
client = server.accept
# 2. upgrade HTTP connections and get the service name in the URI path.
req_service = ws_http_upgrade client
# 3. connect to the service via ipc_connection_switched
serverfd = 0
# LibIPC.ipc_connection_switched service.pointer, req_service, client.fd, Pointer(Libc::Int).null
LibIPC.ipc_connection_switched service.pointer, req_service, client.fd, pointerof (serverfd)
proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message))
proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message))
# 4. change client callbacks via ipc_switching_callbacks
# only the client callbacks are changed, since the associated server is a simple libipc service
LibIPC.ipc_switching_callbacks service.pointer, client.fd, proc_cb_in, proc_cb_out
info "#{CBLUE}new client:#{CRESET} #{client.fd}"
rescue e
puts "Exception in handle_new_client: #{CRED}#{e}#{CRESET}"
unless client.nil?
client.close
end
end
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
info "OUT fd is #{fd}"
wsclient = Context.context.fd_to_websocket[fd]
message = IPC::Message.new pm
# pp! message
if Context.context.is_json[fd]
buf = message.to_json
else
buf = message.to_packet
end
wsclient.send buf
return LibIPC::IPCCB::NoError
rescue e
puts "#{CRED}Exception during message transfer:#{CRESET} #{e}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
end
def ws_cb_in(fd : Int32, pm : LibIPC::Message*)
info "IN fd is #{fd}"
wsclient = Context.context.fd_to_websocket[fd]
message = nil
begin
message = wsclient.run_once
rescue e
puts "#{CRED}run_once FAILED#{CRESET}: #{e}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
end
if wsclient.closed?
info "#{CBLUE}client is closed#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Closing
end
if message.nil?
puts "#{CRED}message is nil#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Closing
end
case message
when String
if Context.context.is_json[fd]
ipc_message = IPC::Message.from_json(message)
ipc_message.copy_to_message_pointer pm
return LibIPC::IPCCB::NoError
end
else
# every other option should be dropped
case message
when WebSocket::Error
puts "#{CRED}An error occured#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
when WebSocket::Ping
puts "#{CBLUE}Received a ping message#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Ignore
when WebSocket::Pong
puts "#{CBLUE}Received a pong message#{CRESET}"
return LibIPC::IPCCB::Ignore
when WebSocket::Close
puts "#{CBLUE}Received a close message#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Closing
when WebSocket::NotFinal
puts "#{CBLUE}Received only part of a message: NOT IMPLEMENTED#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
when Bytes
# TODO: when receiving a binary message
# we should test the format and maybe its content
puts "#{CBLUE}Received a binary message: NOT IMPLEMENTED, YET#{CRESET}"
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
else
puts "#{CRED}Received a websocket message with unknown type#{CRESET}"
end
end
return LibIPC::IPCCB::Error
rescue e
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
# tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET)
# tcp.close
Context.context.remove_fd fd
return LibIPC::IPCCB::Error
end
def info(str : String)
if CLI.verbosity > 0
puts str
end
end
class Context
class_property context = InstanceStorage.new
end
def main
# by default, listen on any IP address
server = TCPServer.new(CLI.host, CLI.port_to_listen)
service = IPC::Server.new CLI.service_name
service << server.fd
# Every few seconds, the service should trigger the timer
# Allowing the sending of Ping messages to clients
service.base_timer = CLI.timer_delay
service.timer = CLI.timer_delay
service.loop do |event|
# info "current state of the context:"
# service.pp
case event
when IPC::Event::Timer
info "#{CORANGE}IPC::Event::Timer#{CRESET}"
sending_ping_messages
when IPC::Event::Connection
info "#{CBLUE}IPC::Event::Connection#{CRESET}: #{event.fd}"
when IPC::Event::Disconnection
info "#{CBLUE}IPC::Event::Disconnection#{CRESET}: #{event.fd}"
Context.context.remove_fd event.fd
when IPC::Event::ExtraSocket
info "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}: #{event.fd}"
if server.fd != event.fd
raise "Error: the only extra socket should be the TCP/WS server"
end
handle_new_clients(service, server)
when IPC::Event::Switch
info "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.fd}"
# raise "Not implemented."
when IPC::Event::MessageSent
info "Message sent, for #{event.fd}"
when IPC::Event::MessageReceived
info "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.fd}"
raise "Not implemented."
end
end
end
main