diff --git a/lib/communication.c b/lib/communication.c index 6abd9f2..12a9c93 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -6,7 +6,7 @@ int file_open (FILE **f, const char *path, const char *mode) { printf ("opening %s\n", path); if (*f != NULL) { - printf ("f != NULL : %p\n", (void*) *f); + // printf ("f != NULL : %p\n", (void*) *f); if (file_close (*f)) { return ER_FILE_CLOSE; } @@ -16,7 +16,7 @@ int file_open (FILE **f, const char *path, const char *mode) fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path); return ER_FILE_OPEN; } - printf ("opened : %ld\n", (long) *f); + // printf ("opened : %ld\n", (long) *f); return 0; } @@ -24,11 +24,11 @@ int file_open (FILE **f, const char *path, const char *mode) int file_close (FILE *f) { if (f != 0) { - printf ("before fclosing\n"); + // printf ("before fclosing\n"); if (fclose (f)) { return ER_FILE_CLOSE; } - printf ("after fclosing\n"); + // printf ("after fclosing\n"); } return 0; } @@ -48,16 +48,21 @@ int file_read (FILE *f, char **buf, size_t *msize) { } } - *msize = fread (*buf, *msize, 1, f); - if (*msize == 0) { + int ret = 0; + + ret = fread (*buf, *msize, 1, f); + if (ret < 0) { fprintf (stderr, "err can't read a file\n"); - if (ER_FILE_CLOSE == file_close (f)) { - fprintf (stderr, "err closing the file\n"); - return ER_FILE_CLOSE; - } + ret = file_close (f); + if (ret != 0) + return ret; return ER_FILE_READ; } + ret = file_close (f); + if (ret != 0) + return ret; + return 0; } @@ -260,16 +265,13 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize return ER_FILE_OPEN; } + int ret = 0; + if (cb != NULL) { - int ret = (*cb) (p->out, buf, msize); - if (ret != 0) - return ret; + ret = (*cb) (p->out, buf, msize); } else { - int ret = file_read (p->out, buf, msize); - if (ret != 0) { - return ret; - } + ret = file_read (p->out, buf, msize); } // printf ("DEBUG read, size %ld : %s\n", *msize, *buf); @@ -279,7 +281,7 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize } p->out = NULL; - return 0; + return ret; } int srv_read (struct process *p, char ** buf, size_t * msize) @@ -289,10 +291,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize) return ER_FILE_OPEN; } - int ret = file_read (p->out, buf, msize); + int ret = 0; + + ret = file_read (p->out, buf, msize); if (ret != 0) { p->out = NULL; - return ret; } // printf ("DEBUG read, size %ld : %s\n", *msize, buf); @@ -300,11 +303,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize) if (ER_FILE_CLOSE == file_close (p->out)) { fprintf (stderr, "err closing the file %s\n", p->path_out); p->out = NULL; - return ER_FILE_CLOSE; + ret = ER_FILE_CLOSE; } p->out = NULL; - return 0; + return ret; } int srv_write (struct process *p, char * buf, size_t msize) @@ -455,20 +458,18 @@ int app_read_cb (struct process *p, char ** buf, size_t * msize return 1; } + int ret = 0; + if (cb != NULL) { - int ret = (*cb) (p->in, buf, msize); - if (ret != 0) - return ret; + ret = (*cb) (p->in, buf, msize); } else { - int ret = file_read (p->in, buf, msize); + ret = file_read (p->in, buf, msize); if (ret != 0) { p->in = NULL; - return ret; } } - if (ER_FILE_CLOSE == file_close (p->in)) { fprintf (stderr, "err closing the file %s\n", p->path_in); } diff --git a/lib/pubsubd.c b/lib/pubsubd.c index 5fce9c7..034fd38 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -390,6 +390,8 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale char chan[BUFSIZ]; memset (chan, 0, BUFSIZ); + printf ("INIT: %s\n", buf); + for (str = buf, i = 1; ; str = NULL, i++) { token = strtok_r(str, " ", &saveptr); if (token == NULL) @@ -417,7 +419,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale case 5 : { if (ale->action != PUBSUB_QUIT) memcpy (chan, token, (strlen (token) < BUFSIZ) ? - strlen (token) : BUFSIZ); + strlen (token) -1 : BUFSIZ); break; } } @@ -465,6 +467,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale return 0; } +#if 0 // TODO CBOR int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) { @@ -550,6 +553,8 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) return 0; } +#endif + // alh from the channel, message to send void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) { @@ -570,14 +575,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg void pubsubd_msg_print (const struct pubsub_msg *msg) { - if (msg == NULL) { - return; - } - - printf ("\t\t\033[36mMessage\033[00m\n"); - printf ("\t\ttype %d\n", msg->type); - printf ("\t\tchan %s\n", msg->chan); - printf ("\t\tdata %s\n", msg->data); + printf ("msg: type=%d chan=%s, data=%s\n" + , msg->type, msg->chan, msg->data); } void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) @@ -585,7 +584,11 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) // read the message from the process size_t mlen = 0; char *buf = NULL; +#if 0 srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); +#else + srv_read (p, &buf, &mlen); +#endif pubsubd_msg_unserialize (m, buf, mlen); @@ -695,7 +698,11 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m) // read the message from the process size_t mlen = 0; char *buf = NULL; +#if 0 app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); +#else + app_read_cb (p, &buf, &mlen, NULL); +#endif pubsubd_msg_unserialize (m, buf, mlen); diff --git a/pubsub/Makefile b/pubsub/Makefile index 332cf7c..10ac542 100644 --- a/pubsub/Makefile +++ b/pubsub/Makefile @@ -19,7 +19,7 @@ $(TESTS): valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@) clean: - @-rm $(OBJECTS) + @-rm $(OBJECTS) *.o mrproper: clean @-rm $(EXEC) diff --git a/pubsub/msg-unserialize.c b/pubsub/msg-unserialize.c index 8596ebe..a4dad4f 100644 --- a/pubsub/msg-unserialize.c +++ b/pubsub/msg-unserialize.c @@ -13,13 +13,6 @@ void usage (char **argv) printf ( "usage: cat msg | %s\n", argv[0]); } -void msg_print (struct pubsub_msg *msg) { - printf ("msg: type=%d chan=%.*s, data=%.*s\n" - , msg->type - , msg->chanlen, msg->chan - , msg->datalen, msg->data); -} - int main(int argc, char **argv) { @@ -36,7 +29,7 @@ main(int argc, char **argv) struct pubsub_msg msg; pubsubd_msg_unserialize (&msg, data, len); - msg_print (&msg); + pubsubd_msg_print (&msg); pubsubd_msg_free (&msg); return EXIT_SUCCESS; diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c index bfbb9fc..7fcfa0d 100644 --- a/pubsub/pubsub-test-send-params.c +++ b/pubsub/pubsub-test-send-params.c @@ -41,19 +41,26 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in struct pubsub_msg m; memset (&m, 0, sizeof (struct pubsub_msg)); - // first message, "coucou" - m.type = PUBSUB_TYPE_INFO; - m.chan = malloc (strlen (chan) + 1); - memset (m.chan, 0, strlen (chan) + 1); - m.chan[strlen (chan)] = '\0'; - m.chanlen = strlen (chan); - m.data = malloc (strlen (MYMESSAGE) + 1); - memset (m.data, 0, strlen (MYMESSAGE) + 1); - strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1); - m.datalen = strlen (MYMESSAGE); + if (strcmp (cmd, "pub") == 0) { + // first message, "coucou" + m.type = PUBSUB_TYPE_MESSAGE; + m.chan = malloc (strlen (chan) + 1); + memset (m.chan, 0, strlen (chan) + 1); + m.chan[strlen (chan)] = '\0'; + m.chanlen = strlen (chan); - printf ("send message\n"); - pubsub_msg_send (&p, &m); + m.data = malloc (strlen (MYMESSAGE) + 1); + memset (m.data, 0, strlen (MYMESSAGE) + 1); + strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1); + m.datalen = strlen (MYMESSAGE); + + printf ("send message\n"); + pubsub_msg_send (&p, &m); + } + else { + pubsub_msg_recv (&p, &m); + pubsubd_msg_print (&m); + } // free everything pubsubd_msg_free (&m); diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index cfff9e2..21e9bad 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -23,13 +23,36 @@ void * pubsubd_worker_thread (void *params) return NULL; } + while (1) { + struct pubsub_msg m; + memset (&m, 0, sizeof (struct pubsub_msg)); + + sleep (5); // TODO DEBUG + printf ("MESSAGE : "); + pubsubd_msg_recv (wp->ale->p, &m); + + pubsubd_msg_print (&m); + + if (m.type == PUBSUB_TYPE_DISCONNECT) { + if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) { + fprintf (stderr, "err : subscriber not registered\n"); + } + break; + } + else { + struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan); + pubsubd_msg_send (chan->alh, &m); + } + } + +#if 0 while (1) { // each chan has a list of subscribers // someone who only push a msg doesn't need to be registered - if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { + // if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { + if (wp->ale->action == PUBSUB_PUB) { // publish a message - printf ("publish or publish and subscribe to chan %s\n" - , wp->chan->chan); + printf ("publish to chan %s\n", wp->chan->chan); struct pubsub_msg m; memset (&m, 0, sizeof (struct pubsub_msg)); @@ -65,6 +88,7 @@ void * pubsubd_worker_thread (void *params) break; } } +#endif pubsubd_app_list_elm_free (wp->ale);