From 15db08db0bf068f235a82298cd145f0c1bf48d1d Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Thu, 1 Aug 2019 00:44:38 +0200 Subject: [PATCH] websocketd: v0.1 (no authentication) --- shard.yml | 14 +- src/pongc.cr | 8 +- src/pongd.cr | 4 +- src/utils.cr | 31 +++++ src/websocketc.cr | 80 +++++++++++ src/websocketd.cr | 329 ++++++++++++++++++++++++++++++++++++++++++++++ src/ws.cr | 38 ++++++ 7 files changed, 491 insertions(+), 13 deletions(-) create mode 100644 src/utils.cr create mode 100644 src/websocketc.cr create mode 100644 src/websocketd.cr create mode 100644 src/ws.cr diff --git a/shard.yml b/shard.yml index 6f5ffa0..20485a2 100644 --- a/shard.yml +++ b/shard.yml @@ -10,25 +10,25 @@ description: | dependencies: ipc: git: https://git.karchnu.fr/JunkOS/ipc.cr - branch: v0.2 + branch: master targets: pongc: main: src/pongc.cr pongd: main: src/pongd.cr + networkd: main: src/main.cr + tcpd: main: src/tcpd.cr tcp: main: src/tcp.cr - # webipc: - # main: src/webipcd.cr - # websocketclient: - # main: src/websocket-client.cr - # websocketserver: - # main: src/websocket-server.cr + websocketd: + main: src/websocketd.cr + websocketc: + main: src/websocketc.cr license: ISC diff --git a/src/pongc.cr b/src/pongc.cr index 06813f3..b4c5626 100644 --- a/src/pongc.cr +++ b/src/pongc.cr @@ -2,20 +2,20 @@ require "ipc" client = IPC::Client.new("pong") -# client.send(LibIPC::MessageType::Data, 42, "salut ça va ?") +# client.send(42, "salut ça va ?") client.send(42.to_u8, "salut ça va ?") -# # client.send(LibIPC::MessageType::Data, 42, "salut ça va ?") +# # client.send(42, "salut ça va ?") # m = client.read # # puts "message received: #{m.to_s}" # # sleep 1 # -# # client.send(LibIPC::MessageType::Data, 42, "salut ça va ?") +# # client.send(42, "salut ça va ?") # client.send(42.to_u8, "autre truc") -# # client.send(LibIPC::MessageType::Data, 42, "salut ça va ?") +# # client.send(42, "salut ça va ?") # m = client.read # # puts "message received: #{m.to_s}" diff --git a/src/pongd.cr b/src/pongd.cr index 8b7f078..23727f2 100644 --- a/src/pongd.cr +++ b/src/pongd.cr @@ -8,8 +8,8 @@ IPC::Service.new ("pong") do |event| when IPC::Event::Disconnection puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}, client: #{event.connection.fd}" when IPC::Event::Message - puts "#{CGREEN}IPC::Event::Message#{CRESET}" - puts event.message.to_s + puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}" + # puts event.message.to_s event.connection.send event.message end end diff --git a/src/utils.cr b/src/utils.cr new file mode 100644 index 0000000..9937485 --- /dev/null +++ b/src/utils.cr @@ -0,0 +1,31 @@ +require "io/hexdump" + +def to_message (user_type : Int, message : String) + payload = Bytes.new (6 + message.size) + + # true start + payload[0] = 1.to_u8 + payload[1] = 0.to_u8 + payload[2] = 0.to_u8 + payload[3] = 0.to_u8 + payload[4] = message.size.to_u8 + + # second part: user message + payload[5] = user_type.to_u8 + (payload + 6).copy_from message.to_slice + + return payload +end + +def print_hexa(message : String, aroundmsg : String) + puts "#{aroundmsg} [[" + m = IO::Memory.new(message) + io = IO::Hexdump.new(m, output: STDERR, read: true) + buffer = Bytes.new 4000 + io.read (buffer) # reading = should print + puts "]] #{aroundmsg}" +end + +# str = to_message(42, "coucou") +# print_hexa str, "message sent" +# ws.send str diff --git a/src/websocketc.cr b/src/websocketc.cr new file mode 100644 index 0000000..583a840 --- /dev/null +++ b/src/websocketc.cr @@ -0,0 +1,80 @@ +require "http/web_socket" +require "option_parser" + +require "./colors" +require "./utils" + +require "./ws" + +uri = "ws://localhost:1234/pong" + +OptionParser.parse! do |parser| + parser.on "-u uri", "--uri uri", "URI" do |opturi| + uri = opturi + end + + parser.on "-h", "--help", "Show this help" do + puts parser + exit 0 + end +end + +ws = WebSocket.new(URI.parse(uri)) + +# HTTP::WebSocket.new(URI.parse("wss://websocket.example.com/chat")) # Creates a new WebSocket with TLS to `websocket.example.com` +# HTTP::WebSocket.new(URI.parse("http://websocket.example.com:8080/chat")) # Creates a new WebSocket to `websocket.example.com` on port `8080` +# HTTP::WebSocket.new(URI.parse("ws://websocket.example.com/chat"), # Creates a new WebSocket to `websocket.example.com` with an Authorization header +# HTTP::Headers{"Authorization" => "Bearer authtoken"}) + +m = ws.read +if m.nil? + puts "oh no" + exit +end +# puts "message: #{String.new(m)}" + +# puts "sending pong" +ws.send "pong".to_slice +m = ws.read +if m.nil? + puts "oh no" + exit +end +# puts "message: #{String.new(m)}" + +# puts "sending coucou" +tosend = to_message 2, "coucou" +# print_hexa(String.new(tosend), "#{CBLUE}Sending message hexa#{CRESET}") +ws.send tosend + +m = ws.read +if m.nil? + puts "oh no" + exit +end +puts "message: #{String.new(m)}" + +# ws.on_message do |message| +# if message == "are we websocket yet?" +# puts "sending pong" +# ws.send "pong\n" +# elsif message.chomp == "OK" +# puts "sending coucou" +# m = to_message 2, "coucou" +# print_hexa(String.new(m), "#{CBLUE}Sending message hexa#{CRESET}") +# ws.send m +# else +# print_hexa message, "#{CORANGE}Receving a message#{CRESET}" +# end +# end + +ws.on_close do |socket| + puts "socket is closing" + # socket.close +end + +# ws.run + +ws.close + +ws.read diff --git a/src/websocketd.cr b/src/websocketd.cr new file mode 100644 index 0000000..74ff4f4 --- /dev/null +++ b/src/websocketd.cr @@ -0,0 +1,329 @@ +require "option_parser" +require "ipc" +require "socket" +require "./colors" +require "./ws" + +require "socket" +require "http" +require "http/server" +require "base64" +require "digest" +require "./utils" + +class IPC::Connection + def initialize(fd : Int32) + external_connection = LibIPC::Connection.new + external_connection.fd = fd + initialize(external_connection) + end +end + +class IPC::Message + def to_buffer + to_message @type, String.new(@payload) + end +end + + +class WrappedTCPFileDescriptor < TCPSocket + # do not close the connection when garbage collected!! + def finalize + # puts "WrappedTCPFileDescriptor garbage collection!!" + # super + end +end + +service_name = "websocket" +port_to_listen = 1234 + +OptionParser.parse! do |parser| + parser.on "-p port", "--port port", "Port to listen on." do |port| + port_to_listen = port.to_u16 + end + + parser.on "-s service-name", "--service-name service-name", "Service name." do |name| + service_name = name + end + + parser.on "-h", "--help", "Show this help" do + puts parser + exit 0 + end +end + + +class InstanceStorage + property service : IPC::SwitchingService + property fdlist : Hash(Int32,String) + property socklist : Hash(Int32, TCPSocket) + property wssocklist : Hash(Int32, WebSocket) + property switchtable : Hash(Int32, Int32) + property connectionlist : Hash(Int32, IPC::Connection) + + def initialize (@service : IPC::SwitchingService) + # fdlist_client = [] of TCPSocket + @fdlist = Hash(Int32,String).new + @socklist = Hash(Int32, TCPSocket).new + @wssocklist = Hash(Int32, WebSocket).new + @switchtable = Hash(Int32, Int32).new + @connectionlist = Hash(Int32, IPC::Connection).new + end + + def remove_fd (fdclient : Int32) + # 1. closing both the client and the service + fdservice = @switchtable[fdclient] + # puts "1. fds to close: #{fdclient} and #{fdservice}" + tcpfdc = @socklist[fdclient] + service = @connectionlist[fdservice] + + # 2. closing the TCP connections + # puts "2.1 closing fd #{fdclient}" + tcpfdc.close unless tcpfdc.closed? + # puts "2.2 closing fd #{fdservice}" + service.close + + # 3. removing the client and the service fds from the loop check + # puts "3.1. removing #{fdclient} from @service" + @service.remove_fd (fdclient) + # puts "3.2. removing #{fdservice} from @service" + @service.remove_fd (fdservice) + + # 4. removing the service IPC::Connection from connectionlist + # puts "4. removing fdservice from @connectionlist" + @connectionlist.select do |k, v| + k != fdservice + end + + # 5. removing both the client and the service from the switchtable + # puts "5. removing both fd from @switchable" + @switchtable.select do |fdc, fds| + fdc != fdclient && fds != fdclient + end + + # 6. removing the client and the service from fdlist + # puts "6. removing both fd from fdlist" + @fdlist.select do |fd,v| fd != fdclient end + @fdlist.select do |fd,v| fd != fdservice end + end +end + +server = TCPServer.new("localhost", port_to_listen) +service = IPC::SwitchingService.new service_name +service << server.fd +context = InstanceStorage.new service + + + +def websocket_client_connection(client, context : InstanceStorage) + request = HTTP::Request.from_io client + # pp! request + + if request.nil? + puts "#{CRED}eEQUEST IS NIL#{CRESET}" + return + end + + if request.is_a? HTTP::Request::BadRequest + puts "#{CRED}BAD REQUEST DAZE~#{CRESET}" + return + end + + # FIXME: check they actually wanted to upgrade to websocket + + key = request.headers["Sec-WebSocket-Key"] + + response_key = Digest::SHA1.base64digest key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + # puts response_key + + # HTTP inside bru + headers_header = "HTTP/1.1 101 Switching Protocols" + headers = HTTP::Headers { + "Upgrade" => "websocket", + "Connection" => "Upgrade", + "Sec-WebSocket-Accept" => response_key + } + + headers = headers.map { |key, value| "#{key}: #{value[0]}\r\n" }.join + + # puts "#{headers_header}\n#{headers.to_s}\r\n" + client.send "#{headers_header}\n#{headers.to_s}\r\n" + + wsclient = WebSocket.new client + wsclient.send "are we websocket yet?" + + # the client is still not connected to a service + context.fdlist[client.fd] = "not connected" + + # listen to the client's file descriptor + context.service << client.fd + # puts "#{CBLUE}new client: #{client.fd}#{CRESET}" + + # registering the client into storing structures to avoid being garbage collected + context.socklist[client.fd] = client + context.wssocklist[client.fd] = wsclient +end + + +def closing_client (fdclient : Int, context : InstanceStorage) + # puts "closing the client #{fdclient}" + context.remove_fd fdclient +end + +# first message from the client: requested service name +# 1. extracting the requested service from the message +# 2. connection to the service +# 3. listening on the service fd +# 4. bounding both file descriptors (through switchtable hash) +# 5. indicating that the client is connected (in fdlist) +# 6. sending an ack to the client +def websocket_connection_procedure (message : String, clientfd : Int32, context : InstanceStorage) + begin + # 1. reading the service name from the received message + requested_service = message + + # 2. establishing a connection to the service + newservice = IPC::Connection.new requested_service + context.connectionlist[newservice.fd] = newservice + + # 3. listening on the service fd + context.service << newservice.fd + + # cannot perform automatic switching due to websockets headers + # service.switch.add fdclient, newservice.fd + + # 4. bounding the client and the service fd + context.switchtable[clientfd] = newservice.fd + context.switchtable[newservice.fd] = clientfd + + # 5. the client is then connected, send it a message + context.fdlist[clientfd] = "connected" + context.fdlist[newservice.fd] = "service" + + # 6. ack + wsclient = context.wssocklist[clientfd] + wsclient.send "OK\n" + rescue e + puts "#{CRED}Exception during connection to the service:#{CRESET} #{e}" + closing_client clientfd, context + end +end + +def websocket_switching_procedure (activefd : Int, context : InstanceStorage) + begin + + if context.fdlist[activefd] == "connected" + # The client is a WebSocket on top of a TCP connection + client = context.socklist[activefd] + wsclient = context.wssocklist[activefd] + message = wsclient.read + + if wsclient.closed? + # puts "#{CBLUE}client is closed#{CRESET}" + closing_client client.fd, context + return + end + + if message.nil? + puts "#{CRED}message is nil#{CRESET}" + closing_client client.fd, context + return + end + + # puts "switching: client to service" + # print_hexa(String.new(message), "#{CBLUE}Received message hexa#{CRESET}") + # client => service + fdservice = context.switchtable[activefd] + + # XXX: this is not a TCP fd, but since behind the scene this is compatible, I'm hacking a bit + serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET) + #serv = context.connectionlist[fdservice] + serv.send message + + elsif context.fdlist[activefd] == "service" + # puts "switching: service to client" + # service => client + + fdclient = context.switchtable[activefd] + wsclient = context.wssocklist[fdclient] + + serv = context.connectionlist[activefd] + message = serv.read + # puts "received message from service: #{message.to_s}" + buf = message.to_buffer + # print_hexa String.new(buf), "\033[31m Message to send to the client \033[00m " + wsclient.send buf + else + raise "cannot understand if the fd is a client or a service" + end + rescue e + puts "#{CRED}Exception during message transfer:#{CRESET} #{e}" + if context.fdlist[activefd] == "service" + clientfd = context.switchtable[activefd] + closing_client clientfd, context + elsif context.fdlist[activefd] == "service" + closing_client activefd, context + end + end + +end + +service.loop do |event| + case event + when IPC::Event::Connection + puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}" + when IPC::Event::Disconnection + puts "#{CBLUE}IPC::Event::Disconnection: #{event.connection.fd}#{CRESET}" + when IPC::Event::ExtraSocket + puts "#{CBLUE}IPC::Event::ExtraSocket#{CRESET}" + + # 1. accept new websocket clients + if server.fd == event.connection.fd + client = server.accept + websocket_client_connection client, context + puts "#{CBLUE}new client:#{CRESET} #{client.fd}" + next + end + + # 2. active fd != server fd + # is this a service or a client? + activefd = event.connection.fd + if context.fdlist[activefd] == "not connected" + + # since it's an external communication + # we have to read the message here, it's not handled by default in libipc + wsclient = context.wssocklist[activefd] + message = wsclient.read + + if message.nil? + puts "#{CBLUE}disconnection of client #{event.connection.fd}#{CRESET}" + closing_client activefd, context + next + end + + # requested service, fd + websocket_connection_procedure String.new(message), activefd, context + + elsif context.fdlist[activefd] == "connected" + # the service is sending a message + websocket_switching_procedure activefd, context + elsif context.fdlist[activefd] == "service" + # the service is sending a message + websocket_switching_procedure activefd, context + else + raise "should not happen: client not recorded in fdlist" + end + + when IPC::Event::Switch + puts "\033[36mIPC::Event::Switch#{CRESET}: from fd #{event.connection.fd}" + + raise "Not implemented." + + # IPC::Event::Message has to be the last entry + # because ExtraSocket and Switch inherit from Message class + when IPC::Event::Message + puts "#{CBLUE}IPC::Event::Message#{CRESET}: #{event.connection.fd}" + + raise "Not implemented." + end +end diff --git a/src/ws.cr b/src/ws.cr new file mode 100644 index 0000000..454aa82 --- /dev/null +++ b/src/ws.cr @@ -0,0 +1,38 @@ + +require "http" +require "http/web_socket/protocol" + + +class WebSocket < HTTP::WebSocket + getter? closed = false + + def read + size = 0 + begin + info = @ws.receive(@buffer) + size = info.size + rescue IO::EOFError + close + return nil + end + + case info.opcode + when Protocol::Opcode::TEXT + return @buffer[0..size-1] + when Protocol::Opcode::BINARY + return @buffer[0..size-1] + when Protocol::Opcode::CLOSE + begin + close + rescue e + puts "\033[31mwebsocket failed to close properly\033[00m #{e}" + end + return nil + end + end + + def finalize + # puts "WrappedTCPFileDescriptor garbage collection!!" + # super + end +end