ws: first message exchange. Still WIP, but we are getting there.
This commit is contained in:
parent
59b701c03e
commit
e41c6878cf
@ -24,6 +24,10 @@ module Websocketc
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class Context
|
||||||
|
class_property service : Websocketc::Service?
|
||||||
|
end
|
||||||
|
|
||||||
class Baguette::Configuration
|
class Baguette::Configuration
|
||||||
class Websocketc < Base
|
class Websocketc < Base
|
||||||
property verbosity : Int32 = 2
|
property verbosity : Int32 = 2
|
||||||
@ -42,6 +46,9 @@ class Relation
|
|||||||
# ws can send JSON messages instead of libipc-formated messages.
|
# ws can send JSON messages instead of libipc-formated messages.
|
||||||
property is_json : Bool
|
property is_json : Bool
|
||||||
|
|
||||||
|
property buffer_client : String = String.new
|
||||||
|
property buffer_service : String = String.new
|
||||||
|
|
||||||
def related? (fd : Int32)
|
def related? (fd : Int32)
|
||||||
fd == @fd_client || fd == @fd_service
|
fd == @fd_client || fd == @fd_service
|
||||||
end
|
end
|
||||||
@ -79,6 +86,16 @@ class Relations < Array(Relation)
|
|||||||
each do |r|
|
each do |r|
|
||||||
if r.related? fd
|
if r.related? fd
|
||||||
Baguette::Log.debug "TODO: closing this relation: #{r}"
|
Baguette::Log.debug "TODO: closing this relation: #{r}"
|
||||||
|
Baguette::Log.warning "Before removing, here the switching list"
|
||||||
|
|
||||||
|
sw = Context.service.not_nil!.context.switchdb
|
||||||
|
pointer_ctx = Context.service.not_nil!.pointer
|
||||||
|
|
||||||
|
LibIPC.ipc_switching_print pointerof(sw)
|
||||||
|
LibIPC.ipc_del_fd pointer_ctx, r.fd_client
|
||||||
|
LibIPC.ipc_del_fd pointer_ctx, r.fd_service
|
||||||
|
LibIPC.ipc_switching_del pointerof(sw), r.fd_client
|
||||||
|
|
||||||
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
|
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -88,109 +105,109 @@ end
|
|||||||
|
|
||||||
require "./network.cr"
|
require "./network.cr"
|
||||||
|
|
||||||
class Websocketc::Service < IPC::Server
|
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
||||||
property relations : Relations = Relations.new
|
Baguette::Log.info "IN fd is #{fd}"
|
||||||
property config : Baguette::Configuration::Websocketc
|
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
||||||
|
message = nil
|
||||||
def initialize(@config : Baguette::Configuration::Websocketc)
|
begin
|
||||||
super "ws"
|
message = relation.ws.run_once
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "run_once FAILED: #{e}"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
end
|
end
|
||||||
|
|
||||||
#def handle_request(event : IPC::Event::MessageReceived)
|
if relation.ws.ws.io.empty?
|
||||||
# # send_now
|
more_to_read[0] = 0
|
||||||
#end
|
else
|
||||||
|
more_to_read[0] = 1
|
||||||
|
end
|
||||||
|
|
||||||
# def handle_request(event : IPC::Event::MessageReceived)
|
if relation.ws.closed?
|
||||||
# request_start = Time.utc
|
Baguette::Log.info "client closed the connection"
|
||||||
#
|
Context.service.not_nil!.relations.remove fd
|
||||||
# # TODO: The message can come from an already linked fd.
|
return LibIPC::IPCCB::Closing
|
||||||
#
|
end
|
||||||
# request = Websocketc.requests.parse_ipc_json event.message
|
|
||||||
#
|
|
||||||
# if request.nil?
|
|
||||||
# raise "unknown request type"
|
|
||||||
# end
|
|
||||||
#
|
|
||||||
# request_name = request.class.name.sub /^Websocketc::Request::/, ""
|
|
||||||
# Baguette::Log.debug "#{request_name}"
|
|
||||||
#
|
|
||||||
# response = begin
|
|
||||||
# request.handle self, event
|
|
||||||
# rescue e : Websocketc::Exception
|
|
||||||
# Baguette::Log.error "Websocketc::Exception: #{request_name} => #{e}"
|
|
||||||
# Websocketc::Response::Error.new "generic error"
|
|
||||||
# rescue e
|
|
||||||
# Baguette::Log.error "#{request_name} generic error #{e}"
|
|
||||||
# Websocketc::Response::Error.new "unknown error"
|
|
||||||
# end
|
|
||||||
#
|
|
||||||
# # If clients sent requests with an “id” field, it is copied
|
|
||||||
# # in the responses. Allows identifying responses easily.
|
|
||||||
# response.id = request.id
|
|
||||||
#
|
|
||||||
# send event.fd, response
|
|
||||||
#
|
|
||||||
# duration = Time.utc - request_start
|
|
||||||
#
|
|
||||||
# response_name = response.class.name.sub /^Websocketc::Response::/, ""
|
|
||||||
#
|
|
||||||
# if response.is_a? Websocketc::Response::Error
|
|
||||||
# Baguette::Log.warning "#{response_name} (#{response.reason})"
|
|
||||||
# else
|
|
||||||
# Baguette::Log.debug "#{response_name} (Total duration: #{duration})"
|
|
||||||
# end
|
|
||||||
# end
|
|
||||||
|
|
||||||
def first_connection(event : IPC::Event::MessageReceived)
|
if message.nil?
|
||||||
# First message format: "URI"
|
Baguette::Log.error "Reveiced a nil message"
|
||||||
payload = String.new event.message.payload
|
Context.service.not_nil!.relations.remove fd
|
||||||
Baguette::Log.info "First message received: #{payload}"
|
return LibIPC::IPCCB::Closing
|
||||||
|
end
|
||||||
|
|
||||||
# TODO: handle exceptions and errors
|
case message
|
||||||
|
when String
|
||||||
|
if relation.is_json
|
||||||
|
Baguette::Log.warning "reassembling the message!"
|
||||||
|
# Reassemble the message.
|
||||||
|
m = relation.buffer_client + message
|
||||||
|
pp! relation.buffer_client, message, m
|
||||||
|
# Clean the buffer.
|
||||||
|
relation.buffer_client = String.new
|
||||||
|
|
||||||
begin
|
begin
|
||||||
uri = URI.parse payload
|
ipc_message = IPC::Message.from_json m
|
||||||
ws = WebSocket.new uri
|
ipc_message.copy_to_message_pointer pm
|
||||||
|
|
||||||
is_json = uri.path.ends_with? ".JSON"
|
|
||||||
|
|
||||||
pp! ws.ws
|
|
||||||
# service_fd = ws.ws.io
|
|
||||||
service_fd = 0
|
|
||||||
|
|
||||||
relation = Relation.new event.fd, service_fd, ws, is_json
|
|
||||||
pp! relation
|
|
||||||
@relations << relation
|
|
||||||
|
|
||||||
# Change client fd status.
|
|
||||||
|
|
||||||
LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched
|
|
||||||
|
|
||||||
# Add service fd as switched.
|
|
||||||
LibIPC.ipc_add_fd_switched self.pointer, service_fd
|
|
||||||
|
|
||||||
# Change service fd callbacks: only the service callbacks are changed,
|
|
||||||
# since the associated client is a simple libipc client
|
|
||||||
|
|
||||||
proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*)
|
|
||||||
proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message))
|
|
||||||
|
|
||||||
# self.remove_fd event.fd
|
|
||||||
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
|
|
||||||
|
|
||||||
Baguette::Log.debug "new client: #{event.fd}"
|
|
||||||
|
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "cannot connect to #{payload}: #{e}"
|
Baguette::Log.error "cannot send message coming from #{fd}"
|
||||||
|
Baguette::Log.error "message: #{m}"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
|
Baguette::Log.error "error: #{e}"
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
end
|
end
|
||||||
|
|
||||||
Baguette::Log.info "Let's say it's OK"
|
return LibIPC::IPCCB::NoError
|
||||||
send_now event.fd, 1.to_u8, "OK"
|
end
|
||||||
|
Baguette::Log.error "cannot handle non-json messages!"
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
when WebSocket::Ping
|
||||||
|
Baguette::Log.debug "TODO: Received a ping message"
|
||||||
|
return LibIPC::IPCCB::Ignore
|
||||||
|
|
||||||
|
when WebSocket::NotFinal
|
||||||
|
Baguette::Log.warning "Received only part of a message."
|
||||||
|
relation.buffer_client += message.message
|
||||||
|
return LibIPC::IPCCB::Ignore
|
||||||
|
|
||||||
|
else
|
||||||
|
# every other option should be dropped
|
||||||
|
case message
|
||||||
|
when WebSocket::Error
|
||||||
|
Baguette::Log.error "An error occured"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
when WebSocket::Pong
|
||||||
|
Baguette::Log.error "Received a pong message"
|
||||||
|
return LibIPC::IPCCB::Ignore
|
||||||
|
when WebSocket::Close
|
||||||
|
Baguette::Log.debug "Received a close message"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Closing
|
||||||
|
when Bytes
|
||||||
|
# TODO: when receiving a binary message
|
||||||
|
# we should test the format and maybe its content
|
||||||
|
Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET"
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
else
|
||||||
|
Baguette::Log.error "Received a websocket message with unknown type"
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
return LibIPC::IPCCB::Error
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "Exception (receiving a message) #{e}"
|
||||||
|
# tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET)
|
||||||
|
# tcp.close
|
||||||
|
Context.service.not_nil!.relations.remove fd
|
||||||
|
return LibIPC::IPCCB::Error
|
||||||
|
end
|
||||||
|
|
||||||
|
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
||||||
Baguette::Log.info "OUT fd is #{fd}"
|
Baguette::Log.info "OUT fd is #{fd}"
|
||||||
@relations.search?(fd).try do |relation|
|
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
||||||
message = IPC::Message.new pm
|
message = IPC::Message.new pm
|
||||||
Baguette::Log.info "message to send: #{message}"
|
Baguette::Log.info "message to send: #{message}"
|
||||||
|
|
||||||
@ -207,94 +224,71 @@ class Websocketc::Service < IPC::Server
|
|||||||
Baguette::Log.error "Wait, not supposed to get here. No relation?"
|
Baguette::Log.error "Wait, not supposed to get here. No relation?"
|
||||||
return LibIPC::IPCCB::Error
|
return LibIPC::IPCCB::Error
|
||||||
|
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "Exception during message transfer: #{e}"
|
Baguette::Log.error "Exception during message transfer: #{e}"
|
||||||
@relations.remove fd
|
Context.service.not_nil!.relations.remove fd
|
||||||
return LibIPC::IPCCB::Error
|
return LibIPC::IPCCB::Error
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
class Websocketc::Service < IPC::Server
|
||||||
|
property relations : Relations = Relations.new
|
||||||
|
property config : Baguette::Configuration::Websocketc
|
||||||
|
|
||||||
|
def initialize(@config : Baguette::Configuration::Websocketc)
|
||||||
|
super "ws"
|
||||||
|
Context.service = self
|
||||||
end
|
end
|
||||||
|
|
||||||
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
def print_self
|
||||||
Baguette::Log.info "IN fd is #{fd}"
|
LibIPC.ipc_ctx_print self.pointer
|
||||||
|
end
|
||||||
|
|
||||||
|
def first_connection(event : IPC::Event::MessageReceived)
|
||||||
|
# First message format: "URI"
|
||||||
|
payload = String.new event.message.payload
|
||||||
|
Baguette::Log.info "First message received: #{payload}"
|
||||||
|
|
||||||
|
# TODO: handle exceptions and errors
|
||||||
|
|
||||||
@relations.search?(fd).try do |relation|
|
|
||||||
message = nil
|
|
||||||
begin
|
begin
|
||||||
message = relation.ws.run_once
|
uri = URI.parse payload
|
||||||
|
ws = WebSocket.new uri
|
||||||
|
|
||||||
|
is_json = uri.path.ends_with? ".JSON"
|
||||||
|
|
||||||
|
service_fd = ws.ws.io.as(TCPSocket).fd
|
||||||
|
|
||||||
|
relation = Relation.new event.fd, service_fd, ws, is_json
|
||||||
|
@relations << relation
|
||||||
|
|
||||||
|
# Change client fd status.
|
||||||
|
LibIPC.ipc_ctx_fd_type self.pointer, event.fd, LibIPC::ConnectionType::Switched
|
||||||
|
|
||||||
|
# Add service fd as switched.
|
||||||
|
LibIPC.ipc_add_fd_switched self.pointer, service_fd
|
||||||
|
|
||||||
|
# Link both file descriptors
|
||||||
|
LibIPC.ipc_ctx_switching_add self.pointer, event.fd, service_fd
|
||||||
|
|
||||||
|
# Change service fd callbacks: only the service callbacks are changed,
|
||||||
|
# since the associated client is a simple libipc client
|
||||||
|
|
||||||
|
proc_cb_in = ->ws_cb_in(Int32, Pointer(LibIPC::Message), Int16*)
|
||||||
|
proc_cb_out = ->ws_cb_out(Int32, Pointer(LibIPC::Message))
|
||||||
|
|
||||||
|
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
|
||||||
|
|
||||||
|
Baguette::Log.debug "new client: #{event.fd}"
|
||||||
|
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "run_once FAILED: #{e}"
|
Baguette::Log.error "cannot connect to #{payload}: #{e}"
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
end
|
end
|
||||||
|
|
||||||
if relation.ws.ws.io.empty?
|
Baguette::Log.info "Let's say it's OK"
|
||||||
more_to_read[0] = 0
|
send_now event.fd, 1.to_u8, "OK"
|
||||||
else
|
|
||||||
more_to_read[0] = 1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
if relation.ws.closed?
|
|
||||||
Baguette::Log.info "client closed the connection"
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Closing
|
|
||||||
end
|
|
||||||
|
|
||||||
if message.nil?
|
|
||||||
Baguette::Log.error "Reveiced a nil message"
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Closing
|
|
||||||
end
|
|
||||||
|
|
||||||
case message
|
|
||||||
when String
|
|
||||||
if relation.is_json
|
|
||||||
ipc_message = IPC::Message.from_json(message)
|
|
||||||
ipc_message.copy_to_message_pointer pm
|
|
||||||
return LibIPC::IPCCB::NoError
|
|
||||||
end
|
|
||||||
when WebSocket::Ping
|
|
||||||
Baguette::Log.debug "TODO: Received a ping message"
|
|
||||||
return LibIPC::IPCCB::Ignore
|
|
||||||
else
|
|
||||||
# every other option should be dropped
|
|
||||||
case message
|
|
||||||
when WebSocket::Error
|
|
||||||
Baguette::Log.error "An error occured"
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
when WebSocket::Pong
|
|
||||||
Baguette::Log.error "Received a pong message"
|
|
||||||
return LibIPC::IPCCB::Ignore
|
|
||||||
when WebSocket::Close
|
|
||||||
Baguette::Log.debug "Received a close message"
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Closing
|
|
||||||
when WebSocket::NotFinal
|
|
||||||
Baguette::Log.warning "TODO: Received only part of a message."
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
when Bytes
|
|
||||||
# TODO: when receiving a binary message
|
|
||||||
# we should test the format and maybe its content
|
|
||||||
Baguette::Log.error "Received a binary message: NOT IMPLEMENTED, YET"
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
else
|
|
||||||
Baguette::Log.error "Received a websocket message with unknown type"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
rescue e
|
|
||||||
Baguette::Log.error "Exception (receiving a message) #{e}"
|
|
||||||
# tcp = WrappedTCPFileDescriptor.new(fd: fd, family: Socket::Family::INET)
|
|
||||||
# tcp.close
|
|
||||||
@relations.remove fd
|
|
||||||
return LibIPC::IPCCB::Error
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def run
|
def run
|
||||||
Baguette::Log.title "Starting websocketc"
|
Baguette::Log.title "Starting websocketc"
|
||||||
|
|
||||||
@ -324,13 +318,19 @@ class Websocketc::Service < IPC::Server
|
|||||||
Baguette::Log.error "relation: #{r}"
|
Baguette::Log.error "relation: #{r}"
|
||||||
else
|
else
|
||||||
first_connection event
|
first_connection event
|
||||||
|
Baguette::Log.warning "context currently is"
|
||||||
|
print_self
|
||||||
end
|
end
|
||||||
|
|
||||||
|
when IPC::Event::Switch
|
||||||
|
Baguette::Log.debug "switched message from #{event.fd}"
|
||||||
|
|
||||||
when IPC::Exception
|
when IPC::Exception
|
||||||
Baguette::Log.warning "IPC::Exception: #{event.message}"
|
Baguette::Log.warning "IPC::Exception: #{event.message}"
|
||||||
|
|
||||||
else
|
else
|
||||||
Baguette::Log.warning "unhandled IPC event: #{event.class}"
|
Baguette::Log.warning "unhandled IPC event: #{event.class}"
|
||||||
|
exit 1
|
||||||
end
|
end
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "exception: #{typeof(e)} - #{e.message}"
|
Baguette::Log.error "exception: #{typeof(e)} - #{e.message}"
|
||||||
|
Loading…
Reference in New Issue
Block a user