pubsub rewrite, draft
parent
9865fcefff
commit
e8db551536
|
@ -0,0 +1,182 @@
|
||||||
|
#include "../../core/ipc.h"
|
||||||
|
|
||||||
|
#include "../lib/message.h"
|
||||||
|
#include "../lib/channels.h"
|
||||||
|
|
||||||
|
#include "../lib/pubsub.h"
|
||||||
|
#include "../lib/pubsubd.h"
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
void usage (char **argv) {
|
||||||
|
printf ( "usage: %s [chan [pub]]\n", argv[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void print_cmd (void) {
|
||||||
|
printf ("\033[32m>\033[00m ");
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
|
||||||
|
void chan_sub (struct ipc_service *srv, char *chan)
|
||||||
|
{
|
||||||
|
struct pubsub_msg msg;
|
||||||
|
memset (&msg, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
// meta data on the message
|
||||||
|
msg.type = PUBSUB_MSG_TYPE_SUB;
|
||||||
|
msg.chanlen = strlen (chan) + 1;
|
||||||
|
msg.chan = malloc (msg.chanlen);
|
||||||
|
memset (msg.chan, 0, msg.chanlen);
|
||||||
|
strncpy (msg.chan, chan, msg.chanlen);
|
||||||
|
msg.chan[strlen (chan)] = '\0';
|
||||||
|
|
||||||
|
pubsub_message_send (srv, &msg);
|
||||||
|
printf ("subscribed to %s\n", chan);
|
||||||
|
|
||||||
|
pubsub_message_free (&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
void main_loop (char **env, int index, int version
|
||||||
|
, char *cmd, char *chan)
|
||||||
|
{
|
||||||
|
printf ("connection to pubsubd: index %d version %d "
|
||||||
|
"cmd %s chan %s\n"
|
||||||
|
, index, version, cmd, chan );
|
||||||
|
|
||||||
|
struct ipc_service srv;
|
||||||
|
memset (&srv, 0, sizeof (struct ipc_service));
|
||||||
|
int ret = ipc_application_connection (env, &srv, PUBSUBD_SERVICE_NAME);
|
||||||
|
if (ret != 0) {
|
||||||
|
handle_err ("pubsub_connection", "application_connection != 0");
|
||||||
|
}
|
||||||
|
printf ("connected\n");
|
||||||
|
|
||||||
|
if (strncmp (cmd, "sub", 3) == 0) {
|
||||||
|
chan_sub (&srv, chan);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf ("main_loop\n");
|
||||||
|
|
||||||
|
struct pubsub_msg msg;
|
||||||
|
memset (&msg, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
// meta data on the message
|
||||||
|
msg.type = PUBSUB_MSG_TYPE_PUB;
|
||||||
|
msg.chanlen = strlen (chan) + 1;
|
||||||
|
msg.chan = malloc (msg.chanlen);
|
||||||
|
memset (msg.chan, 0, msg.chanlen);
|
||||||
|
strncpy ((char *) msg.chan, chan, msg.chanlen);
|
||||||
|
msg.chan[strlen (chan)] = '\0';
|
||||||
|
|
||||||
|
struct ipc_event event;
|
||||||
|
memset (&event, 0, sizeof (struct ipc_event));
|
||||||
|
|
||||||
|
struct ipc_services services;
|
||||||
|
memset (&services, 0, sizeof (struct ipc_services));
|
||||||
|
ipc_service_add (&services, &srv);
|
||||||
|
|
||||||
|
int should_continue = 1;
|
||||||
|
|
||||||
|
while (should_continue) {
|
||||||
|
print_cmd ();
|
||||||
|
ret = ipc_application_peek_event (&services, &event);
|
||||||
|
|
||||||
|
if (ret != 0) {
|
||||||
|
handle_error("ipc_application_peek_event != 0");
|
||||||
|
exit (EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (event.type) {
|
||||||
|
case IPC_EVENT_TYPE_STDIN:
|
||||||
|
{
|
||||||
|
struct ipc_message *m = event.m;
|
||||||
|
if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) {
|
||||||
|
// TODO: disconnection
|
||||||
|
|
||||||
|
ipc_message_empty (m);
|
||||||
|
free (m);
|
||||||
|
|
||||||
|
should_continue = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the curent payload, change it to be compatible with the application
|
||||||
|
// then send it
|
||||||
|
print_cmd ();
|
||||||
|
|
||||||
|
// TODO: remove \n
|
||||||
|
msg.datalen = m->length + 1;
|
||||||
|
msg.data = malloc (msg.datalen);
|
||||||
|
memset (msg.data, 0, msg.datalen);
|
||||||
|
strncpy (msg.data, m->payload, msg.datalen);
|
||||||
|
msg.data[msg.datalen] = '\0';
|
||||||
|
|
||||||
|
pubsub_message_send (&srv, &msg);
|
||||||
|
free (msg.data);
|
||||||
|
msg.data = NULL;
|
||||||
|
msg.datalen = 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case IPC_EVENT_TYPE_MESSAGE:
|
||||||
|
{
|
||||||
|
struct ipc_message *m = event.m;
|
||||||
|
printf ("msg recv: %.*s", m->length, m->payload);
|
||||||
|
// TODO: correctly print the message
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case IPC_EVENT_TYPE_DISCONNECTION:
|
||||||
|
{
|
||||||
|
printf ("server disconnected: quitting...\n");
|
||||||
|
|
||||||
|
// just remove srv from services, it's already closed
|
||||||
|
ipc_services_free (&services);
|
||||||
|
|
||||||
|
exit (EXIT_SUCCESS);
|
||||||
|
};
|
||||||
|
case IPC_EVENT_TYPE_NOT_SET:
|
||||||
|
case IPC_EVENT_TYPE_CONNECTION:
|
||||||
|
case IPC_EVENT_TYPE_ERROR:
|
||||||
|
default :
|
||||||
|
fprintf (stderr, "should not happen, event type %d\n", event.type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// free everything
|
||||||
|
pubsub_message_free (&msg);
|
||||||
|
|
||||||
|
printf ("disconnection...\n");
|
||||||
|
ipc_services_free (&services);
|
||||||
|
if (ipc_application_close (&srv) < 0) {
|
||||||
|
handle_err("main", "application_close < 0");
|
||||||
|
exit (EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv, char **env)
|
||||||
|
{
|
||||||
|
char *cmd = "sub";
|
||||||
|
char *chan = "chan1";
|
||||||
|
|
||||||
|
if (argc == 2 && strncmp("-h", argv[1], 2) == 0) {
|
||||||
|
usage (argv);
|
||||||
|
exit (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argc >= 2) {
|
||||||
|
chan = argv[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argc >= 3) {
|
||||||
|
cmd = argv[2];
|
||||||
|
}
|
||||||
|
|
||||||
|
int index = 0;
|
||||||
|
// don't care about the version
|
||||||
|
int version = 0;
|
||||||
|
|
||||||
|
main_loop (env, index, version, cmd, chan);
|
||||||
|
|
||||||
|
return EXIT_SUCCESS;
|
||||||
|
}
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
// TODO: select on service + input instead of threads
|
// TODO: select on service + input instead of threads
|
||||||
|
|
||||||
#include "../../core/error.h"
|
#include "../../core/ipc.h"
|
||||||
#include "../lib/pubsub.h"
|
#include "../lib/pubsub.h"
|
||||||
#include "../lib/pubsubd.h"
|
#include "../lib/pubsubd.h"
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
@ -67,8 +67,7 @@ void chan_sub (struct ipc_service *srv, char *chan)
|
||||||
pubsub_message_free (&msg);
|
pubsub_message_free (&msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void main_loop (int argc, char **argv, char **env
|
void main_loop (char **env, int index, int version
|
||||||
, int index, int version
|
|
||||||
, char *cmd, char *chan)
|
, char *cmd, char *chan)
|
||||||
{
|
{
|
||||||
printf ("connection to pubsubd: index %d version %d "
|
printf ("connection to pubsubd: index %d version %d "
|
||||||
|
@ -77,7 +76,7 @@ void main_loop (int argc, char **argv, char **env
|
||||||
|
|
||||||
struct ipc_service srv;
|
struct ipc_service srv;
|
||||||
memset (&srv, 0, sizeof (struct ipc_service));
|
memset (&srv, 0, sizeof (struct ipc_service));
|
||||||
pubsub_connection (argc, argv, env, &srv);
|
pubsub_connection (env, &srv);
|
||||||
printf ("connected\n");
|
printf ("connected\n");
|
||||||
|
|
||||||
if (strncmp (cmd, "sub", 3) == 0) {
|
if (strncmp (cmd, "sub", 3) == 0) {
|
||||||
|
@ -167,7 +166,7 @@ int main(int argc, char **argv, char **env)
|
||||||
// don't care about the version
|
// don't care about the version
|
||||||
int version = 0;
|
int version = 0;
|
||||||
|
|
||||||
main_loop (argc, argv, env, index, version, cmd, chan);
|
main_loop (env, index, version, cmd, chan);
|
||||||
|
|
||||||
return EXIT_SUCCESS;
|
return EXIT_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
#include "../../core/communication.h"
|
#include "../../core/ipc.h"
|
||||||
#include "../../core/client.h"
|
|
||||||
#include "../../core/error.h"
|
|
||||||
#include "../lib/pubsubd.h"
|
#include "../lib/pubsubd.h"
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
@ -29,6 +27,10 @@ void handle_signal (int signalnumber)
|
||||||
int
|
int
|
||||||
main(int argc, char **argv, char **env)
|
main(int argc, char **argv, char **env)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
argc = argc;
|
||||||
|
argv = argv;
|
||||||
|
|
||||||
memset (&srv, 0, sizeof (struct ipc_service));
|
memset (&srv, 0, sizeof (struct ipc_service));
|
||||||
srv.index = 0;
|
srv.index = 0;
|
||||||
srv.version = 0;
|
srv.version = 0;
|
||||||
|
@ -40,7 +42,7 @@ main(int argc, char **argv, char **env)
|
||||||
memset (&chans, 0, sizeof (struct channels));
|
memset (&chans, 0, sizeof (struct channels));
|
||||||
pubsubd_channels_init (&chans);
|
pubsubd_channels_init (&chans);
|
||||||
|
|
||||||
if (ipc_server_init (argc, argv, env, &srv, PUBSUBD_SERVICE_NAME) < 0) {
|
if (ipc_server_init (env, &srv, PUBSUBD_SERVICE_NAME) < 0) {
|
||||||
handle_error("ipc_server_init < 0");
|
handle_error("ipc_server_init < 0");
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
|
|
||||||
#include "pubsub.h"
|
#include "pubsub.h"
|
||||||
#include "pubsubd.h"
|
#include "pubsubd.h"
|
||||||
#include "../../core/error.h"
|
|
||||||
|
|
||||||
#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
|
#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
|
||||||
#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
|
#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
|
||||||
|
@ -33,11 +32,9 @@ void pubsub_quit (struct ipc_service *srv)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int pubsub_connection (int argc, char **argv, char **env
|
int pubsub_connection (char **env, struct ipc_service *srv)
|
||||||
, struct ipc_service *srv)
|
|
||||||
{
|
{
|
||||||
int ret = ipc_application_connection (argc, argv, env
|
int ret = ipc_application_connection (env, srv, PUBSUBD_SERVICE_NAME);
|
||||||
, srv, PUBSUBD_SERVICE_NAME, NULL, 0);
|
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
handle_err ("pubsub_connection", "application_connection != 0");
|
handle_err ("pubsub_connection", "application_connection != 0");
|
||||||
|
@ -69,7 +66,7 @@ int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m)
|
||||||
}
|
}
|
||||||
|
|
||||||
ipc_application_write (srv, &m_data);
|
ipc_application_write (srv, &m_data);
|
||||||
ipc_message_free (&m_data);
|
ipc_message_empty (&m_data);
|
||||||
|
|
||||||
if (buf != NULL)
|
if (buf != NULL)
|
||||||
free(buf);
|
free(buf);
|
||||||
|
@ -95,7 +92,7 @@ int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m)
|
||||||
ipc_application_read (srv, &m_recv);
|
ipc_application_read (srv, &m_recv);
|
||||||
pubsub_message_unserialize (m, m_recv.payload, m_recv.length);
|
pubsub_message_unserialize (m, m_recv.payload, m_recv.length);
|
||||||
|
|
||||||
ipc_message_free (&m_recv);
|
ipc_message_empty (&m_recv);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
#ifndef __PUBSUB_H__
|
#ifndef __PUBSUB_H__
|
||||||
#define __PUBSUB_H__
|
#define __PUBSUB_H__
|
||||||
|
|
||||||
#include "../../core/communication.h"
|
#include "../../core/ipc.h"
|
||||||
#include "../../core/client.h"
|
|
||||||
#include "../../core/queue.h"
|
|
||||||
|
|
||||||
#include "message.h"
|
#include "message.h"
|
||||||
|
|
||||||
|
@ -15,7 +13,7 @@ enum subscriber_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
|
||||||
#define PUBSUB_TYPE_DEBUG 4
|
#define PUBSUB_TYPE_DEBUG 4
|
||||||
#define PUBSUB_TYPE_INFO 5
|
#define PUBSUB_TYPE_INFO 5
|
||||||
|
|
||||||
int pubsub_connection (int argc, char **argv, char **env, struct ipc_service *srv);
|
int pubsub_connection (char **env, struct ipc_service *srv);
|
||||||
int pubsub_disconnect (struct ipc_service *srv);
|
int pubsub_disconnect (struct ipc_service *srv);
|
||||||
int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m);
|
int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m);
|
||||||
int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m);
|
int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m);
|
||||||
|
|
|
@ -11,31 +11,31 @@
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m)
|
void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg * pubsub_msg)
|
||||||
{
|
{
|
||||||
if (ap == NULL) {
|
if (clients == NULL) {
|
||||||
fprintf (stderr, "pubsubd_send: ap == NULL");
|
fprintf (stderr, "pubsubd_send: clients == NULL");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m == NULL) {
|
if (pubsub_msg == NULL) {
|
||||||
fprintf (stderr, "pubsubd_send: m == NULL");
|
fprintf (stderr, "pubsubd_send: pubsub_msg == NULL");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
size_t msize = 0;
|
size_t msize = 0;
|
||||||
pubsub_message_serialize (m, &buf, &msize);
|
pubsub_message_serialize (pubsub_msg, &buf, &msize);
|
||||||
|
|
||||||
struct ipc_message m_data;
|
struct ipc_message m;
|
||||||
memset (&m_data, 0, sizeof (struct ipc_message));
|
memset (&m, 0, sizeof (struct ipc_message));
|
||||||
ipc_message_format_data (&m_data, buf, msize);
|
ipc_message_format_data (&m, buf, msize);
|
||||||
|
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < ap->size ; i++) {
|
for (i = 0; i < clients->size ; i++) {
|
||||||
ipc_server_write (ap->clients[i], &m_data);
|
ipc_server_write (clients->clients[i], &m);
|
||||||
}
|
}
|
||||||
ipc_message_free (&m_data);
|
ipc_message_empty (&m);
|
||||||
|
|
||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
|
@ -52,145 +52,115 @@ void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m)
|
||||||
//
|
//
|
||||||
// pubsub_message_unserialize (m, m_data.payload, m_data.length);
|
// pubsub_message_unserialize (m, m_data.payload, m_data.length);
|
||||||
//
|
//
|
||||||
// ipc_message_free (&m_data);
|
// ipc_message_empty (&m_data);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
/**
|
|
||||||
* new connection, once accepted the client is added to the array_proc
|
|
||||||
* structure to be checked periodically for new messages
|
|
||||||
*/
|
|
||||||
void handle_new_connection (struct ipc_service *srv, struct ipc_clients *ap)
|
|
||||||
{
|
|
||||||
struct ipc_client *p = malloc(sizeof(struct ipc_client));
|
|
||||||
memset(p, 0, sizeof(struct ipc_client));
|
|
||||||
|
|
||||||
if (ipc_server_accept (srv, p) < 0) {
|
|
||||||
handle_error("server_accept < 0");
|
|
||||||
} else {
|
|
||||||
printf("new connection\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ipc_client_add (ap, p) < 0) {
|
|
||||||
handle_error("ipc_client_add < 0");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void handle_new_msg (struct channels *chans
|
|
||||||
, struct ipc_clients *ap, struct ipc_clients *proc_to_read)
|
|
||||||
{
|
|
||||||
struct ipc_message m;
|
|
||||||
memset (&m, 0, sizeof (struct ipc_message));
|
|
||||||
int i;
|
|
||||||
for (i = 0; i < proc_to_read->size; i++) {
|
|
||||||
// printf ("loop handle_new_msg\n");
|
|
||||||
if (ipc_server_read (proc_to_read->clients[i], &m) < 0) {
|
|
||||||
handle_error("server_read < 0");
|
|
||||||
}
|
|
||||||
|
|
||||||
mprint_hexa ("msg received: ", (unsigned char *) m.payload, m.length);
|
|
||||||
|
|
||||||
// close the client then delete it from the client array
|
|
||||||
if (m.type == MSG_TYPE_CLOSE) {
|
|
||||||
struct ipc_client *p = proc_to_read->clients[i];
|
|
||||||
|
|
||||||
printf ("client %d disconnecting\n", p->proc_fd);
|
|
||||||
|
|
||||||
// TODO: to test, unsubscribe when closing
|
|
||||||
pubsubd_channels_unsubscribe_everywhere (chans, p);
|
|
||||||
|
|
||||||
// close the connection to the client
|
|
||||||
if (ipc_server_close_client (p) < 0)
|
|
||||||
handle_error( "server_close_client < 0");
|
|
||||||
|
|
||||||
|
|
||||||
// remove the client from the clientes list
|
|
||||||
if (ipc_client_del (ap, p) < 0)
|
|
||||||
handle_error( "ipc_client_del < 0");
|
|
||||||
if (ipc_client_del (proc_to_read, p) < 0)
|
|
||||||
handle_err( "handle_new_msg", "ipc_client_del < 0");
|
|
||||||
|
|
||||||
ipc_message_free (&m);
|
|
||||||
|
|
||||||
// free client
|
|
||||||
free (p);
|
|
||||||
|
|
||||||
i--;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct pubsub_msg m_data;
|
|
||||||
memset (&m_data, 0, sizeof (struct pubsub_msg));
|
|
||||||
|
|
||||||
pubsub_message_unserialize (&m_data, m.payload, m.length);
|
|
||||||
|
|
||||||
if (m_data.type == PUBSUB_MSG_TYPE_SUB) {
|
|
||||||
printf ("client %d subscribing to %s\n"
|
|
||||||
, proc_to_read->clients[i]->proc_fd
|
|
||||||
, m_data.chan);
|
|
||||||
pubsubd_channels_subscribe (chans
|
|
||||||
, m_data.chan, proc_to_read->clients[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m_data.type == PUBSUB_MSG_TYPE_UNSUB) {
|
|
||||||
printf ("client %d unsubscribing to %s\n"
|
|
||||||
, proc_to_read->clients[i]->proc_fd
|
|
||||||
, m_data.chan);
|
|
||||||
pubsubd_channels_unsubscribe (chans
|
|
||||||
, m_data.chan, proc_to_read->clients[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m_data.type == PUBSUB_MSG_TYPE_PUB) {
|
|
||||||
printf ("client %d publishing to %s\n"
|
|
||||||
, proc_to_read->clients[i]->proc_fd
|
|
||||||
, m_data.chan);
|
|
||||||
struct channel *chan = pubsubd_channel_search (chans, m_data.chan);
|
|
||||||
if (chan == NULL) {
|
|
||||||
handle_err ("handle_new_msg", "publish on nonexistent channel");
|
|
||||||
ipc_message_free (&m);
|
|
||||||
return ;
|
|
||||||
}
|
|
||||||
pubsubd_send (chan->subs, &m_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
pubsub_message_free (&m_data);
|
|
||||||
ipc_message_free (&m);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* main loop
|
|
||||||
*
|
|
||||||
* accept new application connections
|
|
||||||
* read a message and send it back
|
|
||||||
* close a connection if MSG_TYPE_CLOSE received
|
|
||||||
*/
|
|
||||||
|
|
||||||
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
||||||
{
|
{
|
||||||
int i, ret = 0;
|
int i, ret = 0;
|
||||||
|
|
||||||
struct ipc_clients ap;
|
struct ipc_clients clients;
|
||||||
memset(&ap, 0, sizeof(struct ipc_clients));
|
memset(&clients, 0, sizeof(struct ipc_clients));
|
||||||
|
|
||||||
struct ipc_clients proc_to_read;
|
struct ipc_clients proc_to_read;
|
||||||
memset(&proc_to_read, 0, sizeof(struct ipc_clients));
|
memset(&proc_to_read, 0, sizeof(struct ipc_clients));
|
||||||
|
|
||||||
while(1) {
|
struct ipc_event event;
|
||||||
ret = ipc_server_select (&ap, srv, &proc_to_read);
|
memset(&event, 0, sizeof (struct ipc_event));
|
||||||
|
event.type = IPC_EVENT_TYPE_NOT_SET;
|
||||||
|
|
||||||
if (ret == CONNECTION) {
|
int cpt = 0;
|
||||||
handle_new_connection (srv, &ap);
|
|
||||||
} else if (ret == APPLICATION) {
|
while(1) {
|
||||||
handle_new_msg (chans, &ap, &proc_to_read);
|
ret = ipc_service_poll_event (&clients, srv, &event);
|
||||||
} else { // both new connection and new msg from at least one client
|
if (ret != 0) {
|
||||||
handle_new_connection (srv, &ap);
|
handle_error("ipc_service_poll_event != 0");
|
||||||
handle_new_msg (chans, &ap, &proc_to_read);
|
// the application will shut down, and close the service
|
||||||
}
|
if (ipc_server_close (srv) < 0) {
|
||||||
ipc_clients_free (&proc_to_read);
|
handle_error("ipc_server_close < 0");
|
||||||
|
}
|
||||||
|
exit (EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (event.type) {
|
||||||
|
case IPC_EVENT_TYPE_CONNECTION:
|
||||||
|
{
|
||||||
|
cpt++;
|
||||||
|
struct ipc_client *cli = event.origin;
|
||||||
|
printf ("connection of client %d: %d clients connected\n", cli->proc_fd, cpt);
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case IPC_EVENT_TYPE_DISCONNECTION:
|
||||||
|
{
|
||||||
|
cpt--;
|
||||||
|
struct ipc_client *cli = event.origin;
|
||||||
|
printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt);
|
||||||
|
|
||||||
|
// TODO: to test, unsubscribe when closing
|
||||||
|
pubsubd_channels_unsubscribe_everywhere (chans, cli);
|
||||||
|
|
||||||
|
// free the ipc_client structure
|
||||||
|
free (event.origin);
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case IPC_EVENT_TYPE_MESSAGE:
|
||||||
|
{
|
||||||
|
struct ipc_message *m = event.m;
|
||||||
|
struct ipc_client *cli = event.origin;
|
||||||
|
if (m->length > 0) {
|
||||||
|
printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload);
|
||||||
|
}
|
||||||
|
// TODO: handle a message
|
||||||
|
// handle_new_msg (chans, &clients, &proc_to_read);
|
||||||
|
|
||||||
|
struct pubsub_msg pubsub_msg;
|
||||||
|
memset (&pubsub_msg, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
pubsub_message_unserialize (&pubsub_msg, m->payload, m->length);
|
||||||
|
|
||||||
|
if (pubsub_msg.type == PUBSUB_MSG_TYPE_SUB) {
|
||||||
|
printf ("client %d subscribing to %s\n"
|
||||||
|
, cli->proc_fd
|
||||||
|
, pubsub_msg.chan);
|
||||||
|
pubsubd_channels_subscribe (chans
|
||||||
|
, pubsub_msg.chan, cli);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pubsub_msg.type == PUBSUB_MSG_TYPE_UNSUB) {
|
||||||
|
printf ("client %d unsubscribing to %s\n", cli->proc_fd, pubsub_msg.chan);
|
||||||
|
pubsubd_channels_unsubscribe (chans, pubsub_msg.chan, cli);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pubsub_msg.type == PUBSUB_MSG_TYPE_PUB) {
|
||||||
|
printf ("client %d publishing to %s\n", cli->proc_fd, pubsub_msg.chan);
|
||||||
|
|
||||||
|
struct channel *chan = pubsubd_channel_search (chans, pubsub_msg.chan);
|
||||||
|
if (chan == NULL) {
|
||||||
|
handle_err ("handle_new_msg", "publish on nonexistent channel");
|
||||||
|
ipc_message_empty (m);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pubsubd_send (chan->subs, &pubsub_msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pubsub_message_free (&pubsub_msg);
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case IPC_EVENT_TYPE_ERROR:
|
||||||
|
{
|
||||||
|
fprintf (stderr, "a problem happened with client %d\n"
|
||||||
|
, ((struct ipc_client*) event.origin)->proc_fd);
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
default :
|
||||||
|
{
|
||||||
|
fprintf (stderr, "there must be a problem, event not set\n");
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; i < ap.size; i++) {
|
for (i = 0; i < clients.size; i++) {
|
||||||
if (ipc_server_close_client (ap.clients[i]) < 0) {
|
if (ipc_server_close_client (clients.clients[i]) < 0) {
|
||||||
handle_error( "server_close_client < 0");
|
handle_error( "server_close_client < 0");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue