diff --git a/lib/process.c b/lib/process.c index e840954..cc81fc1 100644 --- a/lib/process.c +++ b/lib/process.c @@ -39,5 +39,6 @@ void srv_process_free (struct process * p) void srv_process_print (struct process *p) { - printf ("process %d : index %d\n", p->pid, p->index); + if (p != NULL) + printf ("process %d : index %d\n", p->pid, p->index); } diff --git a/lib/pubsubd.c b/lib/pubsubd.c index 6e5dfe9..6021ed6 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -5,14 +5,16 @@ void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); } -void +struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c) { if(!chans || !c) - return; + return NULL; struct channel *n = pubsubd_channel_copy (c); LIST_INSERT_HEAD(chans, n, entries); + + return n; } void @@ -20,8 +22,8 @@ pubsubd_channels_del (struct channels *chans, struct channel *c) { struct channel *todel = pubsubd_channel_get (chans, c); if(todel != NULL) { - LIST_REMOVE(todel, entries); pubsubd_channel_free (todel); + LIST_REMOVE(todel, entries); free (todel); todel = NULL; } @@ -45,14 +47,38 @@ void pubsubd_channels_del_all (struct channels *chans) struct channel * pubsubd_channel_copy (struct channel *c) { + if (c == NULL) + return NULL; + struct channel *copy; copy = malloc (sizeof(struct channel)); + bzero (copy, sizeof (struct channel)); + memcpy (copy, c, sizeof(struct channel)); + + if (c->chan != NULL) { + copy->chan = strndup (c->chan, c->chanlen); + copy->chanlen = c->chanlen; + } + return copy; } void pubsubd_channel_free (struct channel * c) { + // TODO + if (c == NULL) + return; + + if (c->chan != NULL) { + free (c->chan); + c->chan = NULL; + } + + if (c->alh != NULL) { + pubsubd_subscriber_del_all (c->alh); + free (c->alh); + } } struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c) @@ -73,15 +99,42 @@ pubsubd_channel_eq (const struct channel *c1, const struct channel *c2) // SUBSCRIBER -void pubsubd_subscriber_init (struct app_list_head *chans) { LIST_INIT(chans); } +void pubsubd_subscriber_init (struct app_list_head **chans) { + if (chans == NULL) + return; -void pubsubd_app_list_elm_print (const struct app_list_elm *ale) + if (*chans == NULL) + *chans = malloc (sizeof(struct channels)); + LIST_INIT(*chans); +} + +void pubsubd_channels_print (const struct channels *chans) { - printf ( "app_list_elm\n\t"); - srv_process_print (ale->p); + printf ("\033[36mmchannels\033[00m\n\n"); - printf ( "\tchan : %s\n", ale->chan); - printf ( "\taction : %d\n", (int) ale->action); + if (chans == NULL) + return ; + + struct channel *chan = NULL; + LIST_FOREACH(chan, chans, entries) { + pubsubd_channel_print (chan); + } +} + +void pubsubd_channel_print (const struct channel *c) +{ + if (c == NULL || c->chan == NULL) + return; + + printf ( "\033[32mchan %s\033[00m\n\t", c->chan); + + if (c->alh == NULL) + return; + + struct app_list_elm *ale = NULL; + LIST_FOREACH(ale, c->alh, entries) { + srv_process_print (ale->p); + } } struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale) @@ -92,7 +145,8 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale) struct app_list_elm * n; n = malloc (sizeof (struct app_list_elm)); - n->p = srv_process_copy(ale->p); + if (ale->p != NULL) + n->p = srv_process_copy (ale->p); return n; } @@ -131,13 +185,30 @@ pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p) { struct app_list_elm *todel = pubsubd_subscriber_get (chans, p); if(todel != NULL) { - LIST_REMOVE(todel, entries); pubsubd_app_list_elm_free (todel); + LIST_REMOVE(todel, entries); free (todel); todel = NULL; } } +void pubsubd_subscriber_del_all (struct app_list_head *alh) +{ + if (!alh) + return; + + struct app_list_elm *ale; + + while (!LIST_EMPTY(alh)) { + ale = LIST_FIRST(alh); + LIST_REMOVE(ale, entries); + pubsubd_app_list_elm_free (ale); + free (ale); + ale = NULL; + } +} + + void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p) { if (ale == NULL) @@ -148,7 +219,7 @@ void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p) void pubsubd_app_list_elm_free (struct app_list_elm *todel) { - if (todel == NULL) + if (todel == NULL || todel->p == NULL) return; srv_process_free (todel->p); } @@ -202,12 +273,16 @@ void pubsubd_msg_free (struct pubsub_msg *msg) // COMMUNICATION -int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale) +int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale + , struct channels *chans) { + if (srv == NULL || ale == NULL || chans == NULL) + return -1; + if (ale->p != NULL) { free (ale->p); + ale->p = NULL; } - ale->p = malloc (sizeof (struct process)); char *buf; @@ -217,7 +292,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale) // parse pubsubd init msg (sent in TMPDIR/) // // line fmt : pid index version chan action - // action : pub | sub + // action : quit | pub | sub size_t i; char *str, *token, *saveptr; @@ -226,6 +301,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale) int index; int version; + char chan[BUFSIZ]; + bzero (chan, BUFSIZ); + for (str = buf, i = 1; ; str = NULL, i++) { token = strtok_r(str, " ", &saveptr); if (token == NULL) @@ -235,26 +313,53 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale) case 1 : pid = strtoul(token, NULL, 10); break; case 2 : index = strtoul(token, NULL, 10); break; case 3 : version = strtoul(token, NULL, 10); break; - case 4 : memcpy (ale->chan, token, strlen (token)); break; + case 4 : { + memcpy (chan, token, (strlen (token) < BUFSIZ) ? + strlen (token) : BUFSIZ); + break; + } case 5 : { - if (strncmp("pub", token, 3) == 0) { - ale->action = 0; + if (strncmp("both", token, 4) == 0) { + ale->action = PUBSUB_BOTH; } + else if (strncmp("pub", token, 3) == 0) { + ale->action = PUBSUB_PUB; + } else if (strncmp("sub", token, 3) == 0) { - ale->action = 1; - } - else if (strncmp("both", token, 4) == 0) { - ale->action = 2; + ale->action = PUBSUB_SUB; } else { // everything else is about killing the service - ale->action = 3; + ale->action = PUBSUB_QUIT; } } } } + if (buf != NULL) { + free (buf); + buf = NULL; + } + + chan[BUFSIZ -1] = '\0'; + + struct channel c; + bzero (&c, sizeof (struct channel)); + c.chan = strndup (chan, BUFSIZ); + c.chanlen = strlen (chan); + + struct channel *new_chan; + new_chan = pubsubd_channel_get (chans, &c); + if (new_chan == NULL) { + new_chan = pubsubd_channels_add (chans, &c); + pubsubd_subscriber_init (&new_chan->alh); + } + pubsubd_channel_free (&c); + srv_process_gen (ale->p, pid, index, version); - ale->chanlen = strlen (ale->chan); + + // add the subscriber + if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH) + pubsubd_subscriber_add (new_chan->alh, ale); return 0; } @@ -293,6 +398,10 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) memcpy (&cbuf[i], chan, chanlen); i += chanlen; memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t); memcpy (&cbuf[i], data, datalen); i += datalen; + + free (chan); + free (data); + return 0; } diff --git a/lib/pubsubd.h b/lib/pubsubd.h index ca62714..8c1e015 100644 --- a/lib/pubsubd.h +++ b/lib/pubsubd.h @@ -26,7 +26,8 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len); void pubsubd_msg_free (struct pubsub_msg *msg); -int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale); +int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale + , struct channels *chans); int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize); void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m); @@ -40,47 +41,55 @@ void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg); LIST_HEAD(channels, channel); // element of the list +// channel : chan name + chan name length + a list of applications struct channel { char *chan; size_t chanlen; + struct app_list_head *alh; LIST_ENTRY(channel) entries; }; -void pubsubd_channels_init (struct channels *chans); +// simple channel struct channel * pubsubd_channel_copy (struct channel *c); struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c); -void pubsubd_channels_del (struct channels *chans, struct channel *c); -void pubsubd_channels_del_all (struct channels *chans); - void pubsubd_channel_free (struct channel *c); int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2); +void pubsubd_channel_print (const struct channel *c); + +// list of channels +void pubsubd_channels_init (struct channels *chans); +void pubsubd_channels_print (const struct channels *chans); +struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c); +void pubsubd_channels_del (struct channels *chans, struct channel *c); +void pubsubd_channels_del_all (struct channels *chans); // APPLICATION // head of the list LIST_HEAD(app_list_head, app_list_elm); +enum app_list_elm_action { PUBSUB_QUIT, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH }; + // element of the list struct app_list_elm { struct process *p; - char chan[BUFSIZ]; - size_t chanlen; - char action; // 0 : pub, 1 : sub, 2 : both, kill the service : 3 + enum app_list_elm_action action; LIST_ENTRY(app_list_elm) entries; }; int pubsubd_subscriber_eq (const struct app_list_elm *, const struct app_list_elm *); +void pubsubd_subscriber_init (struct app_list_head **chans); void pubsubd_subscriber_add (struct app_list_head * , const struct app_list_elm *); struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head * , const struct app_list_elm *); void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p); +void pubsubd_subscriber_del_all (struct app_list_head *alh); struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale); void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p); void pubsubd_app_list_elm_free (struct app_list_elm *todel); -void pubsubd_app_list_elm_print (const struct app_list_elm *ale); #endif diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index 0b235da..86f44d1 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -11,6 +11,7 @@ ohshit(int rvalue, const char* str) { main(int argc, char* argv[]) { struct service srv; + bzero (&srv, sizeof (struct service)); srv_init (&srv, PUBSUB_SERVICE_NAME); printf ("Listening on %s.\n", srv.spath); @@ -20,29 +21,45 @@ main(int argc, char* argv[]) // init chans list struct channels chans; + bzero (&chans, sizeof (struct channels)); pubsubd_channels_init (&chans); for (;;) { // for each new process struct app_list_elm ale; - pubsubd_get_new_process (&srv, &ale); - pubsubd_app_list_elm_print (&ale); + bzero (&ale, sizeof (struct app_list_elm)); - // stop the application ? (action 3) - if (ale.action == 3) { - pubsubd_channels_del_all (&chans); + pubsubd_get_new_process (&srv, &ale, &chans); + pubsubd_channels_print (&chans); + + // end the application + if (ale.action == PUBSUB_QUIT) { printf ("Quitting ...\n"); + + pubsubd_channels_del_all (&chans); + srv_close (&srv); + + // TODO end the threads + exit (0); } - // add the chan to the list + // TODO thread to handle multiple clients at a time + // TODO register the subscriber // each chan has a list of subscribers // someone who only push a msg doesn't need to be registered + if (ale.action == PUBSUB_SUB || ale.action == PUBSUB_BOTH) { + // TODO + } + else if (ale.action == PUBSUB_PUB) { + // TODO add it to the application to follow + // TODO publish a message - // + // then + } - // TODO thread to handle multiple clients at a time + pubsubd_app_list_elm_free (&ale); } // the application will shut down, and remove the service named pipe