diff --git a/lib/communication.c b/lib/communication.c index 271e0ca..8faa685 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -1,32 +1,52 @@ #include "communication.h" #include #include +#include int file_write (const char *path, const char *buf, size_t msize) { + if (buf == NULL) { + fprintf (stderr, "file_write: buf == NULL\n"); + return -1; + } + + printf("file_write: path to open %s\n", path); int fd = open (path, O_WRONLY); if (fd <= 0) { + printf("file_write: fd < 0\n"); + perror ("file_write: "); return ER_FILE_OPEN; } + printf("file_write: opened file %s\n", path); int ret = 0; + int ret2 = 0; ret = write (fd, buf, msize); + if (ret <= 0) { + fprintf (stderr, "err: written %s\n", path); + } - close (fd); + ret2 = close (fd); + if (ret2 < 0) { + fprintf (stderr, "err: close [err: %d] %s\n", ret2, path); + perror ("closing"); + } return ret; } int file_read (const char *path, char **buf, size_t *msize) { - if (buf == NULL) + if (buf == NULL) { + fprintf (stderr, "file_read: buf == NULL\n"); return -1; + } int fd = open (path, O_RDONLY); if (fd <= 0) { return ER_FILE_OPEN; } - printf("FILE_READ: opened file %s\n", path); + printf("file_read: opened file %s\n", path); if (*buf == NULL) { *buf = malloc (BUFSIZ); @@ -49,7 +69,6 @@ int file_read (const char *path, char **buf, size_t *msize) perror ("closing"); } - return ret; } diff --git a/lib/process.c b/lib/process.c index 899decc..c5fb811 100644 --- a/lib/process.c +++ b/lib/process.c @@ -27,8 +27,8 @@ void srv_process_gen (struct process *p memset (p->path_in, 0, PATH_MAX); memset (p->path_out, 0, PATH_MAX); - snprintf(p->path_in , PATH_MAX, "%s%d-%d-in" , TMPDIR, pid, index); - snprintf(p->path_out, PATH_MAX, "%s%d-%d-out", TMPDIR, pid, index); + snprintf(p->path_in , PATH_MAX, "%s%d-%d-%d-in" , TMPDIR, pid, index, version); + snprintf(p->path_out, PATH_MAX, "%s%d-%d-%d-out", TMPDIR, pid, index, version); } diff --git a/lib/pubsubd.c b/lib/pubsubd.c index 19e339e..6315bb8 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -115,11 +115,11 @@ struct channel * pubsubd_channel_search (struct channels *chans, char *chan) struct channel * np = NULL; LIST_FOREACH(np, chans, entries) { // TODO debug - printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n" - , np->chan, np->chanlen, chan, strlen(chan)); + // printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n" + // , np->chan, np->chanlen, chan, strlen(chan)); if (np->chanlen == strlen (chan) && strncmp (np->chan, chan, np->chanlen) == 0) { - printf ("pubsubd_channel_search: FOUND\n"); + // printf ("pubsubd_channel_search: FOUND\n"); return np; } } @@ -156,6 +156,20 @@ void pubsubd_subscriber_init (struct app_list_head **chans) { LIST_INIT(*chans); } +void pubsubd_channel_print (const struct channel *chan) +{ + if (chan->chan == NULL) { + printf ("pubsubd_channels_print: chan->chan == NULL\n"); + } + + printf ( "\033[32mchan %s\033[00m\n", chan->chan); + + if (chan->alh == NULL) + printf ("pubsubd_channels_print: chan->alh == NULL\n"); + else + pubsubd_subscriber_print (chan->alh); +} + void pubsubd_channels_print (const struct channels *chans) { printf ("\033[36mmchannels\033[00m\n"); @@ -168,16 +182,7 @@ void pubsubd_channels_print (const struct channels *chans) struct channel *chan = NULL; LIST_FOREACH(chan, chans, entries) { - if (chan->chan == NULL) { - printf ("pubsubd_channels_print: chan->chan == NULL\n"); - } - - printf ( "\033[32mchan %s\033[00m\n", chan->chan); - - if (chan->alh == NULL) - printf ("pubsubd_channels_print: chan->alh == NULL\n"); - else - pubsubd_subscriber_print (chan->alh); + pubsubd_channel_print (chan); } } @@ -291,8 +296,10 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel) void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len) { - if (msg == NULL || data == NULL || len == NULL) + if (msg == NULL || data == NULL || len == NULL) { + fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL"); return; + } // msg: "type(1) chanlen(8) chan datalen(8) data if (msg->type == PUBSUB_TYPE_DISCONNECT) { @@ -506,12 +513,24 @@ int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale // alh from the channel, message to send void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) { + if (alh == NULL) { + fprintf (stderr, "pubsubd_msg_send: alh == NULL"); + return; + } + + if (m == NULL) { + fprintf (stderr, "pubsubd_msg_send: m == NULL"); + return; + } + struct app_list_elm * ale = NULL; char *buf = NULL; size_t msize = 0; pubsubd_msg_serialize (m, &buf, &msize); + printf ("\033[32mmsg to send : %.*s (%ld)\n", (int) msize, buf, msize); + LIST_FOREACH(ale, alh, entries) { srv_write (ale->p, buf, msize); } @@ -532,11 +551,13 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) // read the message from the process size_t mlen = 0; char *buf = NULL; + while (buf == NULL || mlen == 0) { #if 0 - srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); + srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); #else - srv_read (p, &buf, &mlen); + srv_read (p, &buf, &mlen); #endif + } pubsubd_msg_unserialize (m, buf, mlen); diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index 1c1984f..543da73 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -2,6 +2,8 @@ #include #include +#define NB_CLIENTS 3 + void ohshit(int rvalue, const char* str) { fprintf(stderr, "%s\n", str); @@ -17,33 +19,55 @@ struct worker_params { void * pubsubd_worker_thread (void *params) { + int s = 0; + + s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + if (s != 0) + printf ("pthread_setcancelstate: %d\n", s); + struct worker_params *wp = (struct worker_params *) params; if (wp == NULL) { fprintf (stderr, "error pubsubd_worker_thread : params NULL\n"); return NULL; } + struct channels *chans = wp->chans; + struct channel *chan = wp->chan; + struct app_list_elm *ale = wp->ale; + + free (wp); + + // main loop while (1) { struct pubsub_msg m; memset (&m, 0, sizeof (struct pubsub_msg)); - sleep (5); // TODO DEBUG - printf ("MESSAGE : "); - pubsubd_msg_recv (wp->ale->p, &m); - - pubsubd_msg_print (&m); + sleep (2); // TODO DEBUG + pubsubd_msg_recv (ale->p, &m); if (m.type == PUBSUB_TYPE_DISCONNECT) { - if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) { + printf ("process %d disconnecting...\n", ale->p->pid); + if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) { fprintf (stderr, "err : subscriber not registered\n"); } break; } else { - struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan); - pubsubd_msg_send (chan->alh, &m); + struct channel *ch = pubsubd_channel_search (chans, chan->chan); + if (ch == NULL) { + printf ("CHAN NON TROUVE\n"); + } + else { + printf ("Je dois dire :\n"); + pubsubd_msg_print (&m); + printf ("CHAN ET PROCESS À QUI JE DOIS COMMUNIQUER\n"); + pubsubd_channel_print (ch); + pubsubd_msg_send (ch->alh, &m); + } } } +#if 0 +#endif #if 0 while (1) { @@ -90,7 +114,7 @@ void * pubsubd_worker_thread (void *params) } #endif - pubsubd_app_list_elm_free (wp->ale); + pubsubd_app_list_elm_free (ale); pthread_exit (NULL); } @@ -112,7 +136,11 @@ main(int argc, char **argv, char **env) memset (&chans, 0, sizeof (struct channels)); pubsubd_channels_init (&chans); - for (;;) { + pthread_t *thr = NULL; + thr = malloc (sizeof (pthread_t) * NB_CLIENTS); + memset (thr, 0, sizeof (pthread_t) * NB_CLIENTS); + + for (int i = 0; i < NB_CLIENTS; i++) { // for each new process struct app_list_elm ale; memset (&ale, 0, sizeof (struct app_list_elm)); @@ -128,7 +156,6 @@ main(int argc, char **argv, char **env) srv_close (&srv); // TODO end the threads - exit (0); } @@ -139,14 +166,27 @@ main(int argc, char **argv, char **env) wp->chans = &chans; wp->chan = chan; - pthread_t thr = 0; - - pthread_create (&thr, NULL, pubsubd_worker_thread, wp); - pthread_detach (thr); + pthread_create (thr + i, NULL, pubsubd_worker_thread, wp); + pthread_detach (thr[i]); pubsubd_app_list_elm_free (&ale); } + sleep (10); + // stop threads + for (int i = 0 ; i < NB_CLIENTS ; i++) { + pthread_cancel (thr[i]); + void *ret = NULL; + pthread_join (thr[i], &ret); + if (ret != NULL) { + free (ret); + } + } + free (thr); + + printf ("QUIT\n"); + pubsubd_channels_del_all (&chans); + // the application will shut down, and remove the service named pipe if (srv_close (&srv)) ohshit (1, "service_close error"); diff --git a/pubsub/test-gen-new-process.sh b/pubsub/test-gen-new-process.sh index cc3fe87..fa5657a 100755 --- a/pubsub/test-gen-new-process.sh +++ b/pubsub/test-gen-new-process.sh @@ -5,6 +5,16 @@ SERVICE=gen #SERVICE=pubsub NB=10 +if [ $# -ge 1 ] ; then + SERVICE=$1 + shift +fi + +if [ $# -ge 1 ] ; then + NB=$1 + shift +fi + for i in $(seq 1 ${NB}) do mkfifo ${REP}/${i}-1-1-in