Archived
3
0

presque tout debug

This commit is contained in:
Philippe PITTOLI 2016-06-07 17:44:18 +02:00
parent 85f94917c9
commit e9c9e551fa
4 changed files with 178 additions and 42 deletions

View File

@ -39,5 +39,6 @@ void srv_process_free (struct process * p)
void srv_process_print (struct process *p)
{
if (p != NULL)
printf ("process %d : index %d\n", p->pid, p->index);
}

View File

@ -5,14 +5,16 @@
void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
void
struct channel *
pubsubd_channels_add (struct channels *chans, struct channel *c)
{
if(!chans || !c)
return;
return NULL;
struct channel *n = pubsubd_channel_copy (c);
LIST_INSERT_HEAD(chans, n, entries);
return n;
}
void
@ -20,8 +22,8 @@ pubsubd_channels_del (struct channels *chans, struct channel *c)
{
struct channel *todel = pubsubd_channel_get (chans, c);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
pubsubd_channel_free (todel);
LIST_REMOVE(todel, entries);
free (todel);
todel = NULL;
}
@ -45,14 +47,38 @@ void pubsubd_channels_del_all (struct channels *chans)
struct channel * pubsubd_channel_copy (struct channel *c)
{
if (c == NULL)
return NULL;
struct channel *copy;
copy = malloc (sizeof(struct channel));
bzero (copy, sizeof (struct channel));
memcpy (copy, c, sizeof(struct channel));
if (c->chan != NULL) {
copy->chan = strndup (c->chan, c->chanlen);
copy->chanlen = c->chanlen;
}
return copy;
}
void pubsubd_channel_free (struct channel * c)
{
// TODO
if (c == NULL)
return;
if (c->chan != NULL) {
free (c->chan);
c->chan = NULL;
}
if (c->alh != NULL) {
pubsubd_subscriber_del_all (c->alh);
free (c->alh);
}
}
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
@ -73,15 +99,42 @@ pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
// SUBSCRIBER
void pubsubd_subscriber_init (struct app_list_head *chans) { LIST_INIT(chans); }
void pubsubd_subscriber_init (struct app_list_head **chans) {
if (chans == NULL)
return;
void pubsubd_app_list_elm_print (const struct app_list_elm *ale)
if (*chans == NULL)
*chans = malloc (sizeof(struct channels));
LIST_INIT(*chans);
}
void pubsubd_channels_print (const struct channels *chans)
{
printf ( "app_list_elm\n\t");
srv_process_print (ale->p);
printf ("\033[36mmchannels\033[00m\n\n");
printf ( "\tchan : %s\n", ale->chan);
printf ( "\taction : %d\n", (int) ale->action);
if (chans == NULL)
return ;
struct channel *chan = NULL;
LIST_FOREACH(chan, chans, entries) {
pubsubd_channel_print (chan);
}
}
void pubsubd_channel_print (const struct channel *c)
{
if (c == NULL || c->chan == NULL)
return;
printf ( "\033[32mchan %s\033[00m\n\t", c->chan);
if (c->alh == NULL)
return;
struct app_list_elm *ale = NULL;
LIST_FOREACH(ale, c->alh, entries) {
srv_process_print (ale->p);
}
}
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
@ -92,7 +145,8 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
struct app_list_elm * n;
n = malloc (sizeof (struct app_list_elm));
n->p = srv_process_copy(ale->p);
if (ale->p != NULL)
n->p = srv_process_copy (ale->p);
return n;
}
@ -131,13 +185,30 @@ pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p)
{
struct app_list_elm *todel = pubsubd_subscriber_get (chans, p);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
pubsubd_app_list_elm_free (todel);
LIST_REMOVE(todel, entries);
free (todel);
todel = NULL;
}
}
void pubsubd_subscriber_del_all (struct app_list_head *alh)
{
if (!alh)
return;
struct app_list_elm *ale;
while (!LIST_EMPTY(alh)) {
ale = LIST_FIRST(alh);
LIST_REMOVE(ale, entries);
pubsubd_app_list_elm_free (ale);
free (ale);
ale = NULL;
}
}
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
{
if (ale == NULL)
@ -148,7 +219,7 @@ void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{
if (todel == NULL)
if (todel == NULL || todel->p == NULL)
return;
srv_process_free (todel->p);
}
@ -202,12 +273,16 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
// COMMUNICATION
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
, struct channels *chans)
{
if (srv == NULL || ale == NULL || chans == NULL)
return -1;
if (ale->p != NULL) {
free (ale->p);
ale->p = NULL;
}
ale->p = malloc (sizeof (struct process));
char *buf;
@ -217,7 +292,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
// parse pubsubd init msg (sent in TMPDIR/<service>)
//
// line fmt : pid index version chan action
// action : pub | sub
// action : quit | pub | sub
size_t i;
char *str, *token, *saveptr;
@ -226,6 +301,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
int index;
int version;
char chan[BUFSIZ];
bzero (chan, BUFSIZ);
for (str = buf, i = 1; ; str = NULL, i++) {
token = strtok_r(str, " ", &saveptr);
if (token == NULL)
@ -235,26 +313,53 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale)
case 1 : pid = strtoul(token, NULL, 10); break;
case 2 : index = strtoul(token, NULL, 10); break;
case 3 : version = strtoul(token, NULL, 10); break;
case 4 : memcpy (ale->chan, token, strlen (token)); break;
case 4 : {
memcpy (chan, token, (strlen (token) < BUFSIZ) ?
strlen (token) : BUFSIZ);
break;
}
case 5 : {
if (strncmp("pub", token, 3) == 0) {
ale->action = 0;
if (strncmp("both", token, 4) == 0) {
ale->action = PUBSUB_BOTH;
}
else if (strncmp("pub", token, 3) == 0) {
ale->action = PUBSUB_PUB;
}
else if (strncmp("sub", token, 3) == 0) {
ale->action = 1;
}
else if (strncmp("both", token, 4) == 0) {
ale->action = 2;
ale->action = PUBSUB_SUB;
}
else { // everything else is about killing the service
ale->action = 3;
ale->action = PUBSUB_QUIT;
}
}
}
}
if (buf != NULL) {
free (buf);
buf = NULL;
}
chan[BUFSIZ -1] = '\0';
struct channel c;
bzero (&c, sizeof (struct channel));
c.chan = strndup (chan, BUFSIZ);
c.chanlen = strlen (chan);
struct channel *new_chan;
new_chan = pubsubd_channel_get (chans, &c);
if (new_chan == NULL) {
new_chan = pubsubd_channels_add (chans, &c);
pubsubd_subscriber_init (&new_chan->alh);
}
pubsubd_channel_free (&c);
srv_process_gen (ale->p, pid, index, version);
ale->chanlen = strlen (ale->chan);
// add the subscriber
if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH)
pubsubd_subscriber_add (new_chan->alh, ale);
return 0;
}
@ -293,6 +398,10 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
memcpy (&cbuf[i], chan, chanlen); i += chanlen;
memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t);
memcpy (&cbuf[i], data, datalen); i += datalen;
free (chan);
free (data);
return 0;
}

