pubsub working with libipc
parent
cbdbe79c5b
commit
be783e69b1
|
@ -30,10 +30,12 @@ int ipc_message_format_read (struct ipc_message *m, const char *buf, ssize_t msi
|
||||||
#endif
|
#endif
|
||||||
assert (m->length == msize - IPC_HEADER_SIZE || m->length == 0);
|
assert (m->length == msize - IPC_HEADER_SIZE || m->length == 0);
|
||||||
|
|
||||||
if (m->payload != NULL)
|
if (m->payload != NULL) {
|
||||||
free (m->payload), m->payload = NULL;
|
free (m->payload);
|
||||||
|
m->payload = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (m->payload == NULL && m->length > 0) {
|
if (m->length > 0) {
|
||||||
m->payload = malloc (m->length);
|
m->payload = malloc (m->length);
|
||||||
memcpy (m->payload, buf+IPC_HEADER_SIZE, m->length);
|
memcpy (m->payload, buf+IPC_HEADER_SIZE, m->length);
|
||||||
}
|
}
|
||||||
|
@ -118,22 +120,25 @@ int ipc_message_write (int fd, const struct ipc_message *m)
|
||||||
ssize_t nbytes_sent = 0;
|
ssize_t nbytes_sent = 0;
|
||||||
int ret = usock_send (fd, buf, msize, &nbytes_sent);
|
int ret = usock_send (fd, buf, msize, &nbytes_sent);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
if (buf != NULL)
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
|
}
|
||||||
handle_err ("msg_write", "usock_send ret < 0");
|
handle_err ("msg_write", "usock_send ret < 0");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// what was sent != what should have been sent
|
// what was sent != what should have been sent
|
||||||
if (nbytes_sent != msize) {
|
if (nbytes_sent != msize) {
|
||||||
if (buf != NULL)
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
|
}
|
||||||
handle_err ("msg_write", "usock_send did not send enough data");
|
handle_err ("msg_write", "usock_send did not send enough data");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buf != NULL)
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -189,8 +194,10 @@ int ipc_message_empty (struct ipc_message *m)
|
||||||
if (m == NULL)
|
if (m == NULL)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (m->payload != NULL)
|
if (m->payload != NULL) {
|
||||||
free (m->payload), m->payload = NULL;
|
free (m->payload);
|
||||||
|
m->payload = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
m->length = 0;
|
m->length = 0;
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include "../lib/channels.h"
|
#include "../lib/channels.h"
|
||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
@ -84,8 +85,6 @@ void main_loop (char **env, int index, int version
|
||||||
{
|
{
|
||||||
struct ipc_message *m = event.m;
|
struct ipc_message *m = event.m;
|
||||||
if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) {
|
if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) {
|
||||||
// TODO: disconnection
|
|
||||||
|
|
||||||
ipc_message_empty (m);
|
ipc_message_empty (m);
|
||||||
free (m);
|
free (m);
|
||||||
|
|
||||||
|
@ -95,9 +94,9 @@ void main_loop (char **env, int index, int version
|
||||||
|
|
||||||
// get the curent payload, change it to be compatible with the application
|
// get the curent payload, change it to be compatible with the application
|
||||||
// then send it
|
// then send it
|
||||||
print_cmd ();
|
char *pointer_to_return = strchr (m->payload, '\n');
|
||||||
|
*pointer_to_return = '\0';
|
||||||
// TODO: remove \n
|
m->length--;
|
||||||
pubsub_message_set_chan (&msg, chan, strlen(chan));
|
pubsub_message_set_chan (&msg, chan, strlen(chan));
|
||||||
pubsub_message_set_data (&msg, m->payload, m->length);
|
pubsub_message_set_data (&msg, m->payload, m->length);
|
||||||
|
|
||||||
|
@ -109,10 +108,10 @@ void main_loop (char **env, int index, int version
|
||||||
case IPC_EVENT_TYPE_MESSAGE:
|
case IPC_EVENT_TYPE_MESSAGE:
|
||||||
{
|
{
|
||||||
struct ipc_message *m = event.m;
|
struct ipc_message *m = event.m;
|
||||||
print_hexa ("received msg hexa", m->payload, m->length);
|
// print_hexa ("received msg hexa", (unsigned char *) m->payload, m->length);
|
||||||
|
|
||||||
pubsub_message_from_message (&msg, m);
|
pubsub_message_from_message (&msg, m);
|
||||||
printf ("\033[31m>\033[00m %.*s\n", (int) msg.datalen, msg.data);
|
printf ("\r\033[31m>\033[00m %.*s\n", (int) msg.datalen, msg.data);
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
case IPC_EVENT_TYPE_DISCONNECTION:
|
case IPC_EVENT_TYPE_DISCONNECTION:
|
||||||
|
|
|
@ -24,23 +24,15 @@ void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg *
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *buf = NULL;
|
|
||||||
size_t msize = 0;
|
|
||||||
pubsub_message_serialize (pubsub_msg, &buf, &msize);
|
|
||||||
|
|
||||||
struct ipc_message m;
|
struct ipc_message m;
|
||||||
memset (&m, 0, sizeof (struct ipc_message));
|
memset (&m, 0, sizeof (struct ipc_message));
|
||||||
ipc_message_format_data (&m, buf, msize);
|
pubsub_message_to_message (pubsub_msg, &m);
|
||||||
|
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < clients->size ; i++) {
|
for (i = 0; i < clients->size ; i++) {
|
||||||
ipc_server_write (clients->clients[i], &m);
|
ipc_server_write (clients->clients[i], &m);
|
||||||
}
|
}
|
||||||
ipc_message_empty (&m);
|
ipc_message_empty (&m);
|
||||||
|
|
||||||
if (buf != NULL) {
|
|
||||||
free (buf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
||||||
|
@ -84,7 +76,6 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
||||||
struct ipc_client *cli = event.origin;
|
struct ipc_client *cli = event.origin;
|
||||||
printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt);
|
printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt);
|
||||||
|
|
||||||
// TODO: to test, unsubscribe when closing
|
|
||||||
pubsubd_channels_unsubscribe_everywhere (chans, cli);
|
pubsubd_channels_unsubscribe_everywhere (chans, cli);
|
||||||
|
|
||||||
// free the ipc_client structure
|
// free the ipc_client structure
|
||||||
|
@ -93,9 +84,8 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
||||||
break;
|
break;
|
||||||
case IPC_EVENT_TYPE_MESSAGE:
|
case IPC_EVENT_TYPE_MESSAGE:
|
||||||
{
|
{
|
||||||
// TODO: handle a message
|
|
||||||
struct ipc_message *m = event.m;
|
struct ipc_message *m = event.m;
|
||||||
print_hexa ("received msg hexa", m->payload, m->length);
|
// print_hexa ("received msg hexa", (unsigned char *) m->payload, m->length);
|
||||||
struct ipc_client *cli = event.origin;
|
struct ipc_client *cli = event.origin;
|
||||||
|
|
||||||
struct pubsub_msg pm;
|
struct pubsub_msg pm;
|
||||||
|
@ -155,7 +145,6 @@ void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
|
||||||
pubsubd_channels_del_all (chans);
|
pubsubd_channels_del_all (chans);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void handle_signal (int signalnumber)
|
void handle_signal (int signalnumber)
|
||||||
{
|
{
|
||||||
// the application will shut down, and remove the service named pipe
|
// the application will shut down, and remove the service named pipe
|
||||||
|
@ -175,6 +164,7 @@ main(int argc, char **argv, char **env)
|
||||||
argc = argc;
|
argc = argc;
|
||||||
argv = argv;
|
argv = argv;
|
||||||
|
|
||||||
|
// set the service
|
||||||
memset (&srv, 0, sizeof (struct ipc_service));
|
memset (&srv, 0, sizeof (struct ipc_service));
|
||||||
srv.index = 0;
|
srv.index = 0;
|
||||||
srv.version = 0;
|
srv.version = 0;
|
||||||
|
@ -183,6 +173,7 @@ main(int argc, char **argv, char **env)
|
||||||
signal(SIGINT, handle_signal);
|
signal(SIGINT, handle_signal);
|
||||||
signal(SIGQUIT, handle_signal);
|
signal(SIGQUIT, handle_signal);
|
||||||
|
|
||||||
|
// set the channels
|
||||||
memset (&chans, 0, sizeof (struct channels));
|
memset (&chans, 0, sizeof (struct channels));
|
||||||
pubsubd_channels_init (&chans);
|
pubsubd_channels_init (&chans);
|
||||||
|
|
||||||
|
|
|
@ -28,60 +28,6 @@ void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len)
|
||||||
pm->chan[len] = '\0';
|
pm->chan[len] = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
|
|
||||||
{
|
|
||||||
if (msg == NULL) {
|
|
||||||
handle_err ("pubsub_message_serialize", "msg == NULL");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data == NULL) {
|
|
||||||
handle_err ("pubsub_message_serialize", "data == NULL");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*data != NULL) {
|
|
||||||
handle_err ("pubsub_message_serialize", "*data != NULL");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (len == NULL) {
|
|
||||||
handle_err ("pubsub_message_serialize", "len == NULL");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// buflen = pubsub msg type (1) + 2* size_t (16) + chan+data
|
|
||||||
size_t buflen = 1 + 2 * sizeof (size_t) + msg->chanlen + msg->datalen;
|
|
||||||
|
|
||||||
if (buflen > BUFSIZ) {
|
|
||||||
handle_err ("pubsub_message_serialize", "chanlen + datalen too high");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *buf = malloc (buflen);
|
|
||||||
memset (buf, 0, buflen);
|
|
||||||
|
|
||||||
size_t offset = 0;
|
|
||||||
|
|
||||||
// msg type
|
|
||||||
buf[offset++] = msg->type;
|
|
||||||
|
|
||||||
// chan
|
|
||||||
memcpy (buf + offset, &msg->chanlen, sizeof (size_t));
|
|
||||||
offset += sizeof (size_t);
|
|
||||||
memcpy (buf + offset, msg->chan, msg->chanlen);
|
|
||||||
offset += msg->chanlen;
|
|
||||||
|
|
||||||
// data
|
|
||||||
memcpy (buf + offset, &msg->datalen, sizeof (size_t));
|
|
||||||
offset += sizeof (size_t);
|
|
||||||
memcpy (buf + offset, msg->data, msg->datalen);
|
|
||||||
offset += msg->datalen;
|
|
||||||
|
|
||||||
*data = buf;
|
|
||||||
*len = buflen;
|
|
||||||
}
|
|
||||||
|
|
||||||
void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
|
void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
|
||||||
{
|
{
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
|
@ -99,7 +45,7 @@ void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
|
||||||
offset += sizeof (size_t);
|
offset += sizeof (size_t);
|
||||||
if (pm->chanlen > 0) {
|
if (pm->chanlen > 0) {
|
||||||
pubsub_message_set_chan (pm, m->payload + offset, pm->chanlen);
|
pubsub_message_set_chan (pm, m->payload + offset, pm->chanlen);
|
||||||
printf ("from ipc_message, chan: %s, chanlen = %lu\n", pm->chan, pm->chanlen);
|
offset += pm->chanlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
// data
|
// data
|
||||||
|
@ -111,7 +57,7 @@ void pubsub_message_from_message (struct pubsub_msg *pm, struct ipc_message *m)
|
||||||
offset += sizeof (size_t);
|
offset += sizeof (size_t);
|
||||||
if (pm->datalen > 0) {
|
if (pm->datalen > 0) {
|
||||||
pubsub_message_set_data (pm, m->payload + offset, pm->datalen);
|
pubsub_message_set_data (pm, m->payload + offset, pm->datalen);
|
||||||
printf ("from ipc_message, data: %s, datalen = %lu\n", pm->data, pm->datalen);
|
offset += pm->datalen;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,50 +105,6 @@ void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message
|
||||||
m->length = buflen;
|
m->length = buflen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsub_message_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
|
|
||||||
{
|
|
||||||
if (msg == NULL) {
|
|
||||||
handle_err ("pubsub_message_unserialize", "msg == NULL");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pubsub_message_empty (msg);
|
|
||||||
|
|
||||||
if (mlen > BUFSIZ) {
|
|
||||||
handle_err ("pubsub_message_unserialize", "mlen > BUFSIZ");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t offset = 0;
|
|
||||||
|
|
||||||
// msg type
|
|
||||||
msg->type = buf[offset++];
|
|
||||||
|
|
||||||
// chan
|
|
||||||
memcpy (&msg->chanlen, buf + offset, sizeof (size_t));
|
|
||||||
if (msg->chanlen > BUFSIZ) {
|
|
||||||
handle_err ("pubsub_message_unserialize", "chanlen > BUFSIZ");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
msg->chan = malloc (msg->chanlen);
|
|
||||||
memset (msg->chan, 0, msg->chanlen);
|
|
||||||
offset += sizeof (size_t);
|
|
||||||
memcpy (msg->chan, buf + offset, msg->chanlen);
|
|
||||||
offset += msg->chanlen;
|
|
||||||
|
|
||||||
// data
|
|
||||||
memcpy (&msg->datalen, buf + offset, sizeof (size_t));
|
|
||||||
if (msg->datalen > BUFSIZ) {
|
|
||||||
handle_err ("pubsub_message_unserialize", "datalen > BUFSIZ");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
msg->data = malloc (msg->datalen);
|
|
||||||
memset (msg->data, 0, msg->datalen);
|
|
||||||
offset += sizeof (size_t);
|
|
||||||
memcpy (msg->data, buf + offset, msg->datalen);
|
|
||||||
offset += msg->datalen;
|
|
||||||
}
|
|
||||||
|
|
||||||
void pubsub_message_empty (struct pubsub_msg *msg)
|
void pubsub_message_empty (struct pubsub_msg *msg)
|
||||||
{
|
{
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
|
@ -214,6 +116,7 @@ void pubsub_message_empty (struct pubsub_msg *msg)
|
||||||
free (msg->chan);
|
free (msg->chan);
|
||||||
msg->chan = NULL;
|
msg->chan = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg->data != NULL) {
|
if (msg->data != NULL) {
|
||||||
free (msg->data);
|
free (msg->data);
|
||||||
msg->data = NULL;
|
msg->data = NULL;
|
||||||
|
|
|
@ -20,7 +20,7 @@ enum pubsub_message_types {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct pubsub_msg {
|
struct pubsub_msg {
|
||||||
unsigned char type; // message type : alert, notification, …
|
enum pubsub_message_types type; // message type : alert, notification, …
|
||||||
char *chan;
|
char *chan;
|
||||||
size_t chanlen;
|
size_t chanlen;
|
||||||
char *data;
|
char *data;
|
||||||
|
@ -33,8 +33,6 @@ void pubsub_message_to_message (const struct pubsub_msg *msg, struct ipc_message
|
||||||
void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len);
|
void pubsub_message_set_chan (struct pubsub_msg *pm, char *chan, size_t len);
|
||||||
void pubsub_message_set_data (struct pubsub_msg *pm, char *data, size_t len);
|
void pubsub_message_set_data (struct pubsub_msg *pm, char *data, size_t len);
|
||||||
|
|
||||||
void pubsub_message_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
|
|
||||||
void pubsub_message_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
|
|
||||||
void pubsub_message_empty (struct pubsub_msg *msg);
|
void pubsub_message_empty (struct pubsub_msg *msg);
|
||||||
void pubsub_message_print (const struct pubsub_msg *msg);
|
void pubsub_message_print (const struct pubsub_msg *msg);
|
||||||
|
|
||||||
|
|
Reference in New Issue