Networkd => Plumberd, websocketd working.
parent
550f4e77ae
commit
176d8d1507
34
README.md
34
README.md
|
@ -1,15 +1,15 @@
|
|||
|
||||
Networkd is a program to handle networking for all other software.
|
||||
Plumberd is a program to handle networking for all other software.
|
||||
|
||||
# WARNING
|
||||
|
||||
Security is TBD. Currently, only TCPd is implemented, which means no communication security.
|
||||
|
||||
# Networkd functionalities
|
||||
# Plumberd functionalities
|
||||
|
||||
## firewall
|
||||
|
||||
`Networkd` has to filter the connections to local services.
|
||||
`Plumberd` has to filter the connections to local services.
|
||||
|
||||
```Warning
|
||||
WIP.
|
||||
|
@ -17,7 +17,7 @@ WIP.
|
|||
|
||||
## authentication
|
||||
|
||||
`Networkd` has to authenticate clients asking for a service.
|
||||
`Plumberd` has to authenticate clients asking for a service.
|
||||
|
||||
```Warning
|
||||
WIP.
|
||||
|
@ -49,31 +49,31 @@ This program can be used as follow:
|
|||
|
||||
```sh
|
||||
# with some static rules
|
||||
networkd --allow in authd tls:example.com --deny in * * --allow out pong tls:pong.example.com:9000
|
||||
networkd --redirect authd nextversion-authd
|
||||
plumberd --allow in authd tls:example.com --deny in * * --allow out pong tls:pong.example.com:9000
|
||||
plumberd --redirect authd nextversion-authd
|
||||
```
|
||||
|
||||
## usage examples
|
||||
|
||||
`networkd` is requested each time a client is launched when the right environment variable is used.
|
||||
`plumberd` is requested each time a client is launched when the right environment variable is used.
|
||||
For example, we want to connect to a distant `authd` service:
|
||||
|
||||
IPC_NETWORKD="authd tls://user@passwd:example.com:9000/authd"
|
||||
|
||||
|
||||
```Warning
|
||||
Currently, the networkd only works with tcp and unix routes.
|
||||
Currently, the plumberd only works with tcp and unix routes.
|
||||
```
|
||||
|
||||
IPC_NETWORKD="pongd tcp://example.com:9000/pongd"
|
||||
|
||||
# Changelog
|
||||
|
||||
* v0.1: (current) networkd (redirections), tcpd
|
||||
* v0.1: (current) plumberd (redirections), tcpd
|
||||
|
||||
* `networkd` understands URIs (`tcp://example.com/service` or `unix:///service`)
|
||||
* `tcp` scheme is understood: `networkd` contacts the `tcpd` service
|
||||
* `unix` scheme is understood: `networkd` performs a redirection
|
||||
* `plumberd` understands URIs (`tcp://example.com/service` or `unix:///service`)
|
||||
* `tcp` scheme is understood: `plumberd` contacts the `tcpd` service
|
||||
* `unix` scheme is understood: `plumberd` performs a redirection
|
||||
|
||||
|
||||
# Roadmap
|
||||
|
@ -87,16 +87,16 @@ Currently, the networkd only works with tcp and unix routes.
|
|||
* v1.0: TBD
|
||||
|
||||
|
||||
# Networkd explanations
|
||||
# Plumberd explanations
|
||||
|
||||
1. client contacts `networkd`
|
||||
1. `networkd` understand the request from the client then contacts the local service responsible for the communication protocol required
|
||||
1. once the distant connection is established (between the two `tlsd` services for example) `networkd` provides a file descriptor to the client
|
||||
1. client contacts `plumberd`
|
||||
1. `plumberd` understand the request from the client then contacts the local service responsible for the communication protocol required
|
||||
1. once the distant connection is established (between the two `tlsd` services for example) `plumberd` provides a file descriptor to the client
|
||||
1. finally, the client can perform requests to the distant service transparently
|
||||
|
||||
during the connection:
|
||||
|
||||
client <-> networkd <-> tlsd <=> tlsd <-> networkd <-> service
|
||||
client <-> plumberd <-> tlsd <=> tlsd <-> plumberd <-> service
|
||||
|
||||
then:
|
||||
|
||||
|
|
11
shard.yml
11
shard.yml
|
@ -1,11 +1,11 @@
|
|||
name: networkd
|
||||
version: 0.1.1
|
||||
name: plumberd
|
||||
version: 0.2.0
|
||||
|
||||
authors:
|
||||
- karchnu <karchnu@karchnu.fr>
|
||||
|
||||
description: |
|
||||
Networkd allows IPC clients to contact remote services.
|
||||
Plumberd allows IPC clients to contact remote services.
|
||||
|
||||
dependencies:
|
||||
ipc:
|
||||
|
@ -18,9 +18,12 @@ targets:
|
|||
pongd:
|
||||
main: src/pongd.cr
|
||||
|
||||
networkd:
|
||||
plumberd:
|
||||
main: src/main.cr
|
||||
|
||||
admind:
|
||||
main: src/admind.cr
|
||||
|
||||
tcpd:
|
||||
main: src/tcpd.cr
|
||||
tcp:
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
require "option_parser"
|
||||
require "ipc"
|
||||
require "./colors"
|
||||
|
||||
require "json"
|
||||
|
||||
verbosity = 1
|
||||
service_name = "admind"
|
||||
|
||||
OptionParser.parse! do |parser|
|
||||
parser.on "-s service_name", "--service-name service_name", "URI" do |optsn|
|
||||
service_name = optsn
|
||||
end
|
||||
|
||||
parser.on "-v verbosity", "--verbosity verbosity", "Verbosity (0 = nothing is printed, 1 = only events, 2 = events and messages). Default: 1" do |optsn|
|
||||
verbosity = optsn.to_i
|
||||
end
|
||||
|
||||
parser.on "-h", "--help", "Show this help" do
|
||||
puts parser
|
||||
exit 0
|
||||
end
|
||||
end
|
||||
|
||||
class Rectangle
|
||||
JSON.mapping(
|
||||
x: UInt32,
|
||||
y: UInt32
|
||||
)
|
||||
|
||||
def initialize(@x : UInt32, @y : UInt32)
|
||||
end
|
||||
end
|
||||
|
||||
class WidgetTree
|
||||
JSON.mapping(
|
||||
rectangles: Array(Rectangle)
|
||||
)
|
||||
|
||||
def initialize(@rectangles : Array(Rectangle))
|
||||
end
|
||||
end
|
||||
|
||||
class Page
|
||||
JSON.mapping(
|
||||
page: String,
|
||||
title: String,
|
||||
tree: WidgetTree
|
||||
)
|
||||
|
||||
def initialize(@page, @title, @tree)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
IPC::Service.new (service_name) do |event|
|
||||
case event
|
||||
when IPC::Event::Timer
|
||||
if verbosity >= 1
|
||||
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
||||
end
|
||||
when IPC::Event::Connection
|
||||
if verbosity >= 1
|
||||
puts "#{CBLUE}IPC::Event::Connection#{CRESET}, client: #{event.connection.fd}"
|
||||
end
|
||||
when IPC::Event::Disconnection
|
||||
if verbosity >= 1
|
||||
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}, client: #{event.connection.fd}"
|
||||
end
|
||||
when IPC::Event::Message
|
||||
if verbosity >= 1
|
||||
puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}"
|
||||
if verbosity == 2
|
||||
puts "#{CBLUE}message: #{event.message} #{CRESET}"
|
||||
end
|
||||
end
|
||||
|
||||
# JSON message
|
||||
rectangles = Array(Rectangle).new
|
||||
rectangles << Rectangle.new 1, 2
|
||||
rectangles << Rectangle.new 2, 1
|
||||
rectangles << Rectangle.new 3, 1
|
||||
widgets = WidgetTree.new(rectangles)
|
||||
page = Page.new "admin", "This is the Admin Page", widgets
|
||||
|
||||
event.connection.send 2, page.to_json
|
||||
else
|
||||
if verbosity >= 1
|
||||
puts "#{CRED}Exception: message = #{event.message} #{CRESET}"
|
||||
end
|
||||
end
|
||||
end
|
12
src/main.cr
12
src/main.cr
|
@ -1,16 +1,16 @@
|
|||
require "./networkd"
|
||||
require "./plumberd"
|
||||
|
||||
networkd = NetworkD.new "network"
|
||||
plumberd = NetworkD.new "plumber"
|
||||
|
||||
# --deny <in|out> <service> <url>
|
||||
# --allow <in|ou> <service> <url>
|
||||
# --redirect <service> <service>
|
||||
# --redirect <service-name> <new-service-name> <origin-url> <dest-url>
|
||||
|
||||
networkd.parse_cli ARGV
|
||||
puts networkd.to_s
|
||||
plumberd.parse_cli ARGV
|
||||
puts plumberd.to_s
|
||||
|
||||
networkd.loop do |event|
|
||||
plumberd.loop do |event|
|
||||
puts "there is an event!"
|
||||
case event
|
||||
when IPC::Event::Connection
|
||||
|
@ -27,7 +27,7 @@ networkd.loop do |event|
|
|||
puts "\033[32mthere is a message\033[00m"
|
||||
puts event.message.to_s
|
||||
|
||||
networkd.service_lookup event.message, event.connection
|
||||
plumberd.service_lookup event.message, event.connection
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
|
||||
require "./rules"
|
||||
require "./networkdcliparser"
|
||||
require "./plumberdcliparser"
|
||||
require "ipc"
|
||||
require "uri"
|
||||
|
||||
class IPC::NetworkD < IPC::Service
|
||||
|
||||
# The client asks to networkd to open a connection to a service
|
||||
# The client asks to plumberd to open a connection to a service
|
||||
# the service name is used unless there is a redirection that is provided to the client through
|
||||
# a IPC_NETWORK environment variable
|
||||
# This environment variable is sent from the client to networkd, that's what is parsed here
|
||||
# This environment variable is sent from the client to plumberd, that's what is parsed here
|
||||
|
||||
# parse_lookup_payload extract the right service URI to use from the IPC_NETWORK content
|
||||
# sent by the client in the form:
|
||||
|
@ -80,7 +80,7 @@ class IPC::NetworkD < IPC::Service
|
|||
# TODO: for remote services, we have to connect to communication service
|
||||
# these communication services need an URI to work on
|
||||
# The protocol:
|
||||
# networkd sends an URI to the communication service, which responds with a "OK" message
|
||||
# plumberd sends an URI to the communication service, which responds with a "OK" message
|
||||
if scheme != "unix"
|
||||
service.send 1.to_u8, "#{requested_service.to_s}\n"
|
||||
response = service.read
|
||||
|
@ -97,7 +97,7 @@ class IPC::NetworkD < IPC::Service
|
|||
raise Exception.new "cannot send the file descriptor of the requested service: #{m}"
|
||||
end
|
||||
|
||||
# finally, the service should be closed in networkd
|
||||
# finally, the service should be closed in plumberd
|
||||
service.close
|
||||
rescue e
|
||||
puts "\033[31mException during the connection to the requested service #{requested_service}: #{e}\033[00m"
|
||||
|
@ -118,8 +118,8 @@ class IPC::NetworkD < IPC::Service
|
|||
def wait_event(server : IPC::Connection?, &block) : Tuple(LibIPC::EventType, IPC::Message, IPC::Connection)
|
||||
event = LibIPC::Event.new
|
||||
|
||||
# TODO: networkd should be able to transfer messages???
|
||||
r = LibIPC.ipc_wait_event self.pointer, @service_info.pointer, pointerof(event)
|
||||
# TODO: plumberd should be able to transfer messages???
|
||||
r = LibIPC.ipc_wait_event self.pointer, @service_info.pointer, pointerof(event), pointerof(@timer)
|
||||
if r != 0
|
||||
m = String.new LibIPC.ipc_errors_get (r)
|
||||
yield IPC::Exception.new "error waiting for a new event: #{m}"
|
29
src/pongd.cr
29
src/pongd.cr
|
@ -2,6 +2,7 @@ require "option_parser"
|
|||
require "ipc"
|
||||
require "./colors"
|
||||
|
||||
verbosity = 1
|
||||
service_name = "pong"
|
||||
|
||||
OptionParser.parse! do |parser|
|
||||
|
@ -9,6 +10,10 @@ OptionParser.parse! do |parser|
|
|||
service_name = optsn
|
||||
end
|
||||
|
||||
parser.on "-v verbosity", "--verbosity verbosity", "Verbosity (0 = nothing is printed, 1 = only events, 2 = events and messages). Default: 1" do |optsn|
|
||||
verbosity = optsn.to_i
|
||||
end
|
||||
|
||||
parser.on "-h", "--help", "Show this help" do
|
||||
puts parser
|
||||
exit 0
|
||||
|
@ -18,13 +23,29 @@ end
|
|||
|
||||
IPC::Service.new (service_name) do |event|
|
||||
case event
|
||||
when IPC::Event::Timer
|
||||
if verbosity >= 1
|
||||
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
||||
end
|
||||
when IPC::Event::Connection
|
||||
puts "#{CBLUE}IPC::Event::Connection#{CRESET}, client: #{event.connection.fd}"
|
||||
if verbosity >= 1
|
||||
puts "#{CBLUE}IPC::Event::Connection#{CRESET}, client: #{event.connection.fd}"
|
||||
end
|
||||
when IPC::Event::Disconnection
|
||||
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}, client: #{event.connection.fd}"
|
||||
if verbosity >= 1
|
||||
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}, client: #{event.connection.fd}"
|
||||
end
|
||||
when IPC::Event::Message
|
||||
puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}"
|
||||
# puts event.message.to_s
|
||||
if verbosity >= 1
|
||||
puts "#{CGREEN}IPC::Event::Message#{CRESET}, client: #{event.connection.fd}"
|
||||
if verbosity == 2
|
||||
puts "#{CBLUE}message: #{event.message} #{CRESET}"
|
||||
end
|
||||
end
|
||||
event.connection.send event.message
|
||||
else
|
||||
if verbosity >= 1
|
||||
puts "#{CRED}Exception: message = #{event.message} #{CRESET}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
require "./utils.cr"
|
||||
|
||||
begin
|
||||
message = String.build do |str|
|
||||
i = 0
|
||||
80.times do
|
||||
str << "i=#{i}"
|
||||
i += 1
|
||||
end
|
||||
end
|
||||
|
||||
rescue e
|
||||
puts "Error: #{e}"
|
||||
exit 1
|
||||
end
|
||||
|
||||
begin
|
||||
bin = to_message 2, message
|
||||
|
||||
rescue e
|
||||
puts "error in to_message: #{e}"
|
||||
exit 2
|
||||
end
|
||||
|
||||
print_hexa String.new(bin), "message"
|
|
@ -1,14 +1,11 @@
|
|||
require "io/hexdump"
|
||||
|
||||
def to_message (user_type : Int, message : String)
|
||||
payload = Bytes.new (6 + message.size)
|
||||
payload = Bytes.new (6 + message.to_slice.size)
|
||||
|
||||
# true start
|
||||
payload[0] = 1.to_u8
|
||||
payload[1] = 0.to_u8
|
||||
payload[2] = 0.to_u8
|
||||
payload[3] = 0.to_u8
|
||||
payload[4] = message.size.to_u8
|
||||
IO::ByteFormat::NetworkEndian.encode message.to_slice.size, (payload + 1)
|
||||
|
||||
# second part: user message
|
||||
payload[5] = user_type.to_u8
|
||||
|
|
|
@ -55,16 +55,26 @@ end
|
|||
begin
|
||||
ws = WebSocket.new(URI.parse(uri))
|
||||
|
||||
puts "connection done: sending pong"
|
||||
|
||||
ws.on_close do |socket|
|
||||
puts "socket is closing"
|
||||
exit 0
|
||||
end
|
||||
|
||||
read ws
|
||||
send ws, "pong"
|
||||
read ws
|
||||
send ws, to_message(2, "coucou")
|
||||
read ws
|
||||
if uri.ends_with? ".JSON"
|
||||
json_message = STDIN.gets_to_end
|
||||
json_message = json_message.chomp
|
||||
puts "json_message: #{json_message}"
|
||||
final_json_message = "{ \"mtype\": 2, \"payload\": #{json_message} }"
|
||||
puts "final json message: #{final_json_message}"
|
||||
send ws, final_json_message
|
||||
# send ws, "{ \"mtype\": 2, \"payload\": \"coucou\" }"
|
||||
read ws
|
||||
else
|
||||
send ws, to_message(2, STDIN.gets_to_end)
|
||||
read ws
|
||||
end
|
||||
|
||||
ws.close
|
||||
ws.read
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
require "http/web_socket"
|
||||
require "option_parser"
|
||||
|
||||
require "./colors"
|
||||
require "./utils"
|
||||
|
||||
require "./ws"
|
||||
|
||||
uri = "ws://localhost:1234/pong"
|
||||
|
||||
OptionParser.parse! do |parser|
|
||||
parser.on "-u uri", "--uri uri", "URI" do |opturi|
|
||||
uri = opturi
|
||||
end
|
||||
|
||||
parser.on "-h", "--help", "Show this help" do
|
||||
puts parser
|
||||
exit 0
|
||||
end
|
||||
end
|
||||
|
||||
def read_then_print(ws : WebSocket)
|
||||
m = read ws
|
||||
puts "message: #{String.new(m)}"
|
||||
end
|
||||
|
||||
def read_then_print_hexa(ws : WebSocket)
|
||||
m = read ws
|
||||
print_hexa(String.new(m), "#{CBLUE}Received message hexa#{CRESET}")
|
||||
end
|
||||
|
||||
def read(ws : WebSocket)
|
||||
m = ws.read
|
||||
if m.nil?
|
||||
raise "empty message"
|
||||
end
|
||||
|
||||
m
|
||||
end
|
||||
|
||||
def send_with_announce(ws : WebSocket, m : String)
|
||||
puts "sending #{m}"
|
||||
send ws, m
|
||||
end
|
||||
|
||||
def send(ws : WebSocket, m : String)
|
||||
ws.send m
|
||||
end
|
||||
|
||||
def send(ws : WebSocket, m : Slice)
|
||||
ws.send m
|
||||
end
|
||||
|
||||
|
||||
begin
|
||||
ws = WebSocket.new(URI.parse(uri))
|
||||
|
||||
puts "connection done: sending pong"
|
||||
|
||||
ws.on_close do |socket|
|
||||
puts "socket is closing"
|
||||
exit 0
|
||||
end
|
||||
|
||||
if uri.ends_with? ".JSON"
|
||||
json_message = STDIN.gets_to_end
|
||||
json_message = json_message.chomp
|
||||
# puts "json_message: #{json_message}"
|
||||
final_json_message = "{ \"mtype\": 2, \"payload\": #{json_message} }"
|
||||
puts "sending 10 messages: #{final_json_message}"
|
||||
10.times do |i|
|
||||
send ws, final_json_message
|
||||
puts "sent message #{i}"
|
||||
# send ws, "{ \"mtype\": 2, \"payload\": \"coucou\" }"
|
||||
end
|
||||
|
||||
10.times do |i|
|
||||
read ws
|
||||
puts "read message #{i}"
|
||||
end
|
||||
else
|
||||
send ws, to_message(2, STDIN.gets_to_end)
|
||||
read ws
|
||||
end
|
||||
|
||||
|
||||
ws.read
|
||||
ws.close
|
||||
rescue e
|
||||
puts "Exception: #{e}"
|
||||
end
|
||||
|
|
@ -54,7 +54,6 @@ OptionParser.parse! do |parser|
|
|||
end
|
||||
end
|
||||
|
||||
|
||||
class InstanceStorage
|
||||
property service : IPC::SwitchingService
|
||||
property switchtable : Hash(Int32, Int32)
|
||||
|
@ -94,6 +93,10 @@ class InstanceStorage
|
|||
@is_client = @is_client.select do |fd,v| fd != fdclient end
|
||||
@is_json = @is_json.select do |fd,v| fd != fdclient end
|
||||
|
||||
@fd_to_websocket.select! do |fd, ws|
|
||||
fd != fdclient
|
||||
end
|
||||
|
||||
unless fdservice.nil?
|
||||
service = @fd_to_ipcconnection[fdservice]
|
||||
service.close
|
||||
|
@ -107,7 +110,8 @@ class InstanceStorage
|
|||
end
|
||||
end
|
||||
|
||||
server = TCPServer.new("localhost", port_to_listen)
|
||||
# by default, listen on any IP address
|
||||
server = TCPServer.new("0.0.0.0", port_to_listen)
|
||||
service = IPC::SwitchingService.new service_name
|
||||
service << server.fd
|
||||
context = InstanceStorage.new service
|
||||
|
@ -171,6 +175,9 @@ def websocket_client_connection(client, context : InstanceStorage)
|
|||
client.send "#{headers_header}\n#{headers.to_s}\r\n"
|
||||
|
||||
wsclient = WebSocket.new client
|
||||
wsclient.on_pong do |m|
|
||||
puts "Received a pong message: #{m}"
|
||||
end
|
||||
context.is_client[client.fd] = true
|
||||
|
||||
# listen to the client's file descriptor
|
||||
|
@ -236,6 +243,7 @@ class IPC::Message
|
|||
end
|
||||
|
||||
def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
||||
# FIXME: debugging purposes
|
||||
begin
|
||||
if context.is_client[activefd]
|
||||
# The client is a WebSocket on top of a TCP connection
|
||||
|
@ -244,6 +252,7 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
|||
begin
|
||||
message = wsclient.read
|
||||
|
||||
# puts "RECEIVING A MESSAGE from #{activefd}: #{message}"
|
||||
rescue e
|
||||
puts "#{CRED}Exception (receiving a message)#{CRESET} #{e}"
|
||||
closing_client activefd, context
|
||||
|
@ -262,9 +271,29 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
|||
return
|
||||
end
|
||||
|
||||
if message.is_a?(WebSocket::Ping)
|
||||
# puts "#{CBLUE}This is a ping message#{CRESET}"
|
||||
return
|
||||
end
|
||||
|
||||
if message.is_a?(WebSocket::Pong)
|
||||
# puts "#{CBLUE}This is a pong message#{CRESET}"
|
||||
return
|
||||
end
|
||||
|
||||
if message.is_a?(WebSocket::Close)
|
||||
# puts "#{CRED}This is a close message#{CRESET}"
|
||||
return
|
||||
end
|
||||
|
||||
if message.is_a?(Slice(UInt8))
|
||||
# puts "#{CRED}This is a binary message: not yet implemented#{CRESET}"
|
||||
return
|
||||
end
|
||||
|
||||
# TODO: verify this
|
||||
if context.is_json[activefd]
|
||||
jsonmessage = JSONWSMessage.from_json String.new(message)
|
||||
jsonmessage = JSONWSMessage.from_json message
|
||||
message = to_message jsonmessage.mtype, jsonmessage.payload
|
||||
|
||||
# puts "JSON TYPE !!!"
|
||||
|
@ -281,16 +310,21 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
|||
serv = WrappedTCPFileDescriptor.new(fd: fdservice, family: Socket::Family::INET)
|
||||
#serv = context.fd_to_ipcconnection[fdservice]
|
||||
serv.send message
|
||||
# puts "SENT MESSAGE TO SERVER: #{message}"
|
||||
|
||||
else
|
||||
# puts "switching: service to client"
|
||||
# service => client
|
||||
|
||||
# puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}"
|
||||
|
||||
fdclient = context.switchtable[activefd]
|
||||
wsclient = context.fd_to_websocket[fdclient]
|
||||
|
||||
serv = context.fd_to_ipcconnection[activefd]
|
||||
message = serv.read
|
||||
|
||||
# puts "RECEIVING A MESSAGE FROM THE SERVER #{activefd}: #{message}"
|
||||
# puts "received message from service: #{message.to_s}"
|
||||
|
||||
if context.is_json[fdclient]
|
||||
|
@ -316,11 +350,28 @@ def websocket_switching_procedure (activefd : Int, context : InstanceStorage)
|
|||
closing_client clientfd, context
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
# Every few seconds, the service should trigger the timer
|
||||
service.base_timer = 30
|
||||
|
||||
service.loop do |event|
|
||||
case event
|
||||
when IPC::Event::Timer
|
||||
# puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
||||
context.fd_to_websocket.each do |fd, ws|
|
||||
|
||||
begin
|
||||
ws.ping "coucou from #{fd}"
|
||||
rescue e
|
||||
puts "#{CRED}Exception: #{e}#{CRESET}, already closed client #{fd}"
|
||||
begin
|
||||
context.remove_fd fd
|
||||
rescue e
|
||||
puts "#{CRED}Cannot remove #{fd} from clients: #{e}#{CRESET}"
|
||||
end
|
||||
end
|
||||
end
|
||||
when IPC::Event::Connection
|
||||
puts "#{CBLUE}IPC::Event::Connection: #{event.connection.fd}#{CRESET}"
|
||||
when IPC::Event::Disconnection
|
||||
|
|
86
src/ws.cr
86
src/ws.cr
|
@ -2,35 +2,71 @@
|
|||
require "http"
|
||||
require "http/web_socket/protocol"
|
||||
|
||||
class HTTP::WebSocket
|
||||
record Pong
|
||||
record Ping
|
||||
record Close
|
||||
|
||||
def read : Slice(UInt8) | String | Close | Ping | Pong | Nil
|
||||
size = 0
|
||||
begin
|
||||
info = @ws.receive(@buffer)
|
||||
rescue IO::EOFError
|
||||
close
|
||||
return nil
|
||||
end
|
||||
|
||||
case info.opcode
|
||||
when Protocol::Opcode::PING
|
||||
@current_message.write @buffer[0, info.size]
|
||||
if info.final
|
||||
message = @current_message.to_s
|
||||
@on_ping.try &.call(message)
|
||||
pong(message) unless closed?
|
||||
@current_message.clear
|
||||
end
|
||||
return Ping.new
|
||||
when Protocol::Opcode::PONG
|
||||
@current_message.write @buffer[0, info.size]
|
||||
if info.final
|
||||
@on_pong.try &.call(@current_message.to_s)
|
||||
@current_message.clear
|
||||
end
|
||||
return Pong.new
|
||||
when Protocol::Opcode::TEXT
|
||||
message = @buffer[0, info.size]
|
||||
@current_message.write message
|
||||
if info.final
|
||||
@on_message.try &.call(@current_message.to_s)
|
||||
@current_message.clear
|
||||
end
|
||||
return String.new message
|
||||
when Protocol::Opcode::BINARY
|
||||
message = @buffer[0, info.size]
|
||||
@current_message.write message
|
||||
if info.final
|
||||
@on_binary.try &.call(@current_message.to_slice)
|
||||
@current_message.clear
|
||||
end
|
||||
return message
|
||||
when Protocol::Opcode::CLOSE
|
||||
@current_message.write @buffer[0, info.size]
|
||||
if info.final
|
||||
message = @current_message.to_s
|
||||
@on_close.try &.call(message)
|
||||
close(message) unless closed?
|
||||
@current_message.clear
|
||||
end
|
||||
return Close.new
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
class WebSocket < HTTP::WebSocket
|
||||
getter? closed = false
|
||||
|
||||
def read
|
||||
size = 0
|
||||
begin
|
||||
info = @ws.receive(@buffer)
|
||||
size = info.size
|
||||
rescue IO::EOFError
|
||||
close
|
||||
return nil
|
||||
end
|
||||
|
||||
case info.opcode
|
||||
when Protocol::Opcode::TEXT
|
||||
return @buffer[0..size-1]
|
||||
when Protocol::Opcode::BINARY
|
||||
return @buffer[0..size-1]
|
||||
when Protocol::Opcode::CLOSE
|
||||
begin
|
||||
close
|
||||
rescue e
|
||||
puts "\033[31mwebsocket failed to close properly\033[00m #{e}"
|
||||
end
|
||||
return nil
|
||||
end
|
||||
end
|
||||
|
||||
def finalize
|
||||
# puts "WrappedTCPFileDescriptor garbage collection!!"
|
||||
# super
|
||||
|
|
Reference in New Issue