diff --git a/lib/pubsub.c b/lib/pubsub.c new file mode 100644 index 0000000..c3a095e --- /dev/null +++ b/lib/pubsub.c @@ -0,0 +1,221 @@ +#include "pubsub.h" +#include + +#include // strndup + +// MESSAGE, TODO CBOR + +void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len) +{ + if (msg == NULL || data == NULL || len == NULL) { + fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL"); + return; + } + + // msg: "type(1) chanlen(8) chan datalen(8) data + if (msg->type == PUBSUB_TYPE_DISCONNECT) { + *len = 1; + if (*data != NULL) { + free (*data); + *data = NULL; + } + *data = malloc(*len); + memset (*data, 0, *len); + data[0][0] = msg->type; + return; + } + else { + // type + size chan + chan + size data + data + *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen; + } + + if (*data != NULL) { + free (*data); + *data = NULL; + } + *data = malloc(*len); + memset (*data, 0, *len); + + size_t i = 0; + + data[0][i] = msg->type; i++; + memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t); + memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen; + memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t); + memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen; +} + +void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len) +{ + if (msg == NULL) { + fprintf (stderr + , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n"); + return; + } + + if (data == NULL) { + fprintf (stderr + , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n"); + return; + } + + if (len > BUFSIZ) { + fprintf (stderr + , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n" + , len); + return; + } + + size_t i = 0; + msg->type = data[i]; i++; + + if (msg->type == PUBSUB_TYPE_DISCONNECT) { + msg->chanlen = 0; + msg->chan = NULL; + msg->datalen = 0; + msg->data = NULL; + return ; + } + + memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t); + if (msg->chanlen > BUFSIZ) { + fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n"); + return; + } + msg->chan = malloc (msg->chanlen +1); + memset (msg->chan, 0, msg->chanlen +1); + memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen; + + memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t); + if (msg->datalen > BUFSIZ) { + fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n"); + return; + } + msg->data = malloc (msg->datalen +1); + memset (msg->data, 0, msg->datalen +1); + memcpy (msg->data, data + i, msg->datalen); i += msg->datalen; +} + +void pubsubd_msg_free (struct pubsub_msg *msg) +{ + if (msg == NULL) { + fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n"); + return; + } + + if (msg->chan) { + free (msg->chan); + msg->chan = NULL; + } + if (msg->data) { + free (msg->data); + msg->data = NULL; + } +} + +void pubsubd_msg_print (const struct pubsub_msg *msg) +{ + printf ("msg: type=%d chan=%s, data=%s\n" + , msg->type, msg->chan, msg->data); +} + +#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" +#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" +#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both" +#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit" + +// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; + +char * pubsub_action_to_str (enum app_list_elm_action action) +{ + switch (action) { + case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB); + case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB); + case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH); + case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT); + } + + return NULL; +} + +void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame) +{ + char * straction = NULL; + straction = pubsub_action_to_str (action); + + char line[BUFSIZ]; + memset (line, 0, BUFSIZ); + + // line fmt : pid index version action chan + // "quit" action is also possible (see pubsubd_quit) + snprintf (line, BUFSIZ, "%d %d %d %s %s\n" + , p->pid, p->index, p->version + , straction + , channame); + line[BUFSIZ -1] = '\0'; // to be sure + + // send the connection line in the $TMP/ pipe + app_srv_connection (srv, line, strlen (line)); + + if (straction != NULL) + free (straction); +} + +void pubsub_disconnect (struct process *p) +{ + struct pubsub_msg m; + memset (&m, 0, sizeof (struct pubsub_msg)); + m.type = PUBSUB_TYPE_DISCONNECT; + + char *buf = NULL; + size_t msize = 0; + pubsubd_msg_serialize (&m, &buf, &msize); + + int ret = app_write (p, buf, msize); + if (ret != (int) msize) { + fprintf (stderr, "err: can't disconnect\n"); + } + + pubsubd_msg_free (&m); + if (buf != NULL) { + free (buf); + } +} + +// tell the service to stop +void pubsubd_quit (struct service *srv) +{ + // line fmt : 0 0 0 quit + char line[BUFSIZ]; + snprintf (line, BUFSIZ, "0 0 0 quit\n"); + app_srv_connection (srv, line, strlen (line)); +} + +void pubsub_msg_send (struct process *p, const struct pubsub_msg * m) +{ + char *buf = NULL; + size_t msize = 0; + pubsubd_msg_serialize (m, &buf, &msize); + + app_write (p, buf, msize); + + if (buf != NULL) { + free (buf); + } +} + +void pubsub_msg_recv (struct process *p, struct pubsub_msg *m) +{ + // read the message from the process + size_t mlen = 0; + char *buf = NULL; + while (buf == NULL || mlen == 0) { + app_read (p, &buf, &mlen); + } + + pubsubd_msg_unserialize (m, buf, mlen); + + if (buf != NULL) { + free (buf); + } +} diff --git a/lib/pubsub.h b/lib/pubsub.h new file mode 100644 index 0000000..65aa3ef --- /dev/null +++ b/lib/pubsub.h @@ -0,0 +1,39 @@ +#ifndef __PUBSUB_H__ +#define __PUBSUB_H__ + +#include "communication.h" +#include "process.h" +#include "queue.h" + +#define PUBSUB_TYPE_DISCONNECT 1 << 0 +#define PUBSUB_TYPE_INFO 1 << 1 +#define PUBSUB_TYPE_DEBUG 1 << 2 +#define PUBSUB_TYPE_MESSAGE 1 << 3 + +#define PUBSUB_SERVICE_NAME "pubsub" + +struct pubsub_msg; + +struct pubsub_msg { + unsigned char *chan; + size_t chanlen; + unsigned char *data; + size_t datalen; + unsigned char type; // message type : alert, notification, … +}; + +void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len); +void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len); +void pubsubd_msg_free (struct pubsub_msg *msg); +void pubsubd_msg_print (const struct pubsub_msg *msg); + +void pubsub_disconnect (struct process *p); +void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg); +void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg); + +enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; + +void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame); +void pubsubd_quit (struct service *srv); + +#endif diff --git a/lib/pubsubd.c b/lib/pubsubd.c index 013f3cc..c1dab43 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -1,7 +1,164 @@ #include "pubsubd.h" -#include -#include // strndup +// WORKERS: one thread per client + +void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); } + +struct worker * +pubsubd_workers_add (struct workers *wrkrs, const struct worker *w) +{ + if (wrkrs == NULL || w == NULL) { + printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL"); + return NULL; + } + + struct worker *n = malloc (sizeof (struct worker)); + memset (n, 0, sizeof (struct worker)); + memcpy (n, w, sizeof (struct worker)); + if (w->ale != NULL) + n->ale = pubsubd_app_list_elm_copy (w->ale); + + LIST_INSERT_HEAD(wrkrs, n, entries); + + return n; +} + +void pubsubd_worker_del (struct workers *wrkrs, struct worker *w) +{ + struct worker *todel = pubsubd_worker_get (wrkrs, w); + if(todel != NULL) { + LIST_REMOVE(todel, entries); + pubsubd_worker_free (todel); + free (todel); + todel = NULL; + } +} + +// kill the threads +void pubsubd_workers_stop (struct workers *wrkrs) +{ + if (!wrkrs) + return; + + struct worker *w = NULL; + struct worker *wtmp = NULL; + + LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) { + if (w->thr == NULL) + continue; + + pthread_cancel (*w->thr); + void *ret = NULL; + pthread_join (*w->thr, &ret); + if (ret != NULL) { + free (ret); + } + free (w->thr); + w->thr = NULL; + } +} + +void pubsubd_workers_del_all (struct workers *wrkrs) +{ + if (!wrkrs) + return; + + struct worker *w = NULL; + + while (!LIST_EMPTY(wrkrs)) { + printf ("KILL THE WORKERS : %p\n", w); + w = LIST_FIRST(wrkrs); + LIST_REMOVE(w, entries); + pubsubd_worker_free (w); + free (w); + w = NULL; + } +} + +void pubsubd_worker_free (struct worker * w) +{ + if (w == NULL) + return; + pubsubd_app_list_elm_free (w->ale); + free (w->ale); + w->ale = NULL; +} + +struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w) +{ + struct worker * np = NULL; + LIST_FOREACH(np, wrkrs, entries) { + if (pubsubd_worker_eq (np, w)) + return np; + } + return NULL; +} + +int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2) +{ + return w1 == w2; // if it's the same pointer +} + +// a thread for each connected process +void * pubsubd_worker_thread (void *params) +{ + int s = 0; + + s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + if (s != 0) + printf ("pthread_setcancelstate: %d\n", s); + + struct worker *w = (struct worker *) params; + if (w == NULL) { + fprintf (stderr, "error pubsubd_worker_thread : params NULL\n"); + return NULL; + } + + struct channels *chans = w->chans; + struct channel *chan = w->chan; + struct app_list_elm *ale = w->ale; + + // main loop + while (1) { + struct pubsub_msg m; + memset (&m, 0, sizeof (struct pubsub_msg)); + + pubsubd_msg_recv (ale->p, &m); + + if (m.type == PUBSUB_TYPE_DISCONNECT) { + printf ("process %d disconnecting...\n", ale->p->pid); + if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) { + fprintf (stderr, "err : subscriber not registered\n"); + } + break; + } + else { + struct channel *ch = pubsubd_channel_search (chans, chan->chan); + if (ch == NULL) { + printf ("CHAN NOT FOUND\n"); + } + else { + printf ("what should be sent: "); + pubsubd_msg_print (&m); + printf ("send the message to:\t"); + pubsubd_channel_print (ch); + pubsubd_msg_send (ch->alh, &m); + } + } + pubsubd_msg_free (&m); + } + + pubsubd_app_list_elm_free (ale); + free (w->ale); + w->ale = NULL; + + free (w->thr); + w->thr = NULL; + + pubsubd_worker_del (w->my_workers, w); + + pthread_exit (NULL); +} // CHANNELS @@ -292,118 +449,6 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel) free (todel->p); } -// MESSAGE, TODO CBOR - -void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len) -{ - if (msg == NULL || data == NULL || len == NULL) { - fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL"); - return; - } - - // msg: "type(1) chanlen(8) chan datalen(8) data - if (msg->type == PUBSUB_TYPE_DISCONNECT) { - *len = 1; - if (*data != NULL) { - free (*data); - *data = NULL; - } - *data = malloc(*len); - memset (*data, 0, *len); - data[0][0] = msg->type; - return; - } - else { - // type + size chan + chan + size data + data - *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen; - } - - if (*data != NULL) { - free (*data); - *data = NULL; - } - *data = malloc(*len); - memset (*data, 0, *len); - - size_t i = 0; - - data[0][i] = msg->type; i++; - memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t); - memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen; - memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t); - memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen; -} - -void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len) -{ - if (msg == NULL) { - fprintf (stderr - , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n"); - return; - } - - if (data == NULL) { - fprintf (stderr - , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n"); - return; - } - - if (len > BUFSIZ) { - fprintf (stderr - , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n" - , len); - return; - } - - size_t i = 0; - msg->type = data[i]; i++; - - if (msg->type == PUBSUB_TYPE_DISCONNECT) { - msg->chanlen = 0; - msg->chan = NULL; - msg->datalen = 0; - msg->data = NULL; - return ; - } - - memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t); - if (msg->chanlen > BUFSIZ) { - fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n"); - return; - } - msg->chan = malloc (msg->chanlen +1); - memset (msg->chan, 0, msg->chanlen +1); - memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen; - - memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t); - if (msg->datalen > BUFSIZ) { - fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n"); - return; - } - msg->data = malloc (msg->datalen +1); - memset (msg->data, 0, msg->datalen +1); - memcpy (msg->data, data + i, msg->datalen); i += msg->datalen; -} - -void pubsubd_msg_free (struct pubsub_msg *msg) -{ - if (msg == NULL) { - fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n"); - return; - } - - if (msg->chan) { - free (msg->chan); - msg->chan = NULL; - } - if (msg->data) { - free (msg->data); - msg->data = NULL; - } -} - -// COMMUNICATION - int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale , struct channels *chans, struct channel **c) { @@ -540,12 +585,6 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg } } -void pubsubd_msg_print (const struct pubsub_msg *msg) -{ - printf ("msg: type=%d chan=%s, data=%s\n" - , msg->type, msg->chan, msg->data); -} - void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) { // read the message from the process @@ -561,108 +600,3 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) free (buf); } } - -#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" -#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" -#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both" -#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit" - -// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; - -char * pubsub_action_to_str (enum app_list_elm_action action) -{ - switch (action) { - case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB); - case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB); - case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH); - case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT); - } - - return NULL; -} - -void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame) -{ - char * straction = NULL; - straction = pubsub_action_to_str (action); - - char line[BUFSIZ]; - memset (line, 0, BUFSIZ); - - // line fmt : pid index version action chan - // "quit" action is also possible (see pubsubd_quit) - snprintf (line, BUFSIZ, "%d %d %d %s %s\n" - , p->pid, p->index, p->version - , straction - , channame); - line[BUFSIZ -1] = '\0'; // to be sure - - // send the connection line in the $TMP/ pipe - app_srv_connection (srv, line, strlen (line)); - - if (straction != NULL) - free (straction); -} - -void pubsub_disconnect (struct process *p) -{ - struct pubsub_msg m; - memset (&m, 0, sizeof (struct pubsub_msg)); - m.type = PUBSUB_TYPE_DISCONNECT; - - char *buf = NULL; - size_t msize = 0; - pubsubd_msg_serialize (&m, &buf, &msize); - - int ret = app_write (p, buf, msize); - if (ret != (int) msize) { - fprintf (stderr, "err: can't disconnect\n"); - } - - pubsubd_msg_free (&m); - if (buf != NULL) { - free (buf); - } -} - -// tell the service to stop -void pubsubd_quit (struct service *srv) -{ - // line fmt : 0 0 0 quit - char line[BUFSIZ]; - snprintf (line, BUFSIZ, "0 0 0 quit\n"); - app_srv_connection (srv, line, strlen (line)); -} - -void pubsub_msg_send (struct process *p, const struct pubsub_msg * m) -{ - char *buf = NULL; - size_t msize = 0; - pubsubd_msg_serialize (m, &buf, &msize); - - app_write (p, buf, msize); - - if (buf != NULL) { - free (buf); - } -} - -void pubsub_msg_recv (struct process *p, struct pubsub_msg *m) -{ - // read the message from the process - size_t mlen = 0; - char *buf = NULL; - while (buf == NULL || mlen == 0) { - app_read (p, &buf, &mlen); - } - - pubsubd_msg_unserialize (m, buf, mlen); - - if (buf != NULL) { - free (buf); - } -} - -// SERVICE - -void pubsubd_srv_init (); diff --git a/lib/pubsubd.h b/lib/pubsubd.h index 99e1826..fad9cd5 100644 --- a/lib/pubsubd.h +++ b/lib/pubsubd.h @@ -1,35 +1,13 @@ #ifndef __PUBSUBD_H__ #define __PUBSUBD_H__ -#include "communication.h" -#include "process.h" -#include "queue.h" - -#define PUBSUB_TYPE_DISCONNECT 1 << 0 -#define PUBSUB_TYPE_INFO 1 << 1 -#define PUBSUB_TYPE_DEBUG 1 << 2 -#define PUBSUB_TYPE_MESSAGE 1 << 3 - -#define PUBSUB_SERVICE_NAME "pubsub" +#include "pubsub.h" +#include struct channel; struct channels; struct app_list_head; struct app_list_elm; -struct pubsub_msg; - -struct pubsub_msg { - unsigned char *chan; - size_t chanlen; - unsigned char *data; - size_t datalen; - unsigned char type; // message type : alert, notification, … -}; - -void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len); -void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len); -void pubsubd_msg_free (struct pubsub_msg *msg); -void pubsubd_msg_print (const struct pubsub_msg *msg); // parse pubsubd init msg (sent in TMPDIR/) // @@ -37,14 +15,9 @@ void pubsubd_msg_print (const struct pubsub_msg *msg); // action : quit | pub | sub int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale , struct channels *chans, struct channel **c); -int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize); void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m); -void pubsub_disconnect (struct process *p); -void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg); -void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg); - // CHANNEL // head of the list @@ -88,8 +61,6 @@ pubsubd_channels_search_from_app_list_elm (struct channels *chans // head of the list LIST_HEAD(app_list_head, app_list_elm); -enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; - // element of the list struct app_list_elm { struct process *p; @@ -113,7 +84,33 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale) void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p); void pubsubd_app_list_elm_free (struct app_list_elm *todel); -void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame); void pubsubd_quit (struct service *srv); +// WORKERS: one thread per client + +// head of the list +LIST_HEAD(workers, worker); + +// element of the list +// worker : process to handle (threaded) +struct worker { + pthread_t *thr; + struct workers *my_workers; + struct channels *chans; + struct channel *chan; + struct app_list_elm *ale; + LIST_ENTRY(worker) entries; +}; + +void pubsubd_worker_free (struct worker * w); +struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w); +int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2); +void pubsubd_workers_init (struct workers *wrkrs); +void * pubsubd_worker_thread (void *params); +struct worker * +pubsubd_workers_add (struct workers *wrkrs, const struct worker *w); +void pubsubd_workers_del_all (struct workers *wrkrs); +void pubsubd_workers_stop (struct workers *wrkrs); +void pubsubd_worker_del (struct workers *wrkrs, struct worker *w); + #endif diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index d6d38ba..1616c95 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -1,8 +1,7 @@ #include "../lib/pubsubd.h" #include -#include -#define NB_CLIENTS 3 +struct workers *my_workers; void ohshit(int rvalue, const char* str) { @@ -10,183 +9,6 @@ ohshit(int rvalue, const char* str) { exit(rvalue); } -// head of the list -LIST_HEAD(workers, worker); - -struct workers *my_workers; - -// element of the list -// worker : process to handle (threaded) -struct worker { - pthread_t *thr; - struct channels *chans; - struct channel *chan; - struct app_list_elm *ale; - LIST_ENTRY(worker) entries; -}; - -void pubsubd_worker_free (struct worker * w); -struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w); -int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2); - -void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); } - -struct worker * -pubsubd_workers_add (struct workers *wrkrs, const struct worker *w) -{ - if (wrkrs == NULL || w == NULL) { - printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL"); - return NULL; - } - - struct worker *n = malloc (sizeof (struct worker)); - memset (n, 0, sizeof (struct worker)); - memcpy (n, w, sizeof (struct worker)); - if (w->ale != NULL) - n->ale = pubsubd_app_list_elm_copy (w->ale); - - LIST_INSERT_HEAD(wrkrs, n, entries); - - return n; -} - -void pubsubd_worker_del (struct workers *wrkrs, struct worker *w) -{ - struct worker *todel = pubsubd_worker_get (wrkrs, w); - if(todel != NULL) { - LIST_REMOVE(todel, entries); - pubsubd_worker_free (todel); - free (todel); - todel = NULL; - } -} - -// kill the threads -void pubsubd_workers_stop (struct workers *wrkrs) -{ - if (!wrkrs) - return; - - struct worker *w = NULL; - struct worker *wtmp = NULL; - - LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) { - if (w->thr == NULL) - continue; - - pthread_cancel (*w->thr); - void *ret = NULL; - pthread_join (*w->thr, &ret); - if (ret != NULL) { - free (ret); - } - free (w->thr); - w->thr = NULL; - } -} - -void pubsubd_workers_del_all (struct workers *wrkrs) -{ - if (!wrkrs) - return; - - struct worker *w = NULL; - - while (!LIST_EMPTY(wrkrs)) { - printf ("KILL THE WORKERS : %p\n", w); - w = LIST_FIRST(wrkrs); - LIST_REMOVE(w, entries); - pubsubd_worker_free (w); - free (w); - w = NULL; - } -} - -void pubsubd_worker_free (struct worker * w) -{ - if (w == NULL) - return; - pubsubd_app_list_elm_free (w->ale); - free (w->ale); - w->ale = NULL; -} - -struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w) -{ - struct worker * np = NULL; - LIST_FOREACH(np, wrkrs, entries) { - if (pubsubd_worker_eq (np, w)) - return np; - } - return NULL; -} - -int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2) -{ - return w1 == w2; // if it's the same pointer -} - -// a thread for each connected process -void * pubsubd_worker_thread (void *params) -{ - int s = 0; - - s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - if (s != 0) - printf ("pthread_setcancelstate: %d\n", s); - - struct worker *w = (struct worker *) params; - if (w == NULL) { - fprintf (stderr, "error pubsubd_worker_thread : params NULL\n"); - return NULL; - } - - struct channels *chans = w->chans; - struct channel *chan = w->chan; - struct app_list_elm *ale = w->ale; - - // main loop - while (1) { - struct pubsub_msg m; - memset (&m, 0, sizeof (struct pubsub_msg)); - - pubsubd_msg_recv (ale->p, &m); - - if (m.type == PUBSUB_TYPE_DISCONNECT) { - printf ("process %d disconnecting...\n", ale->p->pid); - if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) { - fprintf (stderr, "err : subscriber not registered\n"); - } - break; - } - else { - struct channel *ch = pubsubd_channel_search (chans, chan->chan); - if (ch == NULL) { - printf ("CHAN NOT FOUND\n"); - } - else { - printf ("what should be sent: "); - pubsubd_msg_print (&m); - printf ("send the message to:\t"); - pubsubd_channel_print (ch); - pubsubd_msg_send (ch->alh, &m); - } - } - pubsubd_msg_free (&m); - } - - pubsubd_app_list_elm_free (ale); - free (w->ale); - w->ale = NULL; - - free (w->thr); - w->thr = NULL; - - pubsubd_worker_del (my_workers, w); - - pthread_exit (NULL); -} - int main(int argc, char **argv, char **env) { @@ -208,9 +30,7 @@ main(int argc, char **argv, char **env) memset (my_workers, 0, sizeof (struct workers)); pubsubd_workers_init (my_workers); - int i = 0; - // for (i = 0; i < NB_CLIENTS; i++) - for (i = 0; ; i++) { + while (1) { // for each new process struct app_list_elm ale; memset (&ale, 0, sizeof (struct app_list_elm)); @@ -231,6 +51,7 @@ main(int argc, char **argv, char **env) memset (w->thr, 0, sizeof (pthread_t)); w->ale = pubsubd_app_list_elm_copy (&ale); w->chans = &chans; + w->my_workers = my_workers; w->chan = chan; struct worker *wtmp = pubsubd_workers_add (my_workers, w); pubsubd_worker_free (w);