From 539d22a72e3da9e08aa7d92c32831e0a5dd63d69 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Tue, 7 Jun 2016 11:45:21 +0200 Subject: [PATCH] COMPILE \o/ --- lib/communication.c | 41 ++++++++++++++-- lib/communication.h | 7 ++- lib/pubsubd.c | 111 +++++++++++++++++++++++++++++++++++++++++--- lib/pubsubd.h | 19 ++++++-- pingpong/pingpong.c | 7 ++- pubsub/pubsubd.c | 10 ++-- 6 files changed, 172 insertions(+), 23 deletions(-) diff --git a/lib/communication.c b/lib/communication.c index bd171f5..5f041ef 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -85,7 +85,22 @@ int srv_close (struct service *srv) return 0; } -int srv_get_new_process (struct process *p, const struct service *srv) +// only get a raw line from TMPDIR/ +int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize) +{ + *buf = malloc(BUFSIZ); + bzero (*buf, BUFSIZ); + + FILE * f = fopen (srv->spath, "r"); + fgets (*buf, BUFSIZ, f); + fclose (f); + + *msize = strlen (*buf); + + return 0; +} + +int srv_get_new_process (const struct service *srv, struct process *p) { if (srv->spath == NULL) { return -1; @@ -135,15 +150,33 @@ int srv_get_new_process (struct process *p, const struct service *srv) srv_process_gen (p, pid, index, version); - return 1; + return 0; } -int srv_read (struct process *p, void * buf, size_t * msize) +int srv_read_cb (struct process *p, char ** buf, size_t * msize + , int (*cb)(FILE *f, char ** buf, size_t * msize)) { if (file_open (&p->out, p->path_out, "rb")) return 1; - *msize = fread (buf, 1, *msize, p->out); // FIXME check errors + if (cb != NULL) + (*cb) (p->out, buf, msize); + else + *msize = fread (*buf, 1, *msize, p->out); // FIXME check errors + // printf ("DEBUG read, size %ld : %s\n", *msize, *buf); + + if (file_close (p->out)) + return 1; + + return 0; +} + +int srv_read (struct process *p, char ** buf, size_t * msize) +{ + if (file_open (&p->out, p->path_out, "rb")) + return 1; + + *msize = fread (*buf, 1, *msize, p->out); // FIXME check errors // printf ("DEBUG read, size %ld : %s\n", *msize, buf); if (file_close (p->out)) diff --git a/lib/communication.h b/lib/communication.h index 2234c96..39163d0 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -25,7 +25,8 @@ struct service { void srv_init (struct service *srv, const char *sname); -int srv_get_new_process (struct process *proc, const struct service *srv); +int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize); +int srv_get_new_process (const struct service *srv, struct process *proc); /* * returns @@ -37,7 +38,9 @@ int srv_get_new_process (struct process *proc, const struct service *srv); int srv_create (struct service *srv); int srv_close (struct service *srv); -int srv_read (struct process *, void * buf, size_t *); +int srv_read_cb (struct process *p, char ** buf, size_t * msize + , int (*cb)(FILE *f, char ** buf, size_t * msize)); +int srv_read (struct process *, char ** buf, size_t *); int srv_write (struct process *, void * buf, size_t); // APPLICATION diff --git a/lib/pubsubd.c b/lib/pubsubd.c index a1e3994..c2ef092 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -177,16 +177,115 @@ void pubsubd_msg_free (struct pubsub_msg *msg) // COMMUNICATION -void pubsubd_msg_send (struct service *s, struct pubsub_msg * m, struct process *p) +int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale) +{ + if (ale->p != NULL) { + free (ale->p); + } + + ale->p = malloc (sizeof (struct process)); + + char *buf; + size_t msize; + srv_get_listen_raw (srv, &buf, &msize); + + // parse pubsubd init msg (sent in TMPDIR/) + // + // line fmt : pid index version chan action + // action : pub | sub + + size_t i; + char *str, *token, *saveptr; + + pid_t pid; + int index; + int version; + + for (str = buf, i = 1; ; str = NULL, i++) { + token = strtok_r(str, " ", &saveptr); + if (token == NULL) + break; + + switch (i) { + case 1 : pid = strtoul(token, NULL, 10); break; + case 2 : index = strtoul(token, NULL, 10); break; + case 3 : version = strtoul(token, NULL, 10); break; + case 4 : memcpy (ale->chan, token, strlen (token)); break; + case 5 : { + if (strncmp("pub", token, 3) == 0) { + ale->action = 0; + } + else if (strncmp("sub", token, 3) == 0) { + ale->action = 1; + } + else { + ale->action = 2; // both + } + } + } + } + + srv_process_gen (ale->p, pid, index, version); + ale->chanlen = strlen (ale->chan); + + return 0; +} + +// TODO CBOR +int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) +{ + // msg: "type(1) chanlen(8) chan datalen(8) data + + // read + char type; + fread (&type, 1, 1, f); + + size_t chanlen; + fread (&chanlen, sizeof (size_t), 1, f); + + char *chan = malloc (chanlen); + fread (chan, chanlen, 1, f); + + size_t datalen; + fread (&datalen, sizeof (size_t), 1, f); + + char *data = malloc (datalen); + fread (data, datalen, 1, f); + + *msize = 1 + chanlen; + *buf = malloc(*msize); + + // TODO CHECK THIS + size_t i = 0; + + char *cbuf = *buf; + + cbuf[i] = type; i++; + memcpy (&cbuf[i], &chanlen, sizeof(size_t)); i += sizeof(size_t); + memcpy (&cbuf[i], chan, chanlen); i += chanlen; + memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t); + memcpy (&cbuf[i], data, datalen); i += datalen; + return 0; +} + +void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) { } -void pubsubd_msg_recv (struct service *s, struct pubsub_msg * m, struct process *p) +void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) +{ + // read the message from the process + size_t mlen; + char *buf; + srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); + + pubsubd_msg_unserialize (m, buf, mlen); + free (buf); +} + +void pubsub_msg_send (const struct service *s, const struct pubsub_msg * m) { } -void pubsub_msg_send (struct service *s, struct pubsub_msg * m) -{ -} -void pubsub_msg_recv (struct service *s, struct pubsub_msg * m) +void pubsub_msg_recv (const struct service *s, struct pubsub_msg * m) { } diff --git a/lib/pubsubd.h b/lib/pubsubd.h index 0a6e149..fca4715 100644 --- a/lib/pubsubd.h +++ b/lib/pubsubd.h @@ -8,6 +8,12 @@ #define PUBSUB_SERVICE_NAME "pubsub" +struct channel; +struct channels; +struct app_list_head; +struct app_list_elm; +struct pubsub_msg; + struct pubsub_msg { unsigned char *chan; size_t chanlen; @@ -20,11 +26,13 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len); void pubsubd_msg_free (struct pubsub_msg *msg); -void pubsubd_msg_send (struct service *, struct pubsub_msg *msg, struct process *p); -void pubsubd_msg_recv (struct service *, struct pubsub_msg *msg, struct process *p); +int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale); +int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize); +void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); +void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m); -void pubsub_msg_send (struct service *, struct pubsub_msg *msg); -void pubsub_msg_recv (struct service *, struct pubsub_msg *msg); +void pubsub_msg_send (const struct service *, const struct pubsub_msg *msg); +void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg); // CHANNEL @@ -53,6 +61,9 @@ LIST_HEAD(app_list_head, app_list_elm); // element of the list struct app_list_elm { struct process *p; + char chan[BUFSIZ]; + size_t chanlen; + char action; // 0 : pub, 1 : sub, 2 : both LIST_ENTRY(app_list_elm) entries; }; diff --git a/pingpong/pingpong.c b/pingpong/pingpong.c index 98a6450..b01c377 100644 --- a/pingpong/pingpong.c +++ b/pingpong/pingpong.c @@ -19,7 +19,7 @@ void main_loop (const struct service *srv) while (cnt--) { // -1 : error, 0 = no new process, 1 = new process - ret = srv_get_new_process (&proc, srv); + ret = srv_get_new_process (srv, &proc); if (ret == -1) { fprintf (stderr, "error service_get_new_process\n"); continue; @@ -33,8 +33,7 @@ void main_loop (const struct service *srv) // about the message size_t msize = BUFSIZ; - char buf[BUFSIZ]; - bzero(buf, BUFSIZ); + char *buf; // printf ("before read\n"); if ((ret = srv_read (&proc, &buf, &msize))) { @@ -45,7 +44,7 @@ void main_loop (const struct service *srv) printf ("read, size %ld : %s\n", msize, buf); // printf ("before proc write\n"); - if ((ret = srv_write (&proc, &buf, msize))) { + if ((ret = srv_write (&proc, buf, msize))) { fprintf(stdout, "error service_write %d\n", ret); continue; } diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index a8e38a5..9e08ab1 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -18,13 +18,17 @@ main(int argc, char* argv[]) if (srv_create (&srv)) ohshit(1, "service_create error"); + // init chans list struct channels chans; pubsubd_channels_init (&chans); for (;;) { - struct process proc; - srv_get_new_process (&proc, &srv); - process_print (&proc); + // for each new process + struct app_list_elm ale; + pubsubd_get_new_process (&srv, &ale); + process_print (ale.p); + + // TODO thread to handle multiple clients at a time } // the application will shut down, and remove the service named pipe