From 0314120304567f0210b49ae3d9ef71190ed3a0ae Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Sun, 11 Sep 2016 16:55:02 +0200
Subject: [PATCH] pubsubd: pubsubc, the client, still in progress but (kind of)
works
---
lib/pubsubd.c | 6 +--
pubsub/pubsubc.c | 122 +++++++++++++++++++++++++++++++++++++++++++++++
pubsub/pubsubd.c | 71 +++++----------------------
3 files changed, 138 insertions(+), 61 deletions(-)
create mode 100644 pubsub/pubsubc.c
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 6315bb8..216fb55 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -407,8 +407,10 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c)
{
- if (spath == NULL || ale == NULL || chans == NULL)
+ if (spath == NULL || ale == NULL || chans == NULL) {
+ fprintf (stderr, "pubsubd_get_new_process: spath or ale or chans == NULL\n");
return -1;
+ }
char *buf = NULL;
size_t msize = 0;
@@ -529,8 +531,6 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
size_t msize = 0;
pubsubd_msg_serialize (m, &buf, &msize);
- printf ("\033[32mmsg to send : %.*s (%ld)\n", (int) msize, buf, msize);
-
LIST_FOREACH(ale, alh, entries) {
srv_write (ale->p, buf, msize);
}
diff --git a/pubsub/pubsubc.c b/pubsub/pubsubc.c
new file mode 100644
index 0000000..75b3935
--- /dev/null
+++ b/pubsub/pubsubc.c
@@ -0,0 +1,122 @@
+#include "../lib/pubsubd.h"
+#include
+#include
+
+void
+ohshit(int rvalue, const char* str) {
+ fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
+ exit (rvalue);
+}
+
+void usage (char **argv)
+{
+ printf ( "usage: %s\n", argv[0]);
+}
+
+void main_loop (int argc, char **argv, char **env
+ , pid_t pid, int index, int version
+ , char *cmd, char *chan)
+{
+ printf ("connection : pid %d index %d version %d "
+ "cmd %s chan %s\n"
+ , pid, index, version, cmd, chan );
+
+ struct service srv;
+ memset (&srv, 0, sizeof (struct service));
+ srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
+ printf ("Writing on %s.\n", srv.spath);
+
+ struct process p;
+ memset (&p, 0, sizeof (struct process));
+
+ printf ("app creation\n");
+ if (app_create (&p, pid, index, version)) // called by the application
+ ohshit (1, "app_create");
+
+ printf ("main_loop\n");
+ // send a message to warn the service we want to do something
+ // line : pid index version action chan
+ enum app_list_elm_action action = PUBSUB_BOTH;
+
+ if (strncmp (cmd, "pub", 3) == 0) {
+ action = PUBSUB_PUB;
+ }
+ else if (strncmp (cmd, "sub", 3) == 0) {
+ action = PUBSUB_SUB;
+ }
+
+ pubsub_connection (&srv, &p, action, chan);
+
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+
+
+ // meta data on the message
+ m.type = PUBSUB_TYPE_MESSAGE;
+ m.chan = malloc (strlen (chan) + 1);
+ memset (m.chan, 0, strlen (chan) + 1);
+ m.chan[strlen (chan)] = '\0';
+ m.chanlen = strlen (chan);
+
+ // msg loop
+ for (;;) {
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+
+ char buf[BUFSIZ];
+ memset (buf, 0, BUFSIZ);
+ printf ("msg to send [quit]: ");
+ fflush (stdout);
+
+ size_t mlen = read (0, buf, BUFSIZ);
+
+ printf ("data (%ld): %s\n", mlen, buf);
+
+ if (strncmp(buf, "quit\n", strlen ("quit\n")) == 0) {
+ break;
+ }
+
+ m.data = malloc (strlen (buf) + 1);
+ memset (m.data, 0, strlen (buf) + 1);
+ strncpy ((char *) m.data, buf, strlen (buf) + 1);
+ m.datalen = strlen (buf);
+
+ printf ("send message\n");
+ pubsub_msg_send (&p, &m);
+ free (m.data);
+ m.data = NULL;
+ m.datalen = 0;
+ }
+
+ // free everything
+ pubsubd_msg_free (&m);
+
+ printf ("disconnection...\n");
+ // disconnect from the server
+ pubsub_disconnect (&p);
+
+ printf ("destroying app\n");
+ // the application will shut down, and remove the application named pipes
+ if (app_destroy (&p))
+ ohshit (1, "app_destroy");
+}
+
+int main(int argc, char **argv, char **env)
+{
+
+ if (argc != 1) {
+ usage (argv);
+ exit (1);
+ }
+
+ pid_t pid = getpid();
+ int index = 0;
+ // don't care about the version
+ int version = COMMUNICATION_VERSION;
+ char *cmd = "both";
+ char *chan = "chan1";
+
+ main_loop (argc, argv, env, pid, index, version, cmd, chan);
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 543da73..06c1735 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -55,12 +55,12 @@ void * pubsubd_worker_thread (void *params)
else {
struct channel *ch = pubsubd_channel_search (chans, chan->chan);
if (ch == NULL) {
- printf ("CHAN NON TROUVE\n");
+ printf ("CHAN NOT FOUND\n");
}
else {
- printf ("Je dois dire :\n");
+ printf ("what should be sent: ");
pubsubd_msg_print (&m);
- printf ("CHAN ET PROCESS À QUI JE DOIS COMMUNIQUER\n");
+ printf ("send the message to:\t");
pubsubd_channel_print (ch);
pubsubd_msg_send (ch->alh, &m);
}
@@ -137,10 +137,12 @@ main(int argc, char **argv, char **env)
pubsubd_channels_init (&chans);
pthread_t *thr = NULL;
- thr = malloc (sizeof (pthread_t) * NB_CLIENTS);
- memset (thr, 0, sizeof (pthread_t) * NB_CLIENTS);
+ thr = malloc (sizeof (pthread_t));
+ memset (thr, 0, sizeof (pthread_t));
- for (int i = 0; i < NB_CLIENTS; i++) {
+ int i = 0;
+ // for (i = 0; i < NB_CLIENTS; i++)
+ for (i = 0; ; i++) {
// for each new process
struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm));
@@ -151,10 +153,8 @@ main(int argc, char **argv, char **env)
// end the application
if (ale.action == PUBSUB_QUIT) {
printf ("Quitting ...\n");
-
pubsubd_channels_del_all (&chans);
srv_close (&srv);
-
// TODO end the threads
exit (0);
}
@@ -168,16 +168,18 @@ main(int argc, char **argv, char **env)
pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
pthread_detach (thr[i]);
+ // realloc memory for further workers
+ thr = realloc (thr, sizeof (pthread_t) * (i+1));
pubsubd_app_list_elm_free (&ale);
}
sleep (10);
// stop threads
- for (int i = 0 ; i < NB_CLIENTS ; i++) {
- pthread_cancel (thr[i]);
+ for (int j = 0 ; j < i ; j++) {
+ pthread_cancel (thr[j]);
void *ret = NULL;
- pthread_join (thr[i], &ret);
+ pthread_join (thr[j], &ret);
if (ret != NULL) {
free (ret);
}
@@ -193,50 +195,3 @@ main(int argc, char **argv, char **env)
return EXIT_SUCCESS;
}
-
-#if 0
-void main_loop (const struct service *srv)
-{
- int ret;
- struct process proc;
-
- int cnt = 10;
-
- while (cnt--) {
- // -1 : error, 0 = no new process, 1 = new process
- ret = srv_get_new_process (&proc, srv);
- if (ret == -1) {
- fprintf (stderr, "error service_get_new_process\n");
- continue;
- } else if (ret == 0) { // that should not happen
- continue;
- }
-
- // printf ("before print\n");
- process_print (&proc);
- // printf ("after print\n");
-
- // about the message
- size_t msize = BUFSIZ;
- char buf[BUFSIZ];
- bzero(buf, BUFSIZ);
-
- // printf ("before read\n");
- if ((ret = srv_read (&proc, &buf, &msize))) {
- fprintf(stdout, "error service_read %d\n", ret);
- continue;
- }
- // printf ("after read\n");
- printf ("read, size %ld : %s\n", msize, buf);
-
- // printf ("before proc write\n");
- if ((ret = srv_write (&proc, &buf, msize))) {
- fprintf(stdout, "error service_write %d\n", ret);
- continue;
- }
-
- // printf ("after proc write\n");
- printf ("\033[32mStill \033[31m%d\033[32m applications to serve\n",cnt);
- }
-}
-#endif