From e9c9e551fa27f8e621a68f8552c83d0241cdba19 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Tue, 7 Jun 2016 17:44:18 +0200
Subject: [PATCH] presque tout debug
---
lib/process.c | 3 +-
lib/pubsubd.c | 157 +++++++++++++++++++++++++++++++++++++++--------
lib/pubsubd.h | 27 +++++---
pubsub/pubsubd.c | 33 +++++++---
4 files changed, 178 insertions(+), 42 deletions(-)
diff --git a/lib/process.c b/lib/process.c
index e840954..cc81fc1 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -39,5 +39,6 @@ void srv_process_free (struct process * p)
void srv_process_print (struct process *p)
{
- printf ("process %d : index %d\n", p->pid, p->index);
+ if (p != NULL)
+ printf ("process %d : index %d\n", p->pid, p->index);
}
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 6e5dfe9..6021ed6 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -5,14 +5,16 @@
void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
-void
+struct channel *
pubsubd_channels_add (struct channels *chans, struct channel *c)
{
if(!chans || !c)
- return;
+ return NULL;
struct channel *n = pubsubd_channel_copy (c);
LIST_INSERT_HEAD(chans, n, entries);
+
+ return n;
}
void
@@ -20,8 +22,8 @@ pubsubd_channels_del (struct channels *chans, struct channel *c)
{
struct channel *todel = pubsubd_channel_get (chans, c);
if(todel != NULL) {
- LIST_REMOVE(todel, entries);
pubsubd_channel_free (todel);
+ LIST_REMOVE(todel, entries);
free (todel);
todel = NULL;
}
@@ -45,14 +47,38 @@ void pubsubd_channels_del_all (struct channels *chans)
struct channel * pubsubd_channel_copy (struct channel *c)
{
+ if (c == NULL)
+ return NULL;
+
struct channel *copy;
copy = malloc (sizeof(struct channel));
+ bzero (copy, sizeof (struct channel));
+
memcpy (copy, c, sizeof(struct channel));
+
+ if (c->chan != NULL) {
+ copy->chan = strndup (c->chan, c->chanlen);
+ copy->chanlen = c->chanlen;
+ }
+
return copy;
}
void pubsubd_channel_free (struct channel * c)
{
+ // TODO
+ if (c == NULL)
+ return;
+
+ if (c->chan != NULL) {
+ free (c->chan);
+ c->chan = NULL;
+ }
+
+ if (c->alh != NULL) {
+ pubsubd_subscriber_del_all (c->alh);
+ free (c->alh);
+ }
}
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
@@ -73,15 +99,42 @@ pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
// SUBSCRIBER
-void pubsubd_subscriber_init (struct app_list_head *chans) { LIST_INIT(chans); }
+void pubsubd_subscriber_init (struct app_list_head **chans) {
+ if (chans == NULL)
+ return;
-void pubsubd_app_list_elm_print (const struct app_list_elm *ale)
+ if (*chans == NULL)
+ *chans = malloc (sizeof(struct channels));
+ LIST_INIT(*chans);
+}
+
+void pubsubd_channels_print (const struct channels *chans)
{
- printf ( "app_list_elm\n\t");
- srv_process_print (ale->p);
+ printf ("\033[36mmchannels\033[00m\n\n");
- printf ( "\tchan : %s\n", ale->chan);
- printf ( "\taction : %d\n", (int) ale->action);
+ if (chans == NULL)
+ return ;
+
+ struct channel *chan = NULL;
+ LIST_FOREACH(chan, chans, entries) {
+ pubsubd_channel_print (chan);
+ }
+}
+
+void pubsubd_channel_print (const struct channel *c)
+{
+ if (c == NULL || c->chan == NULL)
+ return;
+
+ printf ( "\033[32mchan %s\033[00m\n\t", c->chan);
+
+ if (c->alh == NULL)
+ return;
+
+ struct app_list_elm *ale = NULL;
+ LIST_FOREACH(ale, c->alh, entries) {
+ srv_process_print (ale->p);
+ }
}
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
@@ -92,7 +145,8 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
struct app_list_elm * n;
n = malloc (sizeof (struct app_list_elm));
- n->p = srv_process_copy(ale->p);
+ if (ale->p != NULL)
+ n->p = srv_process_copy (ale->p);
return n;
}
@@ -131,13 +185,30 @@ pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p)
{
struct app_list_elm *todel = pubsubd_subscriber_get (chans, p);
if(todel != NULL) {
- LIST_REMOVE(todel, entries);
pubsubd_app_list_elm_free (todel);
+ LIST_REMOVE(todel, entries);
free (todel);
todel = NULL;
}
}
+void pubsubd_subscriber_del_all (struct app_list_head *alh)
+{
+ if (!alh)
+ return;
+
+ struct app_list_elm *ale;
+
+ while (!LIST_EMPTY(alh)) {
+ ale = LIST_FIRST(alh);
+ LIST_REMOVE(ale, entries);
+ pubsubd_app_list_elm_free (ale);
+ free (ale);
+ ale = NULL;
+ }
+}
+
+
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
{
if (ale == NULL)
@@ -148,7 +219,7 @@ void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{
- if (todel == NULL)
+ if (todel == NULL || todel->p == NULL)
return;
srv_process_free (todel->p);
}
@@ -202,12 +273,16 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
// COMMUNICATION
-int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
+int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
+ , struct channels *chans)
{
+ if (srv == NULL || ale == NULL || chans == NULL)
+ return -1;
+
if (ale->p != NULL) {
free (ale->p);
+ ale->p = NULL;
}
-
ale->p = malloc (sizeof (struct process));
char *buf;
@@ -217,7 +292,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
// parse pubsubd init msg (sent in TMPDIR/)
//
// line fmt : pid index version chan action
- // action : pub | sub
+ // action : quit | pub | sub
size_t i;
char *str, *token, *saveptr;
@@ -226,6 +301,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
int index;
int version;
+ char chan[BUFSIZ];
+ bzero (chan, BUFSIZ);
+
for (str = buf, i = 1; ; str = NULL, i++) {
token = strtok_r(str, " ", &saveptr);
if (token == NULL)
@@ -235,26 +313,53 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
case 1 : pid = strtoul(token, NULL, 10); break;
case 2 : index = strtoul(token, NULL, 10); break;
case 3 : version = strtoul(token, NULL, 10); break;
- case 4 : memcpy (ale->chan, token, strlen (token)); break;
+ case 4 : {
+ memcpy (chan, token, (strlen (token) < BUFSIZ) ?
+ strlen (token) : BUFSIZ);
+ break;
+ }
case 5 : {
- if (strncmp("pub", token, 3) == 0) {
- ale->action = 0;
+ if (strncmp("both", token, 4) == 0) {
+ ale->action = PUBSUB_BOTH;
}
+ else if (strncmp("pub", token, 3) == 0) {
+ ale->action = PUBSUB_PUB;
+ }
else if (strncmp("sub", token, 3) == 0) {
- ale->action = 1;
- }
- else if (strncmp("both", token, 4) == 0) {
- ale->action = 2;
+ ale->action = PUBSUB_SUB;
}
else { // everything else is about killing the service
- ale->action = 3;
+ ale->action = PUBSUB_QUIT;
}
}
}
}
+ if (buf != NULL) {
+ free (buf);
+ buf = NULL;
+ }
+
+ chan[BUFSIZ -1] = '\0';
+
+ struct channel c;
+ bzero (&c, sizeof (struct channel));
+ c.chan = strndup (chan, BUFSIZ);
+ c.chanlen = strlen (chan);
+
+ struct channel *new_chan;
+ new_chan = pubsubd_channel_get (chans, &c);
+ if (new_chan == NULL) {
+ new_chan = pubsubd_channels_add (chans, &c);
+ pubsubd_subscriber_init (&new_chan->alh);
+ }
+ pubsubd_channel_free (&c);
+
srv_process_gen (ale->p, pid, index, version);
- ale->chanlen = strlen (ale->chan);
+
+ // add the subscriber
+ if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH)
+ pubsubd_subscriber_add (new_chan->alh, ale);
return 0;
}
@@ -293,6 +398,10 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
memcpy (&cbuf[i], chan, chanlen); i += chanlen;
memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t);
memcpy (&cbuf[i], data, datalen); i += datalen;
+
+ free (chan);
+ free (data);
+
return 0;
}
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index ca62714..8c1e015 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -26,7 +26,8 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
void pubsubd_msg_free (struct pubsub_msg *msg);
-int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale);
+int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
+ , struct channels *chans);
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
@@ -40,47 +41,55 @@ void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg);
LIST_HEAD(channels, channel);
// element of the list
+// channel : chan name + chan name length + a list of applications
struct channel {
char *chan;
size_t chanlen;
+ struct app_list_head *alh;
LIST_ENTRY(channel) entries;
};
-void pubsubd_channels_init (struct channels *chans);
+// simple channel
struct channel * pubsubd_channel_copy (struct channel *c);
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
-void pubsubd_channels_del (struct channels *chans, struct channel *c);
-void pubsubd_channels_del_all (struct channels *chans);
-
void pubsubd_channel_free (struct channel *c);
int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
+void pubsubd_channel_print (const struct channel *c);
+
+// list of channels
+void pubsubd_channels_init (struct channels *chans);
+void pubsubd_channels_print (const struct channels *chans);
+struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c);
+void pubsubd_channels_del (struct channels *chans, struct channel *c);
+void pubsubd_channels_del_all (struct channels *chans);
// APPLICATION
// head of the list
LIST_HEAD(app_list_head, app_list_elm);
+enum app_list_elm_action { PUBSUB_QUIT, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH };
+
// element of the list
struct app_list_elm {
struct process *p;
- char chan[BUFSIZ];
- size_t chanlen;
- char action; // 0 : pub, 1 : sub, 2 : both, kill the service : 3
+ enum app_list_elm_action action;
LIST_ENTRY(app_list_elm) entries;
};
int
pubsubd_subscriber_eq (const struct app_list_elm *, const struct app_list_elm *);
+void pubsubd_subscriber_init (struct app_list_head **chans);
void pubsubd_subscriber_add (struct app_list_head *
, const struct app_list_elm *);
struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
, const struct app_list_elm *);
void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
+void pubsubd_subscriber_del_all (struct app_list_head *alh);
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale);
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
-void pubsubd_app_list_elm_print (const struct app_list_elm *ale);
#endif
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 0b235da..86f44d1 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -11,6 +11,7 @@ ohshit(int rvalue, const char* str) {
main(int argc, char* argv[])
{
struct service srv;
+ bzero (&srv, sizeof (struct service));
srv_init (&srv, PUBSUB_SERVICE_NAME);
printf ("Listening on %s.\n", srv.spath);
@@ -20,29 +21,45 @@ main(int argc, char* argv[])
// init chans list
struct channels chans;
+ bzero (&chans, sizeof (struct channels));
pubsubd_channels_init (&chans);
for (;;) {
// for each new process
struct app_list_elm ale;
- pubsubd_get_new_process (&srv, &ale);
- pubsubd_app_list_elm_print (&ale);
+ bzero (&ale, sizeof (struct app_list_elm));
- // stop the application ? (action 3)
- if (ale.action == 3) {
- pubsubd_channels_del_all (&chans);
+ pubsubd_get_new_process (&srv, &ale, &chans);
+ pubsubd_channels_print (&chans);
+
+ // end the application
+ if (ale.action == PUBSUB_QUIT) {
printf ("Quitting ...\n");
+
+ pubsubd_channels_del_all (&chans);
+ srv_close (&srv);
+
+ // TODO end the threads
+
exit (0);
}
- // add the chan to the list
+ // TODO thread to handle multiple clients at a time
+ // TODO register the subscriber
// each chan has a list of subscribers
// someone who only push a msg doesn't need to be registered
+ if (ale.action == PUBSUB_SUB || ale.action == PUBSUB_BOTH) {
+ // TODO
+ }
+ else if (ale.action == PUBSUB_PUB) {
+ // TODO add it to the application to follow
+ // TODO publish a message
- //
+ // then
+ }
- // TODO thread to handle multiple clients at a time
+ pubsubd_app_list_elm_free (&ale);
}
// the application will shut down, and remove the service named pipe