From 52cdc695dfdbb1401c7ecd08886331a6c558d5ad Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Sat, 17 Sep 2016 22:57:32 +0200
Subject: [PATCH] pubsub: cbor messages, still /t/i/s cbor message to do
---
lib/communication.c | 5 +--
lib/communication.h | 1 +
lib/pubsub.c | 100 +++++++++++++++-----------------------------
lib/pubsub.h | 9 ++--
misc/msg.c | 2 +-
misc/readmsg.c | 50 ++++++++++++++++++++++
pubsub/Makefile | 2 +-
7 files changed, 93 insertions(+), 76 deletions(-)
create mode 100644 misc/readmsg.c
diff --git a/lib/communication.c b/lib/communication.c
index f9620c9..46d604e 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -18,11 +18,10 @@ int file_write (const char *path, const char *buf, size_t msize)
perror ("file_write");
return ER_FILE_OPEN;
}
- // TODO debug
- // printf("file_write: opened file %s\n", path);
-
+
int ret = 0;
int ret2 = 0;
+ printf ("%ld bytes to write\n", msize);
ret = write (fd, buf, msize);
if (ret <= 0) {
fprintf (stderr, "err: written %s\n", path);
diff --git a/lib/communication.h b/lib/communication.h
index f91dd3f..85e2f8d 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -4,6 +4,7 @@
#include
#include
#include
+#include
#include "process.h"
#include // unlink
diff --git a/lib/pubsub.c b/lib/pubsub.c
index c3a095e..42fc559 100644
--- a/lib/pubsub.c
+++ b/lib/pubsub.c
@@ -3,8 +3,6 @@
#include // strndup
-// MESSAGE, TODO CBOR
-
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
{
if (msg == NULL || data == NULL || len == NULL) {
@@ -12,40 +10,20 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
return;
}
- // msg: "type(1) chanlen(8) chan datalen(8) data
- if (msg->type == PUBSUB_TYPE_DISCONNECT) {
- *len = 1;
- if (*data != NULL) {
- free (*data);
- *data = NULL;
- }
- *data = malloc(*len);
- memset (*data, 0, *len);
- data[0][0] = msg->type;
- return;
- }
- else {
- // type + size chan + chan + size data + data
- *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
- }
+ /* Preallocate the map structure */
+ cbor_item_t * root = cbor_new_definite_map(1);
+ /* Add the content */
+ cbor_map_add(root, (struct cbor_pair) {
+ .key = cbor_move(cbor_build_uint8((unsigned char) msg->type)),
+ .value = cbor_move(cbor_build_bytestring((unsigned char*) msg->data, msg->datalen))
+ });
- if (*data != NULL) {
- free (*data);
- *data = NULL;
- }
- *data = malloc(*len);
- memset (*data, 0, *len);
-
- size_t i = 0;
-
- data[0][i] = msg->type; i++;
- memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
- memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen;
+ size_t buffer_size;
+ *len = cbor_serialize_alloc (root, (unsigned char **) data, &buffer_size);
+ cbor_decref(&root);
}
-void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
+void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
{
if (msg == NULL) {
fprintf (stderr
@@ -53,47 +31,36 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
return;
}
- if (data == NULL) {
+ if (buf == NULL) {
fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
+ , "\033[31merr: pubsubd_msg_unserialize, buf NULL\033[00m\n");
return;
}
- if (len > BUFSIZ) {
+ if (mlen > BUFSIZ) {
fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
- , len);
+ , "\033[31merr: pubsubd_msg_unserialize, mlen %ld\033[00m\n"
+ , mlen);
return;
}
- size_t i = 0;
- msg->type = data[i]; i++;
+ // CBOR reading, from buf to pubsub_msg structure
+ struct cbor_load_result result;
+ cbor_item_t * item = cbor_load ((unsigned char *) buf, mlen, &result);
- if (msg->type == PUBSUB_TYPE_DISCONNECT) {
- msg->chanlen = 0;
- msg->chan = NULL;
- msg->datalen = 0;
- msg->data = NULL;
- return ;
+ struct cbor_pair * pair = cbor_map_handle (item);
+ cbor_mutable_data *data = cbor_bytestring_handle (pair->value);
+
+ msg->type = cbor_get_uint8 (pair->key);
+ if (msg->type != PUBSUB_TYPE_DISCONNECT) {
+ msg->datalen = cbor_bytestring_length (pair->value);
+ msg->data = malloc (msg->datalen +1);
+ memset (msg->data, 0, msg->datalen +1);
+ memcpy (msg->data, data, msg->datalen);
}
- memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
- if (msg->chanlen > BUFSIZ) {
- fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
- return;
- }
- msg->chan = malloc (msg->chanlen +1);
- memset (msg->chan, 0, msg->chanlen +1);
- memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
-
- memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
- if (msg->datalen > BUFSIZ) {
- fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
- return;
- }
- msg->data = malloc (msg->datalen +1);
- memset (msg->data, 0, msg->datalen +1);
- memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
+ /* Deallocate the result */
+ cbor_decref (&item);
}
void pubsubd_msg_free (struct pubsub_msg *msg)
@@ -193,15 +160,13 @@ void pubsubd_quit (struct service *srv)
void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
{
- char *buf = NULL;
size_t msize = 0;
+ char * buf = NULL;
pubsubd_msg_serialize (m, &buf, &msize);
app_write (p, buf, msize);
- if (buf != NULL) {
- free (buf);
- }
+ free(buf);
}
void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
@@ -218,4 +183,5 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
if (buf != NULL) {
free (buf);
}
+
}
diff --git a/lib/pubsub.h b/lib/pubsub.h
index 65aa3ef..34fc210 100644
--- a/lib/pubsub.h
+++ b/lib/pubsub.h
@@ -5,10 +5,11 @@
#include "process.h"
#include "queue.h"
-#define PUBSUB_TYPE_DISCONNECT 1 << 0
-#define PUBSUB_TYPE_INFO 1 << 1
-#define PUBSUB_TYPE_DEBUG 1 << 2
-#define PUBSUB_TYPE_MESSAGE 1 << 3
+#define PUBSUB_TYPE_DISCONNECT 0
+#define PUBSUB_TYPE_MESSAGE 1
+#define PUBSUB_TYPE_ERROR 2
+#define PUBSUB_TYPE_DEBUG 4
+#define PUBSUB_TYPE_INFO 128
#define PUBSUB_SERVICE_NAME "pubsub"
diff --git a/misc/msg.c b/misc/msg.c
index d978013..4452934 100644
--- a/misc/msg.c
+++ b/misc/msg.c
@@ -1,5 +1,5 @@
#include "../lib/pubsubd.h"
-#include "cbor.h"
+#include
#include
#include
diff --git a/misc/readmsg.c b/misc/readmsg.c
new file mode 100644
index 0000000..4b1cb4a
--- /dev/null
+++ b/misc/readmsg.c
@@ -0,0 +1,50 @@
+#include "../lib/pubsubd.h"
+#include
+#include
+#include
+#include
+
+#define PKT_CLOSE 0
+#define PKT_MSG 1
+#define PKT_ERROR 2
+
+void usage (char **argv) {
+ printf ("usage: echo something | msg | %s\n", argv[0]);
+}
+
+int main(int argc, char * argv[])
+{
+ if (argc == 2 && strcmp ("-h", argv[1]) == 0) {
+ usage (argv);
+ exit (1);
+ }
+
+ // read the message from the process
+ size_t mlen = 0;
+ unsigned char buf[BUFSIZ];
+ mlen = read (0, buf, BUFSIZ);
+
+ /* Assuming `buffer` contains `info.st_size` bytes of input data */
+ struct cbor_load_result result;
+ cbor_item_t * item = cbor_load (buf, mlen, &result);
+
+ /* Pretty-print the result */
+ cbor_describe(item, stdout);
+ fflush(stdout);
+
+ struct cbor_pair * pair = cbor_map_handle (item);
+ cbor_mutable_data *data = cbor_bytestring_handle (pair->value);
+
+ size_t datalen = cbor_bytestring_length (pair->value);
+ char *bstr = malloc (datalen +1);
+ memset (bstr, 0, datalen +1);
+ memcpy (bstr, data, datalen);
+
+ printf ("msg data (%ld bytes): %s\n", datalen, bstr);
+
+ /* Deallocate the result */
+ cbor_decref (&item);
+
+ free (bstr);
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/Makefile b/pubsub/Makefile
index c30898e..4da897d 100644
--- a/pubsub/Makefile
+++ b/pubsub/Makefile
@@ -10,7 +10,7 @@ TESTS=$(addsuffix .test, $(EXEC))
all: $(SOURCES) $(EXEC)
$(EXEC): $(OBJECTS) $(CFILES)
- $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@.bin
+ $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -lcbor -o $@.bin
.c.o:
$(CC) -c $(CFLAGS) $< -o $@