Archived
3
0

pubsubd: GROS CHANTIER

This commit is contained in:
Philippe PITTOLI 2016-09-09 01:36:44 +02:00
parent 368e54d4cf
commit 21e326a3d4
6 changed files with 93 additions and 61 deletions

View File

@ -6,7 +6,7 @@ int file_open (FILE **f, const char *path, const char *mode)
{ {
printf ("opening %s\n", path); printf ("opening %s\n", path);
if (*f != NULL) { if (*f != NULL) {
printf ("f != NULL : %p\n", (void*) *f); // printf ("f != NULL : %p\n", (void*) *f);
if (file_close (*f)) { if (file_close (*f)) {
return ER_FILE_CLOSE; return ER_FILE_CLOSE;
} }
@ -16,7 +16,7 @@ int file_open (FILE **f, const char *path, const char *mode)
fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path); fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
return ER_FILE_OPEN; return ER_FILE_OPEN;
} }
printf ("opened : %ld\n", (long) *f); // printf ("opened : %ld\n", (long) *f);
return 0; return 0;
} }
@ -24,11 +24,11 @@ int file_open (FILE **f, const char *path, const char *mode)
int file_close (FILE *f) int file_close (FILE *f)
{ {
if (f != 0) { if (f != 0) {
printf ("before fclosing\n"); // printf ("before fclosing\n");
if (fclose (f)) { if (fclose (f)) {
return ER_FILE_CLOSE; return ER_FILE_CLOSE;
} }
printf ("after fclosing\n"); // printf ("after fclosing\n");
} }
return 0; return 0;
} }
@ -48,16 +48,21 @@ int file_read (FILE *f, char **buf, size_t *msize) {
} }
} }
*msize = fread (*buf, *msize, 1, f); int ret = 0;
if (*msize == 0) {
ret = fread (*buf, *msize, 1, f);
if (ret < 0) {
fprintf (stderr, "err can't read a file\n"); fprintf (stderr, "err can't read a file\n");
if (ER_FILE_CLOSE == file_close (f)) { ret = file_close (f);
fprintf (stderr, "err closing the file\n"); if (ret != 0)
return ER_FILE_CLOSE; return ret;
}
return ER_FILE_READ; return ER_FILE_READ;
} }
ret = file_close (f);
if (ret != 0)
return ret;
return 0; return 0;
} }
@ -260,16 +265,13 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize
return ER_FILE_OPEN; return ER_FILE_OPEN;
} }
int ret = 0;
if (cb != NULL) { if (cb != NULL) {
int ret = (*cb) (p->out, buf, msize); ret = (*cb) (p->out, buf, msize);
if (ret != 0)
return ret;
} }
else { else {
int ret = file_read (p->out, buf, msize); ret = file_read (p->out, buf, msize);
if (ret != 0) {
return ret;
}
} }
// printf ("DEBUG read, size %ld : %s\n", *msize, *buf); // printf ("DEBUG read, size %ld : %s\n", *msize, *buf);
@ -279,7 +281,7 @@ int srv_read_cb (struct process *p, char ** buf, size_t * msize
} }
p->out = NULL; p->out = NULL;
return 0; return ret;
} }
int srv_read (struct process *p, char ** buf, size_t * msize) int srv_read (struct process *p, char ** buf, size_t * msize)
@ -289,10 +291,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize)
return ER_FILE_OPEN; return ER_FILE_OPEN;
} }
int ret = file_read (p->out, buf, msize); int ret = 0;
ret = file_read (p->out, buf, msize);
if (ret != 0) { if (ret != 0) {
p->out = NULL; p->out = NULL;
return ret;
} }
// printf ("DEBUG read, size %ld : %s\n", *msize, buf); // printf ("DEBUG read, size %ld : %s\n", *msize, buf);
@ -300,11 +303,11 @@ int srv_read (struct process *p, char ** buf, size_t * msize)
if (ER_FILE_CLOSE == file_close (p->out)) { if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out); fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL; p->out = NULL;
return ER_FILE_CLOSE; ret = ER_FILE_CLOSE;
} }
p->out = NULL; p->out = NULL;
return 0; return ret;
} }
int srv_write (struct process *p, char * buf, size_t msize) int srv_write (struct process *p, char * buf, size_t msize)
@ -455,20 +458,18 @@ int app_read_cb (struct process *p, char ** buf, size_t * msize
return 1; return 1;
} }
int ret = 0;
if (cb != NULL) { if (cb != NULL) {
int ret = (*cb) (p->in, buf, msize); ret = (*cb) (p->in, buf, msize);
if (ret != 0)
return ret;
} }
else { else {
int ret = file_read (p->in, buf, msize); ret = file_read (p->in, buf, msize);
if (ret != 0) { if (ret != 0) {
p->in = NULL; p->in = NULL;
return ret;
} }
} }
if (ER_FILE_CLOSE == file_close (p->in)) { if (ER_FILE_CLOSE == file_close (p->in)) {
fprintf (stderr, "err closing the file %s\n", p->path_in); fprintf (stderr, "err closing the file %s\n", p->path_in);
} }

View File

