From fceb09a592d91e640d0b06fac2cbb661cc1eebe6 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Sat, 17 Sep 2016 14:49:10 +0200 Subject: [PATCH 1/2] pingpong.sh: update to add version in the pipe names --- pingpong/pingpong.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pingpong/pingpong.sh b/pingpong/pingpong.sh index 7d7eb22..96406f1 100755 --- a/pingpong/pingpong.sh +++ b/pingpong/pingpong.sh @@ -21,21 +21,21 @@ fi for pid in `seq 1 ${NB}` do # we make the application pipes - mkfifo ${REP}${pid}-1-in 2>/dev/null - mkfifo ${REP}${pid}-1-out 2>/dev/null + mkfifo ${REP}${pid}-1-1-in 2>/dev/null + mkfifo ${REP}${pid}-1-1-out 2>/dev/null # pid index version - echo "${pid} 1 5" > ${REP}${SERVICE} + echo "${pid} 1 1" > ${REP}${SERVICE} # the purpose is to send something in the pipe - cat /dev/urandom | base64 | head -n 1 > ${REP}${pid}-1-out + cat /dev/urandom | base64 | head -n 1 > ${REP}${pid}-1-1-out # echo "hello world" > ${REP}${pid}-1-out # the the service will answer with our message echo "pid : ${pid}" - cat ${REP}/${pid}-1-in + cat ${REP}/${pid}-1-1-in done - echo "clean rep" - rm ${REP}/*-in - rm ${REP}/*-out \ No newline at end of file +echo "clean rep" +rm ${REP}/*-in +rm ${REP}/*-out From 52cdc695dfdbb1401c7ecd08886331a6c558d5ad Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Sat, 17 Sep 2016 22:57:32 +0200 Subject: [PATCH 2/2] pubsub: cbor messages, still /t/i/s cbor message to do --- lib/communication.c | 5 +-- lib/communication.h | 1 + lib/pubsub.c | 100 +++++++++++++++----------------------------- lib/pubsub.h | 9 ++-- misc/msg.c | 2 +- misc/readmsg.c | 50 ++++++++++++++++++++++ pubsub/Makefile | 2 +- 7 files changed, 93 insertions(+), 76 deletions(-) create mode 100644 misc/readmsg.c diff --git a/lib/communication.c b/lib/communication.c index f9620c9..46d604e 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -18,11 +18,10 @@ int file_write (const char *path, const char *buf, size_t msize) perror ("file_write"); return ER_FILE_OPEN; } - // TODO debug - // printf("file_write: opened file %s\n", path); - + int ret = 0; int ret2 = 0; + printf ("%ld bytes to write\n", msize); ret = write (fd, buf, msize); if (ret <= 0) { fprintf (stderr, "err: written %s\n", path); diff --git a/lib/communication.h b/lib/communication.h index f91dd3f..85e2f8d 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "process.h" #include // unlink diff --git a/lib/pubsub.c b/lib/pubsub.c index c3a095e..42fc559 100644 --- a/lib/pubsub.c +++ b/lib/pubsub.c @@ -3,8 +3,6 @@ #include // strndup -// MESSAGE, TODO CBOR - void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len) { if (msg == NULL || data == NULL || len == NULL) { @@ -12,40 +10,20 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l return; } - // msg: "type(1) chanlen(8) chan datalen(8) data - if (msg->type == PUBSUB_TYPE_DISCONNECT) { - *len = 1; - if (*data != NULL) { - free (*data); - *data = NULL; - } - *data = malloc(*len); - memset (*data, 0, *len); - data[0][0] = msg->type; - return; - } - else { - // type + size chan + chan + size data + data - *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen; - } + /* Preallocate the map structure */ + cbor_item_t * root = cbor_new_definite_map(1); + /* Add the content */ + cbor_map_add(root, (struct cbor_pair) { + .key = cbor_move(cbor_build_uint8((unsigned char) msg->type)), + .value = cbor_move(cbor_build_bytestring((unsigned char*) msg->data, msg->datalen)) + }); - if (*data != NULL) { - free (*data); - *data = NULL; - } - *data = malloc(*len); - memset (*data, 0, *len); - - size_t i = 0; - - data[0][i] = msg->type; i++; - memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t); - memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen; - memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t); - memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen; + size_t buffer_size; + *len = cbor_serialize_alloc (root, (unsigned char **) data, &buffer_size); + cbor_decref(&root); } -void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len) +void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen) { if (msg == NULL) { fprintf (stderr @@ -53,47 +31,36 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l return; } - if (data == NULL) { + if (buf == NULL) { fprintf (stderr - , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n"); + , "\033[31merr: pubsubd_msg_unserialize, buf NULL\033[00m\n"); return; } - if (len > BUFSIZ) { + if (mlen > BUFSIZ) { fprintf (stderr - , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n" - , len); + , "\033[31merr: pubsubd_msg_unserialize, mlen %ld\033[00m\n" + , mlen); return; } - size_t i = 0; - msg->type = data[i]; i++; + // CBOR reading, from buf to pubsub_msg structure + struct cbor_load_result result; + cbor_item_t * item = cbor_load ((unsigned char *) buf, mlen, &result); - if (msg->type == PUBSUB_TYPE_DISCONNECT) { - msg->chanlen = 0; - msg->chan = NULL; - msg->datalen = 0; - msg->data = NULL; - return ; + struct cbor_pair * pair = cbor_map_handle (item); + cbor_mutable_data *data = cbor_bytestring_handle (pair->value); + + msg->type = cbor_get_uint8 (pair->key); + if (msg->type != PUBSUB_TYPE_DISCONNECT) { + msg->datalen = cbor_bytestring_length (pair->value); + msg->data = malloc (msg->datalen +1); + memset (msg->data, 0, msg->datalen +1); + memcpy (msg->data, data, msg->datalen); } - memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t); - if (msg->chanlen > BUFSIZ) { - fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n"); - return; - } - msg->chan = malloc (msg->chanlen +1); - memset (msg->chan, 0, msg->chanlen +1); - memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen; - - memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t); - if (msg->datalen > BUFSIZ) { - fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n"); - return; - } - msg->data = malloc (msg->datalen +1); - memset (msg->data, 0, msg->datalen +1); - memcpy (msg->data, data + i, msg->datalen); i += msg->datalen; + /* Deallocate the result */ + cbor_decref (&item); } void pubsubd_msg_free (struct pubsub_msg *msg) @@ -193,15 +160,13 @@ void pubsubd_quit (struct service *srv) void pubsub_msg_send (struct process *p, const struct pubsub_msg * m) { - char *buf = NULL; size_t msize = 0; + char * buf = NULL; pubsubd_msg_serialize (m, &buf, &msize); app_write (p, buf, msize); - if (buf != NULL) { - free (buf); - } + free(buf); } void pubsub_msg_recv (struct process *p, struct pubsub_msg *m) @@ -218,4 +183,5 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg *m) if (buf != NULL) { free (buf); } + } diff --git a/lib/pubsub.h b/lib/pubsub.h index 65aa3ef..34fc210 100644 --- a/lib/pubsub.h +++ b/lib/pubsub.h @@ -5,10 +5,11 @@ #include "process.h" #include "queue.h" -#define PUBSUB_TYPE_DISCONNECT 1 << 0 -#define PUBSUB_TYPE_INFO 1 << 1 -#define PUBSUB_TYPE_DEBUG 1 << 2 -#define PUBSUB_TYPE_MESSAGE 1 << 3 +#define PUBSUB_TYPE_DISCONNECT 0 +#define PUBSUB_TYPE_MESSAGE 1 +#define PUBSUB_TYPE_ERROR 2 +#define PUBSUB_TYPE_DEBUG 4 +#define PUBSUB_TYPE_INFO 128 #define PUBSUB_SERVICE_NAME "pubsub" diff --git a/misc/msg.c b/misc/msg.c index d978013..4452934 100644 --- a/misc/msg.c +++ b/misc/msg.c @@ -1,5 +1,5 @@ #include "../lib/pubsubd.h" -#include "cbor.h" +#include #include #include diff --git a/misc/readmsg.c b/misc/readmsg.c new file mode 100644 index 0000000..4b1cb4a --- /dev/null +++ b/misc/readmsg.c @@ -0,0 +1,50 @@ +#include "../lib/pubsubd.h" +#include +#include +#include +#include + +#define PKT_CLOSE 0 +#define PKT_MSG 1 +#define PKT_ERROR 2 + +void usage (char **argv) { + printf ("usage: echo something | msg | %s\n", argv[0]); +} + +int main(int argc, char * argv[]) +{ + if (argc == 2 && strcmp ("-h", argv[1]) == 0) { + usage (argv); + exit (1); + } + + // read the message from the process + size_t mlen = 0; + unsigned char buf[BUFSIZ]; + mlen = read (0, buf, BUFSIZ); + + /* Assuming `buffer` contains `info.st_size` bytes of input data */ + struct cbor_load_result result; + cbor_item_t * item = cbor_load (buf, mlen, &result); + + /* Pretty-print the result */ + cbor_describe(item, stdout); + fflush(stdout); + + struct cbor_pair * pair = cbor_map_handle (item); + cbor_mutable_data *data = cbor_bytestring_handle (pair->value); + + size_t datalen = cbor_bytestring_length (pair->value); + char *bstr = malloc (datalen +1); + memset (bstr, 0, datalen +1); + memcpy (bstr, data, datalen); + + printf ("msg data (%ld bytes): %s\n", datalen, bstr); + + /* Deallocate the result */ + cbor_decref (&item); + + free (bstr); + return EXIT_SUCCESS; +} diff --git a/pubsub/Makefile b/pubsub/Makefile index c30898e..4da897d 100644 --- a/pubsub/Makefile +++ b/pubsub/Makefile @@ -10,7 +10,7 @@ TESTS=$(addsuffix .test, $(EXEC)) all: $(SOURCES) $(EXEC) $(EXEC): $(OBJECTS) $(CFILES) - $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@.bin + $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -lcbor -o $@.bin .c.o: $(CC) -c $(CFLAGS) $< -o $@