diff --git a/lib/communication.c b/lib/communication.c index 828cb6f..0faf567 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -5,6 +5,10 @@ int file_open (FILE **f, const char *path, const char *mode) { printf ("opening %s\n", path); + if (*f != NULL) { + printf ("f != NULL : %p\n", *f); + fclose (*f); + } *f = fopen (path, mode); if (*f == NULL) { fprintf (stderr, "\033[31mnot opened\033[00m\n"); @@ -124,13 +128,13 @@ int srv_get_new_process (const struct service *srv, struct process *p) printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec); - char *token, *saveptr; - char *str; - int i; + char *token = NULL, *saveptr = NULL; + char *str = NULL; + int i = 0; - pid_t pid; - int index; - int version; + pid_t pid = 0; + int index = 0; + int version = 0; for (str = buf, i = 1; ; str = NULL, i++) { token = strtok_r(str, " ", &saveptr); @@ -170,6 +174,7 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize if (file_close (p->out)) return 1; + p->out = NULL; return 0; } @@ -184,6 +189,7 @@ int srv_read (struct process *p, char ** buf, size_t * msize) if (file_close (p->out)) return 1; + p->out = NULL; return 0; } @@ -197,12 +203,32 @@ int srv_write (struct process *p, void * buf, size_t msize) if (file_close (p->in)) return 1; + p->in = NULL; return 0; } // APPLICATION +// send the connection string to $TMP/ +int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize) +{ + if (srv == NULL) { + return 1; + } + + FILE * f = NULL; + if (file_open (&f, srv->spath, "wb")) + return 2; + + fwrite (connectionstr, msize, 1, f); // FIXME check errors + + if (file_close (f)) + return 3; + + return 0; +} + int app_create (struct process *p, int index) { pid_t pid = getpid(); @@ -303,6 +329,7 @@ int app_read_cb (struct process *p, char ** buf, size_t * msize if (file_close (p->in)) return 1; + p->in = NULL; return 0; } @@ -317,6 +344,7 @@ int app_read (struct process *p, void * buf, size_t * msize) if (file_close (p->in)) return 1; + p->in = NULL; return 0; } @@ -330,6 +358,7 @@ int app_write (struct process *p, void * buf, size_t msize) if (file_close (p->out)) return 1; + p->out = NULL; return 0; } diff --git a/lib/communication.h b/lib/communication.h index 5241b05..e9139de 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -45,6 +45,9 @@ int srv_write (struct process *, void * buf, size_t); // APPLICATION +// send the connection string to $TMP/ +int app_srv_connection (struct service *, const char *, size_t); + int app_create (struct process *, int index); // called by the application int app_destroy (struct process *); // called by the application diff --git a/lib/process.c b/lib/process.c index cc81fc1..0548fa3 100644 --- a/lib/process.c +++ b/lib/process.c @@ -28,6 +28,8 @@ void srv_process_gen (struct process *p bzero (p->path_out, PATH_MAX); snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index); snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index); + p->in = NULL; + p->out = NULL; } void srv_process_free (struct process * p) diff --git a/lib/pubsubd.c b/lib/pubsubd.c index ae12610..7c7815f 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -34,7 +34,7 @@ void pubsubd_channels_del_all (struct channels *chans) if (!chans) return; - struct channel *c; + struct channel *c = NULL; while (!LIST_EMPTY(chans)) { c = LIST_FIRST(chans); @@ -50,7 +50,7 @@ struct channel * pubsubd_channel_copy (struct channel *c) if (c == NULL) return NULL; - struct channel *copy; + struct channel *copy = NULL; copy = malloc (sizeof(struct channel)); bzero (copy, sizeof (struct channel)); @@ -142,7 +142,7 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale) if (ale == NULL) return NULL; - struct app_list_elm * n; + struct app_list_elm * n = NULL; n = malloc (sizeof (struct app_list_elm)); if (ale->p != NULL) @@ -173,7 +173,7 @@ pubsubd_subscriber_add (struct app_list_head *alh, const struct app_list_elm *al struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list_elm *p) { - struct app_list_elm *np, *res = NULL; + struct app_list_elm *np = NULL, *res = NULL; LIST_FOREACH(np, chans, entries) { if(pubsubd_subscriber_eq (np, p)) { res = np; @@ -199,7 +199,7 @@ void pubsubd_subscriber_del_all (struct app_list_head *alh) if (!alh) return; - struct app_list_elm *ale; + struct app_list_elm *ale = NULL; while (!LIST_EMPTY(alh)) { ale = LIST_FIRST(alh); @@ -317,8 +317,8 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale if (srv == NULL || ale == NULL || chans == NULL) return -1; - char *buf; - size_t msize; + char *buf = NULL; + size_t msize = 0; srv_get_listen_raw (srv, &buf, &msize); // parse pubsubd init msg (sent in TMPDIR/) @@ -326,12 +326,12 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale // line fmt : pid index version action chan // action : quit | pub | sub - size_t i; - char *str, *token, *saveptr; + size_t i = 0; + char *str = NULL, *token = NULL, *saveptr = NULL; - pid_t pid; - int index; - int version; + pid_t pid = 0; + int index = 0; + int version = 0; char chan[BUFSIZ]; bzero (chan, BUFSIZ); @@ -399,7 +399,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale c[0]->chan = strndup (chan, BUFSIZ); c[0]->chanlen = strlen (chan); - struct channel *new_chan; + struct channel *new_chan = NULL; new_chan = pubsubd_channel_get (chans, *c); if (new_chan == NULL) { new_chan = pubsubd_channels_add (chans, *c); @@ -424,27 +424,29 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n"); // read - char type; + char type = ' '; fread (&type, 1, 1, f); - size_t chanlen; + size_t chanlen = 0; fread (&chanlen, sizeof (size_t), 1, f); if (chanlen > BUFSIZ) { return 1; } - char *chan = malloc (chanlen); + char *chan = NULL; + chan = malloc (chanlen); fread (chan, chanlen, 1, f); - size_t datalen; + size_t datalen = 0; fread (&datalen, sizeof (size_t), 1, f); if (datalen > BUFSIZ) { return 1; } - char *data = malloc (datalen); + char *data = NULL; + data = malloc (datalen); fread (data, datalen, 1, f); *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen; @@ -476,8 +478,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg { struct app_list_elm * ale = NULL; - char *buf; - size_t msize; + char *buf = NULL; + size_t msize = 0; pubsubd_msg_serialize (m, &buf, &msize); LIST_FOREACH(ale, alh, entries) { @@ -504,8 +506,8 @@ void pubsubd_msg_print (const struct pubsub_msg *msg) void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) { // read the message from the process - size_t mlen; - char *buf; + size_t mlen = 0; + char *buf = NULL; srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); pubsubd_msg_unserialize (m, buf, mlen); @@ -515,11 +517,52 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) } } +#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" +#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" +#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both" +#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit" + +// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; + +char * pubsub_action_to_str (enum app_list_elm_action action) +{ + switch (action) { + case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB); + case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB); + case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH); + case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT); + } + + return NULL; +} + +void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame) +{ + char * straction = NULL; + straction = pubsub_action_to_str (action); + + char line[BUFSIZ]; + bzero (line, BUFSIZ); + + // line fmt : pid index version action chan + // "quit" action is also possible (see pubsub_disconnect) + snprintf (line, BUFSIZ, "%d %d %d %s %s\n" + , p->pid, p->index, p->version + , straction + , channame); + line[BUFSIZ -1] = '\0'; // to be sure + + // send the connection line in the $TMP/ pipe + app_srv_connection (srv, line, strlen (line)); + + if (straction != NULL) + free (straction); +} + void pubsub_msg_send (const struct service *s, struct process *p, const struct pubsub_msg * m) { - - char *buf; - size_t msize; + char *buf = NULL; + size_t msize = 0; pubsubd_msg_serialize (m, &buf, &msize); app_write (p, buf, msize); @@ -532,8 +575,8 @@ void pubsub_msg_send (const struct service *s, struct process *p, const struct p void pubsub_msg_recv (const struct service *s, struct process *p, struct pubsub_msg * m) { // read the message from the process - size_t mlen; - char *buf; + size_t mlen = 0; + char *buf = NULL; app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); pubsubd_msg_unserialize (m, buf, mlen); diff --git a/pingpong/pingpong.c b/pingpong/pingpong.c index 276c216..ff58080 100644 --- a/pingpong/pingpong.c +++ b/pingpong/pingpong.c @@ -33,7 +33,7 @@ void main_loop (const struct service *srv) // about the message size_t msize = BUFSIZ; - char *buf; + char *buf = NULL; // printf ("before read\n"); if ((ret = srv_read (&proc, &buf, &msize))) { diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c new file mode 100644 index 0000000..b5a6ea5 --- /dev/null +++ b/pubsub/pubsub-test-send.c @@ -0,0 +1,59 @@ +#include "../lib/pubsubd.h" +#include +#include + +#define MYMESSAGE "coucou" +#define MYCHAN "chan1" + +void +ohshit(int rvalue, const char* str) { + fprintf (stderr, "\033[31merr: %s\033[00m\n", str); + exit (rvalue); +} + + int +main(int argc, char* argv[]) +{ + struct service srv; + bzero (&srv, sizeof (struct service)); + srv_init (&srv, PUBSUB_SERVICE_NAME); + printf ("Writing on %s.\n", srv.spath); + + struct process p; + bzero (&p, sizeof (struct process)); + int index = 1; + + if (app_create (&p, index)) // called by the application + ohshit (1, "app_create"); + + // send a message to warn the service we want to do something + // line : pid index version action chan + pubsub_connection (&srv, &p, PUBSUB_PUB, MYCHAN); + + struct pubsub_msg m; + bzero (&m, sizeof (struct pubsub_msg)); + + // first message, "coucou" + m.type = PUBSUB_TYPE_INFO; + m.chan = malloc (strlen (MYCHAN)); + m.chanlen = strlen (MYCHAN); + m.data = malloc (strlen (MYMESSAGE)); + m.datalen = strlen (MYMESSAGE); + pubsub_msg_send (&srv, &p, &m); + + // second message, to disconnect from the server + m.type = PUBSUB_TYPE_DISCONNECT; + pubsub_msg_send (&srv, &p, &m); + + // free everything + + pubsubd_msg_free (&m); + + // the application will shut down, and remove the application named pipes + if (app_destroy (&p)) + ohshit (1, "app_destroy"); + + srv_process_free (&p); + + return EXIT_SUCCESS; +} diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index d4fb904..f982f5e 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -75,7 +75,7 @@ main(int argc, char* argv[]) // for each new process struct app_list_elm ale; bzero (&ale, sizeof (struct app_list_elm)); - struct channel *chan; + struct channel *chan = NULL; pubsubd_get_new_process (&srv, &ale, &chans, &chan); pubsubd_channels_print (&chans); @@ -92,13 +92,13 @@ main(int argc, char* argv[]) } // TODO thread to handle multiple clients at a time - struct worker_params *wp; + struct worker_params *wp = NULL; wp = malloc (sizeof (struct worker_params)); wp->ale = pubsubd_app_list_elm_copy (&ale); wp->chans = &chans; wp->chan = chan; - pthread_t thr; + pthread_t thr = 0; pthread_create (&thr, NULL, pubsubd_worker_thread, wp); pthread_detach (thr);