From a9766c39d523411ece9605714343681f76bd888b Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Wed, 14 Sep 2016 21:09:14 +0200
Subject: [PATCH] pubsub: app communication lib. and service lib. separated
---
lib/pubsub.c | 221 +++++++++++++++++++++++++++
lib/pubsub.h | 39 +++++
lib/pubsubd.c | 384 ++++++++++++++++++++---------------------------
lib/pubsubd.h | 61 ++++----
pubsub/pubsubd.c | 185 +----------------------
5 files changed, 451 insertions(+), 439 deletions(-)
create mode 100644 lib/pubsub.c
create mode 100644 lib/pubsub.h
diff --git a/lib/pubsub.c b/lib/pubsub.c
new file mode 100644
index 0000000..c3a095e
--- /dev/null
+++ b/lib/pubsub.c
@@ -0,0 +1,221 @@
+#include "pubsub.h"
+#include
+
+#include // strndup
+
+// MESSAGE, TODO CBOR
+
+void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
+{
+ if (msg == NULL || data == NULL || len == NULL) {
+ fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL");
+ return;
+ }
+
+ // msg: "type(1) chanlen(8) chan datalen(8) data
+ if (msg->type == PUBSUB_TYPE_DISCONNECT) {
+ *len = 1;
+ if (*data != NULL) {
+ free (*data);
+ *data = NULL;
+ }
+ *data = malloc(*len);
+ memset (*data, 0, *len);
+ data[0][0] = msg->type;
+ return;
+ }
+ else {
+ // type + size chan + chan + size data + data
+ *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
+ }
+
+ if (*data != NULL) {
+ free (*data);
+ *data = NULL;
+ }
+ *data = malloc(*len);
+ memset (*data, 0, *len);
+
+ size_t i = 0;
+
+ data[0][i] = msg->type; i++;
+ memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
+ memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
+ memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t);
+ memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen;
+}
+
+void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
+{
+ if (msg == NULL) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n");
+ return;
+ }
+
+ if (data == NULL) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
+ return;
+ }
+
+ if (len > BUFSIZ) {
+ fprintf (stderr
+ , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
+ , len);
+ return;
+ }
+
+ size_t i = 0;
+ msg->type = data[i]; i++;
+
+ if (msg->type == PUBSUB_TYPE_DISCONNECT) {
+ msg->chanlen = 0;
+ msg->chan = NULL;
+ msg->datalen = 0;
+ msg->data = NULL;
+ return ;
+ }
+
+ memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
+ if (msg->chanlen > BUFSIZ) {
+ fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
+ return;
+ }
+ msg->chan = malloc (msg->chanlen +1);
+ memset (msg->chan, 0, msg->chanlen +1);
+ memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
+
+ memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
+ if (msg->datalen > BUFSIZ) {
+ fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
+ return;
+ }
+ msg->data = malloc (msg->datalen +1);
+ memset (msg->data, 0, msg->datalen +1);
+ memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
+}
+
+void pubsubd_msg_free (struct pubsub_msg *msg)
+{
+ if (msg == NULL) {
+ fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n");
+ return;
+ }
+
+ if (msg->chan) {
+ free (msg->chan);
+ msg->chan = NULL;
+ }
+ if (msg->data) {
+ free (msg->data);
+ msg->data = NULL;
+ }
+}
+
+void pubsubd_msg_print (const struct pubsub_msg *msg)
+{
+ printf ("msg: type=%d chan=%s, data=%s\n"
+ , msg->type, msg->chan, msg->data);
+}
+
+#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit"
+
+// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
+
+char * pubsub_action_to_str (enum app_list_elm_action action)
+{
+ switch (action) {
+ case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB);
+ case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB);
+ case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH);
+ case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT);
+ }
+
+ return NULL;
+}
+
+void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame)
+{
+ char * straction = NULL;
+ straction = pubsub_action_to_str (action);
+
+ char line[BUFSIZ];
+ memset (line, 0, BUFSIZ);
+
+ // line fmt : pid index version action chan
+ // "quit" action is also possible (see pubsubd_quit)
+ snprintf (line, BUFSIZ, "%d %d %d %s %s\n"
+ , p->pid, p->index, p->version
+ , straction
+ , channame);
+ line[BUFSIZ -1] = '\0'; // to be sure
+
+ // send the connection line in the $TMP/ pipe
+ app_srv_connection (srv, line, strlen (line));
+
+ if (straction != NULL)
+ free (straction);
+}
+
+void pubsub_disconnect (struct process *p)
+{
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+ m.type = PUBSUB_TYPE_DISCONNECT;
+
+ char *buf = NULL;
+ size_t msize = 0;
+ pubsubd_msg_serialize (&m, &buf, &msize);
+
+ int ret = app_write (p, buf, msize);
+ if (ret != (int) msize) {
+ fprintf (stderr, "err: can't disconnect\n");
+ }
+
+ pubsubd_msg_free (&m);
+ if (buf != NULL) {
+ free (buf);
+ }
+}
+
+// tell the service to stop
+void pubsubd_quit (struct service *srv)
+{
+ // line fmt : 0 0 0 quit
+ char line[BUFSIZ];
+ snprintf (line, BUFSIZ, "0 0 0 quit\n");
+ app_srv_connection (srv, line, strlen (line));
+}
+
+void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
+{
+ char *buf = NULL;
+ size_t msize = 0;
+ pubsubd_msg_serialize (m, &buf, &msize);
+
+ app_write (p, buf, msize);
+
+ if (buf != NULL) {
+ free (buf);
+ }
+}
+
+void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
+{
+ // read the message from the process
+ size_t mlen = 0;
+ char *buf = NULL;
+ while (buf == NULL || mlen == 0) {
+ app_read (p, &buf, &mlen);
+ }
+
+ pubsubd_msg_unserialize (m, buf, mlen);
+
+ if (buf != NULL) {
+ free (buf);
+ }
+}
diff --git a/lib/pubsub.h b/lib/pubsub.h
new file mode 100644
index 0000000..65aa3ef
--- /dev/null
+++ b/lib/pubsub.h
@@ -0,0 +1,39 @@
+#ifndef __PUBSUB_H__
+#define __PUBSUB_H__
+
+#include "communication.h"
+#include "process.h"
+#include "queue.h"
+
+#define PUBSUB_TYPE_DISCONNECT 1 << 0
+#define PUBSUB_TYPE_INFO 1 << 1
+#define PUBSUB_TYPE_DEBUG 1 << 2
+#define PUBSUB_TYPE_MESSAGE 1 << 3
+
+#define PUBSUB_SERVICE_NAME "pubsub"
+
+struct pubsub_msg;
+
+struct pubsub_msg {
+ unsigned char *chan;
+ size_t chanlen;
+ unsigned char *data;
+ size_t datalen;
+ unsigned char type; // message type : alert, notification, …
+};
+
+void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
+void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
+void pubsubd_msg_free (struct pubsub_msg *msg);
+void pubsubd_msg_print (const struct pubsub_msg *msg);
+
+void pubsub_disconnect (struct process *p);
+void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg);
+void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg);
+
+enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
+
+void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
+void pubsubd_quit (struct service *srv);
+
+#endif
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 013f3cc..c1dab43 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -1,7 +1,164 @@
#include "pubsubd.h"
-#include
-#include // strndup
+// WORKERS: one thread per client
+
+void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); }
+
+struct worker *
+pubsubd_workers_add (struct workers *wrkrs, const struct worker *w)
+{
+ if (wrkrs == NULL || w == NULL) {
+ printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL");
+ return NULL;
+ }
+
+ struct worker *n = malloc (sizeof (struct worker));
+ memset (n, 0, sizeof (struct worker));
+ memcpy (n, w, sizeof (struct worker));
+ if (w->ale != NULL)
+ n->ale = pubsubd_app_list_elm_copy (w->ale);
+
+ LIST_INSERT_HEAD(wrkrs, n, entries);
+
+ return n;
+}
+
+void pubsubd_worker_del (struct workers *wrkrs, struct worker *w)
+{
+ struct worker *todel = pubsubd_worker_get (wrkrs, w);
+ if(todel != NULL) {
+ LIST_REMOVE(todel, entries);
+ pubsubd_worker_free (todel);
+ free (todel);
+ todel = NULL;
+ }
+}
+
+// kill the threads
+void pubsubd_workers_stop (struct workers *wrkrs)
+{
+ if (!wrkrs)
+ return;
+
+ struct worker *w = NULL;
+ struct worker *wtmp = NULL;
+
+ LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) {
+ if (w->thr == NULL)
+ continue;
+
+ pthread_cancel (*w->thr);
+ void *ret = NULL;
+ pthread_join (*w->thr, &ret);
+ if (ret != NULL) {
+ free (ret);
+ }
+ free (w->thr);
+ w->thr = NULL;
+ }
+}
+
+void pubsubd_workers_del_all (struct workers *wrkrs)
+{
+ if (!wrkrs)
+ return;
+
+ struct worker *w = NULL;
+
+ while (!LIST_EMPTY(wrkrs)) {
+ printf ("KILL THE WORKERS : %p\n", w);
+ w = LIST_FIRST(wrkrs);
+ LIST_REMOVE(w, entries);
+ pubsubd_worker_free (w);
+ free (w);
+ w = NULL;
+ }
+}
+
+void pubsubd_worker_free (struct worker * w)
+{
+ if (w == NULL)
+ return;
+ pubsubd_app_list_elm_free (w->ale);
+ free (w->ale);
+ w->ale = NULL;
+}
+
+struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w)
+{
+ struct worker * np = NULL;
+ LIST_FOREACH(np, wrkrs, entries) {
+ if (pubsubd_worker_eq (np, w))
+ return np;
+ }
+ return NULL;
+}
+
+int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2)
+{
+ return w1 == w2; // if it's the same pointer
+}
+
+// a thread for each connected process
+void * pubsubd_worker_thread (void *params)
+{
+ int s = 0;
+
+ s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ if (s != 0)
+ printf ("pthread_setcancelstate: %d\n", s);
+
+ struct worker *w = (struct worker *) params;
+ if (w == NULL) {
+ fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
+ return NULL;
+ }
+
+ struct channels *chans = w->chans;
+ struct channel *chan = w->chan;
+ struct app_list_elm *ale = w->ale;
+
+ // main loop
+ while (1) {
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+
+ pubsubd_msg_recv (ale->p, &m);
+
+ if (m.type == PUBSUB_TYPE_DISCONNECT) {
+ printf ("process %d disconnecting...\n", ale->p->pid);
+ if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) {
+ fprintf (stderr, "err : subscriber not registered\n");
+ }
+ break;
+ }
+ else {
+ struct channel *ch = pubsubd_channel_search (chans, chan->chan);
+ if (ch == NULL) {
+ printf ("CHAN NOT FOUND\n");
+ }
+ else {
+ printf ("what should be sent: ");
+ pubsubd_msg_print (&m);
+ printf ("send the message to:\t");
+ pubsubd_channel_print (ch);
+ pubsubd_msg_send (ch->alh, &m);
+ }
+ }
+ pubsubd_msg_free (&m);
+ }
+
+ pubsubd_app_list_elm_free (ale);
+ free (w->ale);
+ w->ale = NULL;
+
+ free (w->thr);
+ w->thr = NULL;
+
+ pubsubd_worker_del (w->my_workers, w);
+
+ pthread_exit (NULL);
+}
// CHANNELS
@@ -292,118 +449,6 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel)
free (todel->p);
}
-// MESSAGE, TODO CBOR
-
-void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
-{
- if (msg == NULL || data == NULL || len == NULL) {
- fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL");
- return;
- }
-
- // msg: "type(1) chanlen(8) chan datalen(8) data
- if (msg->type == PUBSUB_TYPE_DISCONNECT) {
- *len = 1;
- if (*data != NULL) {
- free (*data);
- *data = NULL;
- }
- *data = malloc(*len);
- memset (*data, 0, *len);
- data[0][0] = msg->type;
- return;
- }
- else {
- // type + size chan + chan + size data + data
- *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
- }
-
- if (*data != NULL) {
- free (*data);
- *data = NULL;
- }
- *data = malloc(*len);
- memset (*data, 0, *len);
-
- size_t i = 0;
-
- data[0][i] = msg->type; i++;
- memcpy (&data[0][i], &msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
- memcpy (&data[0][i], &msg->datalen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (&data[0][i], msg->data, msg->datalen); i += msg->datalen;
-}
-
-void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
-{
- if (msg == NULL) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n");
- return;
- }
-
- if (data == NULL) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
- return;
- }
-
- if (len > BUFSIZ) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
- , len);
- return;
- }
-
- size_t i = 0;
- msg->type = data[i]; i++;
-
- if (msg->type == PUBSUB_TYPE_DISCONNECT) {
- msg->chanlen = 0;
- msg->chan = NULL;
- msg->datalen = 0;
- msg->data = NULL;
- return ;
- }
-
- memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
- if (msg->chanlen > BUFSIZ) {
- fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
- return;
- }
- msg->chan = malloc (msg->chanlen +1);
- memset (msg->chan, 0, msg->chanlen +1);
- memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
-
- memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
- if (msg->datalen > BUFSIZ) {
- fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
- return;
- }
- msg->data = malloc (msg->datalen +1);
- memset (msg->data, 0, msg->datalen +1);
- memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
-}
-
-void pubsubd_msg_free (struct pubsub_msg *msg)
-{
- if (msg == NULL) {
- fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n");
- return;
- }
-
- if (msg->chan) {
- free (msg->chan);
- msg->chan = NULL;
- }
- if (msg->data) {
- free (msg->data);
- msg->data = NULL;
- }
-}
-
-// COMMUNICATION
-
int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c)
{
@@ -540,12 +585,6 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
}
}
-void pubsubd_msg_print (const struct pubsub_msg *msg)
-{
- printf ("msg: type=%d chan=%s, data=%s\n"
- , msg->type, msg->chan, msg->data);
-}
-
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
{
// read the message from the process
@@ -561,108 +600,3 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
free (buf);
}
}
-
-#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit"
-
-// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
-
-char * pubsub_action_to_str (enum app_list_elm_action action)
-{
- switch (action) {
- case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB);
- case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB);
- case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH);
- case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT);
- }
-
- return NULL;
-}
-
-void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame)
-{
- char * straction = NULL;
- straction = pubsub_action_to_str (action);
-
- char line[BUFSIZ];
- memset (line, 0, BUFSIZ);
-
- // line fmt : pid index version action chan
- // "quit" action is also possible (see pubsubd_quit)
- snprintf (line, BUFSIZ, "%d %d %d %s %s\n"
- , p->pid, p->index, p->version
- , straction
- , channame);
- line[BUFSIZ -1] = '\0'; // to be sure
-
- // send the connection line in the $TMP/ pipe
- app_srv_connection (srv, line, strlen (line));
-
- if (straction != NULL)
- free (straction);
-}
-
-void pubsub_disconnect (struct process *p)
-{
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
- m.type = PUBSUB_TYPE_DISCONNECT;
-
- char *buf = NULL;
- size_t msize = 0;
- pubsubd_msg_serialize (&m, &buf, &msize);
-
- int ret = app_write (p, buf, msize);
- if (ret != (int) msize) {
- fprintf (stderr, "err: can't disconnect\n");
- }
-
- pubsubd_msg_free (&m);
- if (buf != NULL) {
- free (buf);
- }
-}
-
-// tell the service to stop
-void pubsubd_quit (struct service *srv)
-{
- // line fmt : 0 0 0 quit
- char line[BUFSIZ];
- snprintf (line, BUFSIZ, "0 0 0 quit\n");
- app_srv_connection (srv, line, strlen (line));
-}
-
-void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
-{
- char *buf = NULL;
- size_t msize = 0;
- pubsubd_msg_serialize (m, &buf, &msize);
-
- app_write (p, buf, msize);
-
- if (buf != NULL) {
- free (buf);
- }
-}
-
-void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
-{
- // read the message from the process
- size_t mlen = 0;
- char *buf = NULL;
- while (buf == NULL || mlen == 0) {
- app_read (p, &buf, &mlen);
- }
-
- pubsubd_msg_unserialize (m, buf, mlen);
-
- if (buf != NULL) {
- free (buf);
- }
-}
-
-// SERVICE
-
-void pubsubd_srv_init ();
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index 99e1826..fad9cd5 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -1,35 +1,13 @@
#ifndef __PUBSUBD_H__
#define __PUBSUBD_H__
-#include "communication.h"
-#include "process.h"
-#include "queue.h"
-
-#define PUBSUB_TYPE_DISCONNECT 1 << 0
-#define PUBSUB_TYPE_INFO 1 << 1
-#define PUBSUB_TYPE_DEBUG 1 << 2
-#define PUBSUB_TYPE_MESSAGE 1 << 3
-
-#define PUBSUB_SERVICE_NAME "pubsub"
+#include "pubsub.h"
+#include
struct channel;
struct channels;
struct app_list_head;
struct app_list_elm;
-struct pubsub_msg;
-
-struct pubsub_msg {
- unsigned char *chan;
- size_t chanlen;
- unsigned char *data;
- size_t datalen;
- unsigned char type; // message type : alert, notification, …
-};
-
-void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
-void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
-void pubsubd_msg_free (struct pubsub_msg *msg);
-void pubsubd_msg_print (const struct pubsub_msg *msg);
// parse pubsubd init msg (sent in TMPDIR/)
//
@@ -37,14 +15,9 @@ void pubsubd_msg_print (const struct pubsub_msg *msg);
// action : quit | pub | sub
int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c);
-int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
-void pubsub_disconnect (struct process *p);
-void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg);
-void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg);
-
// CHANNEL
// head of the list
@@ -88,8 +61,6 @@ pubsubd_channels_search_from_app_list_elm (struct channels *chans
// head of the list
LIST_HEAD(app_list_head, app_list_elm);
-enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
-
// element of the list
struct app_list_elm {
struct process *p;
@@ -113,7 +84,33 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
-void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
void pubsubd_quit (struct service *srv);
+// WORKERS: one thread per client
+
+// head of the list
+LIST_HEAD(workers, worker);
+
+// element of the list
+// worker : process to handle (threaded)
+struct worker {
+ pthread_t *thr;
+ struct workers *my_workers;
+ struct channels *chans;
+ struct channel *chan;
+ struct app_list_elm *ale;
+ LIST_ENTRY(worker) entries;
+};
+
+void pubsubd_worker_free (struct worker * w);
+struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w);
+int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2);
+void pubsubd_workers_init (struct workers *wrkrs);
+void * pubsubd_worker_thread (void *params);
+struct worker *
+pubsubd_workers_add (struct workers *wrkrs, const struct worker *w);
+void pubsubd_workers_del_all (struct workers *wrkrs);
+void pubsubd_workers_stop (struct workers *wrkrs);
+void pubsubd_worker_del (struct workers *wrkrs, struct worker *w);
+
#endif
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index d6d38ba..1616c95 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -1,8 +1,7 @@
#include "../lib/pubsubd.h"
#include
-#include
-#define NB_CLIENTS 3
+struct workers *my_workers;
void
ohshit(int rvalue, const char* str) {
@@ -10,183 +9,6 @@ ohshit(int rvalue, const char* str) {
exit(rvalue);
}
-// head of the list
-LIST_HEAD(workers, worker);
-
-struct workers *my_workers;
-
-// element of the list
-// worker : process to handle (threaded)
-struct worker {
- pthread_t *thr;
- struct channels *chans;
- struct channel *chan;
- struct app_list_elm *ale;
- LIST_ENTRY(worker) entries;
-};
-
-void pubsubd_worker_free (struct worker * w);
-struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w);
-int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2);
-
-void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); }
-
-struct worker *
-pubsubd_workers_add (struct workers *wrkrs, const struct worker *w)
-{
- if (wrkrs == NULL || w == NULL) {
- printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL");
- return NULL;
- }
-
- struct worker *n = malloc (sizeof (struct worker));
- memset (n, 0, sizeof (struct worker));
- memcpy (n, w, sizeof (struct worker));
- if (w->ale != NULL)
- n->ale = pubsubd_app_list_elm_copy (w->ale);
-
- LIST_INSERT_HEAD(wrkrs, n, entries);
-
- return n;
-}
-
-void pubsubd_worker_del (struct workers *wrkrs, struct worker *w)
-{
- struct worker *todel = pubsubd_worker_get (wrkrs, w);
- if(todel != NULL) {
- LIST_REMOVE(todel, entries);
- pubsubd_worker_free (todel);
- free (todel);
- todel = NULL;
- }
-}
-
-// kill the threads
-void pubsubd_workers_stop (struct workers *wrkrs)
-{
- if (!wrkrs)
- return;
-
- struct worker *w = NULL;
- struct worker *wtmp = NULL;
-
- LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) {
- if (w->thr == NULL)
- continue;
-
- pthread_cancel (*w->thr);
- void *ret = NULL;
- pthread_join (*w->thr, &ret);
- if (ret != NULL) {
- free (ret);
- }
- free (w->thr);
- w->thr = NULL;
- }
-}
-
-void pubsubd_workers_del_all (struct workers *wrkrs)
-{
- if (!wrkrs)
- return;
-
- struct worker *w = NULL;
-
- while (!LIST_EMPTY(wrkrs)) {
- printf ("KILL THE WORKERS : %p\n", w);
- w = LIST_FIRST(wrkrs);
- LIST_REMOVE(w, entries);
- pubsubd_worker_free (w);
- free (w);
- w = NULL;
- }
-}
-
-void pubsubd_worker_free (struct worker * w)
-{
- if (w == NULL)
- return;
- pubsubd_app_list_elm_free (w->ale);
- free (w->ale);
- w->ale = NULL;
-}
-
-struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w)
-{
- struct worker * np = NULL;
- LIST_FOREACH(np, wrkrs, entries) {
- if (pubsubd_worker_eq (np, w))
- return np;
- }
- return NULL;
-}
-
-int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2)
-{
- return w1 == w2; // if it's the same pointer
-}
-
-// a thread for each connected process
-void * pubsubd_worker_thread (void *params)
-{
- int s = 0;
-
- s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- if (s != 0)
- printf ("pthread_setcancelstate: %d\n", s);
-
- struct worker *w = (struct worker *) params;
- if (w == NULL) {
- fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
- return NULL;
- }
-
- struct channels *chans = w->chans;
- struct channel *chan = w->chan;
- struct app_list_elm *ale = w->ale;
-
- // main loop
- while (1) {
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
-
- pubsubd_msg_recv (ale->p, &m);
-
- if (m.type == PUBSUB_TYPE_DISCONNECT) {
- printf ("process %d disconnecting...\n", ale->p->pid);
- if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) {
- fprintf (stderr, "err : subscriber not registered\n");
- }
- break;
- }
- else {
- struct channel *ch = pubsubd_channel_search (chans, chan->chan);
- if (ch == NULL) {
- printf ("CHAN NOT FOUND\n");
- }
- else {
- printf ("what should be sent: ");
- pubsubd_msg_print (&m);
- printf ("send the message to:\t");
- pubsubd_channel_print (ch);
- pubsubd_msg_send (ch->alh, &m);
- }
- }
- pubsubd_msg_free (&m);
- }
-
- pubsubd_app_list_elm_free (ale);
- free (w->ale);
- w->ale = NULL;
-
- free (w->thr);
- w->thr = NULL;
-
- pubsubd_worker_del (my_workers, w);
-
- pthread_exit (NULL);
-}
-
int
main(int argc, char **argv, char **env)
{
@@ -208,9 +30,7 @@ main(int argc, char **argv, char **env)
memset (my_workers, 0, sizeof (struct workers));
pubsubd_workers_init (my_workers);
- int i = 0;
- // for (i = 0; i < NB_CLIENTS; i++)
- for (i = 0; ; i++) {
+ while (1) {
// for each new process
struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm));
@@ -231,6 +51,7 @@ main(int argc, char **argv, char **env)
memset (w->thr, 0, sizeof (pthread_t));
w->ale = pubsubd_app_list_elm_copy (&ale);
w->chans = &chans;
+ w->my_workers = my_workers;
w->chan = chan;
struct worker *wtmp = pubsubd_workers_add (my_workers, w);
pubsubd_worker_free (w);