diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index b49c6cc..d6d38ba 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -18,6 +18,7 @@ struct workers *my_workers; // element of the list // worker : process to handle (threaded) struct worker { + pthread_t *thr; struct channels *chans; struct channel *chan; struct app_list_elm *ale; @@ -60,6 +61,30 @@ void pubsubd_worker_del (struct workers *wrkrs, struct worker *w) } } +// kill the threads +void pubsubd_workers_stop (struct workers *wrkrs) +{ + if (!wrkrs) + return; + + struct worker *w = NULL; + struct worker *wtmp = NULL; + + LIST_FOREACH_SAFE(w, wrkrs, entries, wtmp) { + if (w->thr == NULL) + continue; + + pthread_cancel (*w->thr); + void *ret = NULL; + pthread_join (*w->thr, &ret); + if (ret != NULL) { + free (ret); + } + free (w->thr); + w->thr = NULL; + } +} + void pubsubd_workers_del_all (struct workers *wrkrs) { if (!wrkrs) @@ -154,6 +179,9 @@ void * pubsubd_worker_thread (void *params) free (w->ale); w->ale = NULL; + free (w->thr); + w->thr = NULL; + pubsubd_worker_del (my_workers, w); pthread_exit (NULL); @@ -176,10 +204,6 @@ main(int argc, char **argv, char **env) memset (&chans, 0, sizeof (struct channels)); pubsubd_channels_init (&chans); - pthread_t *thr = NULL; - thr = malloc (sizeof (pthread_t)); - memset (thr, 0, sizeof (pthread_t)); - my_workers = malloc (sizeof (struct workers)); memset (my_workers, 0, sizeof (struct workers)); pubsubd_workers_init (my_workers); @@ -203,6 +227,8 @@ main(int argc, char **argv, char **env) // thread to handle multiple clients at a time struct worker *w = NULL; w = malloc (sizeof (struct worker)); + w->thr = malloc (sizeof (pthread_t)); + memset (w->thr, 0, sizeof (pthread_t)); w->ale = pubsubd_app_list_elm_copy (&ale); w->chans = &chans; w->chan = chan; @@ -211,36 +237,15 @@ main(int argc, char **argv, char **env) free (w); w = wtmp; - // realloc memory for further workers - pthread_t * tmpthr = realloc (thr, sizeof (pthread_t) * (i+1)); - if (tmpthr == NULL) { - fprintf (stderr, "err: can't allocate more thread contexts\n"); - pubsubd_app_list_elm_free (&ale); - break; - } - else { - thr = tmpthr; - } - memset (thr + i, 0, sizeof (pthread_t)); - - pthread_create (thr + i, NULL, pubsubd_worker_thread, w); - pthread_detach (thr[i]); + pthread_create (w->thr, NULL, pubsubd_worker_thread, w); + pthread_detach (*w->thr); pubsubd_app_list_elm_free (&ale); } printf ("Quitting ...\n"); - // stop threads - for (int j = 0 ; j < i ; j++) { - pthread_cancel (thr[j]); - void *ret = NULL; - pthread_join (thr[j], &ret); - if (ret != NULL) { - free (ret); - } - } - free (thr); + pubsubd_workers_stop (my_workers); pubsubd_channels_del_all (&chans); pubsubd_workers_del_all (my_workers);