From 189b704d1c06fd0c86eb28eabb777fbb9fb093a9 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Thu, 19 Jan 2017 22:07:52 +0100
Subject: [PATCH] pubsub seems fine
---
core-test/app/communication-server.c | 2 +-
core-test/app/usock-server.c | 1 +
core/communication.c | 40 +-
core/process.c | 11 +-
core/process.h | 17 +-
core/pubsub.c | 191 -----
core/pubsub.h | 44 --
core/usocket.c | 4 +-
pong/app/pongd.c | 10 +-
pubsub/app/Makefile | 6 +-
pubsub/app/msg-serialize.c | 48 --
pubsub/app/msg-unserialize.c | 36 -
pubsub/app/pubsubc.c | 159 ++--
pubsub/app/pubsubd.c | 96 +--
pubsub/app/test-chan-lists.c | 99 ---
pubsub/app/test-gen-new-process.c | 53 --
pubsub/app/test-gen-new-process.sh | 25 -
pubsub/app/test-pipe-read.c | 57 --
pubsub/app/test-pipe-write.c | 57 --
pubsub/app/test-subscribers.c | 81 --
pubsub/app/test-test-send-params.sh | 38 -
pubsub/lib/channels.c | 195 +++++
pubsub/lib/channels.h | 46 ++
pubsub/lib/msg.c | 132 ++++
pubsub/lib/msg.h | 21 +
pubsub/lib/pubsub.c | 101 +++
pubsub/lib/pubsub.h | 26 +
pubsub/lib/pubsubd.c | 744 ++++--------------
pubsub/lib/pubsubd.h | 117 +--
pubsub/lib/workers.c | 168 ++++
pubsub/lib/workers.h | 32 +
pubsub/tests/Makefile | 25 +
pubsub/tests/channels.c | 152 ++++
pubsub/tests/msg.c | 56 ++
.../{app => tests}/pubsub-test-send-params.c | 4 +
pubsub/{app => tests}/pubsub-test-send.c | 4 +
36 files changed, 1289 insertions(+), 1609 deletions(-)
delete mode 100644 core/pubsub.c
delete mode 100644 core/pubsub.h
delete mode 100644 pubsub/app/msg-serialize.c
delete mode 100644 pubsub/app/msg-unserialize.c
delete mode 100644 pubsub/app/test-chan-lists.c
delete mode 100644 pubsub/app/test-gen-new-process.c
delete mode 100755 pubsub/app/test-gen-new-process.sh
delete mode 100644 pubsub/app/test-pipe-read.c
delete mode 100644 pubsub/app/test-pipe-write.c
delete mode 100644 pubsub/app/test-subscribers.c
delete mode 100755 pubsub/app/test-test-send-params.sh
create mode 100644 pubsub/lib/channels.c
create mode 100644 pubsub/lib/channels.h
create mode 100644 pubsub/lib/msg.c
create mode 100644 pubsub/lib/msg.h
create mode 100644 pubsub/lib/pubsub.c
create mode 100644 pubsub/lib/pubsub.h
create mode 100644 pubsub/lib/workers.c
create mode 100644 pubsub/lib/workers.h
create mode 100644 pubsub/tests/Makefile
create mode 100644 pubsub/tests/channels.c
create mode 100644 pubsub/tests/msg.c
rename pubsub/{app => tests}/pubsub-test-send-params.c (98%)
rename pubsub/{app => tests}/pubsub-test-send.c (97%)
diff --git a/core-test/app/communication-server.c b/core-test/app/communication-server.c
index 28659c9..d29d40a 100644
--- a/core-test/app/communication-server.c
+++ b/core-test/app/communication-server.c
@@ -52,7 +52,7 @@ int main (int argc, char *argv[], char *env[])
handle_err("main", "srv_read < 0");
return EXIT_FAILURE;
}
- if (m.type == MSG_TYPE_DIS) {
+ if (m.type == MSG_TYPE_CLOSE) {
printf ("the client quits\n");
}
else {
diff --git a/core-test/app/usock-server.c b/core-test/app/usock-server.c
index cf8b17a..2d4328b 100644
--- a/core-test/app/usock-server.c
+++ b/core-test/app/usock-server.c
@@ -57,6 +57,7 @@ int main (int argc, char * argv[])
return EXIT_FAILURE;
}
+
if (usock_close (pfd) < 0) {
handle_err("main", "usock_close pfd < 0");
return EXIT_FAILURE;
diff --git a/core/communication.c b/core/communication.c
index a097455..b306ba1 100644
--- a/core/communication.c
+++ b/core/communication.c
@@ -146,14 +146,6 @@ int app_close (struct service *srv)
handle_err ("app_close", "msg_write < 0");
}
- // if (msg_read (srv->service_fd, &m) < 0) {
- // handle_err ("app_close", "msg_read < 0");
- // }
-
- // if (m.type != MSG_TYPE_ACK) {
- // handle_err ("app_close", "msg received != ACK");
- // }
-
return usock_close (srv->service_fd);
}
@@ -168,6 +160,22 @@ int app_write (struct service *srv, const struct msg *m)
}
+/*calculer le max filedescriptor*/
+int getMaxFd(struct array_proc *ap)
+{
+
+ int i;
+ int max = 0;
+
+ for (i = 0; i < ap->size; i++ ) {
+ if (ap->tab_proc[i]->proc_fd > max) {
+ max = ap->tab_proc[i]->proc_fd;
+ }
+ }
+
+ return max;
+}
+
/*
* srv_select prend en parametre
* * un tableau de process qu'on écoute
@@ -247,19 +255,3 @@ int srv_select (struct array_proc *ap, struct service *srv
return APPLICATION;
return CONNECTION;
}
-
-/*calculer le max filedescriptor*/
-int getMaxFd(struct array_proc *ap)
-{
-
- int i;
- int max = 0;
-
- for (i = 0; i < ap->size; i++ ) {
- if (ap->tab_proc[i]->proc_fd > max) {
- max = ap->tab_proc[i]->proc_fd;
- }
- }
-
- return max;
-}
diff --git a/core/process.c b/core/process.c
index 4e2f7df..c8b2e54 100644
--- a/core/process.c
+++ b/core/process.c
@@ -3,10 +3,8 @@
#include
#include
-// TODO
-// tout revoir ici
+#include
-#if 0
struct process * srv_process_copy (const struct process *p)
{
if (p == NULL)
@@ -20,7 +18,8 @@ struct process * srv_process_copy (const struct process *p)
int srv_process_eq (const struct process *p1, const struct process *p2)
{
- return (p1->version == p2->version && p1->index == p2->index);
+ return (p1->version == p2->version && p1->index == p2->index
+ && p1->proc_fd == p2->proc_fd);
}
void srv_process_gen (struct process *p
@@ -30,9 +29,6 @@ void srv_process_gen (struct process *p
p->index = index;
}
-
-#endif
-
int add_proc (struct array_proc *aproc, struct process *p)
{
assert(aproc != NULL);
@@ -52,7 +48,6 @@ int add_proc (struct array_proc *aproc, struct process *p)
int del_proc (struct array_proc *aproc, struct process *p)
{
assert(aproc != NULL);
- assert(aproc->tab_proc != NULL);
assert(p != NULL);
if (aproc->tab_proc == NULL) {
diff --git a/core/process.h b/core/process.h
index cf65f45..5493941 100644
--- a/core/process.h
+++ b/core/process.h
@@ -13,31 +13,16 @@ struct array_proc {
};
int add_proc (struct array_proc *, struct process *);
-
int del_proc (struct array_proc *aproc, struct process *p);
void array_proc_print (struct array_proc *);
void array_proc_free (struct array_proc *);
-#if 0
-
-#include
-#include
-#include
-#include
-
-#include
-
-
struct process * srv_process_copy (const struct process *p);
-
int srv_process_eq (const struct process *p1, const struct process *p2);
-
// create the service process structure
void srv_process_gen (struct process *p
, unsigned int index, unsigned int version);
-void srv_process_print (struct process *);
-
-#endif
+void process_print (struct process *);
#endif
diff --git a/core/pubsub.c b/core/pubsub.c
deleted file mode 100644
index 14c052f..0000000
--- a/core/pubsub.c
+++ /dev/null
@@ -1,191 +0,0 @@
-#include "pubsub.h"
-#include
-
-#if 0
-
-#include // strndup
-
-void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
-{
- if (msg == NULL || data == NULL || len == NULL) {
- fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL");
- return;
- }
-
- /* Preallocate the map structure */
- cbor_item_t * root = cbor_new_definite_map(1);
- /* Add the content */
- cbor_map_add(root, (struct cbor_pair) {
- .key = cbor_move(cbor_build_uint8((unsigned char) msg->type)),
- .value = cbor_move(cbor_build_bytestring((unsigned char*) msg->data, msg->datalen))
- });
-
- size_t buffer_size;
- *len = cbor_serialize_alloc (root, (unsigned char **) data, &buffer_size);
- cbor_decref(&root);
-}
-
-void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
-{
- if (msg == NULL) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n");
- return;
- }
-
- if (buf == NULL) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, buf NULL\033[00m\n");
- return;
- }
-
- if (mlen > BUFSIZ) {
- fprintf (stderr
- , "\033[31merr: pubsubd_msg_unserialize, mlen %ld\033[00m\n"
- , mlen);
- return;
- }
-
- // CBOR reading, from buf to pubsub_msg structure
- struct cbor_load_result result;
- cbor_item_t * item = cbor_load ((unsigned char *) buf, mlen, &result);
-
- struct cbor_pair * pair = cbor_map_handle (item);
- cbor_mutable_data data = cbor_bytestring_handle (pair->value);
-
- msg->type = cbor_get_uint8 (pair->key);
- if (msg->type != PUBSUB_TYPE_DISCONNECT) {
- msg->datalen = cbor_bytestring_length (pair->value);
- msg->data = malloc (msg->datalen +1);
- memset (msg->data, 0, msg->datalen +1);
- memcpy (msg->data, data, msg->datalen);
- }
-
- /* Deallocate the result */
- cbor_decref (&item);
-}
-
-void pubsubd_msg_free (struct pubsub_msg *msg)
-{
- if (msg == NULL) {
- fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n");
- return;
- }
-
- if (msg->chan) {
- free (msg->chan);
- msg->chan = NULL;
- }
- if (msg->data) {
- free (msg->data);
- msg->data = NULL;
- }
-}
-
-void pubsubd_msg_print (const struct pubsub_msg *msg)
-{
- printf ("msg: type=%d chan=%s, data=%s\n"
- , msg->type, msg->chan, msg->data);
-}
-
-#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both"
-#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit"
-
-// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
-
-char * pubsub_action_to_str (enum app_list_elm_action action)
-{
- switch (action) {
- case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB);
- case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB);
- case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH);
- case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT);
- }
-
- return NULL;
-}
-
-void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame)
-{
- char * straction = NULL;
- straction = pubsub_action_to_str (action);
-
- char line[BUFSIZ];
- memset (line, 0, BUFSIZ);
-
- // line fmt : index version action chan
- // "quit" action is also possible (see pubsubd_quit)
- snprintf (line, BUFSIZ, "%d %d %s %s\n"
- , p->index
- , p->version
- , straction
- , channame);
- line[BUFSIZ -1] = '\0'; // to be sure
-
- // send the connection line in the $TMP/ pipe
- app_srv_connection (srv, line, strlen (line));
-
- if (straction != NULL)
- free (straction);
-}
-
-void pubsub_disconnect (struct process *p)
-{
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
- m.type = PUBSUB_TYPE_DISCONNECT;
-
- char *buf = NULL;
- size_t msize = 0;
- pubsubd_msg_serialize (&m, &buf, &msize);
-
- int ret = app_write (p, buf, msize);
- if (ret != (int) msize) {
- fprintf (stderr, "err: can't disconnect\n");
- }
-
- pubsubd_msg_free (&m);
- if (buf != NULL) {
- free (buf);
- }
-}
-
-// tell the service to stop
-void pubsubd_quit (struct service *srv)
-{
- // line fmt : 0 0 0 quit
- char line[BUFSIZ];
- snprintf (line, BUFSIZ, "0 0 0 quit\n");
- app_srv_connection (srv, line, strlen (line));
-}
-
-void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
-{
- size_t msize = 0;
- char * buf = NULL;
- pubsubd_msg_serialize (m, &buf, &msize);
-
- app_write (p, buf, msize);
-
- free(buf);
-}
-
-void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
-{
- // read the message from the process
- size_t mlen = 0;
- char *buf = NULL;
- while (buf == NULL || mlen == 0) {
- app_read (p, &buf, &mlen);
- }
-
- pubsubd_msg_unserialize (m, buf, mlen);
-
- if (buf != NULL) {
- free (buf);
- }
-
-}
-#endif
diff --git a/core/pubsub.h b/core/pubsub.h
deleted file mode 100644
index 54c6d49..0000000
--- a/core/pubsub.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#ifndef __PUBSUB_H__
-#define __PUBSUB_H__
-
-#include "communication.h"
-#include "process.h"
-#include "queue.h"
-
-#if 0
-
-#define PUBSUB_TYPE_DISCONNECT 0
-#define PUBSUB_TYPE_MESSAGE 1
-#define PUBSUB_TYPE_ERROR 2
-#define PUBSUB_TYPE_DEBUG 4
-#define PUBSUB_TYPE_INFO 128
-
-#define PUBSUB_SERVICE_NAME "pubsub"
-
-struct pubsub_msg;
-
-struct pubsub_msg {
- unsigned char *chan;
- size_t chanlen;
- unsigned char *data;
- size_t datalen;
- unsigned char type; // message type : alert, notification, …
-};
-
-void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
-void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
-void pubsubd_msg_free (struct pubsub_msg *msg);
-void pubsubd_msg_print (const struct pubsub_msg *msg);
-
-void pubsub_disconnect (struct process *p);
-void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg);
-void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg);
-
-enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
-
-void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
-void pubsubd_quit (struct service *srv);
-
-#endif
-
-#endif
diff --git a/core/usocket.c b/core/usocket.c
index 908d79e..c5deb2c 100644
--- a/core/usocket.c
+++ b/core/usocket.c
@@ -13,12 +13,12 @@ int usock_send (const int fd, const char *buf, const int msize)
// print_hexa ("msg send", (unsigned char *)buf, msize);
// fflush(stdout);
- int ret = 0;
+ ssize_t ret = 0;
//printf ("%ld bytes to write\n", msize);
ret = send (fd, buf, msize, 0);
if (ret <= 0)
handle_err ("usock_send", "send ret <= 0");
- return ret;
+ return (int) ret;
}
int usock_recv (const int fd, char **buf, size_t *msize)
diff --git a/pong/app/pongd.c b/pong/app/pongd.c
index de9adcf..44715f2 100644
--- a/pong/app/pongd.c
+++ b/pong/app/pongd.c
@@ -41,14 +41,14 @@ void handle_new_msg (struct array_proc *ap, struct array_proc *proc_to_read)
}
// close the process then delete it from the process array
- if (m.type == MSG_TYPE_DIS) {
+ if (m.type == MSG_TYPE_CLOSE) {
cpt--;
printf ("disconnection => %d client(s) remaining\n", cpt);
if (srv_close_proc (proc_to_read->tab_proc[i]) < 0)
- handle_error( "srv_close_proc < 0");
+ handle_err( "handle_new_msg", "srv_close_proc < 0");
if (del_proc (ap, proc_to_read->tab_proc[i]) < 0)
- handle_error( "del_proc < 0");
+ handle_err( "handle_new_msg", "del_proc < 0");
if (del_proc (proc_to_read, proc_to_read->tab_proc[i]) < 0)
handle_err( "handle_new_msg", "del_proc < 0");
i--;
@@ -57,7 +57,7 @@ void handle_new_msg (struct array_proc *ap, struct array_proc *proc_to_read)
printf ("new message : %s", m.val);
if (srv_write (proc_to_read->tab_proc[i], &m) < 0) {
- handle_error("srv_write < 0");
+ handle_err( "handle_new_msg", "srv_write < 0");
}
}
}
@@ -67,7 +67,7 @@ void handle_new_msg (struct array_proc *ap, struct array_proc *proc_to_read)
*
* accept new application connections
* read a message and send it back
- * close a connection if MSG_TYPE_DIS received
+ * close a connection if MSG_TYPE_CLOSE received
*/
void main_loop (struct service *srv)
diff --git a/pubsub/app/Makefile b/pubsub/app/Makefile
index f6276d4..8ea2a9e 100644
--- a/pubsub/app/Makefile
+++ b/pubsub/app/Makefile
@@ -3,20 +3,20 @@ CFLAGS=-Wall -g -Wextra
LDFLAGS= -pthread
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
EXEC=$(basename $(wildcard *.c))
-SOURCES=$(wildcard ../lib/*.c ../../lib/*.c)
+SOURCES=$(wildcard ../lib/*.c ../../core/*.c)
OBJECTS=$(SOURCES:.c=.o)
TESTS=$(addsuffix .test, $(EXEC))
all: $(SOURCES) $(EXEC)
$(EXEC): $(OBJECTS) $(CFILES)
- $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -lcbor -o $@.bin
+ $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@.bin
.c.o:
$(CC) -c $(CFLAGS) $< -o $@
$(TESTS):
- valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@)
+ valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@).bin
clean:
@-rm $(OBJECTS)
diff --git a/pubsub/app/msg-serialize.c b/pubsub/app/msg-serialize.c
deleted file mode 100644
index bcaa165..0000000
--- a/pubsub/app/msg-serialize.c
+++ /dev/null
@@ -1,48 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-#include
-
-#define MESSAGE "coucou"
-#define CHAN "chan1"
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
- exit (rvalue);
-}
-
-void usage (char **argv)
-{
- printf ( "usage: %s\n", argv[0]);
-}
-
-int
-main(int argc, char **argv)
-{
- if (argc != 1) {
- usage (argv);
- exit (1);
- }
-
- struct pubsub_msg msg;
- memset (&msg, 0, sizeof (struct pubsub_msg));
- msg.type = PUBSUB_TYPE_MESSAGE;
- msg.chan = malloc (strlen (CHAN) + 1);
- strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
- msg.chanlen = strlen (CHAN) + 1;
-
- msg.data = malloc (strlen (MESSAGE) + 1);
- strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
- msg.datalen = strlen (MESSAGE) + 1;
-
- char *data = NULL;
- size_t len = 0;
- pubsubd_msg_serialize (&msg, &data, &len);
- pubsubd_msg_free (&msg);
-
- if ((int) len != write (1, data, len)) {
- ohshit (1, "unable to write the data");
- }
-
- return EXIT_SUCCESS;
-}
diff --git a/pubsub/app/msg-unserialize.c b/pubsub/app/msg-unserialize.c
deleted file mode 100644
index a4dad4f..0000000
--- a/pubsub/app/msg-unserialize.c
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-#include
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
- exit (rvalue);
-}
-
-void usage (char **argv)
-{
- printf ( "usage: cat msg | %s\n", argv[0]);
-}
-
-int
-main(int argc, char **argv)
-{
-
- if (argc != 1) {
- usage (argv);
- exit (1);
- }
-
- char data[BUFSIZ];
- memset (data, 0, BUFSIZ);
- size_t len = read (0, data, BUFSIZ);
- printf ("msg len %ld\n", len);
-
- struct pubsub_msg msg;
- pubsubd_msg_unserialize (&msg, data, len);
- pubsubd_msg_print (&msg);
- pubsubd_msg_free (&msg);
-
- return EXIT_SUCCESS;
-}
diff --git a/pubsub/app/pubsubc.c b/pubsub/app/pubsubc.c
index a8d52ca..f6a6953 100644
--- a/pubsub/app/pubsubc.c
+++ b/pubsub/app/pubsubc.c
@@ -1,16 +1,16 @@
+// int main(void) { return 0; }
+
+// TODO: select on service + input instead of threads
+
+#include "../../core/error.h"
+#include "../lib/pubsub.h"
#include "../lib/pubsubd.h"
#include
#include
+#include
-void
-ohshit(int rvalue, const char* str) {
- fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
- exit (rvalue);
-}
-
-void usage (char **argv)
-{
- printf ( "usage: %s\n", argv[0]);
+void usage (char **argv) {
+ printf ( "usage: %s [chan [pub]]\n", argv[0]);
}
void print_cmd (void) {
@@ -21,14 +21,15 @@ void print_cmd (void) {
void * listener (void *params)
{
int s = 0;
- s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- if (s != 0)
- printf ("pthread_setcancelstate: %d\n", s);
+ s = pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
+ if (s != 0) {
+ handle_err ("listener", "pthread_setcancelstate != 0");
+ }
- struct process *p = NULL;
- p = (struct process *) params;
- if (p == NULL) {
- fprintf (stderr, "listener: no process\n");
+ struct service *srv = NULL;
+ srv = (struct service *) params;
+ if (srv == NULL) {
+ handle_err ("listener", "no service passed");
return NULL;
}
@@ -37,69 +38,70 @@ void * listener (void *params)
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
- pubsub_msg_recv (p, &m);
- printf ("\n\033[31m>\033[00m %s\n", m.data);
+ pubsub_msg_recv (srv, &m);
+ printf ("\r\033[31m>\033[00m %s\n", m.data);
print_cmd ();
- // if (m.type == PUBSUB_TYPE_DISCONNECT) { }
- pubsubd_msg_free (&m);
+ pubsub_msg_free (&m);
}
pthread_exit (NULL);
}
+void chan_sub (struct service *srv, char *chan)
+{
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+
+ // meta data on the message
+ msg.type = PUBSUB_MSG_TYPE_SUB;
+ msg.chanlen = strlen (chan) + 1;
+ msg.chan = malloc (msg.chanlen);
+ memset (msg.chan, 0, msg.chanlen);
+ strncpy (msg.chan, chan, msg.chanlen);
+ msg.chan[strlen (chan)] = '\0';
+
+ pubsub_msg_send (srv, &msg);
+ printf ("subscribed to %s\n", chan);
+
+ pubsub_msg_free (&msg);
+}
void main_loop (int argc, char **argv, char **env
- , pid_t pid, int index, int version
+ , int index, int version
, char *cmd, char *chan)
{
- printf ("connection : pid %d index %d version %d "
+ printf ("connection to pubsubd: index %d version %d "
"cmd %s chan %s\n"
- , pid, index, version, cmd, chan );
+ , index, version, cmd, chan );
struct service srv;
memset (&srv, 0, sizeof (struct service));
- srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
- printf ("Writing on %s.\n", srv.spath);
+ pubsub_connection (argc, argv, env, &srv);
+ printf ("connected\n");
- struct process p;
- memset (&p, 0, sizeof (struct process));
-
- printf ("app creation\n");
- if (app_create (&p, pid, index, version)) // called by the application
- ohshit (1, "app_create");
+ if (strncmp (cmd, "sub", 3) == 0) {
+ chan_sub (&srv, chan);
+ }
pthread_t thr;
memset (&thr, 0, sizeof (pthread_t));
- pthread_create (&thr, NULL, listener, &p);
+ pthread_create (&thr, NULL, listener, &srv);
pthread_detach (thr);
printf ("main_loop\n");
- // send a message to warn the service we want to do something
- // line : pid index version action chan
- enum app_list_elm_action action = PUBSUB_BOTH;
-
- if (strncmp (cmd, "pub", 3) == 0) {
- action = PUBSUB_PUB;
- }
- else if (strncmp (cmd, "sub", 3) == 0) {
- action = PUBSUB_SUB;
- }
-
- pubsub_connection (&srv, &p, action, chan);
struct pubsub_msg msg;
memset (&msg, 0, sizeof (struct pubsub_msg));
-
// meta data on the message
- msg.type = PUBSUB_TYPE_MESSAGE;
- msg.chan = malloc (strlen (chan) + 1);
- memset (msg.chan, 0, strlen (chan) + 1);
- strncpy ((char *) msg.chan, chan, strlen (chan));
+ msg.type = PUBSUB_MSG_TYPE_PUB;
+ msg.chanlen = strlen (chan) + 1;
+ msg.chan = malloc (msg.chanlen);
+ memset (msg.chan, 0, msg.chanlen);
+ strncpy ((char *) msg.chan, chan, msg.chanlen);
msg.chan[strlen (chan)] = '\0';
- msg.chanlen = strlen (chan);
// msg loop
for (;;) {
@@ -110,63 +112,62 @@ void main_loop (int argc, char **argv, char **env
size_t mlen = read (0, buf, BUFSIZ);
+ // remove \n
if (mlen > 1) {
mlen--;
}
buf[mlen] = '\0';
- // TODO debug
- // printf ("data (%ld): %s\n", mlen, buf);
-
if (strncmp(buf, "quit", strlen ("quit")) == 0) {
break;
}
- msg.data = malloc (strlen (buf) + 1);
- memset (msg.data, 0, strlen (buf) + 1);
- strncpy ((char *) msg.data, buf, strlen (buf) + 1);
- msg.datalen = strlen (buf);
+ msg.datalen = strlen (buf) + 1;
+ msg.data = malloc (msg.datalen);
+ memset (msg.data, 0, msg.datalen);
+ strncpy ((char *) msg.data, buf, msg.datalen);
+ msg.data[strlen(buf)] = '\0';
- // TODO debug
- // printf ("send message\n");
- pubsub_msg_send (&p, &msg);
+ pubsub_msg_send (&srv, &msg);
free (msg.data);
msg.data = NULL;
msg.datalen = 0;
}
// free everything
- pubsubd_msg_free (&msg);
+ pubsub_msg_free (&msg);
pthread_cancel (thr);
pthread_join (thr, NULL);
printf ("disconnection...\n");
// disconnect from the server
- pubsub_disconnect (&p);
-
- printf ("destroying app\n");
- // the application will shut down, and remove the application named pipes
- if (app_destroy (&p))
- ohshit (1, "app_destroy");
+ pubsub_disconnect (&srv);
}
int main(int argc, char **argv, char **env)
{
-
- if (argc != 1) {
- usage (argv);
- exit (1);
- }
-
- pid_t pid = getpid();
- int index = 0;
- // don't care about the version
- int version = COMMUNICATION_VERSION;
- char *cmd = "both";
+ char *cmd = "sub";
char *chan = "chan1";
- main_loop (argc, argv, env, pid, index, version, cmd, chan);
+ if (argc == 2 && strncmp("-h", argv[1], 2) == 0) {
+ usage (argv);
+ exit (0);
+ }
+
+ if (argc >= 2) {
+ chan = argv[1];
+ }
+
+ if (argc >= 3) {
+ cmd = argv[2];
+ }
+
+ int index = 0;
+ // don't care about the version
+ int version = 0;
+
+ main_loop (argc, argv, env, index, version, cmd, chan);
return EXIT_SUCCESS;
}
diff --git a/pubsub/app/pubsubd.c b/pubsub/app/pubsubd.c
index 1d484b0..fff1552 100644
--- a/pubsub/app/pubsubd.c
+++ b/pubsub/app/pubsubd.c
@@ -1,82 +1,60 @@
+#include "../../core/communication.h"
+#include "../../core/process.h"
+#include "../../core/error.h"
#include "../lib/pubsubd.h"
#include
-struct workers *my_workers;
+#include
+#include
+#include
+#include
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
+// to quit them properly if a signal occurs
+struct service srv;
+struct channels chans;
+
+void handle_signal (int signalnumber)
+{
+ // the application will shut down, and remove the service named pipe
+ if (srv_close (&srv) < 0) {
+ handle_error("srv_close < 0");
+ }
+
+ pubsubd_channels_del_all (&chans);
+
+ fprintf (stderr, "received a signal %d\n", signalnumber);
+ exit (EXIT_SUCCESS);
}
int
main(int argc, char **argv, char **env)
{
- struct service srv;
memset (&srv, 0, sizeof (struct service));
- srv->index = 0;
- srv->version = 0;
- srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
- printf ("Listening on %s.\n", srv.spath);
+ srv.index = 0;
+ srv.version = 0;
- // creates the service named pipe, that listens to client applications
- if (srv_create (&srv))
- ohshit(1, "service_create error");
+ signal(SIGHUP, handle_signal);
+ signal(SIGINT, handle_signal);
+ signal(SIGQUIT, handle_signal);
- // init chans list
- struct channels chans;
memset (&chans, 0, sizeof (struct channels));
pubsubd_channels_init (&chans);
- my_workers = malloc (sizeof (struct workers));
- memset (my_workers, 0, sizeof (struct workers));
- pubsubd_workers_init (my_workers);
-
- while (1) {
- // for each new process
- struct app_list_elm ale;
- memset (&ale, 0, sizeof (struct app_list_elm));
- struct channel *chan = NULL;
- pubsubd_get_new_process (srv.spath, &ale, &chans, &chan);
- pubsubd_channels_print (&chans);
-
- // end the application
- if (ale.action == PUBSUB_QUIT) {
- pubsubd_app_list_elm_free (&ale);
- break;
- }
-
- // thread to handle multiple clients at a time
- struct worker *w = NULL;
- w = malloc (sizeof (struct worker));
- w->thr = malloc (sizeof (pthread_t));
- memset (w->thr, 0, sizeof (pthread_t));
- w->ale = pubsubd_app_list_elm_copy (&ale);
- w->chans = &chans;
- w->my_workers = my_workers;
- w->chan = chan;
- struct worker *wtmp = pubsubd_workers_add (my_workers, w);
- pubsubd_worker_free (w);
- free (w);
- w = wtmp;
-
- pthread_create (w->thr, NULL, pubsubd_worker_thread, w);
- pthread_detach (*w->thr);
-
- pubsubd_app_list_elm_free (&ale);
+ if (srv_init (argc, argv, env, &srv, PUBSUBD_SERVICE_NAME) < 0) {
+ handle_error("srv_init < 0");
+ return EXIT_FAILURE;
}
+ printf ("Listening on %s.\n", srv.spath);
- printf ("Quitting ...\n");
+ printf("MAIN: server created\n" );
- pubsubd_workers_stop (my_workers);
- pubsubd_channels_del_all (&chans);
- pubsubd_workers_del_all (my_workers);
-
- free (my_workers);
+ // the service will loop until the end of time, a specific message, a signal
+ pubsubd_main_loop (&srv, &chans);
// the application will shut down, and remove the service named pipe
- if (srv_close (&srv))
- ohshit (1, "srv_close error");
+ if (srv_close (&srv) < 0) {
+ handle_error("srv_close < 0");
+ }
return EXIT_SUCCESS;
}
diff --git a/pubsub/app/test-chan-lists.c b/pubsub/app/test-chan-lists.c
deleted file mode 100644
index f69fef5..0000000
--- a/pubsub/app/test-chan-lists.c
+++ /dev/null
@@ -1,99 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-
-#define TEST_NAME "test-chan-lists"
-
-#define CHAN1 "coucou"
-#define CHAN2 "salut"
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
-}
-
-int
-main(int argc, char **argv, char **env)
-{
- struct service srv;
- memset (&srv, 0, sizeof (struct service));
- srv->index = 0;
- srv->version = 0;
- srv_init (argc, argv, env, &srv, TEST_NAME, NULL);
- printf ("Listening on %s.\n", srv.spath);
-
- // creates the service named pipe, that listens to client applications
- if (srv_create (&srv))
- ohshit(1, "service_create error");
-
- // init chans list
- struct channels chans;
- memset (&chans, 0, sizeof (struct channels));
- pubsubd_channels_init (&chans);
-
- // for each new process
- // struct app_list_elm ale1;
- // memset (&ale1, 0, sizeof (struct app_list_elm));
-
- // FIRST CHAN TO BE ADDED
- // search for the chan in channels, add it if not found
- struct channel *new_chan = NULL;
- new_chan = pubsubd_channel_search (&chans, CHAN1);
- if (new_chan == NULL) {
- new_chan = pubsubd_channels_add (&chans, CHAN1);
- pubsubd_subscriber_init (&new_chan->alh);
- }
- else {
- ohshit (2, "error : new chan, can't be found in channels yet");
- }
-
- printf ("print the channels, 1 chan\n");
- printf ("--\n");
- pubsubd_channels_print (&chans);
- printf ("--\n");
-
- // SAME CHAN, SHOULD NOT BE ADDED
- // search for the chan in channels, add it if not found
- new_chan = pubsubd_channel_search (&chans, CHAN1);
- if (new_chan == NULL) {
- ohshit (3, "error : same chan, shouldn't be added in channels");
- }
- else {
- printf ("already in the 'channels' structure\n");
- }
-
- printf ("print the channels, 1 chan\n");
- printf ("--\n");
- pubsubd_channels_print (&chans);
- printf ("--\n");
-
- // NEW CHAN, SHOULD BE ADDED
- // search for the chan in channels, add it if not found
- new_chan = pubsubd_channel_search (&chans, CHAN2);
- if (new_chan == NULL) {
- new_chan = pubsubd_channels_add (&chans, CHAN2);
- pubsubd_subscriber_init (&new_chan->alh);
- }
- else {
- ohshit (4, "error : new chan, should be added in channels");
- }
-
- printf ("print the channels, 2 chans\n");
- printf ("--\n");
- pubsubd_channels_print (&chans);
- printf ("--\n");
-
- // end the application
- pubsubd_channels_del_all (&chans);
-
- printf ("\nshould be empty now\n");
- printf ("--\n");
- pubsubd_channels_print (&chans);
- printf ("--\n");
-
- // the application will shut down, and remove the service named pipe
- if (srv_close (&srv))
- ohshit (1, "service_close error");
-
- return EXIT_SUCCESS;
-}
diff --git a/pubsub/app/test-gen-new-process.c b/pubsub/app/test-gen-new-process.c
deleted file mode 100644
index afb70df..0000000
--- a/pubsub/app/test-gen-new-process.c
+++ /dev/null
@@ -1,53 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-#include
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
-}
-
-void usage (char **argv) {
- fprintf (stderr, "usage: %s path\n", argv[0]);
- exit (1);
-}
-
-int
-main(int argc, char **argv)
-{
- if (argc != 2) {
- usage (argv);
- }
-
- char *spath = argv[1];
-
- printf ("Listening on %s.\n", spath);
-
- struct channels chans;
- memset (&chans, 0, sizeof (struct channels));
-
- for (int nb = 2, i = 0 ; nb > 0; i++, nb--) {
- struct app_list_elm ale;
- memset (&ale, 0, sizeof (struct app_list_elm));
-
- struct channel chan;
- memset (&chan, 0, sizeof (struct channel));
- struct channel *c = &chan;
-
- pubsubd_get_new_process (spath, &ale, &chans, &c);
-
- printf ("print the channels\n");
- printf ("--\n");
- pubsubd_channels_print (&chans);
- printf ("--\n");
- printf ("still %d remaining processes\n", nb);
-
- pubsubd_app_list_elm_free (&ale);
- }
-
- pubsubd_channels_del_all (&chans);
-
- return EXIT_SUCCESS;
-}
-
diff --git a/pubsub/app/test-gen-new-process.sh b/pubsub/app/test-gen-new-process.sh
deleted file mode 100755
index fa5657a..0000000
--- a/pubsub/app/test-gen-new-process.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-
-REP=/tmp/ipc
-SERVICE=gen
-#SERVICE=pubsub
-NB=10
-
-if [ $# -ge 1 ] ; then
- SERVICE=$1
- shift
-fi
-
-if [ $# -ge 1 ] ; then
- NB=$1
- shift
-fi
-
-for i in $(seq 1 ${NB})
-do
- mkfifo ${REP}/${i}-1-1-in
- mkfifo ${REP}/${i}-1-1-out
-
- echo "${i} 1 1 both chan1" > ${REP}/${SERVICE}
- sleep 0.1
-done
diff --git a/pubsub/app/test-pipe-read.c b/pubsub/app/test-pipe-read.c
deleted file mode 100644
index a9c330e..0000000
--- a/pubsub/app/test-pipe-read.c
+++ /dev/null
@@ -1,57 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-
-#define TEST_NAME "test-chan-lists"
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
-}
-
-void usage (char **argv) {
- fprintf (stderr, "usage: %s path times\n", argv[0]);
- fprintf (stderr, "ex: %s /tmp/pipe 5 => you will read 5 times\n", argv[0]);
- exit (1);
-}
-
-int
-main(int argc, char **argv)
-{
-
- if (argc != 3)
- usage (argv);
-
- char *path = argv[1];
- int nb = atoi (argv[2]);
-
- printf ("Listening on %s %d times.\n", path, nb);
-
- char *buf = NULL;
- size_t msize = 0;
-
- int ret = 0;
-
- while (nb--) {
- ret = file_read (path, &buf, &msize);
- if (ret <= 0) {
- fprintf (stderr, "no msg");
- if (ret == ER_FILE_OPEN) {
- fprintf (stderr, " ER_FILE_OPEN");
- }
- fprintf (stderr, "\n");
- nb++;
- continue;
- }
- if (msize > 0) {
- printf ("msg size %ld\t", msize);
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
- pubsubd_msg_unserialize (&m, buf, msize);
- pubsubd_msg_print (&m);
- pubsubd_msg_free (&m);
- }
- }
-
- return EXIT_SUCCESS;
-}
diff --git a/pubsub/app/test-pipe-write.c b/pubsub/app/test-pipe-write.c
deleted file mode 100644
index 2be8e03..0000000
--- a/pubsub/app/test-pipe-write.c
+++ /dev/null
@@ -1,57 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-
-#define CHAN "chan1"
-#define MESSAGE "coucou"
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
-}
-
-void usage (char **argv) {
- fprintf (stderr, "usage: %s path times\n", argv[0]);
- fprintf (stderr, "ex: %s /tmp/pipe 5 => you will write 5 times\n", argv[0]);
- exit (1);
-}
-
-int
-main(int argc, char **argv)
-{
-
- if (argc != 3)
- usage (argv);
-
- char *path = argv[1];
- int nb = atoi (argv[2]);
-
- printf ("Writing on %s %d times.\n", path, nb);
-
- char *buf = NULL;
- size_t msize = 0;
-
- int ret = 0;
-
- while (nb--) {
- struct pubsub_msg msg;
- memset (&msg, 0, sizeof (struct pubsub_msg));
- msg.type = PUBSUB_TYPE_MESSAGE;
- msg.chan = malloc (strlen (CHAN) + 1);
- strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
- msg.chanlen = strlen (CHAN) + 1;
- msg.data = malloc (strlen (MESSAGE) + 1);
- strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
- msg.datalen = strlen (MESSAGE) + 1;
-
- pubsubd_msg_serialize (&msg, &buf, &msize);
- pubsubd_msg_print (&msg);
- pubsubd_msg_free (&msg);
- ret = file_write (path, buf, msize);
- if (ret != (int) msize) {
- fprintf (stderr, "msg not written\n");
- }
- }
-
- return EXIT_SUCCESS;
-}
diff --git a/pubsub/app/test-subscribers.c b/pubsub/app/test-subscribers.c
deleted file mode 100644
index ff4205b..0000000
--- a/pubsub/app/test-subscribers.c
+++ /dev/null
@@ -1,81 +0,0 @@
-#include "../lib/pubsubd.h"
-#include
-#include
-
-void
-ohshit(int rvalue, const char* str) {
- fprintf(stderr, "%s\n", str);
- exit(rvalue);
-}
-
-void usage (char **argv) {
- fprintf (stderr, "usage: %s path\n", argv[0]);
- exit (1);
-}
-
-// // element of the list
-// // channel : chan name + chan name length + a list of applications
-// struct channel {
-// char *chan;
-// size_t chanlen;
-// struct app_list_head *alh;
-// LIST_ENTRY(channel) entries;
-// };
-
-// struct process {
-// pid_t pid;
-// unsigned int version;
-// unsigned int index;
-// char path_in [PATH_MAX];
-// char path_out [PATH_MAX];
-// };
-
-void fill_process (struct process *p)
-{
- p->pid = 10;
- p->version = 1;
- p->index = 1;
-}
-
-// enum app_list_elm_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
-// struct app_list_elm {
-// struct process *p;
-// enum app_list_elm_action action;
-// LIST_ENTRY(app_list_elm) entries;
-// };
-
-void fill_app_list_elm (struct app_list_elm *ale)
-{
- ale->p = malloc (sizeof (struct process));
- fill_process (ale->p);
- ale->action = PUBSUB_PUB;
-}
-
-int main(void)
-{
- struct app_list_head alh;
- memset (&alh, 0, sizeof (struct app_list_head));
-
- struct app_list_elm ale;
- memset (&ale, 0, sizeof (struct app_list_elm));
-
- fill_app_list_elm (&ale);
-
- struct app_list_head *chans = &alh;
- pubsubd_subscriber_init (&chans);
- printf ("1 chan, 0 process\n");
- pubsubd_subscriber_print (chans);
-
- pubsubd_subscriber_add (&alh, &ale);
- printf ("1 chan, 1 process\n");
- pubsubd_subscriber_print (chans);
-
- pubsubd_subscriber_del_all (&alh);
- printf ("0 chan, 0 process\n");
- pubsubd_subscriber_print (chans);
-
- free (ale.p);
-
- return EXIT_SUCCESS;
-}
-
diff --git a/pubsub/app/test-test-send-params.sh b/pubsub/app/test-test-send-params.sh
deleted file mode 100755
index dc0a980..0000000
--- a/pubsub/app/test-test-send-params.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/bin/bash
-
-# start pubsub-test-send alone, with some parameters
-# then test it with this script
-
-REP=/tmp/ipc/
-PID=10
-INDEX=1
-VERSION=1
-ACTION="pub"
-CHAN="chan1"
-
-if [ $# != 0 ] ; then
- PID=$1
- shift
-fi
-
-if [ $# != 0 ] ; then
- INDEX=$1
- shift
-fi
-
-if [ $# != 0 ] ; then
- ACTION=$1
- shift
-fi
-
-if [ $# != 0 ] ; then
- CHAN=$1
- shift
-fi
-
-echo "there should be a line in $REP/pubsub"
-cat $REP/pubsub
-
-echo ""
-echo "there should be something to read in $REP/${PID}-${INDEX}-out"
-cat $REP/${PID}-${INDEX}-out | xxd
diff --git a/pubsub/lib/channels.c b/pubsub/lib/channels.c
new file mode 100644
index 0000000..64e0aa3
--- /dev/null
+++ b/pubsub/lib/channels.c
@@ -0,0 +1,195 @@
+#include
+#include
+#include
+
+#include "../../core/error.h"
+#include "../../core/process.h"
+
+#include "channels.h"
+
+void pubsubd_channel_print (const struct channel *chan)
+{
+ if (chan->chan == NULL) {
+ handle_err ("pubsubd_channel_print", "chan->chan == NULL");
+ }
+
+ printf ( "\033[32mchan %s\033[00m\n", chan->chan);
+
+ if (chan->subs == NULL) {
+ handle_err ("pubsubd_channel_print", "chan->subs == NULL");
+ }
+ else {
+ array_proc_print (chan->subs);
+ }
+}
+
+void pubsubd_channels_print (const struct channels *chans)
+{
+ printf ("\033[36mmchannels\033[00m\n");
+
+ if (chans == NULL) {
+ handle_err ("pubsubd_channels_print", "chans == NULL");
+ return ;
+ }
+
+ struct channel *chan = NULL;
+ LIST_FOREACH(chan, chans, entries) {
+ pubsubd_channel_print (chan);
+ }
+}
+
+void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
+
+struct channel * pubsubd_channels_add (struct channels *chans, const char *chan)
+{
+ if(chans == NULL || chan == NULL) {
+ handle_err ("pubsubd_channels_add", "chans == NULL or chan == NULL");
+ return NULL;
+ }
+
+ struct channel *n = malloc (sizeof (struct channel));
+ memset (n, 0, sizeof (struct channel));
+ pubsubd_channel_new (n, chan);
+
+ LIST_INSERT_HEAD(chans, n, entries);
+
+ return n;
+}
+
+void pubsubd_channels_del (struct channels *chans, struct channel *c)
+{
+ struct channel *todel = pubsubd_channel_get (chans, c);
+ if(todel != NULL) {
+ pubsubd_channel_free (todel);
+ LIST_REMOVE(todel, entries);
+ free (todel);
+ todel = NULL;
+ }
+}
+
+void pubsubd_channels_del_all (struct channels *chans)
+{
+ if (!chans)
+ return;
+
+ struct channel *c = NULL;
+
+ while (!LIST_EMPTY(chans)) {
+ c = LIST_FIRST(chans);
+ LIST_REMOVE(c, entries);
+ pubsubd_channel_free (c);
+ free (c);
+ c = NULL;
+ }
+}
+
+int pubsubd_channel_new (struct channel *c, const char * name)
+{
+ if (c == NULL) {
+ return 1;
+ }
+
+ size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name);
+
+ if (c->chan == NULL)
+ c->chan = malloc (nlen +1);
+
+ memset (c->chan, 0, nlen +1);
+ memcpy (c->chan, name, nlen);
+ c->chanlen = nlen;
+
+ c->subs = malloc (sizeof (struct array_proc));
+ memset (c->subs, 0, sizeof (struct array_proc));
+
+ return 0;
+}
+
+void pubsubd_channel_free (struct channel * c)
+{
+ if (c == NULL)
+ return;
+
+ if (c->chan != NULL) {
+ free (c->chan);
+ c->chan = NULL;
+ }
+
+ if (c->subs != NULL) {
+ array_proc_free (c->subs);
+ free (c->subs);
+ }
+}
+
+struct channel * pubsubd_channel_search (struct channels *chans, char *chan)
+{
+ struct channel * np = NULL;
+ LIST_FOREACH(np, chans, entries) {
+ // TODO debug
+ // printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n"
+ // , np->chan, np->chanlen, chan, strlen(chan));
+ if (np->chanlen == strlen (chan)
+ && strncmp (np->chan, chan, np->chanlen) == 0) {
+ // printf ("pubsubd_channel_search: FOUND\n");
+ return np;
+ }
+ }
+ return NULL;
+}
+
+struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
+{
+ struct channel * np = NULL;
+ LIST_FOREACH(np, chans, entries) {
+ if (pubsubd_channel_eq (np, c))
+ return np;
+ }
+ return NULL;
+}
+
+int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
+{
+ return c1->chanlen == c2->chanlen &&
+ strncmp (c1->chan, c2->chan, c1->chanlen) == 0;
+}
+
+void pubsubd_channel_subscribe (const struct channel *c, struct process *p)
+{
+ add_proc (c->subs, p);
+}
+
+void pubsubd_channel_unsubscribe (const struct channel *c, struct process *p)
+{
+ del_proc (c->subs, p);
+}
+
+void pubsubd_channels_subscribe (struct channels *chans
+ , char *chname, struct process *p)
+{
+ struct channel *chan = pubsubd_channel_search (chans, chname);
+ if (chan == NULL) {
+ printf ("chan %s non existent : creation\n", chname);
+ chan = pubsubd_channels_add (chans, chname);
+ }
+
+ pubsubd_channel_subscribe (chan, p);
+}
+
+void pubsubd_channels_unsubscribe (struct channels *chans
+ , char *chname, struct process *p)
+{
+ struct channel *chan = pubsubd_channel_search (chans, chname);
+ if (chan == NULL) {
+ return;
+ }
+
+ pubsubd_channel_unsubscribe (chan, p);
+}
+
+void pubsubd_channels_unsubscribe_everywhere (struct channels *chans
+ , struct process *p)
+{
+ struct channel * chan = NULL;
+ LIST_FOREACH(chan, chans, entries) {
+ pubsubd_channel_unsubscribe (chan, p);
+ }
+}
diff --git a/pubsub/lib/channels.h b/pubsub/lib/channels.h
new file mode 100644
index 0000000..606a6e2
--- /dev/null
+++ b/pubsub/lib/channels.h
@@ -0,0 +1,46 @@
+#ifndef __CHANNELS_H__
+#define __CHANNELS_H__
+
+#include "../../core/queue.h"
+#include "../../core/process.h"
+
+// head of the list
+LIST_HEAD(channels, channel);
+
+// element of the list
+// channel : chan name + chan name length + a list of applications
+struct channel {
+ char *chan;
+ size_t chanlen;
+ struct array_proc *subs;
+ LIST_ENTRY(channel) entries;
+};
+
+// simple channel
+int pubsubd_channel_new (struct channel *c, const char *name);
+struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
+void pubsubd_channel_free (struct channel *c);
+int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
+void pubsubd_channel_print (const struct channel *c);
+
+// list of channels
+void pubsubd_channels_init (struct channels *chans);
+void pubsubd_channels_print (const struct channels *chans);
+struct channel * pubsubd_channels_add (struct channels *chans, const char *chan);
+void pubsubd_channels_del (struct channels *chans, struct channel *c);
+void pubsubd_channels_del_all (struct channels *chans);
+struct channel * pubsubd_channel_search (struct channels *chans, char *chan);
+
+// add and remove subscribers
+void pubsubd_channel_subscribe (const struct channel *c, struct process *p);
+void pubsubd_channel_unsubscribe (const struct channel *c, struct process *p);
+
+void pubsubd_channels_subscribe (struct channels *chans
+ , char *chname, struct process *p);
+void pubsubd_channels_unsubscribe (struct channels *chans
+ , char *chname, struct process *p);
+
+void pubsubd_channels_unsubscribe_everywhere (struct channels *chans
+ , struct process *p);
+
+#endif
diff --git a/pubsub/lib/msg.c b/pubsub/lib/msg.c
new file mode 100644
index 0000000..e2c5aea
--- /dev/null
+++ b/pubsub/lib/msg.c
@@ -0,0 +1,132 @@
+#include
+#include
+#include
+
+#include "msg.h"
+#include "../../core/error.h"
+
+void pubsub_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
+{
+ if (msg == NULL) {
+ handle_err ("pubsub_msg_serialize", "msg == NULL");
+ return;
+ }
+
+ if (data == NULL) {
+ handle_err ("pubsub_msg_serialize", "data == NULL");
+ return;
+ }
+
+ if (*data != NULL) {
+ handle_err ("pubsub_msg_serialize", "*data != NULL");
+ return;
+ }
+
+ if (len == NULL) {
+ handle_err ("pubsub_msg_serialize", "len == NULL");
+ return;
+ }
+
+ // buflen = pubsub msg type (1) + 2* size_t (16) + chan+data
+ size_t buflen = 1 + 2 * sizeof (size_t) + msg->chanlen + msg->datalen;
+
+ if (buflen > BUFSIZ) {
+ handle_err ("pubsub_msg_serialize", "chanlen + datalen too high");
+ return;
+ }
+
+ char *buf = malloc (buflen);
+ memset (buf, 0, buflen);
+
+ size_t offset = 0;
+
+ // msg type
+ buf[offset++] = msg->type;
+
+ // chan
+ memcpy (buf + offset, &msg->chanlen, sizeof (size_t));
+ offset += sizeof (size_t);
+ memcpy (buf + offset, msg->chan, msg->chanlen);
+ offset += msg->chanlen;
+
+ // data
+ memcpy (buf + offset, &msg->datalen, sizeof (size_t));
+ offset += sizeof (size_t);
+ memcpy (buf + offset, msg->data, msg->datalen);
+ offset += msg->datalen;
+
+ *data = buf;
+ *len = buflen;
+}
+
+void pubsub_msg_unserialize (struct pubsub_msg *msg, const char *buf, size_t mlen)
+{
+ if (msg == NULL) {
+ handle_err ("pubsub_msg_unserialize", "msg == NULL");
+ return;
+ }
+
+ pubsub_msg_free (msg);
+
+ if (mlen > BUFSIZ) {
+ handle_err ("pubsub_msg_unserialize", "mlen > BUFSIZ");
+ return;
+ }
+
+ size_t offset = 0;
+
+ // msg type
+ msg->type = buf[offset++];
+
+ // chan
+ memcpy (&msg->chanlen, buf + offset, sizeof (size_t));
+ if (msg->chanlen > BUFSIZ) {
+ handle_err ("pubsub_msg_unserialize", "chanlen > BUFSIZ");
+ return;
+ }
+ msg->chan = malloc (msg->chanlen);
+ memset (msg->chan, 0, msg->chanlen);
+ offset += sizeof (size_t);
+ memcpy (msg->chan, buf + offset, msg->chanlen);
+ offset += msg->chanlen;
+
+ // data
+ memcpy (&msg->datalen, buf + offset, sizeof (size_t));
+ if (msg->datalen > BUFSIZ) {
+ handle_err ("pubsub_msg_unserialize", "datalen > BUFSIZ");
+ return;
+ }
+ msg->data = malloc (msg->datalen);
+ memset (msg->data, 0, msg->datalen);
+ offset += sizeof (size_t);
+ memcpy (msg->data, buf + offset, msg->datalen);
+ offset += msg->datalen;
+}
+
+void pubsub_msg_free (struct pubsub_msg *msg)
+{
+ if (msg == NULL) {
+ handle_err ("pubsub_msg_free", "msg == NULL");
+ return;
+ }
+
+ if (msg->chan) {
+ free (msg->chan);
+ msg->chan = NULL;
+ }
+ if (msg->data) {
+ free (msg->data);
+ msg->data = NULL;
+ }
+}
+
+void pubsub_msg_print (const struct pubsub_msg *msg)
+{
+ if (msg == NULL) {
+ handle_err ("pubsub_msg_print", "msg == NULL");
+ return;
+ }
+
+ printf ("msg: type=%d chan=%s, data=%s\n"
+ , msg->type, msg->chan, msg->data);
+}
diff --git a/pubsub/lib/msg.h b/pubsub/lib/msg.h
new file mode 100644
index 0000000..0d51c9c
--- /dev/null
+++ b/pubsub/lib/msg.h
@@ -0,0 +1,21 @@
+#ifndef __PUBSUB_MSG_H__
+#define __PUBSUB_MSG_H__
+
+#define PUBSUB_MSG_TYPE_SUB 1
+#define PUBSUB_MSG_TYPE_UNSUB 2
+#define PUBSUB_MSG_TYPE_PUB 3
+
+struct pubsub_msg {
+ unsigned char type; // message type : alert, notification, …
+ char *chan;
+ size_t chanlen;
+ char *data;
+ size_t datalen;
+};
+
+void pubsub_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
+void pubsub_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
+void pubsub_msg_free (struct pubsub_msg *msg);
+void pubsub_msg_print (const struct pubsub_msg *msg);
+
+#endif
diff --git a/pubsub/lib/pubsub.c b/pubsub/lib/pubsub.c
new file mode 100644
index 0000000..5858e17
--- /dev/null
+++ b/pubsub/lib/pubsub.c
@@ -0,0 +1,101 @@
+#include
+#include // strndup
+
+#include "pubsub.h"
+#include "pubsubd.h"
+#include "../../core/error.h"
+
+#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_BOTH "both"
+#define PUBSUB_SUBSCRIBER_ACTION_STR_QUIT "quit"
+
+char * pubsub_action_to_str (enum subscriber_action action)
+{
+ switch (action) {
+ case PUBSUB_PUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_PUB);
+ case PUBSUB_SUB : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_SUB);
+ case PUBSUB_BOTH : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_BOTH);
+ case PUBSUB_QUIT : return strdup (PUBSUB_SUBSCRIBER_ACTION_STR_QUIT);
+ }
+ return NULL;
+}
+
+
+#if 0
+// tell the service to stop
+void pubsub_quit (struct service *srv)
+{
+ // line fmt : 0 0 0 quit
+ char line[BUFSIZ];
+ snprintf (line, BUFSIZ, "0 0 0 quit\n");
+ app_srv_connection (srv, line, strlen (line));
+}
+#endif
+
+int pubsub_connection (int argc, char **argv, char **env
+ , struct service *srv)
+{
+ int ret = app_connection (argc, argv, env
+ , srv, PUBSUBD_SERVICE_NAME, NULL, 0);
+
+ if (ret != 0) {
+ handle_err ("pubsub_connection", "app_connection != 0");
+ }
+
+ return ret;
+}
+
+int pubsub_disconnect (struct service *srv)
+{
+ return app_close (srv);
+}
+
+int pubsub_msg_send (struct service *srv, const struct pubsub_msg * m)
+{
+ size_t msize = 0;
+ char * buf = NULL;
+ pubsub_msg_serialize (m, &buf, &msize);
+
+ struct msg m_data;
+ memset (&m_data, 0, sizeof (struct msg));
+
+ // format the connection msg
+ if (msg_format_data (&m_data, buf, msize) < 0) {
+ handle_err ("pubsub_msg_send", "msg_format_data");
+ if (buf != NULL)
+ free (buf);
+ return -1;
+ }
+
+ app_write (srv, &m_data);
+ msg_free (&m_data);
+
+ if (buf != NULL)
+ free(buf);
+
+ return 0;
+}
+
+int pubsub_msg_recv (struct service *srv, struct pubsub_msg *m)
+{
+ if (srv == NULL) {
+ handle_err ("pubsub_msg_recv", "srv == NULL");
+ return -1;
+ }
+
+ if (m == NULL) {
+ handle_err ("pubsub_msg_recv", "m == NULL");
+ return -1;
+ }
+
+ struct msg m_recv;
+ memset (&m_recv, 0, sizeof (struct msg));
+
+ app_read (srv, &m_recv);
+ pubsub_msg_unserialize (m, m_recv.val, m_recv.valsize);
+
+ msg_free (&m_recv);
+
+ return 0;
+}
diff --git a/pubsub/lib/pubsub.h b/pubsub/lib/pubsub.h
new file mode 100644
index 0000000..0bb0bfb
--- /dev/null
+++ b/pubsub/lib/pubsub.h
@@ -0,0 +1,26 @@
+#ifndef __PUBSUB_H__
+#define __PUBSUB_H__
+
+#include "../../core/communication.h"
+#include "../../core/process.h"
+#include "../../core/queue.h"
+
+#include "msg.h"
+
+enum subscriber_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
+
+#define PUBSUB_TYPE_DISCONNECT 0
+#define PUBSUB_TYPE_MESSAGE 1
+#define PUBSUB_TYPE_ERROR 2
+#define PUBSUB_TYPE_DEBUG 4
+#define PUBSUB_TYPE_INFO 5
+
+int pubsub_connection (int argc, char **argv, char **env, struct service *srv);
+int pubsub_disconnect (struct service *srv);
+int pubsub_msg_send (struct service *srv, const struct pubsub_msg * m);
+int pubsub_msg_recv (struct service *srv, struct pubsub_msg *m);
+
+// TODO
+void pubsub_quit (struct service *srv);
+
+#endif
diff --git a/pubsub/lib/pubsubd.c b/pubsub/lib/pubsubd.c
index 55ac72f..4d21856 100644
--- a/pubsub/lib/pubsubd.c
+++ b/pubsub/lib/pubsubd.c
@@ -1,600 +1,196 @@
+#include "../../core/communication.h"
+#include "../../core/msg.h"
+#include "../../core/process.h"
+#include "../../core/utils.h"
+#include "../../core/error.h"
+
#include "pubsubd.h"
+#include "channels.h"
-// WORKERS: one thread per client
+#include
+#include
+#include
-void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); }
-
-struct worker *
-pubsubd_workers_add (struct workers *wrkrs, const struct worker *w)
+void pubsubd_send (const struct array_proc *ap, const struct pubsub_msg * m)
{
- if (wrkrs == NULL || w == NULL) {
- printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL");
- return NULL;
- }
-
- struct worker *n = malloc (sizeof (struct worker));
- memset (n, 0, sizeof (struct worker));
- memcpy (n, w, sizeof (struct worker));
- if (w->ale != NULL)
- n->ale = pubsubd_app_list_elm_copy (w->ale);
-
- LIST_INSERT_HEAD(wrkrs, n, entries);
-
- return n;
-}
-
-void pubsubd_worker_del (struct workers *wrkrs, struct worker *w)
-{
- struct worker *todel = pubsubd_worker_get (wrkrs, w);
- if(todel != NULL) {
- LIST_REMOVE(todel, entries);
- pubsubd_worker_free (todel);
- free (todel);
- todel = NULL;
- }
-}
-
-// kill the threads
-void pubsubd_workers_stop (struct workers *wrkrs)
-{
- if (!wrkrs)
- return;
-
- struct worker *w = NULL;
- struct worker *wtmp = NULL;
-
- LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) {
- if (w->thr == NULL)
- continue;
-
- pthread_cancel (*w->thr);
- void *ret = NULL;
- pthread_join (*w->thr, &ret);
- if (ret != NULL) {
- free (ret);
- }
- free (w->thr);
- w->thr = NULL;
- }
-}
-
-void pubsubd_workers_del_all (struct workers *wrkrs)
-{
- if (!wrkrs)
- return;
-
- struct worker *w = NULL;
-
- while (!LIST_EMPTY(wrkrs)) {
- printf ("KILL THE WORKERS : %p\n", w);
- w = LIST_FIRST(wrkrs);
- LIST_REMOVE(w, entries);
- pubsubd_worker_free (w);
- free (w);
- w = NULL;
- }
-}
-
-void pubsubd_worker_free (struct worker * w)
-{
- if (w == NULL)
- return;
- pubsubd_app_list_elm_free (w->ale);
- free (w->ale);
- w->ale = NULL;
-}
-
-struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w)
-{
- struct worker * np = NULL;
- LIST_FOREACH(np, wrkrs, entries) {
- if (pubsubd_worker_eq (np, w))
- return np;
- }
- return NULL;
-}
-
-int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2)
-{
- return w1 == w2; // if it's the same pointer
-}
-
-// a thread for each connected process
-void * pubsubd_worker_thread (void *params)
-{
- int s = 0;
-
- s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- if (s != 0)
- printf ("pthread_setcancelstate: %d\n", s);
-
- struct worker *w = (struct worker *) params;
- if (w == NULL) {
- fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
- return NULL;
- }
-
- struct channels *chans = w->chans;
- struct channel *chan = w->chan;
- struct app_list_elm *ale = w->ale;
-
- // main loop
- while (1) {
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
-
- pubsubd_msg_recv (ale->p, &m);
-
- if (m.type == PUBSUB_TYPE_DISCONNECT) {
- // printf ("process %d disconnecting...\n", ale->p->pid);
- if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) {
- fprintf (stderr, "err : subscriber not registered\n");
- }
- break;
- }
- else {
- struct channel *ch = pubsubd_channel_search (chans, chan->chan);
- if (ch == NULL) {
- printf ("CHAN NOT FOUND\n");
- }
- else {
- printf ("what should be sent: ");
- pubsubd_msg_print (&m);
- printf ("send the message to:\t");
- pubsubd_channel_print (ch);
- pubsubd_msg_send (ch->alh, &m);
- }
- }
- pubsubd_msg_free (&m);
- }
-
- pubsubd_app_list_elm_free (ale);
- free (w->ale);
- w->ale = NULL;
-
- free (w->thr);
- w->thr = NULL;
-
- pubsubd_worker_del (w->my_workers, w);
-
- pthread_exit (NULL);
-}
-
-// CHANNELS
-
-void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
-
-struct channel *
-pubsubd_channels_add (struct channels *chans, const char *chan)
-{
- if(chans == NULL || chan == NULL) {
- printf ("pubsubd_channels_add: chans == NULL or chan == NULL");
- return NULL;
- }
-
- struct channel *n = malloc (sizeof (struct channel));
- memset (n, 0, sizeof (struct channel));
- pubsubd_channel_new (n, chan);
-
- LIST_INSERT_HEAD(chans, n, entries);
-
- return n;
-}
-
-void
-pubsubd_channels_del (struct channels *chans, struct channel *c)
-{
- struct channel *todel = pubsubd_channel_get (chans, c);
- if(todel != NULL) {
- pubsubd_channel_free (todel);
- LIST_REMOVE(todel, entries);
- free (todel);
- todel = NULL;
- }
-}
-
-void pubsubd_channels_del_all (struct channels *chans)
-{
- if (!chans)
- return;
-
- struct channel *c = NULL;
-
- while (!LIST_EMPTY(chans)) {
- c = LIST_FIRST(chans);
- LIST_REMOVE(c, entries);
- pubsubd_channel_free (c);
- free (c);
- c = NULL;
- }
-}
-
-struct channel * pubsubd_channel_copy (struct channel *c)
-{
- if (c == NULL)
- return NULL;
-
- struct channel *copy = NULL;
- copy = malloc (sizeof(struct channel));
- memset (copy, 0, sizeof (struct channel));
-
- memcpy (copy, c, sizeof(struct channel));
-
- if (c->chan != NULL) {
- copy->chan = malloc (c->chanlen +1);
- memset (copy->chan, 0, c->chanlen +1);
- memcpy (copy->chan, c->chan, c->chanlen);
- copy->chanlen = c->chanlen;
- }
- else {
- printf ("pubsubd_channel_copy: c->chan == NULL\n");
- }
-
- return copy;
-}
-
-int pubsubd_channel_new (struct channel *c, const char * name)
-{
- if (c == NULL) {
- return 1;
- }
-
- size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name);
-
- if (c->chan == NULL)
- c->chan = malloc (nlen +1);
-
- memset (c->chan, 0, nlen +1);
- memcpy (c->chan, name, nlen);
- c->chanlen = nlen;
-
- return 0;
-}
-
-void pubsubd_channel_free (struct channel * c)
-{
- if (c == NULL)
- return;
-
- if (c->chan != NULL) {
- free (c->chan);
- c->chan = NULL;
- }
-
- if (c->alh != NULL) {
- pubsubd_subscriber_del_all (c->alh);
- free (c->alh);
- }
-}
-
-struct channel * pubsubd_channel_search (struct channels *chans, char *chan)
-{
- struct channel * np = NULL;
- LIST_FOREACH(np, chans, entries) {
- // TODO debug
- // printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n"
- // , np->chan, np->chanlen, chan, strlen(chan));
- if (np->chanlen == strlen (chan)
- && strncmp (np->chan, chan, np->chanlen) == 0) {
- // printf ("pubsubd_channel_search: FOUND\n");
- return np;
- }
- }
- return NULL;
-}
-
-struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
-{
- struct channel * np = NULL;
- LIST_FOREACH(np, chans, entries) {
- if (pubsubd_channel_eq (np, c))
- return np;
- }
- return NULL;
-}
-
-int
-pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
-{
- return c1->chanlen == c2->chanlen &&
- strncmp (c1->chan, c2->chan, c1->chanlen) == 0;
-}
-
-// SUBSCRIBER
-
-void pubsubd_subscriber_init (struct app_list_head **chans) {
- if (chans == NULL)
- return;
-
- if (*chans == NULL) {
- *chans = malloc (sizeof(struct channels));
- memset (*chans, 0, sizeof(struct channels));
- }
- LIST_INIT(*chans);
-}
-
-void pubsubd_channel_print (const struct channel *chan)
-{
- if (chan->chan == NULL) {
- printf ("pubsubd_channels_print: chan->chan == NULL\n");
- }
-
- printf ( "\033[32mchan %s\033[00m\n", chan->chan);
-
- if (chan->alh == NULL)
- printf ("pubsubd_channels_print: chan->alh == NULL\n");
- else
- pubsubd_subscriber_print (chan->alh);
-}
-
-void pubsubd_channels_print (const struct channels *chans)
-{
- printf ("\033[36mmchannels\033[00m\n");
-
- if (chans == NULL) {
- // TODO debug
- printf ("pubsubd_channels_print: chans == NULL\n");
- return ;
- }
-
- struct channel *chan = NULL;
- LIST_FOREACH(chan, chans, entries) {
- pubsubd_channel_print (chan);
- }
-}
-
-struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
-{
- if (ale == NULL)
- return NULL;
-
- struct app_list_elm * n = NULL;
- n = malloc (sizeof (struct app_list_elm));
- memset (n, 0, sizeof (struct app_list_elm));
-
- if (ale->p != NULL)
- n->p = srv_process_copy (ale->p);
-
- n->action = ale->action;
-
- return n;
-}
-
-int
-pubsubd_subscriber_eq (const struct app_list_elm *ale1, const struct app_list_elm *ale2)
-{
- return srv_process_eq (ale1->p, ale2->p);
-}
-
-
-void
-pubsubd_subscriber_add (struct app_list_head *alh, const struct app_list_elm *ale)
-{
- if(alh == NULL || ale == NULL) {
- fprintf (stderr, "err alh or ale is NULL\n");
- return;
- }
-
- struct app_list_elm *n = pubsubd_app_list_elm_copy (ale);
- LIST_INSERT_HEAD(alh, n, entries);
-}
-
-struct app_list_elm *
-pubsubd_subscriber_get (const struct app_list_head *alh, const struct app_list_elm *p)
-{
- struct app_list_elm *np = NULL, *res = NULL;
- LIST_FOREACH(np, alh, entries) {
- if(pubsubd_subscriber_eq (np, p)) {
- res = np;
- }
- }
- return res;
-}
-
-void pubsubd_subscriber_print (struct app_list_head *alh)
-{
- struct app_list_elm *np = NULL;
- LIST_FOREACH(np, alh, entries) {
- printf ("\t");
- srv_process_print (np->p);
- }
-}
-
-int
-pubsubd_subscriber_del (struct app_list_head *alh, struct app_list_elm *p)
-{
- struct app_list_elm *todel = pubsubd_subscriber_get (alh, p);
- if(todel != NULL) {
- pubsubd_app_list_elm_free (todel);
- LIST_REMOVE(todel, entries);
- free (todel);
- todel = NULL;
- return 0;
- }
-
- return 1;
-}
-
-void pubsubd_subscriber_del_all (struct app_list_head *alh)
-{
- if (!alh)
- return;
-
- struct app_list_elm *ale = NULL;
-
- while (!LIST_EMPTY(alh)) {
- ale = LIST_FIRST(alh);
- LIST_REMOVE(ale, entries);
- pubsubd_app_list_elm_free (ale);
- free (ale);
- ale = NULL;
- }
-}
-
-void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
-{
- if (ale == NULL)
- return;
-
- if (ale->p != NULL)
- free (ale->p);
-
- ale->p = srv_process_copy (p);
-}
-
-void pubsubd_app_list_elm_free (struct app_list_elm *todel)
-{
- if (todel == NULL || todel->p == NULL)
- return;
- free (todel->p);
-}
-
-int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
- , struct channels *chans, struct channel **c)
-{
- if (spath == NULL || ale == NULL || chans == NULL) {
- fprintf (stderr, "pubsubd_get_new_process: spath or ale or chans == NULL\n");
- return -1;
- }
-
- char *buf = NULL;
- size_t msize = 0;
- file_read (spath, &buf, &msize);
- // parse pubsubd init msg (sent in TMPDIR/)
- //
- // line fmt : index version action chan
- // action : quit | pub | sub
-
- size_t i = 0;
- char *str = NULL, *token = NULL, *saveptr = NULL;
-
- int index = 0;
- int version = 0;
-
- // chan name
- char chan[BUFSIZ];
- memset (chan, 0, BUFSIZ);
-
- if (buf == NULL) {
- return -2;
- }
-
- printf ("INIT: %s\n", buf);
-
- for (str = buf, i = 1; ; str = NULL, i++) {
- token = strtok_r(str, " ", &saveptr);
- if (token == NULL)
- break;
-
- switch (i) {
- case 1 : index = strtoul(token, NULL, 10); break;
- case 2 : version = strtoul(token, NULL, 10); break;
- case 3 : {
- if (strncmp("both", token, 4) == 0) {
- ale->action = PUBSUB_BOTH;
- }
- else if (strncmp("pub", token, 3) == 0) {
- ale->action = PUBSUB_PUB;
- }
- else if (strncmp("sub", token, 3) == 0) {
- ale->action = PUBSUB_SUB;
- }
- else { // everything else is about killing the service
- ale->action = PUBSUB_QUIT;
- }
- break;
- }
- case 4 : {
- // for the last element of the line
- // drop the following \n
- if (ale->action != PUBSUB_QUIT) {
- memcpy (chan, token, (strlen (token) < BUFSIZ) ?
- strlen (token) -1 : BUFSIZ);
- }
- break;
- }
- }
- }
-
- if (buf != NULL) {
- free (buf);
- buf = NULL;
- }
-
- if (ale->action == PUBSUB_QUIT) {
- return 0;
- }
-
- if (ale->p != NULL) {
- free (ale->p);
- ale->p = NULL;
- }
-
- ale->p = malloc (sizeof (struct process));
- memset (ale->p, 0, sizeof (struct process));
- srv_process_gen (ale->p, index, version);
-
- chan[BUFSIZ -1] = '\0';
-
- // not found = new
- struct channel *new_chan = NULL;
- new_chan = pubsubd_channel_search (chans, chan);
- if (new_chan == NULL) {
- new_chan = pubsubd_channels_add (chans, chan);
- pubsubd_subscriber_init (&new_chan->alh);
- }
-
- *c = new_chan;
-
- // add the subscriber
- if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH) {
- printf ("new process in chan %s\n", chan);
- pubsubd_subscriber_add ((*c)->alh, ale);
- }
-
- return 0;
-}
-
-// alh from the channel, message to send
-void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
-{
- if (alh == NULL) {
- fprintf (stderr, "pubsubd_msg_send: alh == NULL");
+ if (ap == NULL) {
+ fprintf (stderr, "pubsubd_send: ap == NULL");
return;
}
if (m == NULL) {
- fprintf (stderr, "pubsubd_msg_send: m == NULL");
+ fprintf (stderr, "pubsubd_send: m == NULL");
return;
}
- struct app_list_elm * ale = NULL;
-
char *buf = NULL;
size_t msize = 0;
- pubsubd_msg_serialize (m, &buf, &msize);
+ pubsub_msg_serialize (m, &buf, &msize);
- LIST_FOREACH(ale, alh, entries) {
- srv_write (ale->p->proc_fd, buf, msize);
+ struct msg m_data;
+ memset (&m_data, 0, sizeof (struct msg));
+ msg_format_data (&m_data, buf, msize);
+
+ int i;
+ for (i = 0; i < ap->size ; i++) {
+ srv_write (ap->tab_proc[i], &m_data);
}
+ msg_free (&m_data);
if (buf != NULL) {
free (buf);
}
}
-void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
+// void pubsubd_recv (struct process *p, struct pubsub_msg *m)
+// {
+// struct msg m_data;
+// memset (&m_data, 0, sizeof (struct msg));
+//
+// // read the message from the process
+// srv_read (p, &m_data);
+//
+// pubsub_msg_unserialize (m, m_data.val, m_data.valsize);
+//
+// msg_free (&m_data);
+// }
+
+void handle_new_connection (struct service *srv, struct array_proc *ap)
{
- // read the message from the process
- size_t mlen = 0;
- char *buf = NULL;
- while (buf == NULL || mlen == 0) {
- srv_read (p->proc_fd, &buf, &mlen);
+ struct process *p = malloc(sizeof(struct process));
+ memset(p, 0, sizeof(struct process));
+
+ if (srv_accept (srv, p) < 0) {
+ handle_error("srv_accept < 0");
+ } else {
+ printf("new connection\n");
}
- pubsubd_msg_unserialize (m, buf, mlen);
-
- if (buf != NULL) {
- free (buf);
+ if (add_proc (ap, p) < 0) {
+ handle_error("add_proc < 0");
}
}
+
+void handle_new_msg (struct channels *chans
+ , struct array_proc *ap, struct array_proc *proc_to_read)
+{
+ struct msg m;
+ memset (&m, 0, sizeof (struct msg));
+ int i;
+ for (i = 0; i < proc_to_read->size; i++) {
+ // printf ("loop handle_new_msg\n");
+ if (srv_read (proc_to_read->tab_proc[i], &m) < 0) {
+ handle_error("srv_read < 0");
+ }
+
+ mprint_hexa ("msg received: ", (unsigned char *) m.val, m.valsize);
+
+ // close the process then delete it from the process array
+ if (m.type == MSG_TYPE_CLOSE) {
+ struct process *p = proc_to_read->tab_proc[i];
+
+ printf ("proc %d disconnecting\n", p->proc_fd);
+
+ // TODO: to test, unsubscribe when closing
+ pubsubd_channels_unsubscribe_everywhere (chans, p);
+
+ // close the connection to the process
+ if (srv_close_proc (p) < 0)
+ handle_error( "srv_close_proc < 0");
+
+
+ // remove the process from the processes list
+ if (del_proc (ap, p) < 0)
+ handle_error( "del_proc < 0");
+ if (del_proc (proc_to_read, p) < 0)
+ handle_err( "handle_new_msg", "del_proc < 0");
+
+ msg_free (&m);
+
+ // free process
+ free (p);
+
+ i--;
+ continue;
+ }
+
+ struct pubsub_msg m_data;
+ memset (&m_data, 0, sizeof (struct pubsub_msg));
+
+ pubsub_msg_unserialize (&m_data, m.val, m.valsize);
+
+ if (m_data.type == PUBSUB_MSG_TYPE_SUB) {
+ printf ("proc %d subscribing to %s\n"
+ , proc_to_read->tab_proc[i]->proc_fd
+ , m_data.chan);
+ pubsubd_channels_subscribe (chans
+ , m_data.chan, proc_to_read->tab_proc[i]);
+ }
+
+ if (m_data.type == PUBSUB_MSG_TYPE_UNSUB) {
+ printf ("proc %d unsubscribing to %s\n"
+ , proc_to_read->tab_proc[i]->proc_fd
+ , m_data.chan);
+ pubsubd_channels_unsubscribe (chans
+ , m_data.chan, proc_to_read->tab_proc[i]);
+ }
+
+ if (m_data.type == PUBSUB_MSG_TYPE_PUB) {
+ printf ("proc %d publishing to %s\n"
+ , proc_to_read->tab_proc[i]->proc_fd
+ , m_data.chan);
+ struct channel *chan = pubsubd_channel_search (chans, m_data.chan);
+ if (chan == NULL) {
+ handle_err ("handle_new_msg", "publish on nonexistent channel");
+ msg_free (&m);
+ return ;
+ }
+ pubsubd_send (chan->subs, &m_data);
+ }
+
+ pubsub_msg_free (&m_data);
+ msg_free (&m);
+ }
+}
+
+/*
+ * main loop
+ *
+ * accept new application connections
+ * read a message and send it back
+ * close a connection if MSG_TYPE_CLOSE received
+ */
+
+void pubsubd_main_loop (struct service *srv, struct channels *chans)
+{
+ int i, ret = 0;
+
+ struct array_proc ap;
+ memset(&ap, 0, sizeof(struct array_proc));
+
+ struct array_proc proc_to_read;
+ memset(&proc_to_read, 0, sizeof(struct array_proc));
+
+ while(1) {
+ ret = srv_select (&ap, srv, &proc_to_read);
+
+ if (ret == CONNECTION) {
+ handle_new_connection (srv, &ap);
+ } else if (ret == APPLICATION) {
+ handle_new_msg (chans, &ap, &proc_to_read);
+ } else { // both new connection and new msg from at least one client
+ handle_new_connection (srv, &ap);
+ handle_new_msg (chans, &ap, &proc_to_read);
+ }
+ array_proc_free (&proc_to_read);
+ }
+
+ for (i = 0; i < ap.size; i++) {
+ if (srv_close_proc (ap.tab_proc[i]) < 0) {
+ handle_error( "srv_close_proc < 0");
+ }
+ }
+
+ pubsubd_channels_del_all (chans);
+}
+
diff --git a/pubsub/lib/pubsubd.h b/pubsub/lib/pubsubd.h
index 89de934..0c548d2 100644
--- a/pubsub/lib/pubsubd.h
+++ b/pubsub/lib/pubsubd.h
@@ -1,116 +1,15 @@
#ifndef __PUBSUBD_H__
#define __PUBSUBD_H__
-#include "../../core/pubsub.h"
-#include
+// #include "../../core/pubsub.h"
+#include "../../core/process.h"
+#include "../../core/msg.h"
+#include "msg.h"
+#include "channels.h"
-struct channel;
-struct channels;
-struct app_list_head;
-struct app_list_elm;
+#define PUBSUBD_SERVICE_NAME "pubsubd"
-// parse pubsubd init msg (sent in TMPDIR/)
-//
-// TODO TLV line fmt : index version action chan
-// action : quit | pub | sub
-int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
- , struct channels *chans, struct channel **c);
-void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
-void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
-
-// CHANNEL
-
-// head of the list
-LIST_HEAD(channels, channel);
-
-// element of the list
-// channel : chan name + chan name length + a list of applications
-struct channel {
- char *chan;
- size_t chanlen;
- struct app_list_head *alh;
- LIST_ENTRY(channel) entries;
-};
-
-// simple channel
-int pubsubd_channel_new (struct channel *c, const char *name);
-struct channel * pubsubd_channel_copy (struct channel *c);
-struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
-void pubsubd_channel_free (struct channel *c);
-int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
-void pubsubd_channel_print (const struct channel *c);
-
-// list of channels
-void pubsubd_channels_init (struct channels *chans);
-void pubsubd_channels_print (const struct channels *chans);
-struct channel * pubsubd_channels_add (struct channels *chans, const char *chan);
-void pubsubd_channels_del (struct channels *chans, struct channel *c);
-void pubsubd_channels_del_all (struct channels *chans);
-struct channel * pubsubd_channel_search (struct channels *chans, char *chan);
-
-// remove an app_list_elm from the list (msg type DISCONNECT received)
-int pubsubd_channels_del_subscriber (struct channels *chans
- , struct channel *c);
-
-struct channel *
-pubsubd_channels_search_from_app_list_elm (struct channels *chans
- , struct app_list_elm *ale);
-
-// APPLICATION
-
-// head of the list
-LIST_HEAD(app_list_head, app_list_elm);
-
-// element of the list
-struct app_list_elm {
- struct process *p;
- enum app_list_elm_action action;
- LIST_ENTRY(app_list_elm) entries;
-};
-
-int
-pubsubd_subscriber_eq (const struct app_list_elm *, const struct app_list_elm *);
-
-void pubsubd_subscriber_init (struct app_list_head **chans);
-void pubsubd_subscriber_print (struct app_list_head *alh);
-void pubsubd_subscriber_add (struct app_list_head *
- , const struct app_list_elm *);
-struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
- , const struct app_list_elm *);
-int pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
-void pubsubd_subscriber_del_all (struct app_list_head *alh);
-
-struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale);
-void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
-void pubsubd_app_list_elm_free (struct app_list_elm *todel);
-
-void pubsubd_quit (struct service *srv);
-
-// WORKERS: one thread per client
-
-// head of the list
-LIST_HEAD(workers, worker);
-
-// element of the list
-// worker : process to handle (threaded)
-struct worker {
- pthread_t *thr;
- struct workers *my_workers;
- struct channels *chans;
- struct channel *chan;
- struct app_list_elm *ale;
- LIST_ENTRY(worker) entries;
-};
-
-void pubsubd_worker_free (struct worker * w);
-struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w);
-int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2);
-void pubsubd_workers_init (struct workers *wrkrs);
-void * pubsubd_worker_thread (void *params);
-struct worker *
-pubsubd_workers_add (struct workers *wrkrs, const struct worker *w);
-void pubsubd_workers_del_all (struct workers *wrkrs);
-void pubsubd_workers_stop (struct workers *wrkrs);
-void pubsubd_worker_del (struct workers *wrkrs, struct worker *w);
+void pubsubd_main_loop (struct service *srv, struct channels * chans);
+void pubsubd_msg_send (const struct array_proc *ap, const struct pubsub_msg * m);
#endif
diff --git a/pubsub/lib/workers.c b/pubsub/lib/workers.c
new file mode 100644
index 0000000..4dc72b3
--- /dev/null
+++ b/pubsub/lib/workers.c
@@ -0,0 +1,168 @@
+#if 0
+#include
+#include
+#include
+
+#include "../../core/error.h"
+
+#include
+
+// WORKERS: one thread per client
+
+void pubsubd_workers_init (struct workers *wrkrs) { LIST_INIT(wrkrs); }
+
+struct worker * pubsubd_workers_add (struct workers *wrkrs, const struct worker *w)
+{
+ if (wrkrs == NULL || w == NULL) {
+ printf ("pubsubd_workers_add: wrkrs == NULL or w == NULL");
+ return NULL;
+ }
+
+ struct worker *n = malloc (sizeof (struct worker));
+ memset (n, 0, sizeof (struct worker));
+ memcpy (n, w, sizeof (struct worker));
+ if (w->ale != NULL)
+ n->ale = pubsubd_subscriber_copy (w->ale);
+
+ LIST_INSERT_HEAD(wrkrs, n, entries);
+
+ return n;
+}
+
+void pubsubd_worker_del (struct workers *wrkrs, struct worker *w)
+{
+ struct worker *todel = pubsubd_worker_get (wrkrs, w);
+ if(todel != NULL) {
+ LIST_REMOVE(todel, entries);
+ pubsubd_worker_free (todel);
+ free (todel);
+ todel = NULL;
+ }
+}
+
+// kill the threads
+void pubsubd_workers_stop (struct workers *wrkrs)
+{
+ if (!wrkrs)
+ return;
+
+ struct worker *w = NULL;
+ struct worker *wtmp = NULL;
+
+ LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) {
+ if (w->thr == NULL)
+ continue;
+
+ pthread_cancel (*w->thr);
+ void *ret = NULL;
+ pthread_join (*w->thr, &ret);
+ if (ret != NULL) {
+ free (ret);
+ }
+ free (w->thr);
+ w->thr = NULL;
+ }
+}
+
+void pubsubd_workers_del_all (struct workers *wrkrs)
+{
+ if (!wrkrs)
+ return;
+
+ struct worker *w = NULL;
+
+ while (!LIST_EMPTY(wrkrs)) {
+ printf ("KILL THE WORKERS : %p\n", w);
+ w = LIST_FIRST(wrkrs);
+ LIST_REMOVE(w, entries);
+ pubsubd_worker_free (w);
+ free (w);
+ w = NULL;
+ }
+}
+
+void pubsubd_worker_free (struct worker * w)
+{
+ if (w == NULL)
+ return;
+ pubsubd_subscriber_free (w->ale);
+ free (w->ale);
+ w->ale = NULL;
+}
+
+struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w)
+{
+ struct worker * np = NULL;
+ LIST_FOREACH(np, wrkrs, entries) {
+ if (pubsubd_worker_eq (np, w))
+ return np;
+ }
+ return NULL;
+}
+
+int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2)
+{
+ return w1 == w2; // if it's the same pointer
+}
+
+// a thread for each connected process
+void * pubsubd_worker_thread (void *params)
+{
+ int s = 0;
+
+ s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ if (s != 0)
+ printf ("pthread_setcancelstate: %d\n", s);
+
+ struct worker *w = (struct worker *) params;
+ if (w == NULL) {
+ fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
+ return NULL;
+ }
+
+ struct channels *chans = w->chans;
+ struct channel *chan = w->chan;
+ struct subscriber *ale = w->ale;
+
+ // main loop
+ while (1) {
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+
+ pubsub_msg_recv (ale->p, &m);
+
+ if (m.type == PUBSUB_TYPE_DISCONNECT) {
+ // printf ("process %d disconnecting...\n", ale->p->pid);
+ if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) {
+ fprintf (stderr, "err : subscriber not registered\n");
+ }
+ break;
+ }
+ else {
+ struct channel *ch = pubsubd_channel_search (chans, chan->chan);
+ if (ch == NULL) {
+ printf ("CHAN NOT FOUND\n");
+ }
+ else {
+ printf ("what should be sent: ");
+ pubsub_msg_print (&m);
+ printf ("send the message to:\t");
+ pubsubd_channel_print (ch);
+ pubsub_msg_send (ch->alh, &m);
+ }
+ }
+ pubsub_msg_free (&m);
+ }
+
+ pubsubd_subscriber_free (ale);
+ free (w->ale);
+ w->ale = NULL;
+
+ free (w->thr);
+ w->thr = NULL;
+
+ pubsubd_worker_del (w->my_workers, w);
+
+ pthread_exit (NULL);
+}
+#endif
diff --git a/pubsub/lib/workers.h b/pubsub/lib/workers.h
new file mode 100644
index 0000000..51ed1ca
--- /dev/null
+++ b/pubsub/lib/workers.h
@@ -0,0 +1,32 @@
+#if 0
+#ifndef __WORKERS_H__
+#define __WORKERS_H__
+
+// WORKERS: one thread per client
+
+// head of the list
+LIST_HEAD(workers, worker);
+
+// element of the list
+// worker : process to handle (threaded)
+struct worker {
+ pthread_t *thr;
+ struct workers *my_workers;
+ struct channels *chans;
+ struct channel *chan;
+ struct subscriber *ale;
+ LIST_ENTRY(worker) entries;
+};
+
+void pubsubd_worker_free (struct worker * w);
+struct worker * pubsubd_worker_get (struct workers *wrkrs, struct worker *w);
+int pubsubd_worker_eq (const struct worker *w1, const struct worker *w2);
+void pubsubd_workers_init (struct workers *wrkrs);
+void * pubsubd_worker_thread (void *params);
+struct worker * pubsubd_workers_add (struct workers *wrkrs, const struct worker *w);
+void pubsubd_workers_del_all (struct workers *wrkrs);
+void pubsubd_workers_stop (struct workers *wrkrs);
+void pubsubd_worker_del (struct workers *wrkrs, struct worker *w);
+
+#endif
+#endif
diff --git a/pubsub/tests/Makefile b/pubsub/tests/Makefile
new file mode 100644
index 0000000..8ea2a9e
--- /dev/null
+++ b/pubsub/tests/Makefile
@@ -0,0 +1,25 @@
+CC=gcc
+CFLAGS=-Wall -g -Wextra
+LDFLAGS= -pthread
+CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
+EXEC=$(basename $(wildcard *.c))
+SOURCES=$(wildcard ../lib/*.c ../../core/*.c)
+OBJECTS=$(SOURCES:.c=.o)
+TESTS=$(addsuffix .test, $(EXEC))
+
+all: $(SOURCES) $(EXEC)
+
+$(EXEC): $(OBJECTS) $(CFILES)
+ $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@.bin
+
+.c.o:
+ $(CC) -c $(CFLAGS) $< -o $@
+
+$(TESTS):
+ valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@).bin
+
+clean:
+ @-rm $(OBJECTS)
+
+mrproper: clean
+ @-rm *.bin
diff --git a/pubsub/tests/channels.c b/pubsub/tests/channels.c
new file mode 100644
index 0000000..81de4be
--- /dev/null
+++ b/pubsub/tests/channels.c
@@ -0,0 +1,152 @@
+#include
+#include
+#include
+
+#include "../lib/channels.h"
+#include "../../core/error.h"
+
+void fake_process (struct process *p
+ , unsigned int index, unsigned int version, int fake_fd)
+{
+ p->version = version;
+ p->index = index;
+ p->proc_fd = fake_fd;
+}
+
+void phase1 ()
+{
+ struct channel chan1;
+ memset (&chan1, 0, sizeof (struct channel));
+ pubsubd_channel_new (&chan1, "chan1");
+
+ struct channel chan2;
+ memset (&chan2, 0, sizeof (struct channel));
+ pubsubd_channel_new (&chan2, "chan2");
+
+ printf ("chan1:");
+ pubsubd_channel_print (&chan1);
+
+ printf ("chan2:");
+ pubsubd_channel_print (&chan2);
+
+ pubsubd_channel_free (&chan1);
+ pubsubd_channel_free (&chan2);
+}
+
+void phase2 ()
+{
+ struct channel chan1;
+ memset (&chan1, 0, sizeof (struct channel));
+ pubsubd_channel_new (&chan1, "chan1");
+
+ struct channel chan2;
+ memset (&chan2, 0, sizeof (struct channel));
+ pubsubd_channel_new (&chan2, "chan1");
+
+ printf ("chan1:");
+ pubsubd_channel_print (&chan1);
+
+ printf ("chan2:");
+ pubsubd_channel_print (&chan2);
+
+ if (pubsubd_channel_eq (&chan1, &chan2)) {
+ printf ("chan1 == chan2\n");
+ }
+ else {
+ handle_err ("phase2", "pubsubd_channel_eq (&chan1, &chan2) == 0");
+ }
+
+ pubsubd_channel_free (&chan1);
+ pubsubd_channel_free (&chan2);
+}
+
+void phase3 ()
+{
+ struct channels chans;
+ memset (&chans, 0, sizeof (struct channels));
+
+ pubsubd_channels_init (&chans);
+ struct channel * chan1 = pubsubd_channels_add (&chans, "chan1");
+ struct channel * chan2 = pubsubd_channels_add (&chans, "chan2");
+ pubsubd_channels_print (&chans);
+ pubsubd_channels_del (&chans, chan1);
+ pubsubd_channels_print (&chans);
+ pubsubd_channels_del (&chans, chan2);
+ pubsubd_channels_print (&chans);
+}
+
+void phase4 ()
+{
+ struct channels chans;
+ memset (&chans, 0, sizeof (struct channels));
+
+ pubsubd_channels_init (&chans);
+ struct channel * chan1 = pubsubd_channels_add (&chans, "chan1");
+ struct channel * chan2 = pubsubd_channels_add (&chans, "chan2");
+
+ struct process proc1;
+ fake_process (&proc1, 0, 0, 1);
+
+ struct process proc2;
+ fake_process (&proc2, 0, 0, 2);
+
+ printf ("chan1: proc1, chan2: proc2\n");
+ pubsubd_channel_subscribe (chan1, &proc1);
+ pubsubd_channel_subscribe (chan2, &proc2);
+
+ pubsubd_channels_print (&chans);
+ pubsubd_channels_del_all (&chans);
+
+ printf ("channels removed\n");
+ pubsubd_channels_print (&chans);
+}
+
+void phase5 ()
+{
+ struct channels chans;
+ memset (&chans, 0, sizeof (struct channels));
+
+ pubsubd_channels_init (&chans);
+ pubsubd_channels_add (&chans, "chan1");
+ pubsubd_channels_add (&chans, "chan2");
+
+ struct process proc1;
+ fake_process (&proc1, 0, 0, 1);
+
+ struct process proc2;
+ fake_process (&proc2, 0, 0, 2);
+
+ printf ("chan1 & 2 => proc1 and 2 added\n");
+ pubsubd_channels_subscribe (&chans, "chan1", &proc1);
+ pubsubd_channels_subscribe (&chans, "chan1", &proc2);
+
+ pubsubd_channels_subscribe (&chans, "chan2", &proc1);
+ pubsubd_channels_subscribe (&chans, "chan2", &proc2);
+
+ pubsubd_channels_print (&chans);
+
+ printf ("chan1 => proc1 removed\n");
+ pubsubd_channels_unsubscribe (&chans, "chan1", &proc1);
+
+ pubsubd_channels_print (&chans);
+ pubsubd_channels_del_all (&chans);
+
+ printf ("channels removed\n");
+ pubsubd_channels_print (&chans);
+}
+
+int main(int argc, char * argv[])
+{
+ argc = argc;
+ argv = argv;
+
+ // phase1(); // new + print + free
+ // phase2(); // new + print + eq + free
+
+ // channels
+ // phase3(); // channels init + add + print + del
+ // phase4(); // channels del_all + channel subscribe
+ phase5(); // channels del_all + channels subscribe + unsubscribe
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/tests/msg.c b/pubsub/tests/msg.c
new file mode 100644
index 0000000..eb54449
--- /dev/null
+++ b/pubsub/tests/msg.c
@@ -0,0 +1,56 @@
+#include
+#include
+#include
+
+#include "../lib/msg.h"
+#include "../../core/error.h"
+#include "../../core/utils.h"
+
+#define CHAN "chan1"
+#define DATA "hello chan1"
+
+int main(int argc, char * argv[])
+{
+ argc = argc;
+ argv = argv;
+
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+
+ msg.type = 8;
+
+ msg.chanlen = strlen (CHAN) + 1;
+ msg.chan = malloc (msg.chanlen);
+ memset (msg.chan, 0, msg.chanlen);
+ memcpy (msg.chan, CHAN, msg.chanlen);
+
+ msg.datalen = strlen (DATA) + 1;
+ msg.data = malloc (msg.datalen);
+ memset (msg.data, 0, msg.datalen);
+ memcpy (msg.data, DATA, msg.datalen);
+
+ printf ("msg 1: ");
+ pubsub_msg_print (&msg);
+
+ char *buffer = NULL;
+ size_t len = 0;
+
+ pubsub_msg_serialize (&msg, &buffer, &len);
+ mprint_hexa ("buffer msg 1", (unsigned char *) buffer, len);
+
+ struct pubsub_msg msg2;
+ memset (&msg2, 0, sizeof (struct pubsub_msg));
+
+ pubsub_msg_unserialize (&msg2, buffer, len);
+
+ printf ("msg 2: ");
+ pubsub_msg_print (&msg2);
+
+ pubsub_msg_free (&msg);
+ pubsub_msg_free (&msg2);
+
+ if (buffer != NULL)
+ free (buffer);
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/app/pubsub-test-send-params.c b/pubsub/tests/pubsub-test-send-params.c
similarity index 98%
rename from pubsub/app/pubsub-test-send-params.c
rename to pubsub/tests/pubsub-test-send-params.c
index b4c9d6a..a839d4b 100644
--- a/pubsub/app/pubsub-test-send-params.c
+++ b/pubsub/tests/pubsub-test-send-params.c
@@ -1,3 +1,6 @@
+int main() { return 0; }
+
+#if 0
#include "../lib/pubsubd.h"
#include
#include
@@ -125,3 +128,4 @@ main(int argc, char **argv, char **env)
return EXIT_SUCCESS;
}
+#endif
diff --git a/pubsub/app/pubsub-test-send.c b/pubsub/tests/pubsub-test-send.c
similarity index 97%
rename from pubsub/app/pubsub-test-send.c
rename to pubsub/tests/pubsub-test-send.c
index 8aaeca9..fa3dcc4 100644
--- a/pubsub/app/pubsub-test-send.c
+++ b/pubsub/tests/pubsub-test-send.c
@@ -1,3 +1,6 @@
+int main() { return 0; }
+
+#if 0
#include "../lib/pubsubd.h"
#include
#include
@@ -57,3 +60,4 @@ main(int argc, char **argv, char **env)
return EXIT_SUCCESS;
}
+#endif