From bb3ccae218eb0d44b7b26615560f3e56d48e1139 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Wed, 8 Jun 2016 17:57:05 +0200
Subject: [PATCH] En chantier, encore et toujours
---
lib/communication.c | 25 ++++++-
lib/communication.h | 2 +
lib/pubsubd.c | 177 ++++++++++++++++++++++++++++++++++++--------
lib/pubsubd.h | 14 +++-
pubsub/Makefile | 2 +-
pubsub/pubsubd.c | 71 ++++++++++++++----
6 files changed, 240 insertions(+), 51 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 5f041ef..828cb6f 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -156,8 +156,11 @@ int srv_get_new_process (const struct service *srv, struct process *p)
int srv_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize))
{
- if (file_open (&p->out, p->path_out, "rb"))
+ if (file_open (&p->out, p->path_out, "rb")) {
+ fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
+ file_close (p->out);
return 1;
+ }
if (cb != NULL)
(*cb) (p->out, buf, msize);
@@ -284,6 +287,26 @@ int app_destroy (struct process *p)
return 0;
}
+int app_read_cb (struct process *p, char ** buf, size_t * msize
+ , int (*cb)(FILE *f, char ** buf, size_t * msize))
+{
+ if (file_open (&p->in, p->path_in, "rb")) {
+ fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
+ file_close (p->in);
+ return 1;
+ }
+
+ if (cb != NULL)
+ (*cb) (p->in, buf, msize);
+ else
+ *msize = fread (*buf, 1, *msize, p->in); // FIXME check errors
+
+ if (file_close (p->in))
+ return 1;
+
+ return 0;
+}
+
int app_read (struct process *p, void * buf, size_t * msize)
{
if (file_open (&p->in, p->path_in, "rb"))
diff --git a/lib/communication.h b/lib/communication.h
index 39163d0..5241b05 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -48,6 +48,8 @@ int srv_write (struct process *, void * buf, size_t);
int app_create (struct process *, int index); // called by the application
int app_destroy (struct process *); // called by the application
+int app_read_cb (struct process *p, char ** buf, size_t * msize
+ , int (*cb)(FILE *f, char ** buf, size_t * msize));
int app_read (struct process *, void * buf, size_t *);
int app_write (struct process *, void * buf, size_t);
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 2554e9d..ae12610 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -148,6 +148,8 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
if (ale->p != NULL)
n->p = srv_process_copy (ale->p);
+ n->action = ale->action;
+
return n;
}
@@ -233,11 +235,16 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
// msg: "type(1) chanlen(8) chan datalen(8) data
*len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen;
+
+ if (*data != NULL) {
+ free (*data);
+ *data = NULL;
+ }
*data = malloc(*len);
size_t i = 0;
- data[0][i] = msg->type; i++;
+ data[0][i] = msg->type; i++;
memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t);
@@ -246,35 +253,66 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
{
- if (msg == NULL || data == NULL)
+ if (msg == NULL) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n");
return;
+ }
+
+ if (data == NULL) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
+ return;
+ }
+
+ if (len > BUFSIZ) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
+ , len);
+ return;
+ }
size_t i = 0;
- msg->type = data[i]; i++;
+ msg->type = data[i]; i++;
+
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
+ if (msg->chanlen > BUFSIZ) {
+ fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
+ return;
+ }
msg->chan = malloc (msg->chanlen);
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
+
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
+ if (msg->datalen > BUFSIZ) {
+ fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
+ return;
+ }
msg->data = malloc (msg->datalen);
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
}
void pubsubd_msg_free (struct pubsub_msg *msg)
{
+ if (msg == NULL) {
+ fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n");
+ return;
+ }
+
if (msg->chan) {
free (msg->chan);
- msg->chan = 0;
+ msg->chan = NULL;
}
if (msg->data) {
free (msg->data);
- msg->data = 0;
+ msg->data = NULL;
}
}
// COMMUNICATION
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
- , struct channels *chans)
+ , struct channels *chans, struct channel **c)
{
if (srv == NULL || ale == NULL || chans == NULL)
return -1;
@@ -336,31 +374,41 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
buf = NULL;
}
- chan[BUFSIZ -1] = '\0';
-
- struct channel c;
- bzero (&c, sizeof (struct channel));
- c.chan = strndup (chan, BUFSIZ);
- c.chanlen = strlen (chan);
-
- struct channel *new_chan;
- new_chan = pubsubd_channel_get (chans, &c);
- if (new_chan == NULL) {
- new_chan = pubsubd_channels_add (chans, &c);
- pubsubd_subscriber_init (&new_chan->alh);
+ if (ale->action == PUBSUB_QUIT) {
+ return 0;
}
- pubsubd_channel_free (&c);
if (ale->p != NULL) {
free (ale->p);
ale->p = NULL;
}
-
- if (ale->action != PUBSUB_QUIT) {
- ale->p = malloc (sizeof (struct process));
- srv_process_gen (ale->p, pid, index, version);
+
+ ale->p = malloc (sizeof (struct process));
+ srv_process_gen (ale->p, pid, index, version);
+
+ if (*c == NULL) {
+ *c = malloc (sizeof (struct channel));
}
+ if (c[0]->chan != NULL) {
+ free (c[0]->chan);
+ c[0]->chan = NULL;
+ }
+
+ chan[BUFSIZ -1] = '\0';
+ c[0]->chan = strndup (chan, BUFSIZ);
+ c[0]->chanlen = strlen (chan);
+
+ struct channel *new_chan;
+ new_chan = pubsubd_channel_get (chans, *c);
+ if (new_chan == NULL) {
+ new_chan = pubsubd_channels_add (chans, *c);
+ pubsubd_subscriber_init (&new_chan->alh);
+ }
+
+ pubsubd_channel_free (*c);
+ *c = new_chan;
+
// add the subscriber
if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH)
pubsubd_subscriber_add (new_chan->alh, ale);
@@ -373,6 +421,8 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
{
// msg: "type(1) chanlen(8) chan datalen(8) data
+ printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n");
+
// read
char type;
fread (&type, 1, 1, f);
@@ -380,17 +430,27 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
size_t chanlen;
fread (&chanlen, sizeof (size_t), 1, f);
+ if (chanlen > BUFSIZ) {
+ return 1;
+ }
+
char *chan = malloc (chanlen);
fread (chan, chanlen, 1, f);
size_t datalen;
fread (&datalen, sizeof (size_t), 1, f);
+ if (datalen > BUFSIZ) {
+ return 1;
+ }
+
char *data = malloc (datalen);
fread (data, datalen, 1, f);
- *msize = 1 + chanlen;
- *buf = malloc(*msize);
+ *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen;
+ if (*buf == NULL) {
+ *buf = malloc(*msize);
+ }
// TODO CHECK THIS
size_t i = 0;
@@ -398,20 +458,49 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
char *cbuf = *buf;
cbuf[i] = type; i++;
- memcpy (&cbuf[i], &chanlen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&cbuf[i], chan, chanlen); i += chanlen;
- memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&cbuf[i], data, datalen); i += datalen;
+ memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t);
+ memcpy (cbuf + i, chan, chanlen); i += chanlen;
+ memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t);
+ memcpy (cbuf + i, data, datalen); i += datalen;
free (chan);
free (data);
+ printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n");
+
return 0;
}
+// alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{
+ struct app_list_elm * ale = NULL;
+
+ char *buf;
+ size_t msize;
+ pubsubd_msg_serialize (m, &buf, &msize);
+
+ LIST_FOREACH(ale, alh, entries) {
+ srv_write (ale->p, buf, msize);
+ }
+
+ if (buf != NULL) {
+ free (buf);
+ }
}
+
+void pubsubd_msg_print (const struct pubsub_msg *msg)
+{
+ if (msg == NULL) {
+ return;
+ }
+
+ printf ("\t\t\033[36mMessage\033[00m\n");
+ printf ("\t\ttype %d\n", msg->type);
+ printf ("\t\tchan %s\n", msg->chan);
+ printf ("\t\tdata %s\n", msg->data);
+}
+
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
{
// read the message from the process
@@ -420,14 +509,38 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
pubsubd_msg_unserialize (m, buf, mlen);
- free (buf);
+
+ if (buf != NULL) {
+ free (buf);
+ }
}
-void pubsub_msg_send (const struct service *s, const struct pubsub_msg * m)
+void pubsub_msg_send (const struct service *s, struct process *p, const struct pubsub_msg * m)
{
+
+ char *buf;
+ size_t msize;
+ pubsubd_msg_serialize (m, &buf, &msize);
+
+ app_write (p, buf, msize);
+
+ if (buf != NULL) {
+ free (buf);
+ }
}
-void pubsub_msg_recv (const struct service *s, struct pubsub_msg * m)
+
+void pubsub_msg_recv (const struct service *s, struct process *p, struct pubsub_msg * m)
{
+ // read the message from the process
+ size_t mlen;
+ char *buf;
+ app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
+
+ pubsubd_msg_unserialize (m, buf, mlen);
+
+ if (buf != NULL) {
+ free (buf);
+ }
}
// SERVICE
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index ff5dc1f..c27831b 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -6,6 +6,11 @@
#include "queue.h"
+#define PUBSUB_TYPE_DISCONNECT 1 << 0
+#define PUBSUB_TYPE_INFO 1 << 1
+#define PUBSUB_TYPE_DEBUG 1 << 2
+#define PUBSUB_TYPE_MESSAGE 1 << 3
+
#define PUBSUB_SERVICE_NAME "pubsub"
struct channel;
@@ -25,19 +30,20 @@ struct pubsub_msg {
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
void pubsubd_msg_free (struct pubsub_msg *msg);
+void pubsubd_msg_print (const struct pubsub_msg *msg);
// parse pubsubd init msg (sent in TMPDIR/)
//
// line fmt : pid index version action chan
// action : quit | pub | sub
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
- , struct channels *chans);
+ , struct channels *chans, struct channel **c);
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
-void pubsub_msg_send (const struct service *, const struct pubsub_msg *msg);
-void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg);
+void pubsub_msg_send (const struct service *, struct process *p, const struct pubsub_msg *msg);
+void pubsub_msg_recv (const struct service *, struct process *p, struct pubsub_msg *msg);
// CHANNEL
@@ -96,4 +102,6 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
+void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
+
#endif
diff --git a/pubsub/Makefile b/pubsub/Makefile
index 22d244e..6b95c76 100644
--- a/pubsub/Makefile
+++ b/pubsub/Makefile
@@ -1,6 +1,6 @@
CC=gcc
CFLAGS=-Wall -g
-LDFLAGS=
+LDFLAGS= -pthread
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
EXEC=$(basename $(wildcard *.c))
SOURCES=$(wildcard ../lib/*.c)
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 329c51b..d4fb904 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -1,5 +1,6 @@
#include "../lib/pubsubd.h"
#include
+#include
void
ohshit(int rvalue, const char* str) {
@@ -7,6 +8,52 @@ ohshit(int rvalue, const char* str) {
exit(rvalue);
}
+// give this structure to the thread worker function
+struct worker_params {
+ struct channels *chans;
+ struct channel *chan;
+ struct app_list_elm *ale;
+};
+
+void * pubsubd_worker_thread (void *params)
+{
+ struct worker_params *wp = (struct worker_params *) params;
+
+ // each chan has a list of subscribers
+ // someone who only push a msg doesn't need to be registered
+ if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
+ // TODO add it to the application to follow
+ // TODO publish a message
+ printf ("publish or publish and subscribe to something\n");
+
+ struct pubsub_msg m;
+ bzero (&m, sizeof (struct pubsub_msg));
+ pubsubd_msg_recv (wp->ale->p, &m);
+
+ pubsubd_msg_print (&m);
+
+ if (m.type == PUBSUB_TYPE_DISCONNECT) {
+ // TODO remove the application from the subscribers
+ }
+ else {
+ struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
+ pubsubd_msg_send (chan->alh, &m);
+ }
+ }
+ else if (wp->ale->action == PUBSUB_SUB) {
+ // TODO
+ printf ("subscribe to something\n");
+ }
+ else {
+ printf ("\033[31mdo not know what you want to do\033[00m\n");
+ printf ("\tale->p : %p\n", wp->ale->p);
+ }
+
+ pubsubd_app_list_elm_free (wp->ale);
+
+ pthread_exit (NULL);
+}
+
int
main(int argc, char* argv[])
{
@@ -28,8 +75,8 @@ main(int argc, char* argv[])
// for each new process
struct app_list_elm ale;
bzero (&ale, sizeof (struct app_list_elm));
-
- pubsubd_get_new_process (&srv, &ale, &chans);
+ struct channel *chan;
+ pubsubd_get_new_process (&srv, &ale, &chans, &chan);
pubsubd_channels_print (&chans);
// end the application
@@ -37,7 +84,6 @@ main(int argc, char* argv[])
printf ("Quitting ...\n");
pubsubd_channels_del_all (&chans);
- // pubsubd_app_list_elm_free (&ale);
srv_close (&srv);
// TODO end the threads
@@ -46,19 +92,16 @@ main(int argc, char* argv[])
}
// TODO thread to handle multiple clients at a time
+ struct worker_params *wp;
+ wp = malloc (sizeof (struct worker_params));
+ wp->ale = pubsubd_app_list_elm_copy (&ale);
+ wp->chans = &chans;
+ wp->chan = chan;
- // TODO register the subscriber
- // each chan has a list of subscribers
- // someone who only push a msg doesn't need to be registered
- if (ale.action == PUBSUB_SUB || ale.action == PUBSUB_BOTH) {
- // TODO
- }
- else if (ale.action == PUBSUB_PUB) {
- // TODO add it to the application to follow
- // TODO publish a message
+ pthread_t thr;
- // then
- }
+ pthread_create (&thr, NULL, pubsubd_worker_thread, wp);
+ pthread_detach (thr);
pubsubd_app_list_elm_free (&ale);
}