ws: compilable, still very WIP.
This commit is contained in:
parent
772389ab41
commit
59b701c03e
@ -22,11 +22,6 @@ require "./lib_modifications.cr"
|
||||
module Websocketc
|
||||
class Exception < ::Exception
|
||||
end
|
||||
|
||||
class Context
|
||||
class_property uri = "ws://localhost:1234/pong"
|
||||
class_property rounds = 1
|
||||
end
|
||||
end
|
||||
|
||||
class Baguette::Configuration
|
||||
@ -40,64 +35,272 @@ class Baguette::Configuration
|
||||
end
|
||||
end
|
||||
|
||||
class Relation
|
||||
property fd_client : Int32
|
||||
property fd_service : Int32
|
||||
property ws : WebSocket
|
||||
# ws can send JSON messages instead of libipc-formated messages.
|
||||
property is_json : Bool
|
||||
|
||||
def related? (fd : Int32)
|
||||
fd == @fd_client || fd == @fd_service
|
||||
end
|
||||
|
||||
def initialize(@fd_client, @fd_service, @ws, @is_json)
|
||||
end
|
||||
|
||||
def to_s(io)
|
||||
c = "%4d" % @fd_client
|
||||
s = "%4d" % @fd_service
|
||||
j = if @is_json
|
||||
"JSON"
|
||||
else
|
||||
"Not JSON"
|
||||
end
|
||||
io << "client #{c} service #{s} #{j}"
|
||||
end
|
||||
end
|
||||
|
||||
# Hide the complexity of managing relations.
|
||||
class Relations < Array(Relation)
|
||||
property all_fd : Array(Int32) = Array(Int32).new
|
||||
def <<(relation : Relation)
|
||||
Baguette::Log.debug "Adding a new relation: #{relation}"
|
||||
@all_fd << relation.fd_client
|
||||
@all_fd << relation.fd_service
|
||||
super relation
|
||||
end
|
||||
|
||||
def search?(fd : Int32) : Relation?
|
||||
find {|r| r.fd_client == fd || r.fd_service == fd }
|
||||
end
|
||||
|
||||
def remove(fd : Int32)
|
||||
each do |r|
|
||||
if r.related? fd
|
||||
Baguette::Log.debug "TODO: closing this relation: #{r}"
|
||||
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
|
||||
end
|
||||
end
|
||||
select! {|r| ! r.related? fd }
|
||||
end
|
||||
end
|
||||
|
||||
require "./network.cr"
|
||||
|
||||
|
||||
class Websocketc::Service < IPC::Server
|
||||
property connections : Hash(Int32, WebSocket) = {} of Int32 => WebSocket
|
||||
getter all_fd : Array(Int32) = Array(Int32).new
|
||||
|
||||
property relations : Relations = Relations.new
|
||||
property config : Baguette::Configuration::Websocketc
|
||||
|
||||
def initialize(@config : Baguette::Configuration::Websocketc)
|
||||
super "websocketc"
|
||||
super "ws"
|
||||
end
|
||||
|
||||
def handle_request(event : IPC::Event::MessageReceived)
|
||||
request_start = Time.utc
|
||||
#def handle_request(event : IPC::Event::MessageReceived)
|
||||
# # send_now
|
||||
#end
|
||||
|
||||
# TODO: The message can come from an already linked fd.
|
||||
# def handle_request(event : IPC::Event::MessageReceived)
|
||||
# request_start = Time.utc
|
||||
#
|
||||
# # TODO: The message can come from an already linked fd.
|
||||
#
|
||||
# request = Websocketc.requests.parse_ipc_json event.message
|
||||
#
|
||||
# if request.nil?
|
||||
# raise "unknown request type"
|
||||
# end
|
||||
#
|
||||
# request_name = request.class.name.sub /^Websocketc::Request::/, ""
|
||||
# Baguette::Log.debug "#{request_name}"
|
||||
#
|
||||
# response = begin
|
||||
# request.handle self, event
|
||||
# rescue e : Websocketc::Exception
|
||||
# Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}"
|
||||
# Websocketc::Response::Error.new "generic error"
|
||||
# rescue e
|
||||
# Baguette::Log.error "#{request_name} generic error #{e}"
|
||||
# Websocketc::Response::Error.new "unknown error"
|
||||
# end
|
||||
#
|
||||
# # If clients sent requests with an “id” field, it is copied
|
||||
# # in the responses. Allows identifying responses easily.
|
||||
# response.id = request.id
|
||||
#
|
||||
# send event.fd, response
|
||||
#
|
||||
# duration = Time.utc - request_start
|
||||
#
|
||||
# response_name = response.class.name.sub /^Websocketc::Response::/, ""
|
||||
#
|
||||
# if response.is_a? Websocketc::Response::Error
|
||||
# Baguette::Log.warning "#{response_name} (#{response.reason})"
|
||||
# else
|
||||
# Baguette::Log.debug "#{response_name} (Total duration: #{duration})"
|
||||
# end
|
||||
# end
|
||||
|
||||
request = Websocketc.requests.parse_ipc_json event.message
|
||||
def first_connection(event : IPC::Event::MessageReceived)
|
||||
# First message format: "URI"
|
||||
payload = String.new event.message.payload
|
||||
Baguette::Log.info "First message received: #{payload}"
|
||||
|
||||
if request.nil?
|
||||
raise "unknown request type"
|
||||
end
|
||||
# TODO: handle exceptions and errors
|
||||
|
||||
request_name = request.class.name.sub /^Websocketc::Request::/, ""
|
||||
Baguette::Log.debug "<< #{request_name}"
|
||||
begin
|
||||
uri = URI.parse payload
|
||||
ws = WebSocket.new uri
|
||||
|
||||
is_json = uri.path.ends_with? ".JSON"
|
||||
|
||||
pp! ws.ws
|
||||
# service_fd = ws.ws.io
|
||||
service_fd = 0
|
||||
|
||||
relation = Relation.new event.fd, service_fd, ws, is_json
|
||||
pp! relation
|
||||
@relations << relation
|
||||
|
||||
# Change client fd status.
|
||||
|
||||
LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched
|
||||
|
||||
# Add service fd as switched.
|
||||
LibIPC.ipc_add_fd_switched self.pointer, service_fd
|
||||
|
||||
# Change service fd callbacks: only the service callbacks are changed,
|
||||
# since the associated client is a simple libipc client
|
||||
|
||||
proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*)
|
||||
proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message))
|
||||
|
||||
# self.remove_fd event.fd
|
||||
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
|
||||
|
||||
Baguette::Log.debug "new client: #{event.fd}"
|
||||
|
||||
response = begin
|
||||
request.handle self, event
|
||||
rescue e : Websocketc::Exception
|
||||
Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}"
|
||||
Websocketc::Response::Error.new "generic error"
|
||||
rescue e
|
||||
Baguette::Log.error "#{request_name} generic error #{e}"
|
||||
Websocketc::Response::Error.new "unknown error"
|
||||
Baguette::Log.error "cannot connect to #{payload}: #{e}"
|
||||
end
|
||||
|
||||
# If clients sent requests with an “id” field, it is copied
|
||||
# in the responses. Allows identifying responses easily.
|
||||
response.id = request.id
|
||||
Baguette::Log.info "Let's say it's OK"
|
||||
send_now event.fd, 1.to_u8, "OK"
|
||||
end
|
||||
|
||||
send event.fd, response
|
||||
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
||||
Baguette::Log.info "OUT fd is #{fd}"
|
||||
@relations.search?(fd).try do |relation|
|
||||
message = IPC::Message.new pm
|
||||
Baguette::Log.info "message to send: #{message}"
|
||||
|
||||
duration = Time.utc - request_start
|
||||
|
||||
response_name = response.class.name.sub /^Websocketc::Response::/, ""
|
||||
|
||||
if response.is_a? Websocketc::Response::Error
|
||||
Baguette::Log.warning ">> #{response_name} (#{response.reason})"
|
||||
if relation.is_json
|
||||
buf = message.to_json
|
||||
else
|
||||
Baguette::Log.debug ">> #{response_name} (Total duration: #{duration})"
|
||||
buf = message.to_packet
|
||||
end
|
||||
|
||||
relation.ws.send buf
|
||||
return LibIPC::IPCCB::NoError
|
||||
end
|
||||
|
||||
Baguette::Log.error "Wait, not supposed to get here. No relation?"
|
||||
return LibIPC::IPCCB::Error
|
||||
|
||||
rescue e
|
||||
Baguette::Log.error "Exception during message transfer: #{e}"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
end
|
||||
|
||||
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
||||
Baguette::Log.info "IN fd is #{fd}"
|
||||
|
||||
@relations.search?(fd).try do |relation|
|
||||
message = nil
|
||||
begin
|
||||
message = relation.ws.run_once
|
||||
rescue e
|
||||
Baguette::Log.error "run_once FAILED: #{e}"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
end
|
||||
|
||||
if relation.ws.ws.io.empty?
|
||||
more_to_read[0] = 0
|
||||
else
|
||||
more_to_read[0] = 1
|
||||
end
|
||||
|
||||
if relation.ws.closed?
|
||||
Baguette::Log.info "client closed the connection"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Closing
|
||||
end
|
||||
|
||||
if message.nil?
|
||||
Baguette::Log.error "Reveiced a nil message"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Closing
|
||||
end
|
||||
|
||||
case message
|
||||
when String
|
||||
if relation.is_json
|
||||
ipc_message = IPC::Message.from_json(message)
|
||||
ipc_message.copy_to_message_pointer pm
|
||||
return LibIPC::IPCCB::NoError
|
||||
end
|
||||
when WebSocket::Ping
|
||||
Baguette::Log.debug "TODO: Received a ping message"
|
||||
return LibIPC::IPCCB::Ignore
|
||||
else
|
||||
# every other option should be dropped
|
||||
case message
|
||||
when WebSocket::Error
|
||||
Baguette::Log.error "An error occured"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
when WebSocket::Pong
|
||||
Baguette::Log.error "Received a pong message"
|
||||
return LibIPC::IPCCB::Ignore
|
||||
when WebSocket::Close
|
||||
Baguette::Log.debug "Received a close message"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Closing
|
||||
when WebSocket::NotFinal
|
||||
Baguette::Log.warning "TODO: Received only part of a message."
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
when Bytes
|
||||
# TODO: when receiving a binary message
|
||||
# we should test the format and maybe its content
|
||||
Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET"
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
else
|
||||
Baguette::Log.error "Received a websocket message with unknown type"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return LibIPC::IPCCB::Error
|
||||
rescue e
|
||||
Baguette::Log.error "Exception (receiving a message) #{e}"
|
||||
# tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET)
|
||||
# tcp.close
|
||||
@relations.remove fd
|
||||
return LibIPC::IPCCB::Error
|
||||
end
|
||||
|
||||
|
||||
|
||||
def run
|
||||
Baguette::Log.title "Starting websocketc"
|
||||
|
||||
@timer = @config.ipc_timer
|
||||
@base_timer = @config.ipc_timer
|
||||
|
||||
self.loop do |event|
|
||||
begin
|
||||
case event
|
||||
@ -106,19 +309,22 @@ class Websocketc::Service < IPC::Server
|
||||
|
||||
when IPC::Event::Connection
|
||||
Baguette::Log.debug "connection from #{event.fd}"
|
||||
@all_fd << event.fd
|
||||
|
||||
when IPC::Event::Disconnection
|
||||
Baguette::Log.debug "disconnection from #{event.fd}"
|
||||
@all_fd.select! &.!=(event.fd)
|
||||
connections.delete event.fd
|
||||
@relations.remove event.fd
|
||||
|
||||
when IPC::Event::MessageSent
|
||||
Baguette::Log.debug "message sent to #{event.fd}"
|
||||
|
||||
when IPC::Event::MessageReceived
|
||||
Baguette::Log.debug "message received from #{event.fd}"
|
||||
handle_request event
|
||||
if r = @relations.search? event.fd
|
||||
Baguette::Log.error "MessageReceived but from an already existent relation"
|
||||
Baguette::Log.error "relation: #{r}"
|
||||
else
|
||||
first_connection event
|
||||
end
|
||||
|
||||
when IPC::Exception
|
||||
Baguette::Log.warning "IPC::Exception: #{event.message}"
|
||||
@ -149,14 +355,6 @@ class Websocketc::Service < IPC::Server
|
||||
|
||||
|
||||
OptionParser.parse do |parser|
|
||||
parser.on "-u uri", "--uri uri", "URI" do |opturi|
|
||||
Websocketc::Context.uri = opturi
|
||||
end
|
||||
|
||||
parser.on "-r rounds", "--rounds nb-messages", "Nb messages to send." do |opt|
|
||||
Websocketc::Context.rounds = opt.to_i
|
||||
end
|
||||
|
||||
parser.on "-h", "--help", "Show this help" do
|
||||
puts parser
|
||||
exit 0
|
||||
|
Loading…
Reference in New Issue
Block a user