pubsub: cbor messages, still /t/i/s cbor message to do
This commit is contained in:
parent
fceb09a592
commit
52cdc695df
@ -18,11 +18,10 @@ int file_write (const char *path, const char *buf, size_t msize)
|
|||||||
perror ("file_write");
|
perror ("file_write");
|
||||||
return ER_FILE_OPEN;
|
return ER_FILE_OPEN;
|
||||||
}
|
}
|
||||||
// TODO debug
|
|
||||||
// printf("file_write: opened file %s\n", path);
|
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
int ret2 = 0;
|
int ret2 = 0;
|
||||||
|
printf ("%ld bytes to write\n", msize);
|
||||||
ret = write (fd, buf, msize);
|
ret = write (fd, buf, msize);
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
fprintf (stderr, "err: written %s\n", path);
|
fprintf (stderr, "err: written %s\n", path);
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <cbor.h>
|
||||||
|
|
||||||
#include "process.h"
|
#include "process.h"
|
||||||
#include <unistd.h> // unlink
|
#include <unistd.h> // unlink
|
||||||
|
98
lib/pubsub.c
98
lib/pubsub.c
@ -3,8 +3,6 @@
|
|||||||
|
|
||||||
#include <string.h> // strndup
|
#include <string.h> // strndup
|
||||||
|
|
||||||
// MESSAGE, TODO CBOR
|
|
||||||
|
|
||||||
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
|
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
|
||||||
{
|
{
|
||||||
if (msg == NULL || data == NULL || len == NULL) {
|
if (msg == NULL || data == NULL || len == NULL) {
|
||||||
@ -12,40 +10,20 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// msg: "type(1) chanlen(8) chan datalen(8) data
|
/* Preallocate the map structure */
|
||||||
if (msg->type == PUBSUB_TYPE_DISCONNECT) {
|
cbor_item_t * root = cbor_new_definite_map(1);
|
||||||
*len = 1;
|
/* Add the content */
|
||||||
if (*data != NULL) {
|
cbor_map_add(root, (struct cbor_pair) {
|
||||||
free (*data);
|
.key = cbor_move(cbor_build_uint8((unsigned char) msg->type)),
|
||||||
*data = NULL;
|
.value = cbor_move(cbor_build_bytestring((unsigned char*) msg->data, msg->datalen))
|
||||||
}
|
});
|
||||||
*data = malloc(*len);
|
|
||||||
memset (*data, 0, *len);
|
|
||||||
data[0][0] = msg->type;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// type + size chan + chan + size data + data
|
|
||||||
*len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*data != NULL) {
|
size_t buffer_size;
|
||||||
free (*data);
|
*len = cbor_serialize_alloc (root, (unsigned char **) data, &buffer_size);
|
||||||
*data = NULL;
|
cbor_decref(&root);
|
||||||
}
|
|
||||||
*data = malloc(*len);
|
|
||||||
memset (*data, 0, *len);
|
|
||||||
|
|
||||||
size_t i = 0;
|
|
||||||
|
|
||||||
data[0][i] = msg->type; i++;
|
|
||||||
memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
|
|
||||||
memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
|
|
||||||
memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t);
|
|
||||||
memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
|
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
|
||||||
{
|
{
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
fprintf (stderr
|
fprintf (stderr
|
||||||
@ -53,47 +31,36 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data == NULL) {
|
if (buf == NULL) {
|
||||||
fprintf (stderr
|
fprintf (stderr
|
||||||
, "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
|
, "\033[31merr: pubsubd_msg_unserialize, buf NULL\033[00m\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (len > BUFSIZ) {
|
if (mlen > BUFSIZ) {
|
||||||
fprintf (stderr
|
fprintf (stderr
|
||||||
, "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
|
, "\033[31merr: pubsubd_msg_unserialize, mlen %ld\033[00m\n"
|
||||||
, len);
|
, mlen);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t i = 0;
|
// CBOR reading, from buf to pubsub_msg structure
|
||||||
msg->type = data[i]; i++;
|
struct cbor_load_result result;
|
||||||
|
cbor_item_t * item = cbor_load ((unsigned char *) buf, mlen, &result);
|
||||||
|
|
||||||
if (msg->type == PUBSUB_TYPE_DISCONNECT) {
|
struct cbor_pair * pair = cbor_map_handle (item);
|
||||||
msg->chanlen = 0;
|
cbor_mutable_data *data = cbor_bytestring_handle (pair->value);
|
||||||
msg->chan = NULL;
|
|
||||||
msg->datalen = 0;
|
|
||||||
msg->data = NULL;
|
|
||||||
return ;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
msg->type = cbor_get_uint8 (pair->key);
|
||||||
if (msg->chanlen > BUFSIZ) {
|
if (msg->type != PUBSUB_TYPE_DISCONNECT) {
|
||||||
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
|
msg->datalen = cbor_bytestring_length (pair->value);
|
||||||
return;
|
|
||||||
}
|
|
||||||
msg->chan = malloc (msg->chanlen +1);
|
|
||||||
memset (msg->chan, 0, msg->chanlen +1);
|
|
||||||
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
|
|
||||||
|
|
||||||
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
|
||||||
if (msg->datalen > BUFSIZ) {
|
|
||||||
fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
msg->data = malloc (msg->datalen +1);
|
msg->data = malloc (msg->datalen +1);
|
||||||
memset (msg->data, 0, msg->datalen +1);
|
memset (msg->data, 0, msg->datalen +1);
|
||||||
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
|
memcpy (msg->data, data, msg->datalen);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Deallocate the result */
|
||||||
|
cbor_decref (&item);
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsubd_msg_free (struct pubsub_msg *msg)
|
void pubsubd_msg_free (struct pubsub_msg *msg)
|
||||||
@ -193,15 +160,13 @@ void pubsubd_quit (struct service *srv)
|
|||||||
|
|
||||||
void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
|
void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
|
||||||
{
|
{
|
||||||
char *buf = NULL;
|
|
||||||
size_t msize = 0;
|
size_t msize = 0;
|
||||||
|
char * buf = NULL;
|
||||||
pubsubd_msg_serialize (m, &buf, &msize);
|
pubsubd_msg_serialize (m, &buf, &msize);
|
||||||
|
|
||||||
app_write (p, buf, msize);
|
app_write (p, buf, msize);
|
||||||
|
|
||||||
if (buf != NULL) {
|
free(buf);
|
||||||
free (buf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
|
void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
|
||||||
@ -218,4 +183,5 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
|
|||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,11 @@
|
|||||||
#include "process.h"
|
#include "process.h"
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
|
|
||||||
#define PUBSUB_TYPE_DISCONNECT 1 << 0
|
#define PUBSUB_TYPE_DISCONNECT 0
|
||||||
#define PUBSUB_TYPE_INFO 1 << 1
|
#define PUBSUB_TYPE_MESSAGE 1
|
||||||
#define PUBSUB_TYPE_DEBUG 1 << 2
|
#define PUBSUB_TYPE_ERROR 2
|
||||||
#define PUBSUB_TYPE_MESSAGE 1 << 3
|
#define PUBSUB_TYPE_DEBUG 4
|
||||||
|
#define PUBSUB_TYPE_INFO 128
|
||||||
|
|
||||||
#define PUBSUB_SERVICE_NAME "pubsub"
|
#define PUBSUB_SERVICE_NAME "pubsub"
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#include "../lib/pubsubd.h"
|
#include "../lib/pubsubd.h"
|
||||||
#include "cbor.h"
|
#include <cbor.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
50
misc/readmsg.c
Normal file
50
misc/readmsg.c
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
#include "../lib/pubsubd.h"
|
||||||
|
#include <cbor.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
#define PKT_CLOSE 0
|
||||||
|
#define PKT_MSG 1
|
||||||
|
#define PKT_ERROR 2
|
||||||
|
|
||||||
|
void usage (char **argv) {
|
||||||
|
printf ("usage: echo something | msg | %s\n", argv[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char * argv[])
|
||||||
|
{
|
||||||
|
if (argc == 2 && strcmp ("-h", argv[1]) == 0) {
|
||||||
|
usage (argv);
|
||||||
|
exit (1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// read the message from the process
|
||||||
|
size_t mlen = 0;
|
||||||
|
unsigned char buf[BUFSIZ];
|
||||||
|
mlen = read (0, buf, BUFSIZ);
|
||||||
|
|
||||||
|
/* Assuming `buffer` contains `info.st_size` bytes of input data */
|
||||||
|
struct cbor_load_result result;
|
||||||
|
cbor_item_t * item = cbor_load (buf, mlen, &result);
|
||||||
|
|
||||||
|
/* Pretty-print the result */
|
||||||
|
cbor_describe(item, stdout);
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
|
struct cbor_pair * pair = cbor_map_handle (item);
|
||||||
|
cbor_mutable_data *data = cbor_bytestring_handle (pair->value);
|
||||||
|
|
||||||
|
size_t datalen = cbor_bytestring_length (pair->value);
|
||||||
|
char *bstr = malloc (datalen +1);
|
||||||
|
memset (bstr, 0, datalen +1);
|
||||||
|
memcpy (bstr, data, datalen);
|
||||||
|
|
||||||
|
printf ("msg data (%ld bytes): %s\n", datalen, bstr);
|
||||||
|
|
||||||
|
/* Deallocate the result */
|
||||||
|
cbor_decref (&item);
|
||||||
|
|
||||||
|
free (bstr);
|
||||||
|
return EXIT_SUCCESS;
|
||||||
|
}
|
@ -10,7 +10,7 @@ TESTS=$(addsuffix .test, $(EXEC))
|
|||||||
all: $(SOURCES) $(EXEC)
|
all: $(SOURCES) $(EXEC)
|
||||||
|
|
||||||
$(EXEC): $(OBJECTS) $(CFILES)
|
$(EXEC): $(OBJECTS) $(CFILES)
|
||||||
$(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@.bin
|
$(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -lcbor -o $@.bin
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
$(CC) -c $(CFLAGS) $< -o $@
|
$(CC) -c $(CFLAGS) $< -o $@
|
||||||
|
Reference in New Issue
Block a user