pubsub: client and server seems ok (still a small leak)
This commit is contained in:
parent
8537381dac
commit
85c5d97bad
@ -10,14 +10,16 @@ int file_write (const char *path, const char *buf, size_t msize)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("file_write: path to open %s\n", path);
|
// TODO debug
|
||||||
|
// printf("file_write: path to open %s\n", path);
|
||||||
int fd = open (path, O_WRONLY);
|
int fd = open (path, O_WRONLY);
|
||||||
if (fd <= 0) {
|
if (fd <= 0) {
|
||||||
printf("file_write: fd < 0\n");
|
printf("file_write: fd < 0\n");
|
||||||
perror ("file_write: ");
|
perror ("file_write");
|
||||||
return ER_FILE_OPEN;
|
return ER_FILE_OPEN;
|
||||||
}
|
}
|
||||||
printf("file_write: opened file %s\n", path);
|
// TODO debug
|
||||||
|
// printf("file_write: opened file %s\n", path);
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
int ret2 = 0;
|
int ret2 = 0;
|
||||||
@ -46,7 +48,8 @@ int file_read (const char *path, char **buf, size_t *msize)
|
|||||||
if (fd <= 0) {
|
if (fd <= 0) {
|
||||||
return ER_FILE_OPEN;
|
return ER_FILE_OPEN;
|
||||||
}
|
}
|
||||||
printf("file_read: opened file %s\n", path);
|
// TODO debug
|
||||||
|
// printf("file_read: opened file %s\n", path);
|
||||||
|
|
||||||
if (*buf == NULL) {
|
if (*buf == NULL) {
|
||||||
*buf = malloc (BUFSIZ);
|
*buf = malloc (BUFSIZ);
|
||||||
|
@ -371,8 +371,8 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
|
|||||||
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
|
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
msg->chan = malloc (msg->chanlen);
|
msg->chan = malloc (msg->chanlen +1);
|
||||||
memset (msg->chan, 0, msg->chanlen);
|
memset (msg->chan, 0, msg->chanlen +1);
|
||||||
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
|
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
|
||||||
|
|
||||||
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
||||||
@ -380,8 +380,8 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
|
|||||||
fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
|
fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
msg->data = malloc (msg->datalen);
|
msg->data = malloc (msg->datalen +1);
|
||||||
memset (msg->data, 0, msg->datalen);
|
memset (msg->data, 0, msg->datalen +1);
|
||||||
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
|
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -552,11 +552,7 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
|
|||||||
size_t mlen = 0;
|
size_t mlen = 0;
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
while (buf == NULL || mlen == 0) {
|
while (buf == NULL || mlen == 0) {
|
||||||
#if 0
|
|
||||||
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
|
|
||||||
#else
|
|
||||||
srv_read (p, &buf, &mlen);
|
srv_read (p, &buf, &mlen);
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pubsubd_msg_unserialize (m, buf, mlen);
|
pubsubd_msg_unserialize (m, buf, mlen);
|
||||||
@ -651,12 +647,14 @@ void pubsub_msg_send (struct process *p, const struct pubsub_msg * m)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
|
void pubsub_msg_recv (struct process *p, struct pubsub_msg *m)
|
||||||
{
|
{
|
||||||
// read the message from the process
|
// read the message from the process
|
||||||
size_t mlen = 0;
|
size_t mlen = 0;
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
|
while (buf == NULL || mlen == 0) {
|
||||||
app_read (p, &buf, &mlen);
|
app_read (p, &buf, &mlen);
|
||||||
|
}
|
||||||
|
|
||||||
pubsubd_msg_unserialize (m, buf, mlen);
|
pubsubd_msg_unserialize (m, buf, mlen);
|
||||||
|
|
||||||
|
@ -13,6 +13,42 @@ void usage (char **argv)
|
|||||||
printf ( "usage: %s\n", argv[0]);
|
printf ( "usage: %s\n", argv[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void print_cmd (void) {
|
||||||
|
printf ("\033[32m>\033[00m ");
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
|
||||||
|
void * listener (void *params)
|
||||||
|
{
|
||||||
|
int s = 0;
|
||||||
|
s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||||
|
if (s != 0)
|
||||||
|
printf ("pthread_setcancelstate: %d\n", s);
|
||||||
|
|
||||||
|
struct process *p = NULL;
|
||||||
|
p = (struct process *) params;
|
||||||
|
if (p == NULL) {
|
||||||
|
fprintf (stderr, "listener: no process\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// main loop
|
||||||
|
while (1) {
|
||||||
|
struct pubsub_msg m;
|
||||||
|
memset (&m, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
pubsub_msg_recv (p, &m);
|
||||||
|
printf ("\n\033[31m>\033[00m %s\n", m.data);
|
||||||
|
print_cmd ();
|
||||||
|
|
||||||
|
// if (m.type == PUBSUB_TYPE_DISCONNECT) { }
|
||||||
|
pubsubd_msg_free (&m);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_exit (NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void main_loop (int argc, char **argv, char **env
|
void main_loop (int argc, char **argv, char **env
|
||||||
, pid_t pid, int index, int version
|
, pid_t pid, int index, int version
|
||||||
, char *cmd, char *chan)
|
, char *cmd, char *chan)
|
||||||
@ -33,6 +69,12 @@ void main_loop (int argc, char **argv, char **env
|
|||||||
if (app_create (&p, pid, index, version)) // called by the application
|
if (app_create (&p, pid, index, version)) // called by the application
|
||||||
ohshit (1, "app_create");
|
ohshit (1, "app_create");
|
||||||
|
|
||||||
|
pthread_t thr;
|
||||||
|
memset (&thr, 0, sizeof (pthread_t));
|
||||||
|
|
||||||
|
pthread_create (&thr, NULL, listener, &p);
|
||||||
|
pthread_detach (thr);
|
||||||
|
|
||||||
printf ("main_loop\n");
|
printf ("main_loop\n");
|
||||||
// send a message to warn the service we want to do something
|
// send a message to warn the service we want to do something
|
||||||
// line : pid index version action chan
|
// line : pid index version action chan
|
||||||
@ -63,14 +105,20 @@ void main_loop (int argc, char **argv, char **env
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
char buf[BUFSIZ];
|
char buf[BUFSIZ];
|
||||||
memset (buf, 0, BUFSIZ);
|
memset (buf, 0, BUFSIZ);
|
||||||
printf ("msg to send (chan: %s) [quit]: ", msg.chan);
|
print_cmd ();
|
||||||
fflush (stdout);
|
fflush (stdout);
|
||||||
|
|
||||||
size_t mlen = read (0, buf, BUFSIZ);
|
size_t mlen = read (0, buf, BUFSIZ);
|
||||||
|
|
||||||
printf ("data (%ld): %s\n", mlen, buf);
|
if (mlen > 1) {
|
||||||
|
mlen--;
|
||||||
|
}
|
||||||
|
buf[mlen] = '\0';
|
||||||
|
|
||||||
if (strncmp(buf, "quit\n", strlen ("quit\n")) == 0) {
|
// TODO debug
|
||||||
|
// printf ("data (%ld): %s\n", mlen, buf);
|
||||||
|
|
||||||
|
if (strncmp(buf, "quit", strlen ("quit")) == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +127,8 @@ void main_loop (int argc, char **argv, char **env
|
|||||||
strncpy ((char *) msg.data, buf, strlen (buf) + 1);
|
strncpy ((char *) msg.data, buf, strlen (buf) + 1);
|
||||||
msg.datalen = strlen (buf);
|
msg.datalen = strlen (buf);
|
||||||
|
|
||||||
printf ("send message\n");
|
// TODO debug
|
||||||
|
// printf ("send message\n");
|
||||||
pubsub_msg_send (&p, &msg);
|
pubsub_msg_send (&p, &msg);
|
||||||
free (msg.data);
|
free (msg.data);
|
||||||
msg.data = NULL;
|
msg.data = NULL;
|
||||||
@ -89,6 +138,9 @@ void main_loop (int argc, char **argv, char **env
|
|||||||
// free everything
|
// free everything
|
||||||
pubsubd_msg_free (&msg);
|
pubsubd_msg_free (&msg);
|
||||||
|
|
||||||
|
pthread_cancel (thr);
|
||||||
|
pthread_join (thr, NULL);
|
||||||
|
|
||||||
printf ("disconnection...\n");
|
printf ("disconnection...\n");
|
||||||
// disconnect from the server
|
// disconnect from the server
|
||||||
pubsub_disconnect (&p);
|
pubsub_disconnect (&p);
|
||||||
|
Reference in New Issue
Block a user