En chantier, encore et toujours
This commit is contained in:
parent
723828c4b8
commit
bb3ccae218
@ -156,8 +156,11 @@ int srv_get_new_process (const struct service *srv, struct process *p)
|
|||||||
int srv_read_cb (struct process *p, char ** buf, size_t * msize
|
int srv_read_cb (struct process *p, char ** buf, size_t * msize
|
||||||
, int (*cb)(FILE *f, char ** buf, size_t * msize))
|
, int (*cb)(FILE *f, char ** buf, size_t * msize))
|
||||||
{
|
{
|
||||||
if (file_open (&p->out, p->path_out, "rb"))
|
if (file_open (&p->out, p->path_out, "rb")) {
|
||||||
|
fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
|
||||||
|
file_close (p->out);
|
||||||
return 1;
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (cb != NULL)
|
if (cb != NULL)
|
||||||
(*cb) (p->out, buf, msize);
|
(*cb) (p->out, buf, msize);
|
||||||
@ -284,6 +287,26 @@ int app_destroy (struct process *p)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int app_read_cb (struct process *p, char ** buf, size_t * msize
|
||||||
|
, int (*cb)(FILE *f, char ** buf, size_t * msize))
|
||||||
|
{
|
||||||
|
if (file_open (&p->in, p->path_in, "rb")) {
|
||||||
|
fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
|
||||||
|
file_close (p->in);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cb != NULL)
|
||||||
|
(*cb) (p->in, buf, msize);
|
||||||
|
else
|
||||||
|
*msize = fread (*buf, 1, *msize, p->in); // FIXME check errors
|
||||||
|
|
||||||
|
if (file_close (p->in))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int app_read (struct process *p, void * buf, size_t * msize)
|
int app_read (struct process *p, void * buf, size_t * msize)
|
||||||
{
|
{
|
||||||
if (file_open (&p->in, p->path_in, "rb"))
|
if (file_open (&p->in, p->path_in, "rb"))
|
||||||
|
@ -48,6 +48,8 @@ int srv_write (struct process *, void * buf, size_t);
|
|||||||
int app_create (struct process *, int index); // called by the application
|
int app_create (struct process *, int index); // called by the application
|
||||||
int app_destroy (struct process *); // called by the application
|
int app_destroy (struct process *); // called by the application
|
||||||
|
|
||||||
|
int app_read_cb (struct process *p, char ** buf, size_t * msize
|
||||||
|
, int (*cb)(FILE *f, char ** buf, size_t * msize));
|
||||||
int app_read (struct process *, void * buf, size_t *);
|
int app_read (struct process *, void * buf, size_t *);
|
||||||
int app_write (struct process *, void * buf, size_t);
|
int app_write (struct process *, void * buf, size_t);
|
||||||
|
|
||||||
|
167
lib/pubsubd.c
167
lib/pubsubd.c
@ -148,6 +148,8 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
|
|||||||
if (ale->p != NULL)
|
if (ale->p != NULL)
|
||||||
n->p = srv_process_copy (ale->p);
|
n->p = srv_process_copy (ale->p);
|
||||||
|
|
||||||
|
n->action = ale->action;
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,6 +235,11 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
|
|||||||
|
|
||||||
// msg: "type(1) chanlen(8) chan datalen(8) data
|
// msg: "type(1) chanlen(8) chan datalen(8) data
|
||||||
*len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen;
|
*len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen;
|
||||||
|
|
||||||
|
if (*data != NULL) {
|
||||||
|
free (*data);
|
||||||
|
*data = NULL;
|
||||||
|
}
|
||||||
*data = malloc(*len);
|
*data = malloc(*len);
|
||||||
|
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
@ -246,35 +253,66 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
|
|||||||
|
|
||||||
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
|
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
|
||||||
{
|
{
|
||||||
if (msg == NULL || data == NULL)
|
if (msg == NULL) {
|
||||||
|
fprintf (stderr
|
||||||
|
, "\033[31merr: pubsubd_msg_unserialize, msg NULL\033[00m\n");
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data == NULL) {
|
||||||
|
fprintf (stderr
|
||||||
|
, "\033[31merr: pubsubd_msg_unserialize, data NULL\033[00m\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > BUFSIZ) {
|
||||||
|
fprintf (stderr
|
||||||
|
, "\033[31merr: pubsubd_msg_unserialize, len %ld\033[00m\n"
|
||||||
|
, len);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
msg->type = data[i]; i++;
|
msg->type = data[i]; i++;
|
||||||
|
|
||||||
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
||||||
|
if (msg->chanlen > BUFSIZ) {
|
||||||
|
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
msg->chan = malloc (msg->chanlen);
|
msg->chan = malloc (msg->chanlen);
|
||||||
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
|
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
|
||||||
|
|
||||||
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
|
||||||
|
if (msg->datalen > BUFSIZ) {
|
||||||
|
fprintf (stderr, "\033[31merr : msg->datalen > BUFSIZ\033[00m\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
msg->data = malloc (msg->datalen);
|
msg->data = malloc (msg->datalen);
|
||||||
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
|
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsubd_msg_free (struct pubsub_msg *msg)
|
void pubsubd_msg_free (struct pubsub_msg *msg)
|
||||||
{
|
{
|
||||||
|
if (msg == NULL) {
|
||||||
|
fprintf (stderr, "\033[31merr: pubsubd_msg_free, msg NULL\033[00m\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (msg->chan) {
|
if (msg->chan) {
|
||||||
free (msg->chan);
|
free (msg->chan);
|
||||||
msg->chan = 0;
|
msg->chan = NULL;
|
||||||
}
|
}
|
||||||
if (msg->data) {
|
if (msg->data) {
|
||||||
free (msg->data);
|
free (msg->data);
|
||||||
msg->data = 0;
|
msg->data = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// COMMUNICATION
|
// COMMUNICATION
|
||||||
|
|
||||||
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
|
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
|
||||||
, struct channels *chans)
|
, struct channels *chans, struct channel **c)
|
||||||
{
|
{
|
||||||
if (srv == NULL || ale == NULL || chans == NULL)
|
if (srv == NULL || ale == NULL || chans == NULL)
|
||||||
return -1;
|
return -1;
|
||||||
@ -336,31 +374,41 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
|
|||||||
buf = NULL;
|
buf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
chan[BUFSIZ -1] = '\0';
|
if (ale->action == PUBSUB_QUIT) {
|
||||||
|
return 0;
|
||||||
struct channel c;
|
|
||||||
bzero (&c, sizeof (struct channel));
|
|
||||||
c.chan = strndup (chan, BUFSIZ);
|
|
||||||
c.chanlen = strlen (chan);
|
|
||||||
|
|
||||||
struct channel *new_chan;
|
|
||||||
new_chan = pubsubd_channel_get (chans, &c);
|
|
||||||
if (new_chan == NULL) {
|
|
||||||
new_chan = pubsubd_channels_add (chans, &c);
|
|
||||||
pubsubd_subscriber_init (&new_chan->alh);
|
|
||||||
}
|
}
|
||||||
pubsubd_channel_free (&c);
|
|
||||||
|
|
||||||
if (ale->p != NULL) {
|
if (ale->p != NULL) {
|
||||||
free (ale->p);
|
free (ale->p);
|
||||||
ale->p = NULL;
|
ale->p = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ale->action != PUBSUB_QUIT) {
|
|
||||||
ale->p = malloc (sizeof (struct process));
|
ale->p = malloc (sizeof (struct process));
|
||||||
srv_process_gen (ale->p, pid, index, version);
|
srv_process_gen (ale->p, pid, index, version);
|
||||||
|
|
||||||
|
if (*c == NULL) {
|
||||||
|
*c = malloc (sizeof (struct channel));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (c[0]->chan != NULL) {
|
||||||
|
free (c[0]->chan);
|
||||||
|
c[0]->chan = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
chan[BUFSIZ -1] = '\0';
|
||||||
|
c[0]->chan = strndup (chan, BUFSIZ);
|
||||||
|
c[0]->chanlen = strlen (chan);
|
||||||
|
|
||||||
|
struct channel *new_chan;
|
||||||
|
new_chan = pubsubd_channel_get (chans, *c);
|
||||||
|
if (new_chan == NULL) {
|
||||||
|
new_chan = pubsubd_channels_add (chans, *c);
|
||||||
|
pubsubd_subscriber_init (&new_chan->alh);
|
||||||
|
}
|
||||||
|
|
||||||
|
pubsubd_channel_free (*c);
|
||||||
|
*c = new_chan;
|
||||||
|
|
||||||
// add the subscriber
|
// add the subscriber
|
||||||
if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH)
|
if (ale->action == PUBSUB_SUB || ale->action == PUBSUB_BOTH)
|
||||||
pubsubd_subscriber_add (new_chan->alh, ale);
|
pubsubd_subscriber_add (new_chan->alh, ale);
|
||||||
@ -373,6 +421,8 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
|
|||||||
{
|
{
|
||||||
// msg: "type(1) chanlen(8) chan datalen(8) data
|
// msg: "type(1) chanlen(8) chan datalen(8) data
|
||||||
|
|
||||||
|
printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n");
|
||||||
|
|
||||||
// read
|
// read
|
||||||
char type;
|
char type;
|
||||||
fread (&type, 1, 1, f);
|
fread (&type, 1, 1, f);
|
||||||
@ -380,17 +430,27 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
|
|||||||
size_t chanlen;
|
size_t chanlen;
|
||||||
fread (&chanlen, sizeof (size_t), 1, f);
|
fread (&chanlen, sizeof (size_t), 1, f);
|
||||||
|
|
||||||
|
if (chanlen > BUFSIZ) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
char *chan = malloc (chanlen);
|
char *chan = malloc (chanlen);
|
||||||
fread (chan, chanlen, 1, f);
|
fread (chan, chanlen, 1, f);
|
||||||
|
|
||||||
size_t datalen;
|
size_t datalen;
|
||||||
fread (&datalen, sizeof (size_t), 1, f);
|
fread (&datalen, sizeof (size_t), 1, f);
|
||||||
|
|
||||||
|
if (datalen > BUFSIZ) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
char *data = malloc (datalen);
|
char *data = malloc (datalen);
|
||||||
fread (data, datalen, 1, f);
|
fread (data, datalen, 1, f);
|
||||||
|
|
||||||
*msize = 1 + chanlen;
|
*msize = 1 + 2 * sizeof (size_t) + chanlen + datalen;
|
||||||
|
if (*buf == NULL) {
|
||||||
*buf = malloc(*msize);
|
*buf = malloc(*msize);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO CHECK THIS
|
// TODO CHECK THIS
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
@ -398,20 +458,49 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
|
|||||||
char *cbuf = *buf;
|
char *cbuf = *buf;
|
||||||
|
|
||||||
cbuf[i] = type; i++;
|
cbuf[i] = type; i++;
|
||||||
memcpy (&cbuf[i], &chanlen, sizeof(size_t)); i += sizeof(size_t);
|
memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t);
|
||||||
memcpy (&cbuf[i], chan, chanlen); i += chanlen;
|
memcpy (cbuf + i, chan, chanlen); i += chanlen;
|
||||||
memcpy (&cbuf[i], &datalen, sizeof(size_t)); i += sizeof(size_t);
|
memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t);
|
||||||
memcpy (&cbuf[i], data, datalen); i += datalen;
|
memcpy (cbuf + i, data, datalen); i += datalen;
|
||||||
|
|
||||||
free (chan);
|
free (chan);
|
||||||
free (data);
|
free (data);
|
||||||
|
|
||||||
|
printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n");
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// alh from the channel, message to send
|
||||||
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
|
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
|
||||||
{
|
{
|
||||||
|
struct app_list_elm * ale = NULL;
|
||||||
|
|
||||||
|
char *buf;
|
||||||
|
size_t msize;
|
||||||
|
pubsubd_msg_serialize (m, &buf, &msize);
|
||||||
|
|
||||||
|
LIST_FOREACH(ale, alh, entries) {
|
||||||
|
srv_write (ale->p, buf, msize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
free (buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void pubsubd_msg_print (const struct pubsub_msg *msg)
|
||||||
|
{
|
||||||
|
if (msg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf ("\t\t\033[36mMessage\033[00m\n");
|
||||||
|
printf ("\t\ttype %d\n", msg->type);
|
||||||
|
printf ("\t\tchan %s\n", msg->chan);
|
||||||
|
printf ("\t\tdata %s\n", msg->data);
|
||||||
|
}
|
||||||
|
|
||||||
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
|
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
|
||||||
{
|
{
|
||||||
// read the message from the process
|
// read the message from the process
|
||||||
@ -420,14 +509,38 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
|
|||||||
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
|
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
|
||||||
|
|
||||||
pubsubd_msg_unserialize (m, buf, mlen);
|
pubsubd_msg_unserialize (m, buf, mlen);
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
free (buf);
|
free (buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
void pubsub_msg_send (const struct service *s, const struct pubsub_msg * m)
|
|
||||||
{
|
|
||||||
}
|
}
|
||||||
void pubsub_msg_recv (const struct service *s, struct pubsub_msg * m)
|
|
||||||
|
void pubsub_msg_send (const struct service *s, struct process *p, const struct pubsub_msg * m)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
char *buf;
|
||||||
|
size_t msize;
|
||||||
|
pubsubd_msg_serialize (m, &buf, &msize);
|
||||||
|
|
||||||
|
app_write (p, buf, msize);
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
free (buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void pubsub_msg_recv (const struct service *s, struct process *p, struct pubsub_msg * m)
|
||||||
|
{
|
||||||
|
// read the message from the process
|
||||||
|
size_t mlen;
|
||||||
|
char *buf;
|
||||||
|
app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
|
||||||
|
|
||||||
|
pubsubd_msg_unserialize (m, buf, mlen);
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
free (buf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SERVICE
|
// SERVICE
|
||||||
|
@ -6,6 +6,11 @@
|
|||||||
|
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
|
|
||||||
|
#define PUBSUB_TYPE_DISCONNECT 1 << 0
|
||||||
|
#define PUBSUB_TYPE_INFO 1 << 1
|
||||||
|
#define PUBSUB_TYPE_DEBUG 1 << 2
|
||||||
|
#define PUBSUB_TYPE_MESSAGE 1 << 3
|
||||||
|
|
||||||
#define PUBSUB_SERVICE_NAME "pubsub"
|
#define PUBSUB_SERVICE_NAME "pubsub"
|
||||||
|
|
||||||
struct channel;
|
struct channel;
|
||||||
@ -25,19 +30,20 @@ struct pubsub_msg {
|
|||||||
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
|
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
|
||||||
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
|
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
|
||||||
void pubsubd_msg_free (struct pubsub_msg *msg);
|
void pubsubd_msg_free (struct pubsub_msg *msg);
|
||||||
|
void pubsubd_msg_print (const struct pubsub_msg *msg);
|
||||||
|
|
||||||
// parse pubsubd init msg (sent in TMPDIR/<service>)
|
// parse pubsubd init msg (sent in TMPDIR/<service>)
|
||||||
//
|
//
|
||||||
// line fmt : pid index version action chan
|
// line fmt : pid index version action chan
|
||||||
// action : quit | pub | sub
|
// action : quit | pub | sub
|
||||||
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
|
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
|
||||||
, struct channels *chans);
|
, struct channels *chans, struct channel **c);
|
||||||
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
|
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
|
||||||
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
|
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
|
||||||
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
|
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
|
||||||
|
|
||||||
void pubsub_msg_send (const struct service *, const struct pubsub_msg *msg);
|
void pubsub_msg_send (const struct service *, struct process *p, const struct pubsub_msg *msg);
|
||||||
void pubsub_msg_recv (const struct service *, struct pubsub_msg *msg);
|
void pubsub_msg_recv (const struct service *, struct process *p, struct pubsub_msg *msg);
|
||||||
|
|
||||||
// CHANNEL
|
// CHANNEL
|
||||||
|
|
||||||
@ -96,4 +102,6 @@ struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale)
|
|||||||
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
|
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
|
||||||
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
|
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
|
||||||
|
|
||||||
|
void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
CC=gcc
|
CC=gcc
|
||||||
CFLAGS=-Wall -g
|
CFLAGS=-Wall -g
|
||||||
LDFLAGS=
|
LDFLAGS= -pthread
|
||||||
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
|
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
|
||||||
EXEC=$(basename $(wildcard *.c))
|
EXEC=$(basename $(wildcard *.c))
|
||||||
SOURCES=$(wildcard ../lib/*.c)
|
SOURCES=$(wildcard ../lib/*.c)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include "../lib/pubsubd.h"
|
#include "../lib/pubsubd.h"
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
void
|
void
|
||||||
ohshit(int rvalue, const char* str) {
|
ohshit(int rvalue, const char* str) {
|
||||||
@ -7,6 +8,52 @@ ohshit(int rvalue, const char* str) {
|
|||||||
exit(rvalue);
|
exit(rvalue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// give this structure to the thread worker function
|
||||||
|
struct worker_params {
|
||||||
|
struct channels *chans;
|
||||||
|
struct channel *chan;
|
||||||
|
struct app_list_elm *ale;
|
||||||
|
};
|
||||||
|
|
||||||
|
void * pubsubd_worker_thread (void *params)
|
||||||
|
{
|
||||||
|
struct worker_params *wp = (struct worker_params *) params;
|
||||||
|
|
||||||
|
// each chan has a list of subscribers
|
||||||
|
// someone who only push a msg doesn't need to be registered
|
||||||
|
if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
|
||||||
|
// TODO add it to the application to follow
|
||||||
|
// TODO publish a message
|
||||||
|
printf ("publish or publish and subscribe to something\n");
|
||||||
|
|
||||||
|
struct pubsub_msg m;
|
||||||
|
bzero (&m, sizeof (struct pubsub_msg));
|
||||||
|
pubsubd_msg_recv (wp->ale->p, &m);
|
||||||
|
|
||||||
|
pubsubd_msg_print (&m);
|
||||||
|
|
||||||
|
if (m.type == PUBSUB_TYPE_DISCONNECT) {
|
||||||
|
// TODO remove the application from the subscribers
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
|
||||||
|
pubsubd_msg_send (chan->alh, &m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (wp->ale->action == PUBSUB_SUB) {
|
||||||
|
// TODO
|
||||||
|
printf ("subscribe to something\n");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
printf ("\033[31mdo not know what you want to do\033[00m\n");
|
||||||
|
printf ("\tale->p : %p\n", wp->ale->p);
|
||||||
|
}
|
||||||
|
|
||||||
|
pubsubd_app_list_elm_free (wp->ale);
|
||||||
|
|
||||||
|
pthread_exit (NULL);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
main(int argc, char* argv[])
|
main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
@ -28,8 +75,8 @@ main(int argc, char* argv[])
|
|||||||
// for each new process
|
// for each new process
|
||||||
struct app_list_elm ale;
|
struct app_list_elm ale;
|
||||||
bzero (&ale, sizeof (struct app_list_elm));
|
bzero (&ale, sizeof (struct app_list_elm));
|
||||||
|
struct channel *chan;
|
||||||
pubsubd_get_new_process (&srv, &ale, &chans);
|
pubsubd_get_new_process (&srv, &ale, &chans, &chan);
|
||||||
pubsubd_channels_print (&chans);
|
pubsubd_channels_print (&chans);
|
||||||
|
|
||||||
// end the application
|
// end the application
|
||||||
@ -37,7 +84,6 @@ main(int argc, char* argv[])
|
|||||||
printf ("Quitting ...\n");
|
printf ("Quitting ...\n");
|
||||||
|
|
||||||
pubsubd_channels_del_all (&chans);
|
pubsubd_channels_del_all (&chans);
|
||||||
// pubsubd_app_list_elm_free (&ale);
|
|
||||||
srv_close (&srv);
|
srv_close (&srv);
|
||||||
|
|
||||||
// TODO end the threads
|
// TODO end the threads
|
||||||
@ -46,19 +92,16 @@ main(int argc, char* argv[])
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO thread to handle multiple clients at a time
|
// TODO thread to handle multiple clients at a time
|
||||||
|
struct worker_params *wp;
|
||||||
|
wp = malloc (sizeof (struct worker_params));
|
||||||
|
wp->ale = pubsubd_app_list_elm_copy (&ale);
|
||||||
|
wp->chans = &chans;
|
||||||
|
wp->chan = chan;
|
||||||
|
|
||||||
// TODO register the subscriber
|
pthread_t thr;
|
||||||
// each chan has a list of subscribers
|
|
||||||
// someone who only push a msg doesn't need to be registered
|
|
||||||
if (ale.action == PUBSUB_SUB || ale.action == PUBSUB_BOTH) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
else if (ale.action == PUBSUB_PUB) {
|
|
||||||
// TODO add it to the application to follow
|
|
||||||
// TODO publish a message
|
|
||||||
|
|
||||||
// then
|
pthread_create (&thr, NULL, pubsubd_worker_thread, wp);
|
||||||
}
|
pthread_detach (thr);
|
||||||
|
|
||||||
pubsubd_app_list_elm_free (&ale);
|
pubsubd_app_list_elm_free (&ale);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user