From 8537381dacfded8bf81f833a4433d6944c7e9014 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Mon, 12 Sep 2016 12:45:31 +0200
Subject: [PATCH] pubsubd++ (bufix, still some to do), pubsubc++
---
pubsub/pubsubc.c | 38 ++++++++++++++++++--------------------
pubsub/pubsubd.c | 25 ++++++++++++++++---------
2 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/pubsub/pubsubc.c b/pubsub/pubsubc.c
index 75b3935..0cfef11 100644
--- a/pubsub/pubsubc.c
+++ b/pubsub/pubsubc.c
@@ -47,25 +47,23 @@ void main_loop (int argc, char **argv, char **env
pubsub_connection (&srv, &p, action, chan);
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
// meta data on the message
- m.type = PUBSUB_TYPE_MESSAGE;
- m.chan = malloc (strlen (chan) + 1);
- memset (m.chan, 0, strlen (chan) + 1);
- m.chan[strlen (chan)] = '\0';
- m.chanlen = strlen (chan);
+ msg.type = PUBSUB_TYPE_MESSAGE;
+ msg.chan = malloc (strlen (chan) + 1);
+ memset (msg.chan, 0, strlen (chan) + 1);
+ strncpy ((char *) msg.chan, chan, strlen (chan));
+ msg.chan[strlen (chan)] = '\0';
+ msg.chanlen = strlen (chan);
// msg loop
for (;;) {
- struct pubsub_msg msg;
- memset (&msg, 0, sizeof (struct pubsub_msg));
-
char buf[BUFSIZ];
memset (buf, 0, BUFSIZ);
- printf ("msg to send [quit]: ");
+ printf ("msg to send (chan: %s) [quit]: ", msg.chan);
fflush (stdout);
size_t mlen = read (0, buf, BUFSIZ);
@@ -76,20 +74,20 @@ void main_loop (int argc, char **argv, char **env
break;
}
- m.data = malloc (strlen (buf) + 1);
- memset (m.data, 0, strlen (buf) + 1);
- strncpy ((char *) m.data, buf, strlen (buf) + 1);
- m.datalen = strlen (buf);
+ msg.data = malloc (strlen (buf) + 1);
+ memset (msg.data, 0, strlen (buf) + 1);
+ strncpy ((char *) msg.data, buf, strlen (buf) + 1);
+ msg.datalen = strlen (buf);
printf ("send message\n");
- pubsub_msg_send (&p, &m);
- free (m.data);
- m.data = NULL;
- m.datalen = 0;
+ pubsub_msg_send (&p, &msg);
+ free (msg.data);
+ msg.data = NULL;
+ msg.datalen = 0;
}
// free everything
- pubsubd_msg_free (&m);
+ pubsubd_msg_free (&msg);
printf ("disconnection...\n");
// disconnect from the server
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 06c1735..0ab5a37 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -65,6 +65,7 @@ void * pubsubd_worker_thread (void *params)
pubsubd_msg_send (ch->alh, &m);
}
}
+ pubsubd_msg_free (&m);
}
#if 0
#endif
@@ -152,11 +153,8 @@ main(int argc, char **argv, char **env)
// end the application
if (ale.action == PUBSUB_QUIT) {
- printf ("Quitting ...\n");
- pubsubd_channels_del_all (&chans);
- srv_close (&srv);
- // TODO end the threads
- exit (0);
+ pubsubd_app_list_elm_free (&ale);
+ break;
}
// TODO thread to handle multiple clients at a time
@@ -166,15 +164,25 @@ main(int argc, char **argv, char **env)
wp->chans = &chans;
wp->chan = chan;
+ // realloc memory for further workers
+ pthread_t * tmpthr = realloc (thr, sizeof (pthread_t) * (i+1));
+ if (tmpthr == NULL) {
+ fprintf (stderr, "err: can't allocate more thread contexts\n");
+ pubsubd_app_list_elm_free (&ale);
+ break;
+ }
+ else {
+ thr = tmpthr;
+ }
+ memset (thr + i, 0, sizeof (pthread_t));
+
pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
pthread_detach (thr[i]);
- // realloc memory for further workers
- thr = realloc (thr, sizeof (pthread_t) * (i+1));
pubsubd_app_list_elm_free (&ale);
}
- sleep (10);
+ printf ("Quitting ...\n");
// stop threads
for (int j = 0 ; j < i ; j++) {
pthread_cancel (thr[j]);
@@ -186,7 +194,6 @@ main(int argc, char **argv, char **env)
}
free (thr);
- printf ("QUIT\n");
pubsubd_channels_del_all (&chans);
// the application will shut down, and remove the service named pipe