From 14fd9b84b952f388ab4b9b70abb694c08db449d9 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Wed, 7 Sep 2016 23:26:28 +0200
Subject: [PATCH 01/12] pubsubd: test on channels - init, add, get, del all
---
lib/pubsubd.c | 11 +++-
pubsub/Makefile | 9 ++-
pubsub/test-chan-lists.c | 120 +++++++++++++++++++++++++++++++++++++++
3 files changed, 135 insertions(+), 5 deletions(-)
create mode 100644 pubsub/test-chan-lists.c
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index facc71f..ed7e222 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -114,7 +114,7 @@ void pubsubd_subscriber_init (struct app_list_head **chans) {
void pubsubd_channels_print (const struct channels *chans)
{
- printf ("\033[36mmchannels\033[00m\n\n");
+ printf ("\033[36mmchannels\033[00m\n");
if (chans == NULL)
return ;
@@ -130,13 +130,19 @@ void pubsubd_channel_print (const struct channel *c)
if (c == NULL || c->chan == NULL)
return;
- printf ( "\033[32mchan %s\033[00m\n\t", c->chan);
+ if (c->chan == NULL) {
+ printf ( "\033[32mchan name not available\033[00m\n");
+ }
+ else {
+ printf ( "\033[32mchan %s\033[00m\n", c->chan);
+ }
if (c->alh == NULL)
return;
struct app_list_elm *ale = NULL;
LIST_FOREACH(ale, c->alh, entries) {
+ printf ("\t");
srv_process_print (ale->p);
}
}
@@ -361,6 +367,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
int index = 0;
int version = 0;
+ // chan name
char chan[BUFSIZ];
memset (chan, 0, BUFSIZ);
diff --git a/pubsub/Makefile b/pubsub/Makefile
index 6b95c76..332cf7c 100644
--- a/pubsub/Makefile
+++ b/pubsub/Makefile
@@ -1,5 +1,5 @@
CC=gcc
-CFLAGS=-Wall -g
+CFLAGS=-Wall -g -Wextra
LDFLAGS= -pthread
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
EXEC=$(basename $(wildcard *.c))
@@ -15,8 +15,11 @@ $(EXEC): $(OBJECTS) $(CFILES)
.c.o:
$(CC) -c $(CFLAGS) $< -o $@
+$(TESTS):
+ valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@)
+
clean:
- -rm $(OBJECTS)
+ @-rm $(OBJECTS)
mrproper: clean
- rm $(EXEC)
+ @-rm $(EXEC)
diff --git a/pubsub/test-chan-lists.c b/pubsub/test-chan-lists.c
new file mode 100644
index 0000000..fbcada6
--- /dev/null
+++ b/pubsub/test-chan-lists.c
@@ -0,0 +1,120 @@
+#include "../lib/pubsubd.h"
+#include
+
+#define TEST_NAME "test-chan-lists"
+
+int
+create_chan (struct channel *c, const char * name) {
+ if (c == NULL) {
+ return 1;
+ }
+
+ if (c->chan == NULL)
+ c->chan = malloc (BUFSIZ);
+ memset (c->chan, 0, BUFSIZ);
+ memcpy (c->chan, name, strlen (name));
+ c->chanlen = strlen (name);
+ return 0;
+}
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf(stderr, "%s\n", str);
+ exit(rvalue);
+}
+
+int
+main(int argc, char **argv, char **env)
+{
+ struct service srv;
+ memset (&srv, 0, sizeof (struct service));
+ srv_init (argc, argv, env, &srv, TEST_NAME, NULL);
+ printf ("Listening on %s.\n", srv.spath);
+
+ // creates the service named pipe, that listens to client applications
+ if (srv_create (&srv))
+ ohshit(1, "service_create error");
+
+ // init chans list
+ struct channels chans;
+ memset (&chans, 0, sizeof (struct channels));
+ pubsubd_channels_init (&chans);
+
+ // for each new process
+ // struct app_list_elm ale1;
+ // memset (&ale1, 0, sizeof (struct app_list_elm));
+
+ // warning : this is a local structure, not exactly the same in the prog.
+ struct channel chan;
+ memset (&chan, 0, sizeof (struct channel));
+ create_chan (&chan, "coucou");
+
+ // to emulate
+ // pubsubd_get_new_process (&srv, &ale1, &chans, &chan);
+
+ // FIRST CHAN TO BE ADDED
+ // search for the chan in channels, add it if not found
+ struct channel *new_chan = NULL;
+ new_chan = pubsubd_channel_get (&chans, &chan);
+ if (new_chan == NULL) {
+ new_chan = pubsubd_channels_add (&chans, &chan);
+ pubsubd_subscriber_init (&new_chan->alh);
+ }
+ else {
+ ohshit (2, "error : new chan, can't be found in channels yet");
+ }
+ pubsubd_channel_free (&chan);
+
+ printf ("print the channels, 1 chan\n");
+ printf ("--\n");
+ pubsubd_channels_print (&chans);
+ printf ("--\n");
+
+ // SAME CHAN, SHOULD NOT BE ADDED
+ create_chan (&chan, "coucou");
+ // search for the chan in channels, add it if not found
+ new_chan = pubsubd_channel_get (&chans, &chan);
+ if (new_chan == NULL) {
+ ohshit (3, "error : same chan, shouldn't be added in channels");
+ }
+ else {
+ printf ("already in the 'channels' structure\n");
+ }
+
+ printf ("print the channels, 1 chan\n");
+ printf ("--\n");
+ pubsubd_channels_print (&chans);
+ printf ("--\n");
+
+ // NEW CHAN, SHOULD BE ADDED
+ create_chan (&chan, "salut");
+ // search for the chan in channels, add it if not found
+ new_chan = pubsubd_channel_get (&chans, &chan);
+ if (new_chan == NULL) {
+ new_chan = pubsubd_channels_add (&chans, &chan);
+ pubsubd_subscriber_init (&new_chan->alh);
+ }
+ else {
+ ohshit (4, "error : new chan, should be added in channels");
+ }
+ pubsubd_channel_free (&chan);
+
+ printf ("print the channels, 2 chans\n");
+ printf ("--\n");
+ pubsubd_channels_print (&chans);
+ printf ("--\n");
+
+ // end the application
+ pubsubd_channels_del_all (&chans);
+
+ printf ("\nshould be empty now\n");
+ printf ("--\n");
+ pubsubd_channels_print (&chans);
+ printf ("--\n");
+
+ // the application will shut down, and remove the service named pipe
+ if (srv_close (&srv))
+ ohshit (1, "service_close error");
+
+ return EXIT_SUCCESS;
+}
From e9169a633d399c074c84c94d2f44a01eb1537b5a Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Wed, 7 Sep 2016 23:46:00 +0200
Subject: [PATCH 02/12] pubsubd: channel copy improved
---
lib/pubsubd.c | 36 ++++++++++++++++++++++++------------
lib/pubsubd.h | 1 +
pubsub/test-chan-lists.c | 20 +++-----------------
3 files changed, 28 insertions(+), 29 deletions(-)
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index ed7e222..5fce9c7 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -59,15 +59,34 @@ struct channel * pubsubd_channel_copy (struct channel *c)
memcpy (copy, c, sizeof(struct channel));
if (c->chan != NULL) {
- // copy->chan = strndup (c->chan, c->chanlen);
- copy->chan = malloc (BUFSIZ);
- memcpy (copy->chan, c->chan, BUFSIZ);
+ copy->chan = malloc (c->chanlen);
+ memset (copy->chan, 0, c->chanlen);
+ memcpy (copy->chan, c->chan, c->chanlen);
copy->chanlen = c->chanlen;
}
return copy;
}
+int pubsubd_channel_new (struct channel *c, const char * name)
+{
+ if (c == NULL) {
+ return 1;
+ }
+
+ size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name) + 1;
+
+ printf ("NAME : %s, SIZE : %ld\n", name, nlen);
+
+ if (c->chan == NULL)
+ c->chan = malloc (nlen);
+
+ memset (c->chan, 0, nlen);
+ memcpy (c->chan, name, nlen);
+ c->chanlen = nlen;
+ return 0;
+}
+
void pubsubd_channel_free (struct channel * c)
{
// TODO
@@ -425,16 +444,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
*c = malloc (sizeof (struct channel));
}
- if (c[0]->chan != NULL) {
- free (c[0]->chan);
- c[0]->chan = NULL;
- }
-
chan[BUFSIZ -1] = '\0';
- // c[0]->chan = strndup (chan, BUFSIZ);
- c[0]->chan = malloc (BUFSIZ);
- memcpy(c[0]->chan, chan, BUFSIZ);
- c[0]->chanlen = strlen (chan);
+ pubsubd_channel_new (*c, chan);
+
struct channel *new_chan = NULL;
new_chan = pubsubd_channel_get (chans, *c);
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index 30f5336..c261ac7 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -60,6 +60,7 @@ struct channel {
};
// simple channel
+int pubsubd_channel_new (struct channel *c, const char *name);
struct channel * pubsubd_channel_copy (struct channel *c);
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
void pubsubd_channel_free (struct channel *c);
diff --git a/pubsub/test-chan-lists.c b/pubsub/test-chan-lists.c
index fbcada6..826b64b 100644
--- a/pubsub/test-chan-lists.c
+++ b/pubsub/test-chan-lists.c
@@ -3,20 +3,6 @@
#define TEST_NAME "test-chan-lists"
-int
-create_chan (struct channel *c, const char * name) {
- if (c == NULL) {
- return 1;
- }
-
- if (c->chan == NULL)
- c->chan = malloc (BUFSIZ);
- memset (c->chan, 0, BUFSIZ);
- memcpy (c->chan, name, strlen (name));
- c->chanlen = strlen (name);
- return 0;
-}
-
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
@@ -47,7 +33,7 @@ main(int argc, char **argv, char **env)
// warning : this is a local structure, not exactly the same in the prog.
struct channel chan;
memset (&chan, 0, sizeof (struct channel));
- create_chan (&chan, "coucou");
+ pubsubd_channel_new (&chan, "coucou");
// to emulate
// pubsubd_get_new_process (&srv, &ale1, &chans, &chan);
@@ -71,7 +57,7 @@ main(int argc, char **argv, char **env)
printf ("--\n");
// SAME CHAN, SHOULD NOT BE ADDED
- create_chan (&chan, "coucou");
+ pubsubd_channel_new (&chan, "coucou");
// search for the chan in channels, add it if not found
new_chan = pubsubd_channel_get (&chans, &chan);
if (new_chan == NULL) {
@@ -87,7 +73,7 @@ main(int argc, char **argv, char **env)
printf ("--\n");
// NEW CHAN, SHOULD BE ADDED
- create_chan (&chan, "salut");
+ pubsubd_channel_new (&chan, "salut");
// search for the chan in channels, add it if not found
new_chan = pubsubd_channel_get (&chans, &chan);
if (new_chan == NULL) {
From 4065b1ab0a849fb0d96f209a4b7814ebf71c2d17 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Thu, 8 Sep 2016 13:23:07 +0200
Subject: [PATCH 03/12] pubsubd: test
---
pubsub/pubsubd.c | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index f8c6ffd..cfff9e2 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -18,6 +18,10 @@ struct worker_params {
void * pubsubd_worker_thread (void *params)
{
struct worker_params *wp = (struct worker_params *) params;
+ if (wp == NULL) {
+ fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
+ return NULL;
+ }
while (1) {
// each chan has a list of subscribers
@@ -30,7 +34,7 @@ void * pubsubd_worker_thread (void *params)
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
- sleep (5);
+ sleep (5); // TODO DEBUG
pubsubd_msg_recv (wp->ale->p, &m);
pubsubd_msg_print (&m);
@@ -67,7 +71,7 @@ void * pubsubd_worker_thread (void *params)
pthread_exit (NULL);
}
- int
+int
main(int argc, char **argv, char **env)
{
struct service srv;
From 0d50185b3aa3576cbe2e42869a9f6ddc3c0f2285 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Thu, 8 Sep 2016 23:40:48 +0200
Subject: [PATCH 04/12] pubsubd: debug test program
---
pubsub/pubsub-test-send-params.c | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c
index 88fd4c8..bfbb9fc 100644
--- a/pubsub/pubsub-test-send-params.c
+++ b/pubsub/pubsub-test-send-params.c
@@ -17,7 +17,7 @@ void usage (char **argv)
void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan)
{
- printf ("Simulate connnection : pid %d index %d version %d "
+ printf ("Simulate connection : pid %d index %d version %d "
"cmd %s chan %s\n"
, pid, index, version, cmd, chan );
@@ -29,9 +29,11 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
struct process p;
memset (&p, 0, sizeof (struct process));
+ printf ("app creation\n");
if (app_create (&p, pid, index, version)) // called by the application
ohshit (1, "app_create");
+ printf ("connection\n");
// send a message to warn the service we want to do something
// line : pid index version action chan
pubsub_connection (&srv, &p, PUBSUB_PUB, chan);
@@ -42,19 +44,25 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
// first message, "coucou"
m.type = PUBSUB_TYPE_INFO;
m.chan = malloc (strlen (chan) + 1);
+ memset (m.chan, 0, strlen (chan) + 1);
m.chan[strlen (chan)] = '\0';
m.chanlen = strlen (chan);
m.data = malloc (strlen (MYMESSAGE) + 1);
+ memset (m.data, 0, strlen (MYMESSAGE) + 1);
+ strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1);
m.datalen = strlen (MYMESSAGE);
- m.datalen = strlen (MYMESSAGE);
+
+ printf ("send message\n");
pubsub_msg_send (&p, &m);
// free everything
pubsubd_msg_free (&m);
+ printf ("disconnection\n");
// disconnect from the server
pubsub_disconnect (&p);
+ printf ("destroying app\n");
// the application will shut down, and remove the application named pipes
if (app_destroy (&p))
ohshit (1, "app_destroy");
From 368e54d4cff07efbb7f500f494b9f5f317e4e20e Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Thu, 8 Sep 2016 23:41:15 +0200
Subject: [PATCH 05/12] pubsubd: test programs
---
pubsub/msg-serialize.c | 48 +++++++++++++++++++++++++++++++++
pubsub/msg-unserialize.c | 43 +++++++++++++++++++++++++++++
pubsub/test-test-send-params.sh | 38 ++++++++++++++++++++++++++
3 files changed, 129 insertions(+)
create mode 100644 pubsub/msg-serialize.c
create mode 100644 pubsub/msg-unserialize.c
create mode 100755 pubsub/test-test-send-params.sh
diff --git a/pubsub/msg-serialize.c b/pubsub/msg-serialize.c
new file mode 100644
index 0000000..3138f57
--- /dev/null
+++ b/pubsub/msg-serialize.c
@@ -0,0 +1,48 @@
+#include "../lib/pubsubd.h"
+#include
+#include
+
+#define MESSAGE "coucou"
+#define CHAN "chan1"
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
+ exit (rvalue);
+}
+
+void usage (char **argv)
+{
+ printf ( "usage: %s\n", argv[0]);
+}
+
+int
+main(int argc, char **argv)
+{
+ if (argc != 1) {
+ usage (argv);
+ exit (1);
+ }
+
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+ msg.type = PUBSUB_TYPE_MESSAGE;
+ msg.chan = malloc (strlen (CHAN) + 1);
+ strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
+ msg.chanlen = strlen (CHAN) + 1;
+
+ msg.data = malloc (strlen (MESSAGE) + 1);
+ strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
+ msg.datalen = strlen (MESSAGE) + 1;
+
+ char *data = NULL;
+ size_t len = 0;
+ pubsubd_msg_serialize (&msg, &data, &len);
+ pubsubd_msg_free (&msg);
+
+ if (len != write (1, data, len)) {
+ ohshit (1, "unable to write the data");
+ }
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/msg-unserialize.c b/pubsub/msg-unserialize.c
new file mode 100644
index 0000000..8596ebe
--- /dev/null
+++ b/pubsub/msg-unserialize.c
@@ -0,0 +1,43 @@
+#include "../lib/pubsubd.h"
+#include
+#include
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
+ exit (rvalue);
+}
+
+void usage (char **argv)
+{
+ printf ( "usage: cat msg | %s\n", argv[0]);
+}
+
+void msg_print (struct pubsub_msg *msg) {
+ printf ("msg: type=%d chan=%.*s, data=%.*s\n"
+ , msg->type
+ , msg->chanlen, msg->chan
+ , msg->datalen, msg->data);
+}
+
+int
+main(int argc, char **argv)
+{
+
+ if (argc != 1) {
+ usage (argv);
+ exit (1);
+ }
+
+ char data[BUFSIZ];
+ memset (data, 0, BUFSIZ);
+ size_t len = read (0, data, BUFSIZ);
+ printf ("msg len %ld\n", len);
+
+ struct pubsub_msg msg;
+ pubsubd_msg_unserialize (&msg, data, len);
+ msg_print (&msg);
+ pubsubd_msg_free (&msg);
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/test-test-send-params.sh b/pubsub/test-test-send-params.sh
new file mode 100755
index 0000000..dc0a980
--- /dev/null
+++ b/pubsub/test-test-send-params.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# start pubsub-test-send alone, with some parameters
+# then test it with this script
+
+REP=/tmp/ipc/
+PID=10
+INDEX=1
+VERSION=1
+ACTION="pub"
+CHAN="chan1"
+
+if [ $# != 0 ] ; then
+ PID=$1
+ shift
+fi
+
+if [ $# != 0 ] ; then
+ INDEX=$1
+ shift
+fi
+
+if [ $# != 0 ] ; then
+ ACTION=$1
+ shift
+fi
+
+if [ $# != 0 ] ; then
+ CHAN=$1
+ shift
+fi
+
+echo "there should be a line in $REP/pubsub"
+cat $REP/pubsub
+
+echo ""
+echo "there should be something to read in $REP/${PID}-${INDEX}-out"
+cat $REP/${PID}-${INDEX}-out | xxd
From 21e326a3d419430c2f03992f7d4a70ba344d65bd Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 01:36:44 +0200
Subject: [PATCH 06/12] pubsubd: GROS CHANTIER
---
lib/communication.c | 57 ++++++++++++++++----------------
lib/pubsubd.c | 25 +++++++++-----
pubsub/Makefile | 2 +-
pubsub/msg-unserialize.c | 9 +----
pubsub/pubsub-test-send-params.c | 31 ++++++++++-------
pubsub/pubsubd.c | 30 +++++++++++++++--
6 files changed, 93 insertions(+), 61 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 6abd9f2..12a9c93 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -6,7 +6,7 @@ int file_open (FILE **f, const char *path, const char *mode)
{
printf ("opening %s\n", path);
if (*f != NULL) {
- printf ("f != NULL : %p\n", (void*) *f);
+ // printf ("f != NULL : %p\n", (void*) *f);
if (file_close (*f)) {
return ER_FILE_CLOSE;
}
@@ -16,7 +16,7 @@ int file_open (FILE **f, const char *path, const char *mode)
fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
return ER_FILE_OPEN;
}
- printf ("opened : %ld\n", (long) *f);
+ // printf ("opened : %ld\n", (long) *f);
return 0;
}
@@ -24,11 +24,11 @@ int file_open (FILE **f, const char *path, const char *mode)
int file_close (FILE *f)
{
if (f != 0) {
- printf ("before fclosing\n");
+ // printf ("before fclosing\n");
if (fclose (f)) {
return ER_FILE_CLOSE;
}
- printf ("after fclosing\n");
+ // printf ("after fclosing\n");
}
return 0;
}
@@ -48,16 +48,21 @@ int file_read (FILE *f, char **buf, size_t *msize) {
}
}
- *msize = fread (*buf, *msize, 1, f);
- if (*msize == 0) {
+ int ret = 0;
+
+ ret = fread (*buf, *msize, 1, f);
+ if (ret < 0) {
fprintf (stderr, "err can't read a file\n");
- if (ER_FILE_CLOSE == file_close (f)) {
- fprintf (stderr, "err closing the file\n");
- return ER_FILE_CLOSE;
- }
+ ret = file_close (f);
+ if (ret != 0)
+ return ret;
return ER_FILE_READ;
}
+ ret = file_close (f);
+ if (ret != 0)
+ return ret;
+
return 0;
}
@@ -260,16 +265,13 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize
return ER_FILE_OPEN;
}
+ int ret = 0;
+
if (cb != NULL) {
- int ret = (*cb) (p->out, buf, msize);
- if (ret != 0)
- return ret;
+ ret = (*cb) (p->out, buf, msize);
}
else {
- int ret = file_read (p->out, buf, msize);
- if (ret != 0) {
- return ret;
- }
+ ret = file_read (p->out, buf, msize);
}
// printf ("DEBUG read, size %ld : %s\n", *msize, *buf);
@@ -279,7 +281,7 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize
}
p->out = NULL;
- return 0;
+ return ret;
}
int srv_read (struct process *p, char ** buf, size_t * msize)
@@ -289,10 +291,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize)
return ER_FILE_OPEN;
}
- int ret = file_read (p->out, buf, msize);
+ int ret = 0;
+
+ ret = file_read (p->out, buf, msize);
if (ret != 0) {
p->out = NULL;
- return ret;
}
// printf ("DEBUG read, size %ld : %s\n", *msize, buf);
@@ -300,11 +303,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize)
if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL;
- return ER_FILE_CLOSE;
+ ret = ER_FILE_CLOSE;
}
p->out = NULL;
- return 0;
+ return ret;
}
int srv_write (struct process *p, char * buf, size_t msize)
@@ -455,20 +458,18 @@ int app_read_cb (struct process *p, char ** buf, size_t * msize
return 1;
}
+ int ret = 0;
+
if (cb != NULL) {
- int ret = (*cb) (p->in, buf, msize);
- if (ret != 0)
- return ret;
+ ret = (*cb) (p->in, buf, msize);
}
else {
- int ret = file_read (p->in, buf, msize);
+ ret = file_read (p->in, buf, msize);
if (ret != 0) {
p->in = NULL;
- return ret;
}
}
-
if (ER_FILE_CLOSE == file_close (p->in)) {
fprintf (stderr, "err closing the file %s\n", p->path_in);
}
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 5fce9c7..034fd38 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -390,6 +390,8 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
char chan[BUFSIZ];
memset (chan, 0, BUFSIZ);
+ printf ("INIT: %s\n", buf);
+
for (str = buf, i = 1; ; str = NULL, i++) {
token = strtok_r(str, " ", &saveptr);
if (token == NULL)
@@ -417,7 +419,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
case 5 : {
if (ale->action != PUBSUB_QUIT)
memcpy (chan, token, (strlen (token) < BUFSIZ) ?
- strlen (token) : BUFSIZ);
+ strlen (token) -1 : BUFSIZ);
break;
}
}
@@ -465,6 +467,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
return 0;
}
+#if 0
// TODO CBOR
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
{
@@ -550,6 +553,8 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
return 0;
}
+#endif
+
// alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{
@@ -570,14 +575,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
void pubsubd_msg_print (const struct pubsub_msg *msg)
{
- if (msg == NULL) {
- return;
- }
-
- printf ("\t\t\033[36mMessage\033[00m\n");
- printf ("\t\ttype %d\n", msg->type);
- printf ("\t\tchan %s\n", msg->chan);
- printf ("\t\tdata %s\n", msg->data);
+ printf ("msg: type=%d chan=%s, data=%s\n"
+ , msg->type, msg->chan, msg->data);
}
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
@@ -585,7 +584,11 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
+#if 0
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
+#else
+ srv_read (p, &buf, &mlen);
+#endif
pubsubd_msg_unserialize (m, buf, mlen);
@@ -695,7 +698,11 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
+#if 0
app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
+#else
+ app_read_cb (p, &buf, &mlen, NULL);
+#endif
pubsubd_msg_unserialize (m, buf, mlen);
diff --git a/pubsub/Makefile b/pubsub/Makefile
index 332cf7c..10ac542 100644
--- a/pubsub/Makefile
+++ b/pubsub/Makefile
@@ -19,7 +19,7 @@ $(TESTS):
valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@)
clean:
- @-rm $(OBJECTS)
+ @-rm $(OBJECTS) *.o
mrproper: clean
@-rm $(EXEC)
diff --git a/pubsub/msg-unserialize.c b/pubsub/msg-unserialize.c
index 8596ebe..a4dad4f 100644
--- a/pubsub/msg-unserialize.c
+++ b/pubsub/msg-unserialize.c
@@ -13,13 +13,6 @@ void usage (char **argv)
printf ( "usage: cat msg | %s\n", argv[0]);
}
-void msg_print (struct pubsub_msg *msg) {
- printf ("msg: type=%d chan=%.*s, data=%.*s\n"
- , msg->type
- , msg->chanlen, msg->chan
- , msg->datalen, msg->data);
-}
-
int
main(int argc, char **argv)
{
@@ -36,7 +29,7 @@ main(int argc, char **argv)
struct pubsub_msg msg;
pubsubd_msg_unserialize (&msg, data, len);
- msg_print (&msg);
+ pubsubd_msg_print (&msg);
pubsubd_msg_free (&msg);
return EXIT_SUCCESS;
diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c
index bfbb9fc..7fcfa0d 100644
--- a/pubsub/pubsub-test-send-params.c
+++ b/pubsub/pubsub-test-send-params.c
@@ -41,19 +41,26 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
- // first message, "coucou"
- m.type = PUBSUB_TYPE_INFO;
- m.chan = malloc (strlen (chan) + 1);
- memset (m.chan, 0, strlen (chan) + 1);
- m.chan[strlen (chan)] = '\0';
- m.chanlen = strlen (chan);
- m.data = malloc (strlen (MYMESSAGE) + 1);
- memset (m.data, 0, strlen (MYMESSAGE) + 1);
- strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1);
- m.datalen = strlen (MYMESSAGE);
+ if (strcmp (cmd, "pub") == 0) {
+ // first message, "coucou"
+ m.type = PUBSUB_TYPE_MESSAGE;
+ m.chan = malloc (strlen (chan) + 1);
+ memset (m.chan, 0, strlen (chan) + 1);
+ m.chan[strlen (chan)] = '\0';
+ m.chanlen = strlen (chan);
- printf ("send message\n");
- pubsub_msg_send (&p, &m);
+ m.data = malloc (strlen (MYMESSAGE) + 1);
+ memset (m.data, 0, strlen (MYMESSAGE) + 1);
+ strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1);
+ m.datalen = strlen (MYMESSAGE);
+
+ printf ("send message\n");
+ pubsub_msg_send (&p, &m);
+ }
+ else {
+ pubsub_msg_recv (&p, &m);
+ pubsubd_msg_print (&m);
+ }
// free everything
pubsubd_msg_free (&m);
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index cfff9e2..21e9bad 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -23,13 +23,36 @@ void * pubsubd_worker_thread (void *params)
return NULL;
}
+ while (1) {
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+
+ sleep (5); // TODO DEBUG
+ printf ("MESSAGE : ");
+ pubsubd_msg_recv (wp->ale->p, &m);
+
+ pubsubd_msg_print (&m);
+
+ if (m.type == PUBSUB_TYPE_DISCONNECT) {
+ if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
+ fprintf (stderr, "err : subscriber not registered\n");
+ }
+ break;
+ }
+ else {
+ struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
+ pubsubd_msg_send (chan->alh, &m);
+ }
+ }
+
+#if 0
while (1) {
// each chan has a list of subscribers
// someone who only push a msg doesn't need to be registered
- if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
+ // if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
+ if (wp->ale->action == PUBSUB_PUB) {
// publish a message
- printf ("publish or publish and subscribe to chan %s\n"
- , wp->chan->chan);
+ printf ("publish to chan %s\n", wp->chan->chan);
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
@@ -65,6 +88,7 @@ void * pubsubd_worker_thread (void *params)
break;
}
}
+#endif
pubsubd_app_list_elm_free (wp->ale);
From d754958f9acf037daeeefaf91cc51aa66f38cec7 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 12:06:25 +0200
Subject: [PATCH 07/12] pubsubd: simplifications
---
lib/communication.c | 315 ++++---------------------------
lib/communication.h | 10 +-
lib/process.c | 10 +-
lib/process.h | 2 -
lib/pubsubd.c | 103 +---------
pubsub/pubsub-test-send-params.c | 4 -
pubsub/pubsub-test-send.c | 2 -
7 files changed, 43 insertions(+), 403 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 12a9c93..dd62f7d 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -2,82 +2,39 @@
#include
#include
-int file_open (FILE **f, const char *path, const char *mode)
+int file_write (const char *path, const char *buf, size_t msize)
{
- printf ("opening %s\n", path);
- if (*f != NULL) {
- // printf ("f != NULL : %p\n", (void*) *f);
- if (file_close (*f)) {
- return ER_FILE_CLOSE;
- }
- }
- *f = fopen (path, mode);
- if (*f == NULL) {
- fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
+ int fd = open (path, O_WRONLY);
+ if (fd <= 0) {
return ER_FILE_OPEN;
}
- // printf ("opened : %ld\n", (long) *f);
+
+ int ret = 0;
+ ret = write (fd, buf, msize);
- return 0;
+ close (fd);
+
+ return ret;
}
-int file_close (FILE *f)
+int file_read (const char *path, char **buf, size_t *msize)
{
- if (f != 0) {
- // printf ("before fclosing\n");
- if (fclose (f)) {
- return ER_FILE_CLOSE;
- }
- // printf ("after fclosing\n");
- }
- return 0;
-}
-
-int file_read (FILE *f, char **buf, size_t *msize) {
- if (*msize == 0) {
- *msize = BUFSIZ; // default value
- }
-
- if (*buf == NULL) {
- *buf = malloc (*msize);
- if (*buf == NULL) {
- fprintf (stderr, "err can't allocate enough memory (%ld)\n", *msize);
- int ret = file_close (f);
- if (ret != 0)
- return ret;
- }
+ int fd = open (path, O_RDONLY);
+ if (fd <= 0) {
+ return ER_FILE_OPEN;
}
int ret = 0;
-
- ret = fread (*buf, *msize, 1, f);
+ ret = read (fd, *buf, BUFSIZ);
if (ret < 0) {
- fprintf (stderr, "err can't read a file\n");
- ret = file_close (f);
- if (ret != 0)
- return ret;
- return ER_FILE_READ;
- }
-
- ret = file_close (f);
- if (ret != 0)
return ret;
-
- return 0;
-}
-
-int file_write (FILE *f, const char *buf, size_t msize)
-{
- if (0 == fwrite (buf, msize, 1, f)) {
- fprintf (stderr, "err writing in the file\n");
- if (ER_FILE_CLOSE == file_close (f)) {
- fprintf (stderr, "err closing the file\n");
- return ER_FILE_CLOSE;
- }
- return ER_FILE_WRITE;
}
- return 0;
+ *msize = ret;
+
+ close (fd);
+
+ return ret;
}
int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname))
@@ -158,31 +115,10 @@ int srv_close (struct service *srv)
return 0;
}
-// only get a raw line from TMPDIR/
+// TODO remove, replace by file_read
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize)
{
- *buf = malloc(BUFSIZ);
- memset (*buf, 0, BUFSIZ);
-
- FILE * f = NULL;
- if (file_open (&f, srv->spath, "rb")) {
- return ER_FILE_OPEN;
- }
-
- char *ret = NULL;
- ret = fgets (*buf, BUFSIZ, f);
- if (ret == NULL) {
- return ER_FILE_READ;
- }
- buf[0][BUFSIZ -1] = '\0';
-
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
-
- *msize = strlen (*buf);
-
- return 0;
+ return file_read (srv->spath, buf, msize);
}
int srv_get_new_process (const struct service *srv, struct process *p)
@@ -191,39 +127,14 @@ int srv_get_new_process (const struct service *srv, struct process *p)
return -1;
}
- char buf[BUFSIZ];
- memset (buf, 0, BUFSIZ);
-
- // read the pipe, get a process to work on
- struct timespec ts = { 0 };
- struct timespec ts2 = { 0 };
-
- FILE * f = NULL;
- if (file_open (&f, srv->spath, "rb")) {
- return ER_FILE_OPEN;
+ char *buf = NULL;
+ size_t msize = 0;
+ int ret = file_read (srv->spath, &buf, &msize);
+ if (ret <= 0) {
+ fprintf (stderr, "err: listening on %s\n", srv->spath);
+ exit (1);
}
- clock_gettime(CLOCK_REALTIME, &ts);
-
- char *ret = NULL;
- ret = fgets (buf, BUFSIZ, f);
- if (ret == NULL) {
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
- return ER_FILE_READ;
- }
-
- clock_gettime(CLOCK_REALTIME, &ts2);
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
-
- printf("sec: %ld nsec: %ld\n", ts.tv_sec, ts.tv_nsec);
- printf("sec: %ld nsec: %ld\n", ts2.tv_sec, ts2.tv_nsec);
-
- printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec);
-
char *token = NULL, *saveptr = NULL;
char *str = NULL;
int i = 0;
@@ -253,85 +164,14 @@ int srv_get_new_process (const struct service *srv, struct process *p)
return 0;
}
-int srv_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize))
-{
- if (file_open (&p->out, p->path_out, "rb")) {
- fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- }
- return ER_FILE_OPEN;
- }
-
- int ret = 0;
-
- if (cb != NULL) {
- ret = (*cb) (p->out, buf, msize);
- }
- else {
- ret = file_read (p->out, buf, msize);
- }
- // printf ("DEBUG read, size %ld : %s\n", *msize, *buf);
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- }
- p->out = NULL;
-
- return ret;
-}
-
int srv_read (struct process *p, char ** buf, size_t * msize)
{
- if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "rb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_out);
- return ER_FILE_OPEN;
- }
-
- int ret = 0;
-
- ret = file_read (p->out, buf, msize);
- if (ret != 0) {
- p->out = NULL;
- }
-
- // printf ("DEBUG read, size %ld : %s\n", *msize, buf);
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- ret = ER_FILE_CLOSE;
- }
- p->out = NULL;
-
- return ret;
+ return file_read (p->path_out, buf, msize);
}
int srv_write (struct process *p, char * buf, size_t msize)
{
- if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "wb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_in);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (p->in, buf, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the file %s\n", p->path_in);
- p->in = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- p->in = NULL;
- return ER_FILE_CLOSE;
- }
- p->in = NULL;
-
- return 0;
+ return file_write (p->path_in, buf, msize);
}
// APPLICATION
@@ -340,27 +180,10 @@ int srv_write (struct process *p, char * buf, size_t msize)
int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize)
{
if (srv == NULL) {
- return 1;
+ return -1;
}
-
- FILE * f = NULL;
- if (ER_FILE_OPEN == file_open (&f, srv->spath, "wb")) {
- fprintf (stderr, "err opening the service file %s\n", srv->spath);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (f, connectionstr, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the service file %s\n", srv->spath);
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (f)) {
- fprintf (stderr, "err closing the file\n");
- return ER_FILE_CLOSE;
- }
-
- return 0;
+
+ return file_write (srv->spath, connectionstr, msize);
}
int app_create (struct process *p, pid_t pid, int index, int version)
@@ -449,82 +272,12 @@ int app_destroy (struct process *p)
return 0;
}
-int app_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize))
-{
- if (file_open (&p->in, p->path_in, "rb")) {
- fprintf (stderr, "\033[31merr: app_read_cb, file_open\033[00m\n");
- p->in = NULL;
- return 1;
- }
-
- int ret = 0;
-
- if (cb != NULL) {
- ret = (*cb) (p->in, buf, msize);
- }
- else {
- ret = file_read (p->in, buf, msize);
- if (ret != 0) {
- p->in = NULL;
- }
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- }
- p->in = NULL;
-
- return 0;
-}
-
int app_read (struct process *p, char ** buf, size_t * msize)
{
- if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "rb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_in);
- return ER_FILE_OPEN;
- }
-
- int ret = file_read (p->in, buf, msize);
- if (ret != 0) {
- p->in = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- p->in = NULL;
- return ER_FILE_CLOSE;
- }
- p->in = NULL;
-
- return 0;
+ return file_read (p->path_in, buf, msize);
}
int app_write (struct process *p, char * buf, size_t msize)
{
- if (buf == NULL) {
- return ER_FILE_WRITE_PARAMS;
- }
-
- if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_out);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (p->out, buf, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the file %s\n", p->path_out);
- p->out = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- return ER_FILE_CLOSE;
- }
- p->out = NULL;
-
- return 0;
+ return file_write (p->path_out, buf, msize);
}
diff --git a/lib/communication.h b/lib/communication.h
index 6dfbf1d..bc875f5 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -50,8 +50,6 @@ int srv_get_new_process (const struct service *srv, struct process *proc);
int srv_create (struct service *srv);
int srv_close (struct service *srv);
-int srv_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize));
int srv_read (struct process *, char ** buf, size_t *);
int srv_write (struct process *, char * buf, size_t);
@@ -63,15 +61,11 @@ int app_srv_connection (struct service *, const char *, size_t);
int app_create (struct process *, pid_t pid, int index, int version);
int app_destroy (struct process *);
-int app_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize));
int app_read (struct process *, char ** buf, size_t *);
int app_write (struct process *, char * buf, size_t);
// wrappers
-int file_open (FILE **f, const char *path, const char *mode);
-int file_close (FILE *f);
-int file_read (FILE *f, char **buf, size_t *msize);
-int file_write (FILE *f, const char *buf, size_t msize);
+int file_read (const char *path, char **buf, size_t *msize);
+int file_write (const char *path, const char *buf, size_t msize);
#endif
diff --git a/lib/process.c b/lib/process.c
index 1898f98..1f5a7ed 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -28,17 +28,11 @@ void srv_process_gen (struct process *p
memset (p->path_out, 0, PATH_MAX);
snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index);
snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index);
- p->in = NULL;
- p->out = NULL;
-}
-
-void srv_process_free (struct process * p)
-{
- // TODO nothing to do now
}
void srv_process_print (struct process *p)
{
if (p != NULL)
- printf ("process %d : index %d\n", p->pid, p->index);
+ printf ("process %d : index %d, version %d\n"
+ , p->pid, p->index, p->version);
}
diff --git a/lib/process.h b/lib/process.h
index 11bf64d..d707c81 100644
--- a/lib/process.h
+++ b/lib/process.h
@@ -19,11 +19,9 @@ struct process {
unsigned int index;
char path_in [PATH_MAX];
char path_out [PATH_MAX];
- FILE *in, *out;
};
struct process * srv_process_copy (const struct process *p);
-void srv_process_free (struct process * p);
int srv_process_eq (const struct process *p1, const struct process *p2);
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 034fd38..1d83374 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -74,14 +74,14 @@ int pubsubd_channel_new (struct channel *c, const char * name)
return 1;
}
- size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name) + 1;
+ size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name);
printf ("NAME : %s, SIZE : %ld\n", name, nlen);
if (c->chan == NULL)
- c->chan = malloc (nlen);
+ c->chan = malloc (nlen +1);
- memset (c->chan, 0, nlen);
+ memset (c->chan, 0, nlen +1);
memcpy (c->chan, name, nlen);
c->chanlen = nlen;
return 0;
@@ -89,7 +89,6 @@ int pubsubd_channel_new (struct channel *c, const char * name)
void pubsubd_channel_free (struct channel * c)
{
- // TODO
if (c == NULL)
return;
@@ -255,7 +254,7 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{
if (todel == NULL || todel->p == NULL)
return;
- srv_process_free (todel->p);
+ free (todel->p);
}
// MESSAGE, TODO CBOR
@@ -467,94 +466,6 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
return 0;
}
-#if 0
-// TODO CBOR
-int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
-{
- // msg: "type(1) chanlen(8) chan datalen(8) data
-
- printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n");
-
- // read
- char type = ' ';
- if (0 == fread (&type, 1, 1, f)) {
- return ER_FILE_READ;
- }
-
- size_t chanlen = 0;
- if (0 == fread (&chanlen, sizeof (size_t), 1, f)) {
- return ER_FILE_READ;
- }
-
- if (chanlen > BUFSIZ) {
- return ER_FILE_READ;
- }
-
- char *chan = NULL;
- chan = malloc (chanlen);
-
- if (chan == NULL) {
- return ER_MEM_ALLOC;
- }
-
- if (0 == fread (chan, chanlen, 1, f)) {
- return ER_FILE_READ;
- }
-
- size_t datalen = 0;
- if (0 == fread (&datalen, sizeof (size_t), 1, f)) {
- free (chan);
- return ER_FILE_READ;
- }
-
- if (datalen > BUFSIZ) {
- return 1;
- }
-
- char *data = NULL;
- data = malloc (datalen);
- if (data == NULL) {
- free (chan);
- return ER_MEM_ALLOC;
- }
-
- if (0 == fread (data, datalen, 1, f)) {
- free (chan);
- free (data);
- return ER_FILE_READ;
- }
-
- *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen;
- if (*buf == NULL) {
- *buf = malloc(*msize);
- if (*buf == NULL) {
- free (chan);
- free (data);
- return ER_MEM_ALLOC;
- }
- }
-
- // TODO CHECK THIS
- size_t i = 0;
-
- char *cbuf = *buf;
-
- cbuf[i] = type; i++;
- memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (cbuf + i, chan, chanlen); i += chanlen;
- memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (cbuf + i, data, datalen); i += datalen;
-
- free (chan);
- free (data);
-
- printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n");
-
- return 0;
-}
-
-#endif
-
// alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{
@@ -698,11 +609,7 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
-#if 0
- app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
-#else
- app_read_cb (p, &buf, &mlen, NULL);
-#endif
+ app_read (p, &buf, &mlen);
pubsubd_msg_unserialize (m, buf, mlen);
diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c
index 7fcfa0d..b4c9d6a 100644
--- a/pubsub/pubsub-test-send-params.c
+++ b/pubsub/pubsub-test-send-params.c
@@ -73,8 +73,6 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
// the application will shut down, and remove the application named pipes
if (app_destroy (&p))
ohshit (1, "app_destroy");
-
- srv_process_free (&p);
}
void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version)
@@ -93,8 +91,6 @@ void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index,
// send a message to disconnect
// line : pid index version action chan
pubsub_disconnect (&p);
-
- srv_process_free (&p);
}
int
diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c
index 976b76d..8aaeca9 100644
--- a/pubsub/pubsub-test-send.c
+++ b/pubsub/pubsub-test-send.c
@@ -55,7 +55,5 @@ main(int argc, char **argv, char **env)
if (app_destroy (&p))
ohshit (1, "app_destroy");
- srv_process_free (&p);
-
return EXIT_SUCCESS;
}
From 0524f70bab98f09157d4be9f7a85d9a1a070f0fe Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 12:33:47 +0200
Subject: [PATCH 08/12] pubsubd: corrections + tests
---
lib/pubsubd.c | 15 ++-----------
pubsub/msg-serialize.c | 2 +-
pubsub/test-pipe-read.c | 49 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 52 insertions(+), 14 deletions(-)
create mode 100644 pubsub/test-pipe-read.c
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 1d83374..7b31021 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -561,19 +561,8 @@ void pubsub_disconnect (struct process *p)
pubsubd_msg_serialize (&m, &buf, &msize);
int ret = app_write (p, buf, msize);
- switch (ret) {
- case ER_FILE_WRITE :
- fprintf (stderr, "err: ER_FILE_WRITE\n");
- break;
- case ER_FILE_WRITE_PARAMS :
- fprintf (stderr, "err: ER_FILE_WRITE_PARAMS\n");
- break;
- case ER_FILE_OPEN :
- fprintf (stderr, "err: ER_FILE_OPEN\n");
- break;
- case ER_FILE_CLOSE :
- fprintf (stderr, "err: ER_FILE_CLOSE\n");
- break;
+ if (ret != (int) msize) {
+ fprintf (stderr, "err: can't disconnect\n");
}
pubsubd_msg_free (&m);
diff --git a/pubsub/msg-serialize.c b/pubsub/msg-serialize.c
index 3138f57..bcaa165 100644
--- a/pubsub/msg-serialize.c
+++ b/pubsub/msg-serialize.c
@@ -40,7 +40,7 @@ main(int argc, char **argv)
pubsubd_msg_serialize (&msg, &data, &len);
pubsubd_msg_free (&msg);
- if (len != write (1, data, len)) {
+ if ((int) len != write (1, data, len)) {
ohshit (1, "unable to write the data");
}
diff --git a/pubsub/test-pipe-read.c b/pubsub/test-pipe-read.c
new file mode 100644
index 0000000..a2cb78f
--- /dev/null
+++ b/pubsub/test-pipe-read.c
@@ -0,0 +1,49 @@
+#include "../lib/pubsubd.h"
+#include
+
+#define TEST_NAME "test-chan-lists"
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf(stderr, "%s\n", str);
+ exit(rvalue);
+}
+
+void usage (char **argv) {
+ fprintf (stderr, "usage: %s path times\n", argv[0]);
+ fprintf (stderr, "ex: %s /tmp/pipe 5 => you will read 5 times\n", argv[0]);
+ exit (1);
+}
+
+int
+main(int argc, char **argv)
+{
+
+ if (argc != 3)
+ usage (argv);
+
+ char *path = argv[1];
+ int nb = atoi (argv[2]);
+
+ printf ("Listening on %s %d times.\n", path, nb);
+
+ char *buf = NULL;
+ size_t msize = 0;
+
+ int ret = 0;
+
+ while (nb--) {
+ ret = file_read (path, &buf, &msize);
+ if (ret == 0) {
+ printf ("no msg\n");
+ nb++;
+ continue;
+ }
+ struct pubsub_msg m;
+ pubsubd_msg_unserialize (&m, buf, msize);
+ pubsubd_msg_print (&m);
+ sleep (1);
+ }
+
+ return EXIT_SUCCESS;
+}
From 4a32978c536462d0f96b319735ad6af4b92b8290 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 13:29:36 +0200
Subject: [PATCH 09/12] pubsubd: test pipes
---
pubsub/test-pipe-read.c | 1 +
pubsub/test-pipe-write.c | 58 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 59 insertions(+)
create mode 100644 pubsub/test-pipe-write.c
diff --git a/pubsub/test-pipe-read.c b/pubsub/test-pipe-read.c
index a2cb78f..7ab462c 100644
--- a/pubsub/test-pipe-read.c
+++ b/pubsub/test-pipe-read.c
@@ -42,6 +42,7 @@ main(int argc, char **argv)
struct pubsub_msg m;
pubsubd_msg_unserialize (&m, buf, msize);
pubsubd_msg_print (&m);
+ pubsubd_msg_free (&m);
sleep (1);
}
diff --git a/pubsub/test-pipe-write.c b/pubsub/test-pipe-write.c
new file mode 100644
index 0000000..3520928
--- /dev/null
+++ b/pubsub/test-pipe-write.c
@@ -0,0 +1,58 @@
+#include "../lib/pubsubd.h"
+#include
+
+#define CHAN "chan1"
+#define MESSAGE "coucou"
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf(stderr, "%s\n", str);
+ exit(rvalue);
+}
+
+void usage (char **argv) {
+ fprintf (stderr, "usage: %s path times\n", argv[0]);
+ fprintf (stderr, "ex: %s /tmp/pipe 5 => you will write 5 times\n", argv[0]);
+ exit (1);
+}
+
+int
+main(int argc, char **argv)
+{
+
+ if (argc != 3)
+ usage (argv);
+
+ char *path = argv[1];
+ int nb = atoi (argv[2]);
+
+ printf ("Writing on %s %d times.\n", path, nb);
+
+ char *buf = NULL;
+ size_t msize = 0;
+
+ int ret = 0;
+
+ while (nb--) {
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+ msg.type = PUBSUB_TYPE_MESSAGE;
+ msg.chan = malloc (strlen (CHAN) + 1);
+ strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
+ msg.chanlen = strlen (CHAN) + 1;
+ msg.data = malloc (strlen (MESSAGE) + 1);
+ strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
+ msg.datalen = strlen (MESSAGE) + 1;
+
+ pubsubd_msg_serialize (&msg, &buf, &msize);
+ pubsubd_msg_print (&msg);
+ pubsubd_msg_free (&msg);
+ ret = file_write (path, buf, msize);
+ if (ret != msize) {
+ fprintf (stderr, "msg not written\n");
+ }
+ sleep (1);
+ }
+
+ return EXIT_SUCCESS;
+}
From cdf975db2956e405c287855d6b093ab505855731 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 14:44:15 +0200
Subject: [PATCH 10/12] pubsub: tests sur les pipes (read, write)
---
lib/communication.c | 19 ++++++++++++++-----
pubsub/test-pipe-read.c | 21 ++++++++++++++-------
pubsub/test-pipe-write.c | 3 +--
3 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index dd62f7d..0f6d387 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -24,15 +24,24 @@ int file_read (const char *path, char **buf, size_t *msize)
return ER_FILE_OPEN;
}
+ if (*buf == NULL)
+ *buf = malloc (BUFSIZ);
+
int ret = 0;
+ int ret2 = 0;
ret = read (fd, *buf, BUFSIZ);
- if (ret < 0) {
- return ret;
+ if (ret <= 0) {
+ fprintf (stderr, "err: read %s\n", path);
+ }
+ else {
+ *msize = ret;
}
- *msize = ret;
-
- close (fd);
+ ret2 = close (fd);
+ if (ret2 < 0) {
+ fprintf (stderr, "err: close [err: %d] %s\n", ret2, path);
+ perror ("closing");
+ }
return ret;
}
diff --git a/pubsub/test-pipe-read.c b/pubsub/test-pipe-read.c
index 7ab462c..a9c330e 100644
--- a/pubsub/test-pipe-read.c
+++ b/pubsub/test-pipe-read.c
@@ -34,16 +34,23 @@ main(int argc, char **argv)
while (nb--) {
ret = file_read (path, &buf, &msize);
- if (ret == 0) {
- printf ("no msg\n");
+ if (ret <= 0) {
+ fprintf (stderr, "no msg");
+ if (ret == ER_FILE_OPEN) {
+ fprintf (stderr, " ER_FILE_OPEN");
+ }
+ fprintf (stderr, "\n");
nb++;
continue;
}
- struct pubsub_msg m;
- pubsubd_msg_unserialize (&m, buf, msize);
- pubsubd_msg_print (&m);
- pubsubd_msg_free (&m);
- sleep (1);
+ if (msize > 0) {
+ printf ("msg size %ld\t", msize);
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+ pubsubd_msg_unserialize (&m, buf, msize);
+ pubsubd_msg_print (&m);
+ pubsubd_msg_free (&m);
+ }
}
return EXIT_SUCCESS;
diff --git a/pubsub/test-pipe-write.c b/pubsub/test-pipe-write.c
index 3520928..2be8e03 100644
--- a/pubsub/test-pipe-write.c
+++ b/pubsub/test-pipe-write.c
@@ -48,10 +48,9 @@ main(int argc, char **argv)
pubsubd_msg_print (&msg);
pubsubd_msg_free (&msg);
ret = file_write (path, buf, msize);
- if (ret != msize) {
+ if (ret != (int) msize) {
fprintf (stderr, "msg not written\n");
}
- sleep (1);
}
return EXIT_SUCCESS;
From bd2e3e0d15c75e768e3faf2e9631cf59f5579b7a Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 21:52:56 +0200
Subject: [PATCH 11/12] pubsubd: debug
---
lib/communication.c | 6 ----
lib/communication.h | 1 -
lib/pubsubd.c | 9 +++---
lib/pubsubd.h | 2 +-
pubsub/pubsubd.c | 2 +-
pubsub/test-gen-new-process.c | 51 ++++++++++++++++++++++++++++++++++
pubsub/test-gen-new-process.sh | 7 +++++
7 files changed, 65 insertions(+), 13 deletions(-)
create mode 100644 pubsub/test-gen-new-process.c
create mode 100755 pubsub/test-gen-new-process.sh
diff --git a/lib/communication.c b/lib/communication.c
index 0f6d387..68030ce 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -124,12 +124,6 @@ int srv_close (struct service *srv)
return 0;
}
-// TODO remove, replace by file_read
-int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize)
-{
- return file_read (srv->spath, buf, msize);
-}
-
int srv_get_new_process (const struct service *srv, struct process *p)
{
if (srv->spath == NULL) {
diff --git a/lib/communication.h b/lib/communication.h
index bc875f5..f91dd3f 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -37,7 +37,6 @@ int srv_init (int argc, char **argv, char **env
, int (*cb)(int argc, char **argv, char **env
, struct service *srv, const char *sname));
-int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize);
int srv_get_new_process (const struct service *srv, struct process *proc);
/*
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 7b31021..e07c7db 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -363,16 +363,15 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
// COMMUNICATION
-int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
+int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c)
{
- if (srv == NULL || ale == NULL || chans == NULL)
+ if (spath == NULL || ale == NULL || chans == NULL)
return -1;
char *buf = NULL;
size_t msize = 0;
- srv_get_listen_raw (srv, &buf, &msize);
-
+ file_read (spath, &buf, &msize);
// parse pubsubd init msg (sent in TMPDIR/)
//
// line fmt : pid index version action chan
@@ -416,6 +415,8 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
break;
}
case 5 : {
+ // for the last element of the line
+ // drop the following \n
if (ale->action != PUBSUB_QUIT)
memcpy (chan, token, (strlen (token) < BUFSIZ) ?
strlen (token) -1 : BUFSIZ);
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index c261ac7..2dd3e63 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -35,7 +35,7 @@ void pubsubd_msg_print (const struct pubsub_msg *msg);
//
// line fmt : pid index version action chan
// action : quit | pub | sub
-int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
+int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c);
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 21e9bad..1c1984f 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -117,7 +117,7 @@ main(int argc, char **argv, char **env)
struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm));
struct channel *chan = NULL;
- pubsubd_get_new_process (&srv, &ale, &chans, &chan);
+ pubsubd_get_new_process (srv.spath, &ale, &chans, &chan);
pubsubd_channels_print (&chans);
// end the application
diff --git a/pubsub/test-gen-new-process.c b/pubsub/test-gen-new-process.c
new file mode 100644
index 0000000..6999ac1
--- /dev/null
+++ b/pubsub/test-gen-new-process.c
@@ -0,0 +1,51 @@
+#include "../lib/pubsubd.h"
+#include
+#include
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf(stderr, "%s\n", str);
+ exit(rvalue);
+}
+
+void usage (char **argv) {
+ fprintf (stderr, "usage: %s path\n", argv[0]);
+ exit (1);
+}
+
+int
+main(int argc, char **argv)
+{
+ if (argc != 2) {
+ usage (argv);
+ }
+
+ char *spath = argv[1];
+
+ printf ("Listening on %s.\n", spath);
+
+ struct channels chans;
+ memset (&chans, 0, sizeof (struct channels));
+
+ for (int nb = 10, i = 0 ; nb > 0; i++, nb--) {
+ struct app_list_elm ale;
+ memset (&ale, 0, sizeof (struct app_list_elm));
+
+ struct channel chan;
+ memset (&chan, 0, sizeof (struct channel));
+ struct channel *c = &chan;
+
+ pubsubd_get_new_process (spath, &ale, &chans, &c);
+
+ printf ("print the channels, %d chan\n", i);
+ printf ("--\n");
+ pubsubd_channels_print (&chans);
+ printf ("--\n");
+ printf ("still %d remaining processes\n", nb);
+ }
+
+ pubsubd_channels_del_all (&chans);
+
+ return EXIT_SUCCESS;
+}
+
diff --git a/pubsub/test-gen-new-process.sh b/pubsub/test-gen-new-process.sh
new file mode 100755
index 0000000..e6aad3a
--- /dev/null
+++ b/pubsub/test-gen-new-process.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+for i in $(seq 1 10)
+do
+ echo "${i} 1 1 pub chan${i}" > /tmp/ipc/gen
+ sleep 1
+done
From 99ca565bb6bf3729df3a1ec6ceeb2a58211751f5 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Sat, 10 Sep 2016 02:54:53 +0200
Subject: [PATCH 12/12] pubsub: eq test channels -- fixed
---
lib/communication.c | 2 ++
lib/pubsubd.c | 3 ++-
pubsub/test-gen-new-process.sh | 2 +-
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 68030ce..31f41a9 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -162,6 +162,8 @@ int srv_get_new_process (const struct service *srv, struct process *p)
}
}
+ if (buf != NULL)
+ free (buf);
srv_process_gen (p, pid, index, version);
return 0;
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index e07c7db..d1619dd 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -116,7 +116,8 @@ struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
int
pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
{
- return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0);
+ return c1->chanlen == c2->chanlen &&
+ strncmp (c1->chan, c2->chan, c1->chanlen) == 0;
}
// SUBSCRIBER
diff --git a/pubsub/test-gen-new-process.sh b/pubsub/test-gen-new-process.sh
index e6aad3a..67d6699 100755
--- a/pubsub/test-gen-new-process.sh
+++ b/pubsub/test-gen-new-process.sh
@@ -3,5 +3,5 @@
for i in $(seq 1 10)
do
echo "${i} 1 1 pub chan${i}" > /tmp/ipc/gen
- sleep 1
+ sleep 0.1
done