Better error management for ws.
This commit is contained in:
parent
e41c6878cf
commit
5e5de4a9f1
@ -56,6 +56,10 @@ class Relation
|
|||||||
def initialize(@fd_client, @fd_service, @ws, @is_json)
|
def initialize(@fd_client, @fd_service, @ws, @is_json)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def inspect(io) : Nil
|
||||||
|
to_s io
|
||||||
|
end
|
||||||
|
|
||||||
def to_s(io)
|
def to_s(io)
|
||||||
c = "%4d" % @fd_client
|
c = "%4d" % @fd_client
|
||||||
s = "%4d" % @fd_service
|
s = "%4d" % @fd_service
|
||||||
@ -66,6 +70,10 @@ class Relation
|
|||||||
end
|
end
|
||||||
io << "client #{c} service #{s} #{j}"
|
io << "client #{c} service #{s} #{j}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def finalize
|
||||||
|
Baguette::Log.warning "TADAAAAA"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Hide the complexity of managing relations.
|
# Hide the complexity of managing relations.
|
||||||
@ -83,38 +91,56 @@ class Relations < Array(Relation)
|
|||||||
end
|
end
|
||||||
|
|
||||||
def remove(fd : Int32)
|
def remove(fd : Int32)
|
||||||
|
Baguette::Log.warning "context before removing #{fd}:"
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
|
|
||||||
each do |r|
|
each do |r|
|
||||||
if r.related? fd
|
if r.related? fd
|
||||||
Baguette::Log.debug "TODO: closing this relation: #{r}"
|
Baguette::Log.debug "== Closing this relation: #{r}"
|
||||||
Baguette::Log.warning "Before removing, here the switching list"
|
|
||||||
|
|
||||||
sw = Context.service.not_nil!.context.switchdb
|
# Removing relations and file descriptors from C structures.
|
||||||
pointer_ctx = Context.service.not_nil!.pointer
|
pointer_ctx = Context.service.not_nil!.pointer
|
||||||
|
LibIPC.ipc_ctx_switching_del pointer_ctx, r.fd_client
|
||||||
LibIPC.ipc_switching_print pointerof(sw)
|
|
||||||
LibIPC.ipc_del_fd pointer_ctx, r.fd_client
|
LibIPC.ipc_del_fd pointer_ctx, r.fd_client
|
||||||
LibIPC.ipc_del_fd pointer_ctx, r.fd_service
|
LibIPC.ipc_del_fd pointer_ctx, r.fd_service
|
||||||
LibIPC.ipc_switching_del pointerof(sw), r.fd_client
|
|
||||||
|
# Close these sockets.
|
||||||
|
begin
|
||||||
|
s = Socket.new r.fd_client, Socket::Family::UNIX, Socket::Type::RAW
|
||||||
|
s.close
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "(ignoring) closing the client socket: #{e}"
|
||||||
|
end
|
||||||
|
begin
|
||||||
|
s = Socket.new r.fd_service, Socket::Family::INET, Socket::Type::STREAM
|
||||||
|
s.close
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.error "(ignoring) closing the service socket: #{e}"
|
||||||
|
end
|
||||||
|
|
||||||
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
|
all_fd.select! {|v| v != r.fd_client && v != r.fd_service }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
select! {|r| ! r.related? fd }
|
select! {|r| ! r.related? fd }
|
||||||
|
|
||||||
|
Baguette::Log.warning "context after remove"
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
require "./network.cr"
|
require "./network.cr"
|
||||||
|
|
||||||
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
||||||
Baguette::Log.info "IN fd is #{fd}"
|
|
||||||
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
||||||
|
Baguette::Log.info "IN fd is #{fd} in relation #{relation}"
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
message = nil
|
message = nil
|
||||||
begin
|
begin
|
||||||
message = relation.ws.run_once
|
message = relation.ws.run_once
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "run_once FAILED: #{e}"
|
Baguette::Log.error "run_once FAILED: #{e}"
|
||||||
Context.service.not_nil!.relations.remove fd
|
Context.service.not_nil!.relations.remove fd
|
||||||
return LibIPC::IPCCB::Error
|
return LibIPC::IPCCB::Closing
|
||||||
end
|
end
|
||||||
|
|
||||||
if relation.ws.ws.io.empty?
|
if relation.ws.ws.io.empty?
|
||||||
@ -138,10 +164,8 @@ def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
|||||||
case message
|
case message
|
||||||
when String
|
when String
|
||||||
if relation.is_json
|
if relation.is_json
|
||||||
Baguette::Log.warning "reassembling the message!"
|
|
||||||
# Reassemble the message.
|
# Reassemble the message.
|
||||||
m = relation.buffer_client + message
|
m = relation.buffer_client + message
|
||||||
pp! relation.buffer_client, message, m
|
|
||||||
# Clean the buffer.
|
# Clean the buffer.
|
||||||
relation.buffer_client = String.new
|
relation.buffer_client = String.new
|
||||||
|
|
||||||
@ -157,6 +181,8 @@ def ws_cb_in(fd : Int32, pm : LibIPC::Message*, more_to_read : Int16*)
|
|||||||
return LibIPC::IPCCB::Error
|
return LibIPC::IPCCB::Error
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Baguette::Log.warning "no error reassembling the message"
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
return LibIPC::IPCCB::NoError
|
return LibIPC::IPCCB::NoError
|
||||||
end
|
end
|
||||||
Baguette::Log.error "cannot handle non-json messages!"
|
Baguette::Log.error "cannot handle non-json messages!"
|
||||||
@ -206,8 +232,9 @@ rescue e
|
|||||||
end
|
end
|
||||||
|
|
||||||
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
def ws_cb_out(fd : Int32, pm : Pointer(LibIPC::Message))
|
||||||
Baguette::Log.info "OUT fd is #{fd}"
|
|
||||||
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
Context.service.not_nil!.relations.search?(fd).try do |relation|
|
||||||
|
Baguette::Log.info "OUT fd is #{fd} in relation #{relation}"
|
||||||
|
Context.service.not_nil!.print_self
|
||||||
message = IPC::Message.new pm
|
message = IPC::Message.new pm
|
||||||
Baguette::Log.info "message to send: #{message}"
|
Baguette::Log.info "message to send: #{message}"
|
||||||
|
|
||||||
@ -241,13 +268,19 @@ class Websocketc::Service < IPC::Server
|
|||||||
end
|
end
|
||||||
|
|
||||||
def print_self
|
def print_self
|
||||||
|
Baguette::Log.warning "From C perspective"
|
||||||
LibIPC.ipc_ctx_print self.pointer
|
LibIPC.ipc_ctx_print self.pointer
|
||||||
|
|
||||||
|
Baguette::Log.warning "From Crystal perspective"
|
||||||
|
Baguette::Log.warning "all fd: #{@relations.all_fd.join ", "}"
|
||||||
|
@relations.each do |r| pp! r end
|
||||||
|
Baguette::Log.warning "==="
|
||||||
end
|
end
|
||||||
|
|
||||||
def first_connection(event : IPC::Event::MessageReceived)
|
def first_connection(event : IPC::Event::MessageReceived)
|
||||||
# First message format: "URI"
|
# First message format: "URI"
|
||||||
payload = String.new event.message.payload
|
payload = String.new event.message.payload
|
||||||
Baguette::Log.info "First message received: #{payload}"
|
# Baguette::Log.info "First message received: #{payload}"
|
||||||
|
|
||||||
# TODO: handle exceptions and errors
|
# TODO: handle exceptions and errors
|
||||||
|
|
||||||
@ -279,13 +312,12 @@ class Websocketc::Service < IPC::Server
|
|||||||
|
|
||||||
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
|
LibIPC.ipc_switching_callbacks self.pointer, service_fd, proc_cb_in, proc_cb_out
|
||||||
|
|
||||||
Baguette::Log.debug "new client: #{event.fd}"
|
# Baguette::Log.debug "new client: #{event.fd}"
|
||||||
|
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "cannot connect to #{payload}: #{e}"
|
Baguette::Log.error "cannot connect to #{payload}: #{e}"
|
||||||
end
|
end
|
||||||
|
|
||||||
Baguette::Log.info "Let's say it's OK"
|
|
||||||
send_now event.fd, 1.to_u8, "OK"
|
send_now event.fd, 1.to_u8, "OK"
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -300,22 +332,31 @@ class Websocketc::Service < IPC::Server
|
|||||||
case event
|
case event
|
||||||
when IPC::Event::Timer
|
when IPC::Event::Timer
|
||||||
Baguette::Log.debug "Timer" if @config.print_ipc_timer
|
Baguette::Log.debug "Timer" if @config.print_ipc_timer
|
||||||
|
Baguette::Log.debug "Timer: see the context"
|
||||||
|
print_self
|
||||||
|
|
||||||
when IPC::Event::Connection
|
when IPC::Event::Connection
|
||||||
Baguette::Log.debug "connection from #{event.fd}"
|
Baguette::Log.info "connection from #{event.fd}"
|
||||||
|
|
||||||
when IPC::Event::Disconnection
|
when IPC::Event::Disconnection
|
||||||
Baguette::Log.debug "disconnection from #{event.fd}"
|
Baguette::Log.info "disconnection from #{event.fd}"
|
||||||
@relations.remove event.fd
|
@relations.remove event.fd
|
||||||
|
# begin
|
||||||
|
# s = Socket.new event.fd, Socket::Family::UNIX, Socket::Type::RAW
|
||||||
|
# s.close
|
||||||
|
# rescue e
|
||||||
|
# Baguette::Log.warning "cannot close the socket #{event.fd}: #{e} (ignoring the problem)"
|
||||||
|
# end
|
||||||
|
|
||||||
when IPC::Event::MessageSent
|
when IPC::Event::MessageSent
|
||||||
Baguette::Log.debug "message sent to #{event.fd}"
|
Baguette::Log.info "message sent to #{event.fd}"
|
||||||
|
|
||||||
when IPC::Event::MessageReceived
|
when IPC::Event::MessageReceived
|
||||||
Baguette::Log.debug "message received from #{event.fd}"
|
Baguette::Log.info "message received from #{event.fd}"
|
||||||
if r = @relations.search? event.fd
|
if r = @relations.search? event.fd
|
||||||
Baguette::Log.error "MessageReceived but from an already existent relation"
|
Baguette::Log.error "MessageReceived but from an already existent relation"
|
||||||
Baguette::Log.error "relation: #{r}"
|
Baguette::Log.error "relation: #{r}"
|
||||||
|
exit 1
|
||||||
else
|
else
|
||||||
first_connection event
|
first_connection event
|
||||||
Baguette::Log.warning "context currently is"
|
Baguette::Log.warning "context currently is"
|
||||||
@ -323,17 +364,40 @@ class Websocketc::Service < IPC::Server
|
|||||||
end
|
end
|
||||||
|
|
||||||
when IPC::Event::Switch
|
when IPC::Event::Switch
|
||||||
Baguette::Log.debug "switched message from #{event.fd}"
|
# Baguette::Log.debug "switched message from #{event.fd}"
|
||||||
|
|
||||||
|
when IPC::Event::EventNotSet
|
||||||
|
Baguette::Log.error "Event not set: #{event.fd}"
|
||||||
|
@relations.remove event.fd
|
||||||
|
begin
|
||||||
|
s = Socket.new event.fd, Socket::Family::UNIX, Socket::Type::RAW
|
||||||
|
s.close
|
||||||
|
rescue e
|
||||||
|
Baguette::Log.warning "cannot close the socket #{event.fd}: #{e} (ignoring)"
|
||||||
|
end
|
||||||
|
|
||||||
|
when IPC::Event::Error
|
||||||
|
Baguette::Log.error "Event Error on fd #{event.fd} (removing it)"
|
||||||
|
@relations.remove event.fd
|
||||||
|
|
||||||
when IPC::Exception
|
when IPC::Exception
|
||||||
Baguette::Log.warning "IPC::Exception: #{event.message}"
|
Baguette::Log.error "IPC::Exception: #{event.message}"
|
||||||
|
|
||||||
|
if event.message == "closed recipient"
|
||||||
|
Baguette::Log.error "Bloody closed recipient!"
|
||||||
|
# @relations.remove #
|
||||||
|
else
|
||||||
|
Baguette::Log.error "CLOSING WS"
|
||||||
|
exit 1
|
||||||
|
end
|
||||||
|
|
||||||
else
|
else
|
||||||
Baguette::Log.warning "unhandled IPC event: #{event.class}"
|
Baguette::Log.warning "unhandled IPC event: #{event.class}"
|
||||||
exit 1
|
exit 1
|
||||||
end
|
end
|
||||||
rescue e
|
rescue e
|
||||||
Baguette::Log.error "exception: #{typeof(e)} - #{e.message}"
|
Baguette::Log.error "Exception during event handling: #{typeof(e)} - #{e.message}"
|
||||||
|
exit 1
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user