From cbdbe79c5b605d3d3cea12f50a2e5c4693612bbc Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Tue, 30 Oct 2018 11:13:30 +0100 Subject: [PATCH] big changes in pubsub libs => WIP --- pubsub/app/pubsub-client.c | 40 ++++----- pubsub/app/pubsubc.c | 172 ------------------------------------- pubsub/app/pubsubd.c | 154 +++++++++++++++++++++++++++++++-- pubsub/lib/channels.h | 3 +- pubsub/lib/message.c | 153 ++++++++++++++++++++++++++++++--- pubsub/lib/message.h | 30 ++++++- pubsub/lib/pubsub.c | 98 --------------------- pubsub/lib/pubsub.h | 24 ------ pubsub/lib/pubsubd.c | 170 ------------------------------------ pubsub/lib/pubsubd.h | 15 ---- 10 files changed, 332 insertions(+), 527 deletions(-) delete mode 100644 pubsub/app/pubsubc.c delete mode 100644 pubsub/lib/pubsub.c delete mode 100644 pubsub/lib/pubsub.h delete mode 100644 pubsub/lib/pubsubd.c delete mode 100644 pubsub/lib/pubsubd.h diff --git a/pubsub/app/pubsub-client.c b/pubsub/app/pubsub-client.c index d1175d9..ccba19f 100644 --- a/pubsub/app/pubsub-client.c +++ b/pubsub/app/pubsub-client.c @@ -3,13 +3,12 @@ #include "../lib/message.h" #include "../lib/channels.h" -#include "../lib/pubsub.h" -#include "../lib/pubsubd.h" - #include #include #include +#define PUBSUBD_SERVICE_NAME "pubsubd" + void usage (char **argv) { printf ( "usage: %s [chan [pub]]\n", argv[0]); } @@ -26,16 +25,12 @@ void chan_sub (struct ipc_service *srv, char *chan) // meta data on the message msg.type = PUBSUB_MSG_TYPE_SUB; - msg.chanlen = strlen (chan) + 1; - msg.chan = malloc (msg.chanlen); - memset (msg.chan, 0, msg.chanlen); - strncpy (msg.chan, chan, msg.chanlen); - msg.chan[strlen (chan)] = '\0'; + pubsub_message_set_chan (&msg, chan, strlen(chan)); pubsub_message_send (srv, &msg); printf ("subscribed to %s\n", chan); - pubsub_message_free (&msg); + pubsub_message_empty (&msg); } void main_loop (char **env, int index, int version @@ -64,11 +59,7 @@ void main_loop (char **env, int index, int version // meta data on the message msg.type = PUBSUB_MSG_TYPE_PUB; - msg.chanlen = strlen (chan) + 1; - msg.chan = malloc (msg.chanlen); - memset (msg.chan, 0, msg.chanlen); - strncpy ((char *) msg.chan, chan, msg.chanlen); - msg.chan[strlen (chan)] = '\0'; + pubsub_message_set_chan (&msg, chan, strlen(chan)); struct ipc_event event; memset (&event, 0, sizeof (struct ipc_event)); @@ -107,23 +98,21 @@ void main_loop (char **env, int index, int version print_cmd (); // TODO: remove \n - msg.datalen = m->length + 1; - msg.data = malloc (msg.datalen); - memset (msg.data, 0, msg.datalen); - strncpy (msg.data, m->payload, msg.datalen); - msg.data[msg.datalen] = '\0'; + pubsub_message_set_chan (&msg, chan, strlen(chan)); + pubsub_message_set_data (&msg, m->payload, m->length); pubsub_message_send (&srv, &msg); - free (msg.data); - msg.data = NULL; - msg.datalen = 0; + + pubsub_message_empty (&msg); } break; case IPC_EVENT_TYPE_MESSAGE: { struct ipc_message *m = event.m; - printf ("msg recv: %.*s", m->length, m->payload); - // TODO: correctly print the message + print_hexa ("received msg hexa", m->payload, m->length); + + pubsub_message_from_message (&msg, m); + printf ("\033[31m>\033[00m %.*s\n", (int) msg.datalen, msg.data); }; break; case IPC_EVENT_TYPE_DISCONNECTION: @@ -135,6 +124,7 @@ void main_loop (char **env, int index, int version exit (EXIT_SUCCESS); }; + break; case IPC_EVENT_TYPE_NOT_SET: case IPC_EVENT_TYPE_CONNECTION: case IPC_EVENT_TYPE_ERROR: @@ -144,7 +134,7 @@ void main_loop (char **env, int index, int version } // free everything - pubsub_message_free (&msg); + pubsub_message_empty (&msg); printf ("disconnection...\n"); ipc_services_free (&services); diff --git a/pubsub/app/pubsubc.c b/pubsub/app/pubsubc.c deleted file mode 100644 index 5aded0b..0000000 --- a/pubsub/app/pubsubc.c +++ /dev/null @@ -1,172 +0,0 @@ -// int main(void) { return 0; } - -// TODO: select on service + input instead of threads - -#include "../../core/ipc.h" -#include "../lib/pubsub.h" -#include "../lib/pubsubd.h" -#include -#include -#include - -void usage (char **argv) { - printf ( "usage: %s [chan [pub]]\n", argv[0]); -} - -void print_cmd (void) { - printf ("\033[32m>\033[00m "); - fflush (stdout); -} - -void * listener (void *params) -{ - int s = 0; - s = pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); - if (s != 0) { - handle_err ("listener", "pthread_setcancelstate != 0"); - } - - struct ipc_service *srv = NULL; - srv = (struct ipc_service *) params; - if (srv == NULL) { - handle_err ("listener", "no service passed"); - return NULL; - } - - // main loop - while (1) { - struct pubsub_msg m; - memset (&m, 0, sizeof (struct pubsub_msg)); - - pubsub_message_recv (srv, &m); - printf ("\r\033[31m>\033[00m %s\n", m.data); - print_cmd (); - - pubsub_message_free (&m); - } - - pthread_exit (NULL); -} - -void chan_sub (struct ipc_service *srv, char *chan) -{ - struct pubsub_msg msg; - memset (&msg, 0, sizeof (struct pubsub_msg)); - - // meta data on the message - msg.type = PUBSUB_MSG_TYPE_SUB; - msg.chanlen = strlen (chan) + 1; - msg.chan = malloc (msg.chanlen); - memset (msg.chan, 0, msg.chanlen); - strncpy (msg.chan, chan, msg.chanlen); - msg.chan[strlen (chan)] = '\0'; - - pubsub_message_send (srv, &msg); - printf ("subscribed to %s\n", chan); - - pubsub_message_free (&msg); -} - -void main_loop (char **env, int index, int version - , char *cmd, char *chan) -{ - printf ("connection to pubsubd: index %d version %d " - "cmd %s chan %s\n" - , index, version, cmd, chan ); - - struct ipc_service srv; - memset (&srv, 0, sizeof (struct ipc_service)); - pubsub_connection (env, &srv); - printf ("connected\n"); - - if (strncmp (cmd, "sub", 3) == 0) { - chan_sub (&srv, chan); - } - - pthread_t thr; - memset (&thr, 0, sizeof (pthread_t)); - - pthread_create (&thr, NULL, listener, &srv); - pthread_detach (thr); - - printf ("main_loop\n"); - - struct pubsub_msg msg; - memset (&msg, 0, sizeof (struct pubsub_msg)); - - // meta data on the message - msg.type = PUBSUB_MSG_TYPE_PUB; - msg.chanlen = strlen (chan) + 1; - msg.chan = malloc (msg.chanlen); - memset (msg.chan, 0, msg.chanlen); - strncpy ((char *) msg.chan, chan, msg.chanlen); - msg.chan[strlen (chan)] = '\0'; - - // msg loop - for (;;) { - char buf[BUFSIZ]; - memset (buf, 0, BUFSIZ); - print_cmd (); - fflush (stdout); - - size_t mlen = read (0, buf, BUFSIZ); - - // remove \n - if (mlen > 1) { - mlen--; - } - buf[mlen] = '\0'; - - if (strncmp(buf, "quit", strlen ("quit")) == 0) { - break; - } - - msg.datalen = strlen (buf) + 1; - msg.data = malloc (msg.datalen); - memset (msg.data, 0, msg.datalen); - strncpy ((char *) msg.data, buf, msg.datalen); - msg.data[strlen(buf)] = '\0'; - - pubsub_message_send (&srv, &msg); - free (msg.data); - msg.data = NULL; - msg.datalen = 0; - } - - // free everything - pubsub_message_free (&msg); - - pthread_cancel (thr); - pthread_join (thr, NULL); - - printf ("disconnection...\n"); - // disconnect from the server - pubsub_disconnect (&srv); -} - -int main(int argc, char **argv, char **env) -{ - char *cmd = "sub"; - char *chan = "chan1"; - - if (argc == 2 && strncmp("-h", argv[1], 2) == 0) { - usage (argv); - exit (0); - } - - if (argc >= 2) { - chan = argv[1]; - } - - if (argc >= 3) { - cmd = argv[2]; - } - - int index = 0; - // don't care about the version - int version = 0; - - main_loop (env, index, version, cmd, chan); - - return EXIT_SUCCESS; -} diff --git a/pubsub/app/pubsubd.c b/pubsub/app/pubsubd.c index f41effc..ded68f6 100644 --- a/pubsub/app/pubsubd.c +++ b/pubsub/app/pubsubd.c @@ -1,16 +1,161 @@ #include "../../core/ipc.h" -#include "../lib/pubsubd.h" -#include +#include "../lib/message.h" +#include "../lib/channels.h" +#include #include -#include -#include #include +#define PUBSUBD_SERVICE_NAME "pubsubd" + // to quit them properly if a signal occurs struct ipc_service srv; struct channels chans; +void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg * pubsub_msg) +{ + if (clients == NULL) { + fprintf (stderr, "pubsubd_send: clients == NULL"); + return; + } + + if (pubsub_msg == NULL) { + fprintf (stderr, "pubsubd_send: pubsub_msg == NULL"); + return; + } + + char *buf = NULL; + size_t msize = 0; + pubsub_message_serialize (pubsub_msg, &buf, &msize); + + struct ipc_message m; + memset (&m, 0, sizeof (struct ipc_message)); + ipc_message_format_data (&m, buf, msize); + + int i; + for (i = 0; i < clients->size ; i++) { + ipc_server_write (clients->clients[i], &m); + } + ipc_message_empty (&m); + + if (buf != NULL) { + free (buf); + } +} + +void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans) +{ + int i, ret = 0; + + struct ipc_clients clients; + memset(&clients, 0, sizeof(struct ipc_clients)); + + struct ipc_clients proc_to_read; + memset(&proc_to_read, 0, sizeof(struct ipc_clients)); + + struct ipc_event event; + memset(&event, 0, sizeof (struct ipc_event)); + event.type = IPC_EVENT_TYPE_NOT_SET; + + int cpt = 0; + + while(1) { + ret = ipc_service_poll_event (&clients, srv, &event); + if (ret != 0) { + handle_error("ipc_service_poll_event != 0"); + // the application will shut down, and close the service + if (ipc_server_close (srv) < 0) { + handle_error("ipc_server_close < 0"); + } + exit (EXIT_FAILURE); + } + + switch (event.type) { + case IPC_EVENT_TYPE_CONNECTION: + { + cpt++; + struct ipc_client *cli = event.origin; + printf ("connection of client %d: %d clients connected\n", cli->proc_fd, cpt); + }; + break; + case IPC_EVENT_TYPE_DISCONNECTION: + { + cpt--; + struct ipc_client *cli = event.origin; + printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt); + + // TODO: to test, unsubscribe when closing + pubsubd_channels_unsubscribe_everywhere (chans, cli); + + // free the ipc_client structure + free (event.origin); + }; + break; + case IPC_EVENT_TYPE_MESSAGE: + { + // TODO: handle a message + struct ipc_message *m = event.m; + print_hexa ("received msg hexa", m->payload, m->length); + struct ipc_client *cli = event.origin; + + struct pubsub_msg pm; + memset (&pm, 0, sizeof (struct pubsub_msg)); + + pubsub_message_from_message (&pm, m); + + if (pm.type == PUBSUB_MSG_TYPE_SUB) { + printf ("client %d subscribing to %s\n" + , cli->proc_fd + , pm.chan); + pubsubd_channels_subscribe (chans + , pm.chan, cli); + } + + if (pm.type == PUBSUB_MSG_TYPE_UNSUB) { + printf ("client %d unsubscribing to %s\n", cli->proc_fd, pm.chan); + pubsubd_channels_unsubscribe (chans, pm.chan, cli); + } + + if (pm.type == PUBSUB_MSG_TYPE_PUB) { + // printf ("client %d: publishing to %s: %s\n", cli->proc_fd, pm.chan, pm.data); + printf ("client %d: ", cli->proc_fd); + pubsub_message_print (&pm); + + struct channel *chan = pubsubd_channel_search (chans, pm.chan); + if (chan == NULL) { + handle_err ("handle_new_msg", "publish on nonexistent channel"); + ipc_message_empty (m); + continue; + } + pubsubd_send (chan->subs, &pm); + } + + pubsub_message_empty (&pm); + }; + break; + case IPC_EVENT_TYPE_ERROR: + { + fprintf (stderr, "a problem happened with client %d\n" + , ((struct ipc_client*) event.origin)->proc_fd); + }; + break; + default : + { + fprintf (stderr, "there must be a problem, event not set\n"); + }; + } + } + + for (i = 0; i < clients.size; i++) { + if (ipc_server_close_client (clients.clients[i]) < 0) { + handle_error( "server_close_client < 0"); + } + } + + pubsubd_channels_del_all (chans); +} + + void handle_signal (int signalnumber) { // the application will shut down, and remove the service named pipe @@ -27,7 +172,6 @@ void handle_signal (int signalnumber) int main(int argc, char **argv, char **env) { - argc = argc; argv = argv; diff --git a/pubsub/lib/channels.h b/pubsub/lib/channels.h index 4a74f7c..2baeb86 100644 --- a/pubsub/lib/channels.h +++ b/pubsub/lib/channels.h @@ -1,8 +1,7 @@ #ifndef __CHANNELS_H__ #define __CHANNELS_H__ -#include "../../core/queue.h" -#include "../../core/client.h" +#include "../../core/ipc.h" // head of the list LIST_HEAD(channels, channel); diff --git a/pubsub/lib/message.c b/pubsub/lib/message.c index 5e1c78a..c00aef8 100644 --- a/pubsub/lib/message.c +++ b/pubsub/lib/message.c @@ -1,9 +1,32 @@ #include #include -#include +#include // strndup, strncpy #include "message.h" -#include "../../core/error.h" + +void pubsub_message_set_data (struct pubsub_msg *pm, char *data, size_t len) +{ + pm->datalen = len; + if (pm->data != NULL) { + free (pm->data); + } + pm->data = malloc (len + 1); + memset (pm->data, 0, len); + strncpy (pm->data, data, len); + pm->data[len] = '\0'; +} + +void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len) +{ + pm->chanlen = len; + if (pm->chan != NULL) { + free (pm->chan); + } + pm->chan = malloc (len + 1); + memset (pm->chan, 0, len); + strncpy (pm->chan, chan, len); + pm->chan[len] = '\0'; +} void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len) { @@ -59,6 +82,83 @@ void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len = buflen; } +void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m) +{ + size_t offset = 0; + + pubsub_message_empty (pm); // just in case + + pm->type = m->type; + + // chan + memcpy (&pm->chanlen, m->payload + offset, sizeof (size_t)); + if (pm->chanlen > BUFSIZ) { + handle_err ("pubsub_message_from_message", "chanlen > BUFSIZ"); + return; + } + offset += sizeof (size_t); + if (pm->chanlen > 0) { + pubsub_message_set_chan (pm, m->payload + offset, pm->chanlen); + printf ("from ipc_message, chan: %s, chanlen = %lu\n", pm->chan, pm->chanlen); + } + + // data + memcpy (&pm->datalen, m->payload + offset, sizeof (size_t)); + if (pm->datalen > BUFSIZ) { + handle_err ("pubsub_message_from_message", "chanlen > BUFSIZ"); + return; + } + offset += sizeof (size_t); + if (pm->datalen > 0) { + pubsub_message_set_data (pm, m->payload + offset, pm->datalen); + printf ("from ipc_message, data: %s, datalen = %lu\n", pm->data, pm->datalen); + } +} + +void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message *m) +{ + if (msg == NULL) { + handle_err ("pubsub_message_to_message", "msg == NULL"); + return; + } + + if (m == NULL) { + handle_err ("pubsub_message_to_message", "data == NULL"); + return; + } + + ipc_message_empty (m); // just in case + + size_t buflen = 2 * sizeof (size_t) + msg->chanlen + msg->datalen; + + if (buflen > BUFSIZ) { + handle_err ("pubsub_message_serialize", "chanlen + datalen too high"); + return; + } + + char *buf = malloc (buflen); + memset (buf, 0, buflen); + + size_t offset = 0; + + m->type = msg->type; + + // chan + memcpy (buf + offset, &msg->chanlen, sizeof (size_t)); + offset += sizeof (size_t); + memcpy (buf + offset, msg->chan, msg->chanlen); + offset += msg->chanlen; + + // data + memcpy (buf + offset, &msg->datalen, sizeof (size_t)); + offset += sizeof (size_t); + memcpy (buf + offset, msg->data, msg->datalen); + offset += msg->datalen; + + m->payload = buf; + m->length = buflen; +} + void pubsub_message_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen) { if (msg == NULL) { @@ -66,7 +166,7 @@ void pubsub_message_unserialize (struct pubsub_msg *msg, const char *buf, size_t return; } - pubsub_message_free (msg); + pubsub_message_empty (msg); if (mlen > BUFSIZ) { handle_err ("pubsub_message_unserialize", "mlen > BUFSIZ"); @@ -103,30 +203,59 @@ void pubsub_message_unserialize (struct pubsub_msg *msg, const char *buf, size_t offset += msg->datalen; } -void pubsub_message_free (struct pubsub_msg *msg) +void pubsub_message_empty (struct pubsub_msg *msg) { if (msg == NULL) { - handle_err ("pubsub_message_free", "msg == NULL"); + handle_err ("pubsub_message_empty", "msg == NULL"); return; } - if (msg->chan) { + if (msg->chan != NULL) { free (msg->chan); msg->chan = NULL; } - if (msg->data) { + if (msg->data != NULL) { free (msg->data); msg->data = NULL; } } -void pubsub_message_print (const struct pubsub_msg *msg) +void pubsub_message_print (const struct pubsub_msg *pm) { - if (msg == NULL) { - handle_err ("pubsub_message_print", "msg == NULL"); + if (pm == NULL) { + handle_err ("pubsub_message_print", "pm == NULL"); return; } - printf ("msg: type=%d chan=%s, data=%s\n" - , msg->type, msg->chan, msg->data); + if (pm->chanlen > 0 && pm->datalen > 0) { + printf ("msg: type: %u, chan: %s (%lu bytes), data: %s (%lu bytes)\n" + , pm->type, pm->chan, pm->chanlen, pm->data, pm->datalen); + } else if (pm->chanlen > 0) { + printf ("msg: type: %u, chan: %s (%lu bytes), and no data\n" + , pm->type, pm->chan, pm->chanlen); + } else if (pm->datalen > 0) { + printf ("msg: type: %u, no chan, data: %s (%lu bytes)\n" + , pm->type, pm->data, pm->datalen); + } +} + +int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * pm) +{ + struct ipc_message m; + memset (&m, 0, sizeof (struct ipc_message)); + pubsub_message_to_message (pm, &m); + ipc_application_write (srv, &m); + ipc_message_empty (&m); + + return 0; +} + +char * pubsub_action_to_str (enum subscriber_action action) +{ + switch (action) { + case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB); + case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB); + default : return strdup ("undocumented action"); + } + return NULL; } diff --git a/pubsub/lib/message.h b/pubsub/lib/message.h index 8154fff..71f27ca 100644 --- a/pubsub/lib/message.h +++ b/pubsub/lib/message.h @@ -1,9 +1,23 @@ #ifndef __PUBSUB_MSG_H__ #define __PUBSUB_MSG_H__ -#define PUBSUB_MSG_TYPE_SUB 1 -#define PUBSUB_MSG_TYPE_UNSUB 2 -#define PUBSUB_MSG_TYPE_PUB 3 +#include "../../core/ipc.h" + +#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" +#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" + +enum subscriber_action {PUBSUB_PUB, PUBSUB_SUB}; + +#define PUBSUB_TYPE_MESSAGE 1 +#define PUBSUB_TYPE_ERROR 2 +#define PUBSUB_TYPE_DEBUG 4 +#define PUBSUB_TYPE_INFO 5 + +enum pubsub_message_types { + PUBSUB_MSG_TYPE_SUB + , PUBSUB_MSG_TYPE_UNSUB + , PUBSUB_MSG_TYPE_PUB +}; struct pubsub_msg { unsigned char type; // message type : alert, notification, … @@ -13,9 +27,17 @@ struct pubsub_msg { size_t datalen; }; +void pubsub_message_from_message (struct pubsub_msg *msg, struct ipc_message *m); +void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message *m); + +void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len); +void pubsub_message_set_data (struct pubsub_msg *pm, char *data, size_t len); + void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len); void pubsub_message_unserialize (struct pubsub_msg *msg, const char *data, size_t len); -void pubsub_message_free (struct pubsub_msg *msg); +void pubsub_message_empty (struct pubsub_msg *msg); void pubsub_message_print (const struct pubsub_msg *msg); +int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m); + #endif diff --git a/pubsub/lib/pubsub.c b/pubsub/lib/pubsub.c deleted file mode 100644 index 14e1378..0000000 --- a/pubsub/lib/pubsub.c +++ /dev/null @@ -1,98 +0,0 @@ -#include -#include // strndup - -#include "pubsub.h" -#include "pubsubd.h" - -#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" -#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" -#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both" -#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit" - -char * pubsub_action_to_str (enum subscriber_action action) -{ - switch (action) { - case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB); - case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB); - case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH); - case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT); - } - return NULL; -} - - -#if 0 -// tell the service to stop -void pubsub_quit (struct ipc_service *srv) -{ - // line fmt : 0 0 0 quit - char line[BUFSIZ]; - snprintf (line, BUFSIZ, "0 0 0 quit\n"); - ipc_application_server_connection (srv, line, strlen (line)); -} -#endif - -int pubsub_connection (char **env, struct ipc_service *srv) -{ - int ret = ipc_application_connection (env, srv, PUBSUBD_SERVICE_NAME); - - if (ret != 0) { - handle_err ("pubsub_connection", "application_connection != 0"); - } - - return ret; -} - -int pubsub_disconnect (struct ipc_service *srv) -{ - return ipc_application_close (srv); -} - -int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m) -{ - size_t msize = 0; - char * buf = NULL; - pubsub_message_serialize (m, &buf, &msize); - - struct ipc_message m_data; - memset (&m_data, 0, sizeof (struct ipc_message)); - - // format the connection msg - if (ipc_message_format_data (&m_data, buf, msize) < 0) { - handle_err ("pubsub_message_send", "msg_format_data"); - if (buf != NULL) - free (buf); - return -1; - } - - ipc_application_write (srv, &m_data); - ipc_message_empty (&m_data); - - if (buf != NULL) - free(buf); - - return 0; -} - -int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m) -{ - if (srv == NULL) { - handle_err ("pubsub_message_recv", "srv == NULL"); - return -1; - } - - if (m == NULL) { - handle_err ("pubsub_message_recv", "m == NULL"); - return -1; - } - - struct ipc_message m_recv; - memset (&m_recv, 0, sizeof (struct ipc_message)); - - ipc_application_read (srv, &m_recv); - pubsub_message_unserialize (m, m_recv.payload, m_recv.length); - - ipc_message_empty (&m_recv); - - return 0; -} diff --git a/pubsub/lib/pubsub.h b/pubsub/lib/pubsub.h deleted file mode 100644 index 97d009a..0000000 --- a/pubsub/lib/pubsub.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef __PUBSUB_H__ -#define __PUBSUB_H__ - -#include "../../core/ipc.h" - -#include "message.h" - -enum subscriber_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; - -#define PUBSUB_TYPE_DISCONNECT 0 -#define PUBSUB_TYPE_MESSAGE 1 -#define PUBSUB_TYPE_ERROR 2 -#define PUBSUB_TYPE_DEBUG 4 -#define PUBSUB_TYPE_INFO 5 - -int pubsub_connection (char **env, struct ipc_service *srv); -int pubsub_disconnect (struct ipc_service *srv); -int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m); -int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m); - -// TODO -void pubsub_quit (struct ipc_service *srv); - -#endif diff --git a/pubsub/lib/pubsubd.c b/pubsub/lib/pubsubd.c deleted file mode 100644 index ccfd64d..0000000 --- a/pubsub/lib/pubsubd.c +++ /dev/null @@ -1,170 +0,0 @@ -#include "../../core/communication.h" -#include "../../core/message.h" -#include "../../core/client.h" -#include "../../core/utils.h" -#include "../../core/error.h" - -#include "pubsubd.h" -#include "channels.h" - -#include -#include -#include - -void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg * pubsub_msg) -{ - if (clients == NULL) { - fprintf (stderr, "pubsubd_send: clients == NULL"); - return; - } - - if (pubsub_msg == NULL) { - fprintf (stderr, "pubsubd_send: pubsub_msg == NULL"); - return; - } - - char *buf = NULL; - size_t msize = 0; - pubsub_message_serialize (pubsub_msg, &buf, &msize); - - struct ipc_message m; - memset (&m, 0, sizeof (struct ipc_message)); - ipc_message_format_data (&m, buf, msize); - - int i; - for (i = 0; i < clients->size ; i++) { - ipc_server_write (clients->clients[i], &m); - } - ipc_message_empty (&m); - - if (buf != NULL) { - free (buf); - } -} - -// void pubsubd_recv (struct ipc_client *p, struct pubsub_msg *m) -// { -// struct ipc_message m_data; -// memset (&m_data, 0, sizeof (struct ipc_message)); -// -// // read the message from the client -// ipc_server_read (p, &m_data); -// -// pubsub_message_unserialize (m, m_data.payload, m_data.length); -// -// ipc_message_empty (&m_data); -// } - -void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans) -{ - int i, ret = 0; - - struct ipc_clients clients; - memset(&clients, 0, sizeof(struct ipc_clients)); - - struct ipc_clients proc_to_read; - memset(&proc_to_read, 0, sizeof(struct ipc_clients)); - - struct ipc_event event; - memset(&event, 0, sizeof (struct ipc_event)); - event.type = IPC_EVENT_TYPE_NOT_SET; - - int cpt = 0; - - while(1) { - ret = ipc_service_poll_event (&clients, srv, &event); - if (ret != 0) { - handle_error("ipc_service_poll_event != 0"); - // the application will shut down, and close the service - if (ipc_server_close (srv) < 0) { - handle_error("ipc_server_close < 0"); - } - exit (EXIT_FAILURE); - } - - switch (event.type) { - case IPC_EVENT_TYPE_CONNECTION: - { - cpt++; - struct ipc_client *cli = event.origin; - printf ("connection of client %d: %d clients connected\n", cli->proc_fd, cpt); - }; - break; - case IPC_EVENT_TYPE_DISCONNECTION: - { - cpt--; - struct ipc_client *cli = event.origin; - printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt); - - // TODO: to test, unsubscribe when closing - pubsubd_channels_unsubscribe_everywhere (chans, cli); - - // free the ipc_client structure - free (event.origin); - }; - break; - case IPC_EVENT_TYPE_MESSAGE: - { - struct ipc_message *m = event.m; - struct ipc_client *cli = event.origin; - if (m->length > 0) { - printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload); - } - // TODO: handle a message - // handle_new_msg (chans, &clients, &proc_to_read); - - struct pubsub_msg pubsub_msg; - memset (&pubsub_msg, 0, sizeof (struct pubsub_msg)); - - pubsub_message_unserialize (&pubsub_msg, m->payload, m->length); - - if (pubsub_msg.type == PUBSUB_MSG_TYPE_SUB) { - printf ("client %d subscribing to %s\n" - , cli->proc_fd - , pubsub_msg.chan); - pubsubd_channels_subscribe (chans - , pubsub_msg.chan, cli); - } - - if (pubsub_msg.type == PUBSUB_MSG_TYPE_UNSUB) { - printf ("client %d unsubscribing to %s\n", cli->proc_fd, pubsub_msg.chan); - pubsubd_channels_unsubscribe (chans, pubsub_msg.chan, cli); - } - - if (pubsub_msg.type == PUBSUB_MSG_TYPE_PUB) { - printf ("client %d publishing to %s\n", cli->proc_fd, pubsub_msg.chan); - - struct channel *chan = pubsubd_channel_search (chans, pubsub_msg.chan); - if (chan == NULL) { - handle_err ("handle_new_msg", "publish on nonexistent channel"); - ipc_message_empty (m); - continue; - } - pubsubd_send (chan->subs, &pubsub_msg); - } - - pubsub_message_free (&pubsub_msg); - }; - break; - case IPC_EVENT_TYPE_ERROR: - { - fprintf (stderr, "a problem happened with client %d\n" - , ((struct ipc_client*) event.origin)->proc_fd); - }; - break; - default : - { - fprintf (stderr, "there must be a problem, event not set\n"); - }; - } - } - - for (i = 0; i < clients.size; i++) { - if (ipc_server_close_client (clients.clients[i]) < 0) { - handle_error( "server_close_client < 0"); - } - } - - pubsubd_channels_del_all (chans); -} - diff --git a/pubsub/lib/pubsubd.h b/pubsub/lib/pubsubd.h deleted file mode 100644 index b63e9f6..0000000 --- a/pubsub/lib/pubsubd.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef __PUBSUBD_H__ -#define __PUBSUBD_H__ - -// #include "../../core/pubsub.h" -#include "../../core/client.h" -#include "../../core/message.h" -#include "message.h" -#include "channels.h" - -#define PUBSUBD_SERVICE_NAME "pubsubd" - -void pubsubd_main_loop (struct ipc_service *srv, struct channels * chans); -void pubsubd_message_send (const struct ipc_clients *ap, const struct pubsub_msg * m); - -#endif