From 85c5d97bad2a749bc7692f39830cdcecee0e627b Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Mon, 12 Sep 2016 19:30:28 +0200
Subject: [PATCH] pubsub: client and server seems ok (still a small leak)
---
lib/communication.c | 11 ++++++---
lib/pubsubd.c | 18 ++++++--------
pubsub/pubsubc.c | 60 ++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 71 insertions(+), 18 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 8faa685..4f55737 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -10,14 +10,16 @@ int file_write (const char *path, const char *buf, size_t msize)
return -1;
}
- printf("file_write: path to open %s\n", path);
+ // TODO debug
+ // printf("file_write: path to open %s\n", path);
int fd = open (path, O_WRONLY);
if (fd <= 0) {
printf("file_write: fd < 0\n");
- perror ("file_write: ");
+ perror ("file_write");
return ER_FILE_OPEN;
}
- printf("file_write: opened file %s\n", path);
+ // TODO debug
+ // printf("file_write: opened file %s\n", path);
int ret = 0;
int ret2 = 0;
@@ -46,7 +48,8 @@ int file_read (const char *path, char **buf, size_t *msize)
if (fd <= 0) {
return ER_FILE_OPEN;
}
- printf("file_read: opened file %s\n", path);
+ // TODO debug
+ // printf("file_read: opened file %s\n", path);
if (*buf == NULL) {
*buf = malloc (BUFSIZ);
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 216fb55..7b09aad 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -371,8 +371,8 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
return;
}
- msg->chan = malloc (msg->chanlen);
- memset (msg->chan, 0, msg->chanlen);
+ msg->chan = malloc (msg->chanlen +1);
+ memset (msg->chan, 0, msg->chanlen +1);
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
@@ -380,8 +380,8 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
return;
}
- msg->data = malloc (msg->datalen);
- memset (msg->data, 0, msg->datalen);
+ msg->data = malloc (msg->datalen +1);
+ memset (msg->data, 0, msg->datalen +1);
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
}
@@ -552,11 +552,7 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
size_t mlen = 0;
char *buf = NULL;
while (buf == NULL || mlen == 0) {
-#if 0
- srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
-#else
srv_read (p, &buf, &mlen);
-#endif
}
pubsubd_msg_unserialize (m, buf, mlen);
@@ -651,12 +647,14 @@ void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
}
}
-void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
+void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
{
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
- app_read (p, &buf, &mlen);
+ while (buf == NULL || mlen == 0) {
+ app_read (p, &buf, &mlen);
+ }
pubsubd_msg_unserialize (m, buf, mlen);
diff --git a/pubsub/pubsubc.c b/pubsub/pubsubc.c
index 0cfef11..a8d52ca 100644
--- a/pubsub/pubsubc.c
+++ b/pubsub/pubsubc.c
@@ -13,6 +13,42 @@ void usage (char **argv)
printf ( "usage: %s\n", argv[0]);
}
+void print_cmd (void) {
+ printf ("\033[32m>\033[00m ");
+ fflush (stdout);
+}
+
+void * listener (void *params)
+{
+ int s = 0;
+ s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ if (s != 0)
+ printf ("pthread_setcancelstate: %d\n", s);
+
+ struct process *p = NULL;
+ p = (struct process *) params;
+ if (p == NULL) {
+ fprintf (stderr, "listener: no process\n");
+ return NULL;
+ }
+
+ // main loop
+ while (1) {
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+
+ pubsub_msg_recv (p, &m);
+ printf ("\n\033[31m>\033[00m %s\n", m.data);
+ print_cmd ();
+
+ // if (m.type == PUBSUB_TYPE_DISCONNECT) { }
+ pubsubd_msg_free (&m);
+ }
+
+ pthread_exit (NULL);
+}
+
+
void main_loop (int argc, char **argv, char **env
, pid_t pid, int index, int version
, char *cmd, char *chan)
@@ -33,6 +69,12 @@ void main_loop (int argc, char **argv, char **env
if (app_create (&p, pid, index, version)) // called by the application
ohshit (1, "app_create");
+ pthread_t thr;
+ memset (&thr, 0, sizeof (pthread_t));
+
+ pthread_create (&thr, NULL, listener, &p);
+ pthread_detach (thr);
+
printf ("main_loop\n");
// send a message to warn the service we want to do something
// line : pid index version action chan
@@ -63,14 +105,20 @@ void main_loop (int argc, char **argv, char **env
for (;;) {
char buf[BUFSIZ];
memset (buf, 0, BUFSIZ);
- printf ("msg to send (chan: %s) [quit]: ", msg.chan);
+ print_cmd ();
fflush (stdout);
size_t mlen = read (0, buf, BUFSIZ);
- printf ("data (%ld): %s\n", mlen, buf);
+ if (mlen > 1) {
+ mlen--;
+ }
+ buf[mlen] = '\0';
- if (strncmp(buf, "quit\n", strlen ("quit\n")) == 0) {
+ // TODO debug
+ // printf ("data (%ld): %s\n", mlen, buf);
+
+ if (strncmp(buf, "quit", strlen ("quit")) == 0) {
break;
}
@@ -79,7 +127,8 @@ void main_loop (int argc, char **argv, char **env
strncpy ((char *) msg.data, buf, strlen (buf) + 1);
msg.datalen = strlen (buf);
- printf ("send message\n");
+ // TODO debug
+ // printf ("send message\n");
pubsub_msg_send (&p, &msg);
free (msg.data);
msg.data = NULL;
@@ -89,6 +138,9 @@ void main_loop (int argc, char **argv, char **env
// free everything
pubsubd_msg_free (&msg);
+ pthread_cancel (thr);
+ pthread_join (thr, NULL);
+
printf ("disconnection...\n");
// disconnect from the server
pubsub_disconnect (&p);