From 2cd6b62bf05c6c6ae3ce53c0c866e0c85be0639c Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Sun, 11 Sep 2016 14:37:41 +0200
Subject: [PATCH] pubsubd: memory leaks, but working
---
lib/communication.c | 27 +++++++++++--
lib/process.c | 4 +-
lib/pubsubd.c | 53 +++++++++++++++++--------
pubsub/pubsubd.c | 70 ++++++++++++++++++++++++++--------
pubsub/test-gen-new-process.sh | 10 +++++
5 files changed, 127 insertions(+), 37 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 271e0ca..8faa685 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -1,32 +1,52 @@
#include "communication.h"
#include
#include
+#include
int file_write (const char *path, const char *buf, size_t msize)
{
+ if (buf == NULL) {
+ fprintf (stderr, "file_write: buf == NULL\n");
+ return -1;
+ }
+
+ printf("file_write: path to open %s\n", path);
int fd = open (path, O_WRONLY);
if (fd <= 0) {
+ printf("file_write: fd < 0\n");
+ perror ("file_write: ");
return ER_FILE_OPEN;
}
+ printf("file_write: opened file %s\n", path);
int ret = 0;
+ int ret2 = 0;
ret = write (fd, buf, msize);
+ if (ret <= 0) {
+ fprintf (stderr, "err: written %s\n", path);
+ }
- close (fd);
+ ret2 = close (fd);
+ if (ret2 < 0) {
+ fprintf (stderr, "err: close [err: %d] %s\n", ret2, path);
+ perror ("closing");
+ }
return ret;
}
int file_read (const char *path, char **buf, size_t *msize)
{
- if (buf == NULL)
+ if (buf == NULL) {
+ fprintf (stderr, "file_read: buf == NULL\n");
return -1;
+ }
int fd = open (path, O_RDONLY);
if (fd <= 0) {
return ER_FILE_OPEN;
}
- printf("FILE_READ: opened file %s\n", path);
+ printf("file_read: opened file %s\n", path);
if (*buf == NULL) {
*buf = malloc (BUFSIZ);
@@ -49,7 +69,6 @@ int file_read (const char *path, char **buf, size_t *msize)
perror ("closing");
}
-
return ret;
}
diff --git a/lib/process.c b/lib/process.c
index 899decc..c5fb811 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -27,8 +27,8 @@ void srv_process_gen (struct process *p
memset (p->path_in, 0, PATH_MAX);
memset (p->path_out, 0, PATH_MAX);
- snprintf(p->path_in , PATH_MAX, "%s%d-%d-in" , TMPDIR, pid, index);
- snprintf(p->path_out, PATH_MAX, "%s%d-%d-out", TMPDIR, pid, index);
+ snprintf(p->path_in , PATH_MAX, "%s%d-%d-%d-in" , TMPDIR, pid, index, version);
+ snprintf(p->path_out, PATH_MAX, "%s%d-%d-%d-out", TMPDIR, pid, index, version);
}
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 19e339e..6315bb8 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -115,11 +115,11 @@ struct channel * pubsubd_channel_search (struct channels *chans, char *chan)
struct channel * np = NULL;
LIST_FOREACH(np, chans, entries) {
// TODO debug
- printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n"
- , np->chan, np->chanlen, chan, strlen(chan));
+ // printf ("pubsubd_channel_search: %s (%ld) vs %s (%ld)\n"
+ // , np->chan, np->chanlen, chan, strlen(chan));
if (np->chanlen == strlen (chan)
&& strncmp (np->chan, chan, np->chanlen) == 0) {
- printf ("pubsubd_channel_search: FOUND\n");
+ // printf ("pubsubd_channel_search: FOUND\n");
return np;
}
}
@@ -156,6 +156,20 @@ void pubsubd_subscriber_init (struct app_list_head **chans) {
LIST_INIT(*chans);
}
+void pubsubd_channel_print (const struct channel *chan)
+{
+ if (chan->chan == NULL) {
+ printf ("pubsubd_channels_print: chan->chan == NULL\n");
+ }
+
+ printf ( "\033[32mchan %s\033[00m\n", chan->chan);
+
+ if (chan->alh == NULL)
+ printf ("pubsubd_channels_print: chan->alh == NULL\n");
+ else
+ pubsubd_subscriber_print (chan->alh);
+}
+
void pubsubd_channels_print (const struct channels *chans)
{
printf ("\033[36mmchannels\033[00m\n");
@@ -168,16 +182,7 @@ void pubsubd_channels_print (const struct channels *chans)
struct channel *chan = NULL;
LIST_FOREACH(chan, chans, entries) {
- if (chan->chan == NULL) {
- printf ("pubsubd_channels_print: chan->chan == NULL\n");
- }
-
- printf ( "\033[32mchan %s\033[00m\n", chan->chan);
-
- if (chan->alh == NULL)
- printf ("pubsubd_channels_print: chan->alh == NULL\n");
- else
- pubsubd_subscriber_print (chan->alh);
+ pubsubd_channel_print (chan);
}
}
@@ -291,8 +296,10 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel)
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
{
- if (msg == NULL || data == NULL || len == NULL)
+ if (msg == NULL || data == NULL || len == NULL) {
+ fprintf (stderr, "pubsubd_msg_send: msg or data or len == NULL");
return;
+ }
// msg: "type(1) chanlen(8) chan datalen(8) data
if (msg->type == PUBSUB_TYPE_DISCONNECT) {
@@ -506,12 +513,24 @@ int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
// alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{
+ if (alh == NULL) {
+ fprintf (stderr, "pubsubd_msg_send: alh == NULL");
+ return;
+ }
+
+ if (m == NULL) {
+ fprintf (stderr, "pubsubd_msg_send: m == NULL");
+ return;
+ }
+
struct app_list_elm * ale = NULL;
char *buf = NULL;
size_t msize = 0;
pubsubd_msg_serialize (m, &buf, &msize);
+ printf ("\033[32mmsg to send : %.*s (%ld)\n", (int) msize, buf, msize);
+
LIST_FOREACH(ale, alh, entries) {
srv_write (ale->p, buf, msize);
}
@@ -532,11 +551,13 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
+ while (buf == NULL || mlen == 0) {
#if 0
- srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
+ srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
#else
- srv_read (p, &buf, &mlen);
+ srv_read (p, &buf, &mlen);
#endif
+ }
pubsubd_msg_unserialize (m, buf, mlen);
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 1c1984f..543da73 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -2,6 +2,8 @@
#include
#include
+#define NB_CLIENTS 3
+
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
@@ -17,33 +19,55 @@ struct worker_params {
void * pubsubd_worker_thread (void *params)
{
+ int s = 0;
+
+ s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ if (s != 0)
+ printf ("pthread_setcancelstate: %d\n", s);
+
struct worker_params *wp = (struct worker_params *) params;
if (wp == NULL) {
fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
return NULL;
}
+ struct channels *chans = wp->chans;
+ struct channel *chan = wp->chan;
+ struct app_list_elm *ale = wp->ale;
+
+ free (wp);
+
+ // main loop
while (1) {
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
- sleep (5); // TODO DEBUG
- printf ("MESSAGE : ");
- pubsubd_msg_recv (wp->ale->p, &m);
-
- pubsubd_msg_print (&m);
+ sleep (2); // TODO DEBUG
+ pubsubd_msg_recv (ale->p, &m);
if (m.type == PUBSUB_TYPE_DISCONNECT) {
- if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
+ printf ("process %d disconnecting...\n", ale->p->pid);
+ if ( 0 != pubsubd_subscriber_del (chan->alh, ale)) {
fprintf (stderr, "err : subscriber not registered\n");
}
break;
}
else {
- struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
- pubsubd_msg_send (chan->alh, &m);
+ struct channel *ch = pubsubd_channel_search (chans, chan->chan);
+ if (ch == NULL) {
+ printf ("CHAN NON TROUVE\n");
+ }
+ else {
+ printf ("Je dois dire :\n");
+ pubsubd_msg_print (&m);
+ printf ("CHAN ET PROCESS À QUI JE DOIS COMMUNIQUER\n");
+ pubsubd_channel_print (ch);
+ pubsubd_msg_send (ch->alh, &m);
+ }
}
}
+#if 0
+#endif
#if 0
while (1) {
@@ -90,7 +114,7 @@ void * pubsubd_worker_thread (void *params)
}
#endif
- pubsubd_app_list_elm_free (wp->ale);
+ pubsubd_app_list_elm_free (ale);
pthread_exit (NULL);
}
@@ -112,7 +136,11 @@ main(int argc, char **argv, char **env)
memset (&chans, 0, sizeof (struct channels));
pubsubd_channels_init (&chans);
- for (;;) {
+ pthread_t *thr = NULL;
+ thr = malloc (sizeof (pthread_t) * NB_CLIENTS);
+ memset (thr, 0, sizeof (pthread_t) * NB_CLIENTS);
+
+ for (int i = 0; i < NB_CLIENTS; i++) {
// for each new process
struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm));
@@ -128,7 +156,6 @@ main(int argc, char **argv, char **env)
srv_close (&srv);
// TODO end the threads
-
exit (0);
}
@@ -139,14 +166,27 @@ main(int argc, char **argv, char **env)
wp->chans = &chans;
wp->chan = chan;
- pthread_t thr = 0;
-
- pthread_create (&thr, NULL, pubsubd_worker_thread, wp);
- pthread_detach (thr);
+ pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
+ pthread_detach (thr[i]);
pubsubd_app_list_elm_free (&ale);
}
+ sleep (10);
+ // stop threads
+ for (int i = 0 ; i < NB_CLIENTS ; i++) {
+ pthread_cancel (thr[i]);
+ void *ret = NULL;
+ pthread_join (thr[i], &ret);
+ if (ret != NULL) {
+ free (ret);
+ }
+ }
+ free (thr);
+
+ printf ("QUIT\n");
+ pubsubd_channels_del_all (&chans);
+
// the application will shut down, and remove the service named pipe
if (srv_close (&srv))
ohshit (1, "service_close error");
diff --git a/pubsub/test-gen-new-process.sh b/pubsub/test-gen-new-process.sh
index cc3fe87..fa5657a 100755
--- a/pubsub/test-gen-new-process.sh
+++ b/pubsub/test-gen-new-process.sh
@@ -5,6 +5,16 @@ SERVICE=gen
#SERVICE=pubsub
NB=10
+if [ $# -ge 1 ] ; then
+ SERVICE=$1
+ shift
+fi
+
+if [ $# -ge 1 ] ; then
+ NB=$1
+ shift
+fi
+
for i in $(seq 1 ${NB})
do
mkfifo ${REP}/${i}-1-1-in