Obsolete
/
libipc-old
Archived
3
0
Fork 0

pubsubd++ (bufix, still some to do), pubsubc++

more_to_read
Philippe PITTOLI 2016-09-12 12:45:31 +02:00
parent 0314120304
commit 8537381dac
2 changed files with 34 additions and 29 deletions

View File

@ -47,25 +47,23 @@ void main_loop (int argc, char **argv, char **env
pubsub_connection (&srv, &p, action, chan); pubsub_connection (&srv, &p, action, chan);
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
// meta data on the message
m.type = PUBSUB_TYPE_MESSAGE;
m.chan = malloc (strlen (chan) + 1);
memset (m.chan, 0, strlen (chan) + 1);
m.chan[strlen (chan)] = '\0';
m.chanlen = strlen (chan);
// msg loop
for (;;) {
struct pubsub_msg msg; struct pubsub_msg msg;
memset (&msg, 0, sizeof (struct pubsub_msg)); memset (&msg, 0, sizeof (struct pubsub_msg));
// meta data on the message
msg.type = PUBSUB_TYPE_MESSAGE;
msg.chan = malloc (strlen (chan) + 1);
memset (msg.chan, 0, strlen (chan) + 1);
strncpy ((char *) msg.chan, chan, strlen (chan));
msg.chan[strlen (chan)] = '\0';
msg.chanlen = strlen (chan);
// msg loop
for (;;) {
char buf[BUFSIZ]; char buf[BUFSIZ];
memset (buf, 0, BUFSIZ); memset (buf, 0, BUFSIZ);
printf ("msg to send [quit]: "); printf ("msg to send (chan: %s) [quit]: ", msg.chan);
fflush (stdout); fflush (stdout);
size_t mlen = read (0, buf, BUFSIZ); size_t mlen = read (0, buf, BUFSIZ);
@ -76,20 +74,20 @@ void main_loop (int argc, char **argv, char **env
break; break;
} }
m.data = malloc (strlen (buf) + 1); msg.data = malloc (strlen (buf) + 1);
memset (m.data, 0, strlen (buf) + 1); memset (msg.data, 0, strlen (buf) + 1);
strncpy ((char *) m.data, buf, strlen (buf) + 1); strncpy ((char *) msg.data, buf, strlen (buf) + 1);
m.datalen = strlen (buf); msg.datalen = strlen (buf);
printf ("send message\n"); printf ("send message\n");
pubsub_msg_send (&p, &m); pubsub_msg_send (&p, &msg);
free (m.data); free (msg.data);
m.data = NULL; msg.data = NULL;
m.datalen = 0; msg.datalen = 0;
} }
// free everything // free everything
pubsubd_msg_free (&m); pubsubd_msg_free (&msg);
printf ("disconnection...\n"); printf ("disconnection...\n");
// disconnect from the server // disconnect from the server

View File

@ -65,6 +65,7 @@ void * pubsubd_worker_thread (void *params)
pubsubd_msg_send (ch->alh, &m); pubsubd_msg_send (ch->alh, &m);
} }
} }
pubsubd_msg_free (&m);
} }
#if 0 #if 0
#endif #endif
@ -152,11 +153,8 @@ main(int argc, char **argv, char **env)
// end the application // end the application
if (ale.action == PUBSUB_QUIT) { if (ale.action == PUBSUB_QUIT) {
printf ("Quitting ...\n"); pubsubd_app_list_elm_free (&ale);
pubsubd_channels_del_all (&chans); break;
srv_close (&srv);
// TODO end the threads
exit (0);
} }
// TODO thread to handle multiple clients at a time // TODO thread to handle multiple clients at a time
@ -166,15 +164,25 @@ main(int argc, char **argv, char **env)
wp->chans = &chans; wp->chans = &chans;
wp->chan = chan; wp->chan = chan;
// realloc memory for further workers
pthread_t * tmpthr = realloc (thr, sizeof (pthread_t) * (i+1));
if (tmpthr == NULL) {
fprintf (stderr, "err: can't allocate more thread contexts\n");
pubsubd_app_list_elm_free (&ale);
break;
}
else {
thr = tmpthr;
}
memset (thr + i, 0, sizeof (pthread_t));
pthread_create (thr + i, NULL, pubsubd_worker_thread, wp); pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
pthread_detach (thr[i]); pthread_detach (thr[i]);
// realloc memory for further workers
thr = realloc (thr, sizeof (pthread_t) * (i+1));
pubsubd_app_list_elm_free (&ale); pubsubd_app_list_elm_free (&ale);
} }
sleep (10); printf ("Quitting ...\n");
// stop threads // stop threads
for (int j = 0 ; j < i ; j++) { for (int j = 0 ; j < i ; j++) {
pthread_cancel (thr[j]); pthread_cancel (thr[j]);
@ -186,7 +194,6 @@ main(int argc, char **argv, char **env)
} }
free (thr); free (thr);
printf ("QUIT\n");
pubsubd_channels_del_all (&chans); pubsubd_channels_del_all (&chans);
// the application will shut down, and remove the service named pipe // the application will shut down, and remove the service named pipe