diff --git a/lib/communication.c b/lib/communication.c index a25de14..e2b60e5 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -2,36 +2,37 @@ #include #include -int file_open (FILE **f, const char *path, const char *mode) +int file_write (const char *path, const char *buf, size_t msize) { - printf ("opening %s\n", path); - if (*f != NULL) { - printf ("f != NULL : %p\n", (void*) *f); - if (file_close (*f)) { - return ER_FILE_CLOSE; - } - } - *f = fopen (path, mode); - if (*f == NULL) { - fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path); + int fd = open (path, O_WRONLY); + if (fd <= 0) { return ER_FILE_OPEN; } - printf ("opened : %ld\n", (long) *f); + + int ret = 0; + ret = write (fd, buf, msize); - return 0; + close (fd); + + return ret; } -int file_close (FILE *f) +/*int file_read (const char *path, char **buf, size_t *msize) { +<<<<<<< HEAD if (f != 0) { if (fclose (f)) { return ER_FILE_CLOSE; } printf ("FILE_CLOSE: closed file\n"); +======= + int fd = open (path, O_RDONLY); + if (fd <= 0) { + return ER_FILE_OPEN; +>>>>>>> 99ca565bb6bf3729df3a1ec6ceeb2a58211751f5 } - return 0; -} +<<<<<<< HEAD int file_read (FILE *f, char **buf, size_t *msize) { size_t n =0; @@ -67,23 +68,53 @@ int file_read (FILE *f, char **buf, size_t *msize) { return ER_FILE_CLOSE; } return ER_FILE_READ; +======= + if (*buf == NULL) + *buf = malloc (BUFSIZ); + + int ret = 0; + int ret2 = 0; + ret = read (fd, *buf, BUFSIZ); + if (ret <= 0) { + fprintf (stderr, "err: read %s\n", path); + } + else { + *msize = ret; +>>>>>>> 99ca565bb6bf3729df3a1ec6ceeb2a58211751f5 } - return 0; -} + ret2 = close (fd); + if (ret2 < 0) { + fprintf (stderr, "err: close [err: %d] %s\n", ret2, path); + perror ("closing"); + } -int file_write (FILE *f, const char *buf, size_t msize) + return ret; +}*/ + +int file_read (const char *path, char **buf, size_t *msize) { - if (0 == fwrite (buf, msize, 1, f)) { - fprintf (stderr, "err writing in the file\n"); - if (ER_FILE_CLOSE == file_close (f)) { - fprintf (stderr, "err closing the file\n"); - return ER_FILE_CLOSE; - } - return ER_FILE_WRITE; + int fd = open (path, O_RDONLY); + if (fd <= 0) { + return ER_FILE_OPEN; } - - return 0; + if (*buf == NULL) + *buf = malloc (BUFSIZ); + int ret = 0; + int ret2 = 0; + ret = read (fd, *buf, BUFSIZ); + if (ret <= 0) { + fprintf (stderr, "err: read %s\n", path); + } + else { + *msize = ret; + } + ret2 = close (fd); + if (ret2 < 0) { + fprintf (stderr, "err: close [err: %d] %s\n", ret2, path); + perror ("closing"); + } + return ret; } int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname)) @@ -164,72 +195,20 @@ int srv_close (struct service *srv) return 0; } -// only get a raw line from TMPDIR/ -int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize) -{ - *buf = malloc(BUFSIZ); - memset (*buf, 0, BUFSIZ); - - FILE * f = NULL; - if (file_open (&f, srv->spath, "rb")) { - return ER_FILE_OPEN; - } - - char *ret = NULL; - ret = fgets (*buf, BUFSIZ, f); - if (ret == NULL) { - return ER_FILE_READ; - } - buf[0][BUFSIZ -1] = '\0'; - - if (file_close (f)) { - return ER_FILE_CLOSE; - } - - *msize = strlen (*buf); - - return 0; -} - int srv_get_new_process (const struct service *srv, struct process *p) { if (srv->spath == NULL) { return -1; } - char buf[BUFSIZ]; - memset (buf, 0, BUFSIZ); - - // read the pipe, get a process to work on - struct timespec ts = { 0 }; - struct timespec ts2 = { 0 }; - - FILE * f = NULL; - if (file_open (&f, srv->spath, "rb")) { - return ER_FILE_OPEN; + char *buf = NULL; + size_t msize = 0; + int ret = file_read (srv->spath, &buf, &msize); + if (ret <= 0) { + fprintf (stderr, "err: listening on %s\n", srv->spath); + exit (1); } - clock_gettime(CLOCK_REALTIME, &ts); - - char *ret = NULL; - ret = fgets (buf, BUFSIZ, f); - if (ret == NULL) { - if (file_close (f)) { - return ER_FILE_CLOSE; - } - return ER_FILE_READ; - } - - clock_gettime(CLOCK_REALTIME, &ts2); - if (file_close (f)) { - return ER_FILE_CLOSE; - } - - printf("sec: %ld nsec: %ld\n", ts.tv_sec, ts.tv_nsec); - printf("sec: %ld nsec: %ld\n", ts2.tv_sec, ts2.tv_nsec); - - printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec); - char *token = NULL, *saveptr = NULL; char *str = NULL; int i = 0; @@ -254,93 +233,22 @@ int srv_get_new_process (const struct service *srv, struct process *p) } } - // printf("pid = %d, index = %d, version = %d \n",pid, index, version ); + + if (buf != NULL) + free (buf); srv_process_gen (p, pid, index, version); return 1; } -int srv_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)) -{ - if (file_open (&p->out, p->path_out, "rb")) { - fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n"); - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - } - return ER_FILE_OPEN; - } - - if (cb != NULL) { - int ret = (*cb) (p->out, buf, msize); - if (ret != 0) - return ret; - } - else { - int ret = file_read (p->out, buf, msize); - if (ret != 0) { - return ret; - } - } - // printf ("DEBUG read, size %ld : %s\n", *msize, *buf); - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - } - p->out = NULL; - - return 0; -} - int srv_read (struct process *p, char ** buf, size_t * msize) { - if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "rb")) { - fprintf (stderr, "err opening the file %s\n", p->path_out); - return ER_FILE_OPEN; - } - - int ret = file_read (p->out, buf, msize); - if (ret != 0) { - p->out = NULL; - return ret; - } - - // printf ("DEBUG read, size %ld : %s\n", *msize, buf); - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - return ER_FILE_CLOSE; - } - p->out = NULL; - - return 0; + return file_read (p->path_out, buf, msize); } int srv_write (struct process *p, char * buf, size_t msize) { - if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "wb")) { - fprintf (stderr, "err opening the file %s\n", p->path_in); - return ER_FILE_OPEN; - } - - int ret = file_write (p->in, buf, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the file %s\n", p->path_in); - p->in = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - p->in = NULL; - return ER_FILE_CLOSE; - } - p->in = NULL; - - return 0; + return file_write (p->path_in, buf, msize); } // APPLICATION @@ -349,27 +257,10 @@ int srv_write (struct process *p, char * buf, size_t msize) int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize) { if (srv == NULL) { - return 1; + return -1; } - - FILE * f = NULL; - if (ER_FILE_OPEN == file_open (&f, srv->spath, "wb")) { - fprintf (stderr, "err opening the service file %s\n", srv->spath); - return ER_FILE_OPEN; - } - - int ret = file_write (f, connectionstr, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the service file %s\n", srv->spath); - return ret; - } - - if (ER_FILE_CLOSE == file_close (f)) { - fprintf (stderr, "err closing the file\n"); - return ER_FILE_CLOSE; - } - - return 0; + + return file_write (srv->spath, connectionstr, msize); } int app_create (struct process *p, pid_t pid, int index, int version) @@ -458,84 +349,12 @@ int app_destroy (struct process *p) return 0; } -int app_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)) -{ - if (file_open (&p->in, p->path_in, "rb")) { - fprintf (stderr, "\033[31merr: app_read_cb, file_open\033[00m\n"); - p->in = NULL; - return 1; - } - - if (cb != NULL) { - int ret = (*cb) (p->in, buf, msize); - if (ret != 0) - return ret; - } - else { - int ret = file_read (p->in, buf, msize); - if (ret != 0) { - p->in = NULL; - return ret; - } - } - - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - } - p->in = NULL; - - return 0; -} - int app_read (struct process *p, char ** buf, size_t * msize) { - if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "rb")) { - fprintf (stderr, "err opening the file %s\n", p->path_in); - return ER_FILE_OPEN; - } - - int ret = file_read (p->in, buf, msize); - if (ret != 0) { - p->in = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - p->in = NULL; - return ER_FILE_CLOSE; - } - p->in = NULL; - - return 0; + return file_read (p->path_in, buf, msize); } int app_write (struct process *p, char * buf, size_t msize) { - if (buf == NULL) { - return ER_FILE_WRITE_PARAMS; - } - - if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) { - fprintf (stderr, "err opening the file %s\n", p->path_out); - return ER_FILE_OPEN; - } - - int ret = file_write (p->out, buf, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the file %s\n", p->path_out); - p->out = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - return ER_FILE_CLOSE; - } - p->out = NULL; - - return 0; + return file_write (p->path_out, buf, msize); } diff --git a/lib/communication.h b/lib/communication.h index 6dfbf1d..f91dd3f 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -37,7 +37,6 @@ int srv_init (int argc, char **argv, char **env , int (*cb)(int argc, char **argv, char **env , struct service *srv, const char *sname)); -int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize); int srv_get_new_process (const struct service *srv, struct process *proc); /* @@ -50,8 +49,6 @@ int srv_get_new_process (const struct service *srv, struct process *proc); int srv_create (struct service *srv); int srv_close (struct service *srv); -int srv_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)); int srv_read (struct process *, char ** buf, size_t *); int srv_write (struct process *, char * buf, size_t); @@ -63,15 +60,11 @@ int app_srv_connection (struct service *, const char *, size_t); int app_create (struct process *, pid_t pid, int index, int version); int app_destroy (struct process *); -int app_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)); int app_read (struct process *, char ** buf, size_t *); int app_write (struct process *, char * buf, size_t); // wrappers -int file_open (FILE **f, const char *path, const char *mode); -int file_close (FILE *f); -int file_read (FILE *f, char **buf, size_t *msize); -int file_write (FILE *f, const char *buf, size_t msize); +int file_read (const char *path, char **buf, size_t *msize); +int file_write (const char *path, const char *buf, size_t msize); #endif diff --git a/lib/process.c b/lib/process.c index acf75f6..426cd73 100644 --- a/lib/process.c +++ b/lib/process.c @@ -39,10 +39,15 @@ void srv_process_gen (struct process *p void srv_process_free (struct process * p) { // TODO nothing to do now + + snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index); + snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index); + } void srv_process_print (struct process *p) { if (p != NULL) - printf ("process %d : index %d\n", p->pid, p->index); + printf ("process %d : index %d, version %d\n" + , p->pid, p->index, p->version); } diff --git a/lib/process.h b/lib/process.h index 11bf64d..d707c81 100644 --- a/lib/process.h +++ b/lib/process.h @@ -19,11 +19,9 @@ struct process { unsigned int index; char path_in [PATH_MAX]; char path_out [PATH_MAX]; - FILE *in, *out; }; struct process * srv_process_copy (const struct process *p); -void srv_process_free (struct process * p); int srv_process_eq (const struct process *p1, const struct process *p2); diff --git a/lib/pubsubd.c b/lib/pubsubd.c index facc71f..d1619dd 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -59,18 +59,36 @@ struct channel * pubsubd_channel_copy (struct channel *c) memcpy (copy, c, sizeof(struct channel)); if (c->chan != NULL) { - // copy->chan = strndup (c->chan, c->chanlen); - copy->chan = malloc (BUFSIZ); - memcpy (copy->chan, c->chan, BUFSIZ); + copy->chan = malloc (c->chanlen); + memset (copy->chan, 0, c->chanlen); + memcpy (copy->chan, c->chan, c->chanlen); copy->chanlen = c->chanlen; } return copy; } +int pubsubd_channel_new (struct channel *c, const char * name) +{ + if (c == NULL) { + return 1; + } + + size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name); + + printf ("NAME : %s, SIZE : %ld\n", name, nlen); + + if (c->chan == NULL) + c->chan = malloc (nlen +1); + + memset (c->chan, 0, nlen +1); + memcpy (c->chan, name, nlen); + c->chanlen = nlen; + return 0; +} + void pubsubd_channel_free (struct channel * c) { - // TODO if (c == NULL) return; @@ -98,7 +116,8 @@ struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c) int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2) { - return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0); + return c1->chanlen == c2->chanlen && + strncmp (c1->chan, c2->chan, c1->chanlen) == 0; } // SUBSCRIBER @@ -114,7 +133,7 @@ void pubsubd_subscriber_init (struct app_list_head **chans) { void pubsubd_channels_print (const struct channels *chans) { - printf ("\033[36mmchannels\033[00m\n\n"); + printf ("\033[36mmchannels\033[00m\n"); if (chans == NULL) return ; @@ -130,13 +149,19 @@ void pubsubd_channel_print (const struct channel *c) if (c == NULL || c->chan == NULL) return; - printf ( "\033[32mchan %s\033[00m\n\t", c->chan); + if (c->chan == NULL) { + printf ( "\033[32mchan name not available\033[00m\n"); + } + else { + printf ( "\033[32mchan %s\033[00m\n", c->chan); + } if (c->alh == NULL) return; struct app_list_elm *ale = NULL; LIST_FOREACH(ale, c->alh, entries) { + printf ("\t"); srv_process_print (ale->p); } } @@ -230,7 +255,7 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel) { if (todel == NULL || todel->p == NULL) return; - srv_process_free (todel->p); + free (todel->p); } // MESSAGE, TODO CBOR @@ -339,16 +364,15 @@ void pubsubd_msg_free (struct pubsub_msg *msg) // COMMUNICATION -int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale +int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale , struct channels *chans, struct channel **c) { - if (srv == NULL || ale == NULL || chans == NULL) + if (spath == NULL || ale == NULL || chans == NULL) return -1; char *buf = NULL; size_t msize = 0; - srv_get_listen_raw (srv, &buf, &msize); - + file_read (spath, &buf, &msize); // parse pubsubd init msg (sent in TMPDIR/) // // line fmt : pid index version action chan @@ -361,9 +385,12 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale int index = 0; int version = 0; + // chan name char chan[BUFSIZ]; memset (chan, 0, BUFSIZ); + printf ("INIT: %s\n", buf); + for (str = buf, i = 1; ; str = NULL, i++) { token = strtok_r(str, " ", &saveptr); if (token == NULL) @@ -389,9 +416,11 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale break; } case 5 : { + // for the last element of the line + // drop the following \n if (ale->action != PUBSUB_QUIT) memcpy (chan, token, (strlen (token) < BUFSIZ) ? - strlen (token) : BUFSIZ); + strlen (token) -1 : BUFSIZ); break; } } @@ -418,16 +447,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale *c = malloc (sizeof (struct channel)); } - if (c[0]->chan != NULL) { - free (c[0]->chan); - c[0]->chan = NULL; - } - chan[BUFSIZ -1] = '\0'; - // c[0]->chan = strndup (chan, BUFSIZ); - c[0]->chan = malloc (BUFSIZ); - memcpy(c[0]->chan, chan, BUFSIZ); - c[0]->chanlen = strlen (chan); + pubsubd_channel_new (*c, chan); + struct channel *new_chan = NULL; new_chan = pubsubd_channel_get (chans, *c); @@ -446,91 +468,6 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale return 0; } -// TODO CBOR -int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) -{ - // msg: "type(1) chanlen(8) chan datalen(8) data - - printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n"); - - // read - char type = ' '; - if (0 == fread (&type, 1, 1, f)) { - return ER_FILE_READ; - } - - size_t chanlen = 0; - if (0 == fread (&chanlen, sizeof (size_t), 1, f)) { - return ER_FILE_READ; - } - - if (chanlen > BUFSIZ) { - return ER_FILE_READ; - } - - char *chan = NULL; - chan = malloc (chanlen); - - if (chan == NULL) { - return ER_MEM_ALLOC; - } - - if (0 == fread (chan, chanlen, 1, f)) { - return ER_FILE_READ; - } - - size_t datalen = 0; - if (0 == fread (&datalen, sizeof (size_t), 1, f)) { - free (chan); - return ER_FILE_READ; - } - - if (datalen > BUFSIZ) { - return 1; - } - - char *data = NULL; - data = malloc (datalen); - if (data == NULL) { - free (chan); - return ER_MEM_ALLOC; - } - - if (0 == fread (data, datalen, 1, f)) { - free (chan); - free (data); - return ER_FILE_READ; - } - - *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen; - if (*buf == NULL) { - *buf = malloc(*msize); - if (*buf == NULL) { - free (chan); - free (data); - return ER_MEM_ALLOC; - } - } - - // TODO CHECK THIS - size_t i = 0; - - char *cbuf = *buf; - - cbuf[i] = type; i++; - memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t); - memcpy (cbuf + i, chan, chanlen); i += chanlen; - memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t); - memcpy (cbuf + i, data, datalen); i += datalen; - - free (chan); - free (data); - - printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n"); - - return 0; -} - // alh from the channel, message to send void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) { @@ -551,14 +488,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg void pubsubd_msg_print (const struct pubsub_msg *msg) { - if (msg == NULL) { - return; - } - - printf ("\t\t\033[36mMessage\033[00m\n"); - printf ("\t\ttype %d\n", msg->type); - printf ("\t\tchan %s\n", msg->chan); - printf ("\t\tdata %s\n", msg->data); + printf ("msg: type=%d chan=%s, data=%s\n" + , msg->type, msg->chan, msg->data); } void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) @@ -566,7 +497,11 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) // read the message from the process size_t mlen = 0; char *buf = NULL; +#if 0 srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); +#else + srv_read (p, &buf, &mlen); +#endif pubsubd_msg_unserialize (m, buf, mlen); @@ -628,19 +563,8 @@ void pubsub_disconnect (struct process *p) pubsubd_msg_serialize (&m, &buf, &msize); int ret = app_write (p, buf, msize); - switch (ret) { - case ER_FILE_WRITE : - fprintf (stderr, "err: ER_FILE_WRITE\n"); - break; - case ER_FILE_WRITE_PARAMS : - fprintf (stderr, "err: ER_FILE_WRITE_PARAMS\n"); - break; - case ER_FILE_OPEN : - fprintf (stderr, "err: ER_FILE_OPEN\n"); - break; - case ER_FILE_CLOSE : - fprintf (stderr, "err: ER_FILE_CLOSE\n"); - break; + if (ret != (int) msize) { + fprintf (stderr, "err: can't disconnect\n"); } pubsubd_msg_free (&m); @@ -676,7 +600,7 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m) // read the message from the process size_t mlen = 0; char *buf = NULL; - app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); + app_read (p, &buf, &mlen); pubsubd_msg_unserialize (m, buf, mlen); diff --git a/lib/pubsubd.h b/lib/pubsubd.h index 30f5336..2dd3e63 100644 --- a/lib/pubsubd.h +++ b/lib/pubsubd.h @@ -35,7 +35,7 @@ void pubsubd_msg_print (const struct pubsub_msg *msg); // // line fmt : pid index version action chan // action : quit | pub | sub -int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale +int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale , struct channels *chans, struct channel **c); int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize); void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); @@ -60,6 +60,7 @@ struct channel { }; // simple channel +int pubsubd_channel_new (struct channel *c, const char *name); struct channel * pubsubd_channel_copy (struct channel *c); struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c); void pubsubd_channel_free (struct channel *c); diff --git a/pubsub/Makefile b/pubsub/Makefile index 6b95c76..10ac542 100644 --- a/pubsub/Makefile +++ b/pubsub/Makefile @@ -1,5 +1,5 @@ CC=gcc -CFLAGS=-Wall -g +CFLAGS=-Wall -g -Wextra LDFLAGS= -pthread CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change EXEC=$(basename $(wildcard *.c)) @@ -15,8 +15,11 @@ $(EXEC): $(OBJECTS) $(CFILES) .c.o: $(CC) -c $(CFLAGS) $< -o $@ +$(TESTS): + valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@) + clean: - -rm $(OBJECTS) + @-rm $(OBJECTS) *.o mrproper: clean - rm $(EXEC) + @-rm $(EXEC) diff --git a/pubsub/msg-serialize.c b/pubsub/msg-serialize.c new file mode 100644 index 0000000..bcaa165 --- /dev/null +++ b/pubsub/msg-serialize.c @@ -0,0 +1,48 @@ +#include "../lib/pubsubd.h" +#include +#include + +#define MESSAGE "coucou" +#define CHAN "chan1" + +void +ohshit(int rvalue, const char* str) { + fprintf (stderr, "\033[31merr: %s\033[00m\n", str); + exit (rvalue); +} + +void usage (char **argv) +{ + printf ( "usage: %s\n", argv[0]); +} + +int +main(int argc, char **argv) +{ + if (argc != 1) { + usage (argv); + exit (1); + } + + struct pubsub_msg msg; + memset (&msg, 0, sizeof (struct pubsub_msg)); + msg.type = PUBSUB_TYPE_MESSAGE; + msg.chan = malloc (strlen (CHAN) + 1); + strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1); + msg.chanlen = strlen (CHAN) + 1; + + msg.data = malloc (strlen (MESSAGE) + 1); + strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1); + msg.datalen = strlen (MESSAGE) + 1; + + char *data = NULL; + size_t len = 0; + pubsubd_msg_serialize (&msg, &data, &len); + pubsubd_msg_free (&msg); + + if ((int) len != write (1, data, len)) { + ohshit (1, "unable to write the data"); + } + + return EXIT_SUCCESS; +} diff --git a/pubsub/msg-unserialize.c b/pubsub/msg-unserialize.c new file mode 100644 index 0000000..a4dad4f --- /dev/null +++ b/pubsub/msg-unserialize.c @@ -0,0 +1,36 @@ +#include "../lib/pubsubd.h" +#include +#include + +void +ohshit(int rvalue, const char* str) { + fprintf (stderr, "\033[31merr: %s\033[00m\n", str); + exit (rvalue); +} + +void usage (char **argv) +{ + printf ( "usage: cat msg | %s\n", argv[0]); +} + +int +main(int argc, char **argv) +{ + + if (argc != 1) { + usage (argv); + exit (1); + } + + char data[BUFSIZ]; + memset (data, 0, BUFSIZ); + size_t len = read (0, data, BUFSIZ); + printf ("msg len %ld\n", len); + + struct pubsub_msg msg; + pubsubd_msg_unserialize (&msg, data, len); + pubsubd_msg_print (&msg); + pubsubd_msg_free (&msg); + + return EXIT_SUCCESS; +} diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c index 88fd4c8..b4c9d6a 100644 --- a/pubsub/pubsub-test-send-params.c +++ b/pubsub/pubsub-test-send-params.c @@ -17,7 +17,7 @@ void usage (char **argv) void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan) { - printf ("Simulate connnection : pid %d index %d version %d " + printf ("Simulate connection : pid %d index %d version %d " "cmd %s chan %s\n" , pid, index, version, cmd, chan ); @@ -29,9 +29,11 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in struct process p; memset (&p, 0, sizeof (struct process)); + printf ("app creation\n"); if (app_create (&p, pid, index, version)) // called by the application ohshit (1, "app_create"); + printf ("connection\n"); // send a message to warn the service we want to do something // line : pid index version action chan pubsub_connection (&srv, &p, PUBSUB_PUB, chan); @@ -39,27 +41,38 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in struct pubsub_msg m; memset (&m, 0, sizeof (struct pubsub_msg)); - // first message, "coucou" - m.type = PUBSUB_TYPE_INFO; - m.chan = malloc (strlen (chan) + 1); - m.chan[strlen (chan)] = '\0'; - m.chanlen = strlen (chan); - m.data = malloc (strlen (MYMESSAGE) + 1); - m.datalen = strlen (MYMESSAGE); - m.datalen = strlen (MYMESSAGE); - pubsub_msg_send (&p, &m); + if (strcmp (cmd, "pub") == 0) { + // first message, "coucou" + m.type = PUBSUB_TYPE_MESSAGE; + m.chan = malloc (strlen (chan) + 1); + memset (m.chan, 0, strlen (chan) + 1); + m.chan[strlen (chan)] = '\0'; + m.chanlen = strlen (chan); + + m.data = malloc (strlen (MYMESSAGE) + 1); + memset (m.data, 0, strlen (MYMESSAGE) + 1); + strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1); + m.datalen = strlen (MYMESSAGE); + + printf ("send message\n"); + pubsub_msg_send (&p, &m); + } + else { + pubsub_msg_recv (&p, &m); + pubsubd_msg_print (&m); + } // free everything pubsubd_msg_free (&m); + printf ("disconnection\n"); // disconnect from the server pubsub_disconnect (&p); + printf ("destroying app\n"); // the application will shut down, and remove the application named pipes if (app_destroy (&p)) ohshit (1, "app_destroy"); - - srv_process_free (&p); } void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version) @@ -78,8 +91,6 @@ void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, // send a message to disconnect // line : pid index version action chan pubsub_disconnect (&p); - - srv_process_free (&p); } int diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c index 976b76d..8aaeca9 100644 --- a/pubsub/pubsub-test-send.c +++ b/pubsub/pubsub-test-send.c @@ -55,7 +55,5 @@ main(int argc, char **argv, char **env) if (app_destroy (&p)) ohshit (1, "app_destroy"); - srv_process_free (&p); - return EXIT_SUCCESS; } diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index f8c6ffd..1c1984f 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -18,19 +18,46 @@ struct worker_params { void * pubsubd_worker_thread (void *params) { struct worker_params *wp = (struct worker_params *) params; + if (wp == NULL) { + fprintf (stderr, "error pubsubd_worker_thread : params NULL\n"); + return NULL; + } + while (1) { + struct pubsub_msg m; + memset (&m, 0, sizeof (struct pubsub_msg)); + + sleep (5); // TODO DEBUG + printf ("MESSAGE : "); + pubsubd_msg_recv (wp->ale->p, &m); + + pubsubd_msg_print (&m); + + if (m.type == PUBSUB_TYPE_DISCONNECT) { + if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) { + fprintf (stderr, "err : subscriber not registered\n"); + } + break; + } + else { + struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan); + pubsubd_msg_send (chan->alh, &m); + } + } + +#if 0 while (1) { // each chan has a list of subscribers // someone who only push a msg doesn't need to be registered - if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { + // if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { + if (wp->ale->action == PUBSUB_PUB) { // publish a message - printf ("publish or publish and subscribe to chan %s\n" - , wp->chan->chan); + printf ("publish to chan %s\n", wp->chan->chan); struct pubsub_msg m; memset (&m, 0, sizeof (struct pubsub_msg)); - sleep (5); + sleep (5); // TODO DEBUG pubsubd_msg_recv (wp->ale->p, &m); pubsubd_msg_print (&m); @@ -61,13 +88,14 @@ void * pubsubd_worker_thread (void *params) break; } } +#endif pubsubd_app_list_elm_free (wp->ale); pthread_exit (NULL); } - int +int main(int argc, char **argv, char **env) { struct service srv; @@ -89,7 +117,7 @@ main(int argc, char **argv, char **env) struct app_list_elm ale; memset (&ale, 0, sizeof (struct app_list_elm)); struct channel *chan = NULL; - pubsubd_get_new_process (&srv, &ale, &chans, &chan); + pubsubd_get_new_process (srv.spath, &ale, &chans, &chan); pubsubd_channels_print (&chans); // end the application diff --git a/pubsub/test-chan-lists.c b/pubsub/test-chan-lists.c new file mode 100644 index 0000000..826b64b --- /dev/null +++ b/pubsub/test-chan-lists.c @@ -0,0 +1,106 @@ +#include "../lib/pubsubd.h" +#include + +#define TEST_NAME "test-chan-lists" + +void +ohshit(int rvalue, const char* str) { + fprintf(stderr, "%s\n", str); + exit(rvalue); +} + +int +main(int argc, char **argv, char **env) +{ + struct service srv; + memset (&srv, 0, sizeof (struct service)); + srv_init (argc, argv, env, &srv, TEST_NAME, NULL); + printf ("Listening on %s.\n", srv.spath); + + // creates the service named pipe, that listens to client applications + if (srv_create (&srv)) + ohshit(1, "service_create error"); + + // init chans list + struct channels chans; + memset (&chans, 0, sizeof (struct channels)); + pubsubd_channels_init (&chans); + + // for each new process + // struct app_list_elm ale1; + // memset (&ale1, 0, sizeof (struct app_list_elm)); + + // warning : this is a local structure, not exactly the same in the prog. + struct channel chan; + memset (&chan, 0, sizeof (struct channel)); + pubsubd_channel_new (&chan, "coucou"); + + // to emulate + // pubsubd_get_new_process (&srv, &ale1, &chans, &chan); + + // FIRST CHAN TO BE ADDED + // search for the chan in channels, add it if not found + struct channel *new_chan = NULL; + new_chan = pubsubd_channel_get (&chans, &chan); + if (new_chan == NULL) { + new_chan = pubsubd_channels_add (&chans, &chan); + pubsubd_subscriber_init (&new_chan->alh); + } + else { + ohshit (2, "error : new chan, can't be found in channels yet"); + } + pubsubd_channel_free (&chan); + + printf ("print the channels, 1 chan\n"); + printf ("--\n"); + pubsubd_channels_print (&chans); + printf ("--\n"); + + // SAME CHAN, SHOULD NOT BE ADDED + pubsubd_channel_new (&chan, "coucou"); + // search for the chan in channels, add it if not found + new_chan = pubsubd_channel_get (&chans, &chan); + if (new_chan == NULL) { + ohshit (3, "error : same chan, shouldn't be added in channels"); + } + else { + printf ("already in the 'channels' structure\n"); + } + + printf ("print the channels, 1 chan\n"); + printf ("--\n"); + pubsubd_channels_print (&chans); + printf ("--\n"); + + // NEW CHAN, SHOULD BE ADDED + pubsubd_channel_new (&chan, "salut"); + // search for the chan in channels, add it if not found + new_chan = pubsubd_channel_get (&chans, &chan); + if (new_chan == NULL) { + new_chan = pubsubd_channels_add (&chans, &chan); + pubsubd_subscriber_init (&new_chan->alh); + } + else { + ohshit (4, "error : new chan, should be added in channels"); + } + pubsubd_channel_free (&chan); + + printf ("print the channels, 2 chans\n"); + printf ("--\n"); + pubsubd_channels_print (&chans); + printf ("--\n"); + + // end the application + pubsubd_channels_del_all (&chans); + + printf ("\nshould be empty now\n"); + printf ("--\n"); + pubsubd_channels_print (&chans); + printf ("--\n"); + + // the application will shut down, and remove the service named pipe + if (srv_close (&srv)) + ohshit (1, "service_close error"); + + return EXIT_SUCCESS; +} diff --git a/pubsub/test-gen-new-process.c b/pubsub/test-gen-new-process.c new file mode 100644 index 0000000..6999ac1 --- /dev/null +++ b/pubsub/test-gen-new-process.c @@ -0,0 +1,51 @@ +#include "../lib/pubsubd.h" +#include +#include + +void +ohshit(int rvalue, const char* str) { + fprintf(stderr, "%s\n", str); + exit(rvalue); +} + +void usage (char **argv) { + fprintf (stderr, "usage: %s path\n", argv[0]); + exit (1); +} + +int +main(int argc, char **argv) +{ + if (argc != 2) { + usage (argv); + } + + char *spath = argv[1]; + + printf ("Listening on %s.\n", spath); + + struct channels chans; + memset (&chans, 0, sizeof (struct channels)); + + for (int nb = 10, i = 0 ; nb > 0; i++, nb--) { + struct app_list_elm ale; + memset (&ale, 0, sizeof (struct app_list_elm)); + + struct channel chan; + memset (&chan, 0, sizeof (struct channel)); + struct channel *c = &chan; + + pubsubd_get_new_process (spath, &ale, &chans, &c); + + printf ("print the channels, %d chan\n", i); + printf ("--\n"); + pubsubd_channels_print (&chans); + printf ("--\n"); + printf ("still %d remaining processes\n", nb); + } + + pubsubd_channels_del_all (&chans); + + return EXIT_SUCCESS; +} + diff --git a/pubsub/test-gen-new-process.sh b/pubsub/test-gen-new-process.sh new file mode 100755 index 0000000..67d6699 --- /dev/null +++ b/pubsub/test-gen-new-process.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +for i in $(seq 1 10) +do + echo "${i} 1 1 pub chan${i}" > /tmp/ipc/gen + sleep 0.1 +done diff --git a/pubsub/test-pipe-read.c b/pubsub/test-pipe-read.c new file mode 100644 index 0000000..a9c330e --- /dev/null +++ b/pubsub/test-pipe-read.c @@ -0,0 +1,57 @@ +#include "../lib/pubsubd.h" +#include + +#define TEST_NAME "test-chan-lists" + +void +ohshit(int rvalue, const char* str) { + fprintf(stderr, "%s\n", str); + exit(rvalue); +} + +void usage (char **argv) { + fprintf (stderr, "usage: %s path times\n", argv[0]); + fprintf (stderr, "ex: %s /tmp/pipe 5 => you will read 5 times\n", argv[0]); + exit (1); +} + +int +main(int argc, char **argv) +{ + + if (argc != 3) + usage (argv); + + char *path = argv[1]; + int nb = atoi (argv[2]); + + printf ("Listening on %s %d times.\n", path, nb); + + char *buf = NULL; + size_t msize = 0; + + int ret = 0; + + while (nb--) { + ret = file_read (path, &buf, &msize); + if (ret <= 0) { + fprintf (stderr, "no msg"); + if (ret == ER_FILE_OPEN) { + fprintf (stderr, " ER_FILE_OPEN"); + } + fprintf (stderr, "\n"); + nb++; + continue; + } + if (msize > 0) { + printf ("msg size %ld\t", msize); + struct pubsub_msg m; + memset (&m, 0, sizeof (struct pubsub_msg)); + pubsubd_msg_unserialize (&m, buf, msize); + pubsubd_msg_print (&m); + pubsubd_msg_free (&m); + } + } + + return EXIT_SUCCESS; +} diff --git a/pubsub/test-pipe-write.c b/pubsub/test-pipe-write.c new file mode 100644 index 0000000..2be8e03 --- /dev/null +++ b/pubsub/test-pipe-write.c @@ -0,0 +1,57 @@ +#include "../lib/pubsubd.h" +#include + +#define CHAN "chan1" +#define MESSAGE "coucou" + +void +ohshit(int rvalue, const char* str) { + fprintf(stderr, "%s\n", str); + exit(rvalue); +} + +void usage (char **argv) { + fprintf (stderr, "usage: %s path times\n", argv[0]); + fprintf (stderr, "ex: %s /tmp/pipe 5 => you will write 5 times\n", argv[0]); + exit (1); +} + +int +main(int argc, char **argv) +{ + + if (argc != 3) + usage (argv); + + char *path = argv[1]; + int nb = atoi (argv[2]); + + printf ("Writing on %s %d times.\n", path, nb); + + char *buf = NULL; + size_t msize = 0; + + int ret = 0; + + while (nb--) { + struct pubsub_msg msg; + memset (&msg, 0, sizeof (struct pubsub_msg)); + msg.type = PUBSUB_TYPE_MESSAGE; + msg.chan = malloc (strlen (CHAN) + 1); + strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1); + msg.chanlen = strlen (CHAN) + 1; + msg.data = malloc (strlen (MESSAGE) + 1); + strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1); + msg.datalen = strlen (MESSAGE) + 1; + + pubsubd_msg_serialize (&msg, &buf, &msize); + pubsubd_msg_print (&msg); + pubsubd_msg_free (&msg); + ret = file_write (path, buf, msize); + if (ret != (int) msize) { + fprintf (stderr, "msg not written\n"); + } + } + + return EXIT_SUCCESS; +} diff --git a/pubsub/test-test-send-params.sh b/pubsub/test-test-send-params.sh new file mode 100755 index 0000000..dc0a980 --- /dev/null +++ b/pubsub/test-test-send-params.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# start pubsub-test-send alone, with some parameters +# then test it with this script + +REP=/tmp/ipc/ +PID=10 +INDEX=1 +VERSION=1 +ACTION="pub" +CHAN="chan1" + +if [ $# != 0 ] ; then + PID=$1 + shift +fi + +if [ $# != 0 ] ; then + INDEX=$1 + shift +fi + +if [ $# != 0 ] ; then + ACTION=$1 + shift +fi + +if [ $# != 0 ] ; then + CHAN=$1 + shift +fi + +echo "there should be a line in $REP/pubsub" +cat $REP/pubsub + +echo "" +echo "there should be something to read in $REP/${PID}-${INDEX}-out" +cat $REP/${PID}-${INDEX}-out | xxd