From be783e69b1c21e7d956534d1d3a13730ef74d67d Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Tue, 30 Oct 2018 13:18:53 +0100
Subject: [PATCH] pubsub working with libipc
---
core/message.c | 23 ++++++---
pubsub/app/pubsub-client.c | 13 +++--
pubsub/app/pubsubd.c | 19 ++-----
pubsub/lib/message.c | 103 ++-----------------------------------
pubsub/lib/message.h | 4 +-
5 files changed, 30 insertions(+), 132 deletions(-)
diff --git a/core/message.c b/core/message.c
index 25b50c7..613f078 100644
--- a/core/message.c
+++ b/core/message.c
@@ -30,10 +30,12 @@ int ipc_message_format_read (struct ipc_message *m, const char *buf, ssize_t msi
#endif
assert (m->length == msize - IPC_HEADER_SIZE || m->length == 0);
- if (m->payload != NULL)
- free (m->payload), m->payload = NULL;
+ if (m->payload != NULL) {
+ free (m->payload);
+ m->payload = NULL;
+ }
- if (m->payload == NULL && m->length > 0) {
+ if (m->length > 0) {
m->payload = malloc (m->length);
memcpy (m->payload, buf+IPC_HEADER_SIZE, m->length);
}
@@ -118,22 +120,25 @@ int ipc_message_write (int fd, const struct ipc_message *m)
ssize_t nbytes_sent = 0;
int ret = usock_send (fd, buf, msize, &nbytes_sent);
if (ret < 0) {
- if (buf != NULL)
+ if (buf != NULL) {
free (buf);
+ }
handle_err ("msg_write", "usock_send ret < 0");
return -1;
}
// what was sent != what should have been sent
if (nbytes_sent != msize) {
- if (buf != NULL)
+ if (buf != NULL) {
free (buf);
+ }
handle_err ("msg_write", "usock_send did not send enough data");
return -1;
}
- if (buf != NULL)
+ if (buf != NULL) {
free (buf);
+ }
return 0;
}
@@ -189,8 +194,10 @@ int ipc_message_empty (struct ipc_message *m)
if (m == NULL)
return -1;
- if (m->payload != NULL)
- free (m->payload), m->payload = NULL;
+ if (m->payload != NULL) {
+ free (m->payload);
+ m->payload = NULL;
+ }
m->length = 0;
diff --git a/pubsub/app/pubsub-client.c b/pubsub/app/pubsub-client.c
index ccba19f..7141eb3 100644
--- a/pubsub/app/pubsub-client.c
+++ b/pubsub/app/pubsub-client.c
@@ -4,6 +4,7 @@
#include "../lib/channels.h"
#include
+#include
#include
#include
@@ -84,8 +85,6 @@ void main_loop (char **env, int index, int version
{
struct ipc_message *m = event.m;
if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) {
- // TODO: disconnection
-
ipc_message_empty (m);
free (m);
@@ -95,9 +94,9 @@ void main_loop (char **env, int index, int version
// get the curent payload, change it to be compatible with the application
// then send it
- print_cmd ();
-
- // TODO: remove \n
+ char *pointer_to_return = strchr (m->payload, '\n');
+ *pointer_to_return = '\0';
+ m->length--;
pubsub_message_set_chan (&msg, chan, strlen(chan));
pubsub_message_set_data (&msg, m->payload, m->length);
@@ -109,10 +108,10 @@ void main_loop (char **env, int index, int version
case IPC_EVENT_TYPE_MESSAGE:
{
struct ipc_message *m = event.m;
- print_hexa ("received msg hexa", m->payload, m->length);
+ // print_hexa ("received msg hexa", (unsigned char *) m->payload, m->length);
pubsub_message_from_message (&msg, m);
- printf ("\033[31m>\033[00m %.*s\n", (int) msg.datalen, msg.data);
+ printf ("\r\033[31m>\033[00m %.*s\n", (int) msg.datalen, msg.data);
};
break;
case IPC_EVENT_TYPE_DISCONNECTION:
diff --git a/pubsub/app/pubsubd.c b/pubsub/app/pubsubd.c
index ded68f6..f73aeac 100644
--- a/pubsub/app/pubsubd.c
+++ b/pubsub/app/pubsubd.c
@@ -24,23 +24,15 @@ void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg *
return;
}
- char *buf = NULL;
- size_t msize = 0;
- pubsub_message_serialize (pubsub_msg, &buf, &msize);
-
struct ipc_message m;
- memset (&m, 0, sizeof (struct ipc_message));
- ipc_message_format_data (&m, buf, msize);
+ memset (&m, 0, sizeof (struct ipc_message));
+ pubsub_message_to_message (pubsub_msg, &m);
int i;
for (i = 0; i < clients->size ; i++) {
ipc_server_write (clients->clients[i], &m);
}
ipc_message_empty (&m);
-
- if (buf != NULL) {
- free (buf);
- }
}
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
@@ -84,7 +76,6 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
struct ipc_client *cli = event.origin;
printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt);
- // TODO: to test, unsubscribe when closing
pubsubd_channels_unsubscribe_everywhere (chans, cli);
// free the ipc_client structure
@@ -93,9 +84,8 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
break;
case IPC_EVENT_TYPE_MESSAGE:
{
- // TODO: handle a message
struct ipc_message *m = event.m;
- print_hexa ("received msg hexa", m->payload, m->length);
+ // print_hexa ("received msg hexa", (unsigned char *) m->payload, m->length);
struct ipc_client *cli = event.origin;
struct pubsub_msg pm;
@@ -155,7 +145,6 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
pubsubd_channels_del_all (chans);
}
-
void handle_signal (int signalnumber)
{
// the application will shut down, and remove the service named pipe
@@ -175,6 +164,7 @@ main(int argc, char **argv, char **env)
argc = argc;
argv = argv;
+ // set the service
memset (&srv, 0, sizeof (struct ipc_service));
srv.index = 0;
srv.version = 0;
@@ -183,6 +173,7 @@ main(int argc, char **argv, char **env)
signal(SIGINT, handle_signal);
signal(SIGQUIT, handle_signal);
+ // set the channels
memset (&chans, 0, sizeof (struct channels));
pubsubd_channels_init (&chans);
diff --git a/pubsub/lib/message.c b/pubsub/lib/message.c
index c00aef8..266e3bf 100644
--- a/pubsub/lib/message.c
+++ b/pubsub/lib/message.c
@@ -28,60 +28,6 @@ void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len)
pm->chan[len] = '\0';
}
-void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
-{
- if (msg == NULL) {
- handle_err ("pubsub_message_serialize", "msg == NULL");
- return;
- }
-
- if (data == NULL) {
- handle_err ("pubsub_message_serialize", "data == NULL");
- return;
- }
-
- if (*data != NULL) {
- handle_err ("pubsub_message_serialize", "*data != NULL");
- return;
- }
-
- if (len == NULL) {
- handle_err ("pubsub_message_serialize", "len == NULL");
- return;
- }
-
- // buflen = pubsub msg type (1) + 2* size_t (16) + chan+data
- size_t buflen = 1 + 2 * sizeof (size_t) + msg->chanlen + msg->datalen;
-
- if (buflen > BUFSIZ) {
- handle_err ("pubsub_message_serialize", "chanlen + datalen too high");
- return;
- }
-
- char *buf = malloc (buflen);
- memset (buf, 0, buflen);
-
- size_t offset = 0;
-
- // msg type
- buf[offset++] = msg->type;
-
- // chan
- memcpy (buf + offset, &msg->chanlen, sizeof (size_t));
- offset += sizeof (size_t);
- memcpy (buf + offset, msg->chan, msg->chanlen);
- offset += msg->chanlen;
-
- // data
- memcpy (buf + offset, &msg->datalen, sizeof (size_t));
- offset += sizeof (size_t);
- memcpy (buf + offset, msg->data, msg->datalen);
- offset += msg->datalen;
-
- *data = buf;
- *len = buflen;
-}
-
void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
{
size_t offset = 0;
@@ -99,7 +45,7 @@ void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
offset += sizeof (size_t);
if (pm->chanlen > 0) {
pubsub_message_set_chan (pm, m->payload + offset, pm->chanlen);
- printf ("from ipc_message, chan: %s, chanlen = %lu\n", pm->chan, pm->chanlen);
+ offset += pm->chanlen;
}
// data
@@ -111,7 +57,7 @@ void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
offset += sizeof (size_t);
if (pm->datalen > 0) {
pubsub_message_set_data (pm, m->payload + offset, pm->datalen);
- printf ("from ipc_message, data: %s, datalen = %lu\n", pm->data, pm->datalen);
+ offset += pm->datalen;
}
}
@@ -159,50 +105,6 @@ void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message
m->length = buflen;
}
-void pubsub_message_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
-{
- if (msg == NULL) {
- handle_err ("pubsub_message_unserialize", "msg == NULL");
- return;
- }
-
- pubsub_message_empty (msg);
-
- if (mlen > BUFSIZ) {
- handle_err ("pubsub_message_unserialize", "mlen > BUFSIZ");
- return;
- }
-
- size_t offset = 0;
-
- // msg type
- msg->type = buf[offset++];
-
- // chan
- memcpy (&msg->chanlen, buf + offset, sizeof (size_t));
- if (msg->chanlen > BUFSIZ) {
- handle_err ("pubsub_message_unserialize", "chanlen > BUFSIZ");
- return;
- }
- msg->chan = malloc (msg->chanlen);
- memset (msg->chan, 0, msg->chanlen);
- offset += sizeof (size_t);
- memcpy (msg->chan, buf + offset, msg->chanlen);
- offset += msg->chanlen;
-
- // data
- memcpy (&msg->datalen, buf + offset, sizeof (size_t));
- if (msg->datalen > BUFSIZ) {
- handle_err ("pubsub_message_unserialize", "datalen > BUFSIZ");
- return;
- }
- msg->data = malloc (msg->datalen);
- memset (msg->data, 0, msg->datalen);
- offset += sizeof (size_t);
- memcpy (msg->data, buf + offset, msg->datalen);
- offset += msg->datalen;
-}
-
void pubsub_message_empty (struct pubsub_msg *msg)
{
if (msg == NULL) {
@@ -214,6 +116,7 @@ void pubsub_message_empty (struct pubsub_msg *msg)
free (msg->chan);
msg->chan = NULL;
}
+
if (msg->data != NULL) {
free (msg->data);
msg->data = NULL;
diff --git a/pubsub/lib/message.h b/pubsub/lib/message.h
index 71f27ca..ae5ab36 100644
--- a/pubsub/lib/message.h
+++ b/pubsub/lib/message.h
@@ -20,7 +20,7 @@ enum pubsub_message_types {
};
struct pubsub_msg {
- unsigned char type; // message type : alert, notification, …
+ enum pubsub_message_types type; // message type : alert, notification, …
char *chan;
size_t chanlen;
char *data;
@@ -33,8 +33,6 @@ void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message
void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len);
void pubsub_message_set_data (struct pubsub_msg *pm, char *data, size_t len);
-void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
-void pubsub_message_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
void pubsub_message_empty (struct pubsub_msg *msg);
void pubsub_message_print (const struct pubsub_msg *msg);