View File

@ -26,7 +26,8 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
void pubsubd_msg_free (struct pubsub_msg *msg);
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale);
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
, struct channels *chans);
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
@ -40,47 +41,55 @@ void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg);
LIST_HEAD(channels, channel);
// element of the list
// channel : chan name + chan name length + a list of applications
struct channel {
char *chan;
size_t chanlen;
struct app_list_head *alh;
LIST_ENTRY(channel) entries;
};
void pubsubd_channels_init (struct channels *chans);
// simple channel
struct channel * pubsubd_channel_copy (struct channel *c);
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
void pubsubd_channels_del (struct channels *chans, struct channel *c);
void pubsubd_channels_del_all (struct channels *chans);
void pubsubd_channel_free (struct channel *c);
int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
void pubsubd_channel_print (const struct channel *c);
// list of channels
void pubsubd_channels_init (struct channels *chans);
void pubsubd_channels_print (const struct channels *chans);
struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c);
void pubsubd_channels_del (struct channels *chans, struct channel *c);
void pubsubd_channels_del_all (struct channels *chans);
// APPLICATION
// head of the list
LIST_HEAD(app_list_head, app_list_elm);
enum app_list_elm_action { PUBSUB_QUIT, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH };
// element of the list
struct app_list_elm {
struct process *p;
char chan[BUFSIZ];
size_t chanlen;
char action; // 0 : pub, 1 : sub, 2 : both, kill the service : 3
enum app_list_elm_action action;
LIST_ENTRY(app_list_elm) entries;
};
int
pubsubd_subscriber_eq (const struct app_list_elm *, const struct app_list_elm *);
void pubsubd_subscriber_init (struct app_list_head **chans);
void pubsubd_subscriber_add (struct app_list_head *
, const struct app_list_elm *);
struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
, const struct app_list_elm *);
void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
void pubsubd_subscriber_del_all (struct app_list_head *alh);
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale);
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
void pubsubd_app_list_elm_print (const struct app_list_elm *ale);
#endif

View File

@ -11,6 +11,7 @@ ohshit(int rvalue, const char* str) {
main(int argc, char* argv[])
{
struct service srv;
bzero (&srv, sizeof (struct service));
srv_init (&srv, PUBSUB_SERVICE_NAME);
printf ("Listening on %s.\n", srv.spath);
@ -20,29 +21,45 @@ main(int argc, char* argv[])
// init chans list
struct channels chans;
bzero (&chans, sizeof (struct channels));
pubsubd_channels_init (&chans);
for (;;) {
// for each new process
struct app_list_elm ale;
pubsubd_get_new_process (&srv, &ale);
pubsubd_app_list_elm_print (&ale);
bzero (&ale, sizeof (struct app_list_elm));
// stop the application ? (action 3)
if (ale.action == 3) {
pubsubd_channels_del_all (&chans);
pubsubd_get_new_process (&srv, &ale, &chans);
pubsubd_channels_print (&chans);
// end the application
if (ale.action == PUBSUB_QUIT) {
printf ("Quitting ...\n");
pubsubd_channels_del_all (&chans);
srv_close (&srv);
// TODO end the threads
exit (0);
}
// add the chan to the list
// TODO thread to handle multiple clients at a time
// TODO register the subscriber
// each chan has a list of subscribers
// someone who only push a msg doesn't need to be registered
if (ale.action == PUBSUB_SUB || ale.action == PUBSUB_BOTH) {
// TODO
}
else if (ale.action == PUBSUB_PUB) {
// TODO add it to the application to follow
// TODO publish a message
//
// then
}
// TODO thread to handle multiple clients at a time
pubsubd_app_list_elm_free (&ale);
}
// the application will shut down, and remove the service named pipe