@ -390,6 +390,8 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
char chan[BUFSIZ]; char chan[BUFSIZ];
memset (chan, 0, BUFSIZ); memset (chan, 0, BUFSIZ);
printf ("INIT: %s\n", buf);
for (str = buf, i = 1; ; str = NULL, i++) { for (str = buf, i = 1; ; str = NULL, i++) {
token = strtok_r(str, " ", &saveptr); token = strtok_r(str, " ", &saveptr);
if (token == NULL) if (token == NULL)
@ -417,7 +419,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
case 5 : { case 5 : {
if (ale->action != PUBSUB_QUIT) if (ale->action != PUBSUB_QUIT)
memcpy (chan, token, (strlen (token) < BUFSIZ) ? memcpy (chan, token, (strlen (token) < BUFSIZ) ?
strlen (token) : BUFSIZ); strlen (token) -1 : BUFSIZ);
break; break;
} }
} }
@ -465,6 +467,7 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
return 0; return 0;
} }
#if 0
// TODO CBOR // TODO CBOR
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
{ {
@ -550,6 +553,8 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
return 0; return 0;
} }
#endif
// alh from the channel, message to send // alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{ {
@ -570,14 +575,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
void pubsubd_msg_print (const struct pubsub_msg *msg) void pubsubd_msg_print (const struct pubsub_msg *msg)
{ {
if (msg == NULL) { printf ("msg: type=%d chan=%s, data=%s\n"
return; , msg->type, msg->chan, msg->data);
}
printf ("\t\t\033[36mMessage\033[00m\n");
printf ("\t\ttype %d\n", msg->type);
printf ("\t\tchan %s\n", msg->chan);
printf ("\t\tdata %s\n", msg->data);
} }
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
@ -585,7 +584,11 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
// read the message from the process // read the message from the process
size_t mlen = 0; size_t mlen = 0;
char *buf = NULL; char *buf = NULL;
#if 0
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
#else
srv_read (p, &buf, &mlen);
#endif
pubsubd_msg_unserialize (m, buf, mlen); pubsubd_msg_unserialize (m, buf, mlen);
@ -695,7 +698,11 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
// read the message from the process // read the message from the process
size_t mlen = 0; size_t mlen = 0;
char *buf = NULL; char *buf = NULL;
#if 0
app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
#else
app_read_cb (p, &buf, &mlen, NULL);
#endif
pubsubd_msg_unserialize (m, buf, mlen); pubsubd_msg_unserialize (m, buf, mlen);

View File

@ -19,7 +19,7 @@ $(TESTS):
valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@) valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@)
clean: clean:
@-rm $(OBJECTS) @-rm $(OBJECTS) *.o
mrproper: clean mrproper: clean
@-rm $(EXEC) @-rm $(EXEC)

View File

@ -13,13 +13,6 @@ void usage (char **argv)
printf ( "usage: cat msg | %s\n", argv[0]); printf ( "usage: cat msg | %s\n", argv[0]);
} }
void msg_print (struct pubsub_msg *msg) {
printf ("msg: type=%d chan=%.*s, data=%.*s\n"
, msg->type
, msg->chanlen, msg->chan
, msg->datalen, msg->data);
}
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
@ -36,7 +29,7 @@ main(int argc, char **argv)
struct pubsub_msg msg; struct pubsub_msg msg;
pubsubd_msg_unserialize (&msg, data, len); pubsubd_msg_unserialize (&msg, data, len);
msg_print (&msg); pubsubd_msg_print (&msg);
pubsubd_msg_free (&msg); pubsubd_msg_free (&msg);
return EXIT_SUCCESS; return EXIT_SUCCESS;

View File

@ -41,12 +41,14 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
struct pubsub_msg m; struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
if (strcmp (cmd, "pub") == 0) {
// first message, "coucou" // first message, "coucou"
m.type = PUBSUB_TYPE_INFO; m.type = PUBSUB_TYPE_MESSAGE;
m.chan = malloc (strlen (chan) + 1); m.chan = malloc (strlen (chan) + 1);
memset (m.chan, 0, strlen (chan) + 1); memset (m.chan, 0, strlen (chan) + 1);
m.chan[strlen (chan)] = '\0'; m.chan[strlen (chan)] = '\0';
m.chanlen = strlen (chan); m.chanlen = strlen (chan);
m.data = malloc (strlen (MYMESSAGE) + 1); m.data = malloc (strlen (MYMESSAGE) + 1);
memset (m.data, 0, strlen (MYMESSAGE) + 1); memset (m.data, 0, strlen (MYMESSAGE) + 1);
strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1); strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1);
@ -54,6 +56,11 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
printf ("send message\n"); printf ("send message\n");
pubsub_msg_send (&p, &m); pubsub_msg_send (&p, &m);
}
else {
pubsub_msg_recv (&p, &m);
pubsubd_msg_print (&m);
}
// free everything // free everything
pubsubd_msg_free (&m); pubsubd_msg_free (&m);

View File

@ -23,13 +23,36 @@ void * pubsubd_worker_thread (void *params)
return NULL; return NULL;
} }
while (1) {
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
sleep (5); // TODO DEBUG
printf ("MESSAGE : ");
pubsubd_msg_recv (wp->ale->p, &m);
pubsubd_msg_print (&m);
if (m.type == PUBSUB_TYPE_DISCONNECT) {
if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
fprintf (stderr, "err : subscriber not registered\n");
}
break;
}
else {
struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
pubsubd_msg_send (chan->alh, &m);
}
}
#if 0
while (1) { while (1) {
// each chan has a list of subscribers // each chan has a list of subscribers
// someone who only push a msg doesn't need to be registered // someone who only push a msg doesn't need to be registered
if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { // if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
if (wp->ale->action == PUBSUB_PUB) {
// publish a message // publish a message
printf ("publish or publish and subscribe to chan %s\n" printf ("publish to chan %s\n", wp->chan->chan);
, wp->chan->chan);
struct pubsub_msg m; struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
@ -65,6 +88,7 @@ void * pubsubd_worker_thread (void *params)
break; break;
} }
} }
#endif
pubsubd_app_list_elm_free (wp->ale); pubsubd_app_list_elm_free (wp->ale);