send_now + simpler wait_event
This commit is contained in:
parent
d791528008
commit
7870346afb
@ -19,6 +19,10 @@ class IPC::Client < IPC::Context
|
|||||||
at_exit { close }
|
at_exit { close }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def fd
|
||||||
|
@server_fd
|
||||||
|
end
|
||||||
|
|
||||||
def read
|
def read
|
||||||
unless (fd = @server_fd).nil?
|
unless (fd = @server_fd).nil?
|
||||||
message = LibIPC::Message.new
|
message = LibIPC::Message.new
|
||||||
|
@ -40,13 +40,13 @@ class IPC::Context
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def wait_event(&block) : IPC::Event::Events | Exception
|
def wait_event : IPC::Event::Events | Exception
|
||||||
event = LibIPC::Event.new
|
event = LibIPC::Event.new
|
||||||
|
|
||||||
r = LibIPC.ipc_wait_event self.pointer, pointerof(event), pointerof(@timer)
|
r = LibIPC.ipc_wait_event self.pointer, pointerof(event), pointerof(@timer)
|
||||||
if r.error_code != 0
|
if r.error_code != 0
|
||||||
m = String.new r.error_message.to_slice
|
m = String.new r.error_message.to_slice
|
||||||
yield IPC::Exception.new "error waiting for a new event: #{m}"
|
return IPC::Exception.new "error waiting for a new event: #{m}"
|
||||||
end
|
end
|
||||||
|
|
||||||
eventtype = event.type.unsafe_as(LibIPC::EventType)
|
eventtype = event.type.unsafe_as(LibIPC::EventType)
|
||||||
@ -88,32 +88,55 @@ class IPC::Context
|
|||||||
@timer = @base_timer
|
@timer = @base_timer
|
||||||
end
|
end
|
||||||
|
|
||||||
yield wait_event &block
|
break if yield wait_event
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def send_now(message : LibIPC::Message)
|
||||||
|
r = LibIPC.ipc_write_fd(message.fd, pointerof(message))
|
||||||
|
if r.error_code != 0
|
||||||
|
m = String.new r.error_message.to_slice
|
||||||
|
raise Exception.new "error writing a message: #{m}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_now(message : IPC::Message)
|
||||||
|
send_now fd: message.fd, utype: message.utype, payload: message.payload
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_now(fd : Int32, utype : UInt8, payload : Bytes)
|
||||||
|
message = LibIPC::Message.new fd: fd,
|
||||||
|
type: LibIPC::MessageType::Data.to_u8,
|
||||||
|
user_type: utype,
|
||||||
|
length: payload.bytesize,
|
||||||
|
payload: payload.to_unsafe
|
||||||
|
send_now message
|
||||||
|
end
|
||||||
|
|
||||||
|
def send(message : LibIPC::Message)
|
||||||
|
r = LibIPC.ipc_write(self.pointer, pointerof(message))
|
||||||
|
if r.error_code != 0
|
||||||
|
m = String.new r.error_message.to_slice
|
||||||
|
raise Exception.new "error writing a message: #{m}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
def send(message : IPC::Message)
|
||||||
|
send fd: message.fd, utype: message.utype, payload: message.payload
|
||||||
|
end
|
||||||
|
|
||||||
def send(fd : Int32, utype : UInt8, payload : Bytes)
|
def send(fd : Int32, utype : UInt8, payload : Bytes)
|
||||||
message = LibIPC::Message.new fd: fd,
|
message = LibIPC::Message.new fd: fd,
|
||||||
type: LibIPC::MessageType::Data.to_u8,
|
type: LibIPC::MessageType::Data.to_u8,
|
||||||
user_type: utype,
|
user_type: utype,
|
||||||
length: payload.bytesize,
|
length: payload.bytesize,
|
||||||
payload: payload.to_unsafe
|
payload: payload.to_unsafe
|
||||||
|
send message
|
||||||
r = LibIPC.ipc_write(self.pointer, pointerof(message))
|
|
||||||
if r.error_code != 0
|
|
||||||
m = String.new r.error_message.to_slice
|
|
||||||
raise Exception.new "error writing a message: #{m}"
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def send(fd : Int32, utype : UInt8, payload : String)
|
def send(fd : Int32, utype : UInt8, payload : String)
|
||||||
send(fd, utype, Bytes.new(payload.to_unsafe, payload.bytesize))
|
send(fd, utype, Bytes.new(payload.to_unsafe, payload.bytesize))
|
||||||
end
|
end
|
||||||
|
|
||||||
def send(message : IPC::Message)
|
|
||||||
send(message.fd, message.utype, message.payload)
|
|
||||||
end
|
|
||||||
|
|
||||||
def read(index : UInt32)
|
def read(index : UInt32)
|
||||||
message = LibIPC::Message.new
|
message = LibIPC::Message.new
|
||||||
r = LibIPC.ipc_read(self.pointer, index, pointerof(message))
|
r = LibIPC.ipc_read(self.pointer, index, pointerof(message))
|
||||||
@ -130,11 +153,6 @@ class IPC::Context
|
|||||||
pointerof(@context)
|
pointerof(@context)
|
||||||
end
|
end
|
||||||
|
|
||||||
# sanitizer
|
|
||||||
def fd
|
|
||||||
@connection.fd
|
|
||||||
end
|
|
||||||
|
|
||||||
def close
|
def close
|
||||||
return if @closed
|
return if @closed
|
||||||
r = LibIPC.ipc_close_all(self.pointer)
|
r = LibIPC.ipc_close_all(self.pointer)
|
||||||
|
@ -118,6 +118,10 @@ lib LibIPC
|
|||||||
# Sending a message (will wait the fd to become available for IO operations).
|
# Sending a message (will wait the fd to become available for IO operations).
|
||||||
fun ipc_write(Ctx*, Message*) : IPCError
|
fun ipc_write(Ctx*, Message*) : IPCError
|
||||||
|
|
||||||
|
# Sending a message NOW.
|
||||||
|
# WARNING: unbuffered send do not wait the fd to become available.
|
||||||
|
fun ipc_write_fd(Int32, Message*) : IPCError
|
||||||
|
|
||||||
# This function let the user get the default error message based on the error code.
|
# This function let the user get the default error message based on the error code.
|
||||||
# The error message is contained in the IPCError structure, this function should not be used, in most cases.
|
# The error message is contained in the IPCError structure, this function should not be used, in most cases.
|
||||||
fun ipc_errors_get (LibC::Int) : LibC::Char*
|
fun ipc_errors_get (LibC::Int) : LibC::Char*
|
||||||
|
109
tests/pongc.cr
109
tests/pongc.cr
@ -1,24 +1,101 @@
|
|||||||
|
require "option_parser"
|
||||||
require "../src/ipc.cr"
|
require "../src/ipc.cr"
|
||||||
|
require "./prints.cr"
|
||||||
|
|
||||||
client = IPC::Client.new "pong"
|
class CLI
|
||||||
|
class_property service_name = "pong"
|
||||||
server_fd = client.server_fd
|
class_property message : String? = nil
|
||||||
|
class_property type = 1
|
||||||
if server_fd.nil?
|
class_property user_type = 42
|
||||||
puts "there is no server_fd!!"
|
class_property verbosity = 1
|
||||||
exit 1
|
class_property rounds = 1
|
||||||
end
|
end
|
||||||
|
|
||||||
message = IPC::Message.new server_fd, 1, 42.to_u8, "salut ça va ?"
|
OptionParser.parse do |parser|
|
||||||
|
parser.on "-s service_name", "--service-name service_name", "URI" do |optsn|
|
||||||
|
CLI.service_name = optsn
|
||||||
|
end
|
||||||
|
|
||||||
client.send message
|
parser.on "-v verbosity", "--verbosity verbosity", "Verbosity (0 = nothing is printed, 1 = only events, 2 = events and messages). Default: 1" do |optsn|
|
||||||
|
CLI.verbosity = optsn.to_i
|
||||||
|
end
|
||||||
|
|
||||||
client.loop do |event|
|
parser.on "-t message_type",
|
||||||
case event
|
"--type message_type",
|
||||||
when IPC::Event::MessageReceived
|
"(internal) message type." do |opt|
|
||||||
puts "\033[32mthere is a message\033[00m"
|
CLI.type = opt.to_i
|
||||||
puts event.message.to_s
|
end
|
||||||
client.close
|
|
||||||
exit
|
parser.on "-u user_message_type",
|
||||||
|
"--user-type user_message_type",
|
||||||
|
"Message type." do |opt|
|
||||||
|
CLI.user_type = opt.to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
parser.on "-r rounds", "--rounds count", "Number of messages sent." do |opt|
|
||||||
|
CLI.rounds = opt.to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
parser.on "-m message", "--message m", "Message to sent." do |opt|
|
||||||
|
CLI.message = opt
|
||||||
|
end
|
||||||
|
|
||||||
|
parser.on "-h", "--help", "Show this help" do
|
||||||
|
puts parser
|
||||||
|
exit 0
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def main
|
||||||
|
client = IPC::Client.new CLI.service_name
|
||||||
|
client.base_timer = 30_000 # 30 seconds
|
||||||
|
client.timer = 30_000 # 30 seconds
|
||||||
|
|
||||||
|
server_fd = client.server_fd
|
||||||
|
if server_fd.nil?
|
||||||
|
puts "there is no server_fd!!"
|
||||||
|
exit 1
|
||||||
|
end
|
||||||
|
|
||||||
|
nb_messages_remaining = CLI.rounds
|
||||||
|
|
||||||
|
# Listening on STDIN.
|
||||||
|
client << 0
|
||||||
|
|
||||||
|
client.loop do |event|
|
||||||
|
case event
|
||||||
|
when IPC::Event::ExtraSocket
|
||||||
|
puts "extra socket fd #{event.fd}"
|
||||||
|
info "reading on #{event.fd}"
|
||||||
|
if event.fd == 0
|
||||||
|
puts "reading on STDIN"
|
||||||
|
end
|
||||||
|
|
||||||
|
mstr = if CLI.message.nil?
|
||||||
|
if event.fd == 0 STDIN.gets || "STDIN failed!" else "coucou" end
|
||||||
|
else
|
||||||
|
CLI.message.not_nil!
|
||||||
|
end
|
||||||
|
|
||||||
|
CLI.rounds.times do |i|
|
||||||
|
client.send server_fd.not_nil!, CLI.user_type.to_u8, mstr.to_slice
|
||||||
|
end
|
||||||
|
when IPC::Event::MessageReceived
|
||||||
|
nb_messages_remaining -= 1
|
||||||
|
info "new message from #{event.fd}: #{event.message.to_s}, remaining #{nb_messages_remaining}"
|
||||||
|
if nb_messages_remaining == 0
|
||||||
|
exit 0
|
||||||
|
end
|
||||||
|
when IPC::Event::Disconnection
|
||||||
|
info "Disconnection from #{event.fd}"
|
||||||
|
if event.fd == 0
|
||||||
|
client.remove_fd 0
|
||||||
|
end
|
||||||
|
else
|
||||||
|
info "unhandled event: #{event.class}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
main
|
||||||
|
102
tests/pongd.cr
102
tests/pongd.cr
@ -1,22 +1,30 @@
|
|||||||
require "option_parser"
|
require "option_parser"
|
||||||
require "../src/ipc.cr"
|
require "../src/ipc.cr"
|
||||||
require "./colors"
|
require "./prints.cr"
|
||||||
|
|
||||||
verbosity = 1
|
class CLI
|
||||||
service_name = "pong"
|
class_property service_name = "pong"
|
||||||
no_response = false
|
class_property verbosity = 1
|
||||||
|
class_property timer = 30_000
|
||||||
|
class_property no_response = false
|
||||||
|
end
|
||||||
|
|
||||||
OptionParser.parse do |parser|
|
OptionParser.parse do |parser|
|
||||||
parser.on "-s service_name", "--service-name service_name", "URI" do |optsn|
|
parser.on "-s service_name", "--service-name service_name", "URI" do |optsn|
|
||||||
service_name = optsn
|
CLI.service_name = optsn
|
||||||
end
|
end
|
||||||
|
|
||||||
parser.on "-n", "--no-response", "Do not provide any response back." do
|
parser.on "-n", "--no-response", "Do not provide any response back." do
|
||||||
no_response = true
|
CLI.no_response = true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
parser.on "-t timer", "--timer ms", "Timer in ms. Default: 30 000" do |optsn|
|
||||||
|
CLI.timer = optsn.to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
parser.on "-v verbosity", "--verbosity verbosity", "Verbosity (0 = nothing is printed, 1 = only events, 2 = events and messages). Default: 1" do |optsn|
|
parser.on "-v verbosity", "--verbosity verbosity", "Verbosity (0 = nothing is printed, 1 = only events, 2 = events and messages). Default: 1" do |optsn|
|
||||||
verbosity = optsn.to_i
|
CLI.verbosity = optsn.to_i
|
||||||
end
|
end
|
||||||
|
|
||||||
parser.on "-h", "--help", "Show this help" do
|
parser.on "-h", "--help", "Show this help" do
|
||||||
@ -25,54 +33,46 @@ OptionParser.parse do |parser|
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
service = IPC::Server.new (service_name)
|
def main
|
||||||
service.base_timer = 5000 # 5 seconds
|
service = IPC::Server.new CLI.service_name
|
||||||
service.timer = 5000 # 5 seconds
|
service.base_timer = CLI.timer # default: 30 seconds
|
||||||
|
service.timer = CLI.timer # default: 30 seconds
|
||||||
|
|
||||||
service.loop do |event|
|
service.loop do |event|
|
||||||
case event
|
# service.pp
|
||||||
when IPC::Event::Timer
|
case event
|
||||||
if verbosity >= 1
|
when IPC::Event::Timer
|
||||||
puts "#{CORANGE}IPC::Event::Timer#{CRESET}"
|
info "IPC::Event::Timer"
|
||||||
end
|
when IPC::Event::Connection
|
||||||
when IPC::Event::Connection
|
info "IPC::Event::Connection, client: #{event.fd}"
|
||||||
if verbosity >= 1
|
when IPC::Event::Disconnection
|
||||||
puts "#{CBLUE}IPC::Event::Connection#{CRESET}, client: #{event.fd}"
|
info "IPC::Event::Disconnection, client: #{event.fd}"
|
||||||
end
|
when IPC::Event::MessageSent
|
||||||
when IPC::Event::Disconnection
|
begin
|
||||||
if verbosity >= 1
|
info "IPC::Event::MessageSent, client: #{event.fd}"
|
||||||
puts "#{CBLUE}IPC::Event::Disconnection#{CRESET}, client: #{event.fd}"
|
rescue e
|
||||||
end
|
important "#{e.message}"
|
||||||
when IPC::Event::MessageSent
|
service.remove_fd event.fd
|
||||||
begin
|
|
||||||
if verbosity >= 1
|
|
||||||
puts "#{CGREEN}IPC::Event::MessageSent#{CRESET}, client: #{event.fd}"
|
|
||||||
end
|
end
|
||||||
rescue e
|
when IPC::Event::MessageReceived
|
||||||
puts "#{CRED}#{e.message}#{CRESET}"
|
begin
|
||||||
service.remove_fd event.fd
|
info "IPC::Event::MessageReceived, client: #{event.fd}"
|
||||||
end
|
m = String.new event.message.payload
|
||||||
when IPC::Event::MessageReceived
|
debug "message type #{event.message.utype}: #{m}"
|
||||||
begin
|
|
||||||
if verbosity >= 1
|
unless CLI.no_response
|
||||||
puts "#{CGREEN}IPC::Event::MessageReceived#{CRESET}, client: #{event.fd}"
|
service.send event.message
|
||||||
if verbosity >= 2
|
debug "sending message..."
|
||||||
m = String.new event.message.payload
|
|
||||||
puts "#{CBLUE}message type #{event.message.utype}: #{m} #{CRESET}"
|
|
||||||
end
|
end
|
||||||
end
|
|
||||||
service.send event.message unless no_response
|
|
||||||
if verbosity >= 2 && ! no_response
|
|
||||||
puts "#{CBLUE}sending message...#{CRESET}"
|
|
||||||
end
|
|
||||||
|
|
||||||
rescue e
|
rescue e
|
||||||
puts "#{CRED}#{e.message}#{CRESET}"
|
important "#{e.message}"
|
||||||
service.remove_fd event.fd
|
service.remove_fd event.fd
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
if verbosity >= 1
|
important "Exception: message #{event}"
|
||||||
puts "#{CRED}Exception: message #{event} #{CRESET}"
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
main
|
||||||
|
13
tests/prints.cr
Normal file
13
tests/prints.cr
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
require "./colors"
|
||||||
|
|
||||||
|
def important(message : String)
|
||||||
|
puts "#{CRED}#{message}#{CRESET}" if CLI.verbosity > 0
|
||||||
|
end
|
||||||
|
|
||||||
|
def info(message : String)
|
||||||
|
puts "#{CGREEN}#{message}#{CRESET}" if CLI.verbosity > 1
|
||||||
|
end
|
||||||
|
|
||||||
|
def debug(message : String)
|
||||||
|
puts "#{CBLUE}#{message}#{CRESET}" if CLI.verbosity > 2
|
||||||
|
end
|
Reference in New Issue
Block a user