Archived
3
0
Conflicts:
	lib/communication.c
	lib/process.c
This commit is contained in:
lapupe 2016-09-10 16:59:53 +02:00
commit bde5bc0283
18 changed files with 603 additions and 423 deletions

View File

@ -2,36 +2,37 @@
#include <stdio.h> #include <stdio.h>
#include <time.h> #include <time.h>
int file_open (FILE **f, const char *path, const char *mode) int file_write (const char *path, const char *buf, size_t msize)
{ {
printf ("opening %s\n", path); int fd = open (path, O_WRONLY);
if (*f != NULL) { if (fd <= 0) {
printf ("f != NULL : %p\n", (void*) *f);
if (file_close (*f)) {
return ER_FILE_CLOSE;
}
}
*f = fopen (path, mode);
if (*f == NULL) {
fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
return ER_FILE_OPEN; return ER_FILE_OPEN;
} }
printf ("opened : %ld\n", (long) *f);
int ret = 0;
ret = write (fd, buf, msize);
return 0; close (fd);
return ret;
} }
int file_close (FILE *f) /*int file_read (const char *path, char **buf, size_t *msize)
{ {
<<<<<<< HEAD
if (f != 0) { if (f != 0) {
if (fclose (f)) { if (fclose (f)) {
return ER_FILE_CLOSE; return ER_FILE_CLOSE;
} }
printf ("FILE_CLOSE: closed file\n"); printf ("FILE_CLOSE: closed file\n");
=======
int fd = open (path, O_RDONLY);
if (fd <= 0) {
return ER_FILE_OPEN;
>>>>>>> 99ca565bb6bf3729df3a1ec6ceeb2a58211751f5
} }
return 0;
}
<<<<<<< HEAD
int file_read (FILE *f, char **buf, size_t *msize) { int file_read (FILE *f, char **buf, size_t *msize) {
size_t n =0; size_t n =0;
@ -67,23 +68,53 @@ int file_read (FILE *f, char **buf, size_t *msize) {
return ER_FILE_CLOSE; return ER_FILE_CLOSE;
} }
return ER_FILE_READ; return ER_FILE_READ;
=======
if (*buf == NULL)
*buf = malloc (BUFSIZ);
int ret = 0;
int ret2 = 0;
ret = read (fd, *buf, BUFSIZ);
if (ret <= 0) {
fprintf (stderr, "err: read %s\n", path);
}
else {
*msize = ret;
>>>>>>> 99ca565bb6bf3729df3a1ec6ceeb2a58211751f5
} }
return 0; ret2 = close (fd);
} if (ret2 < 0) {
fprintf (stderr, "err: close [err: %d] %s\n", ret2, path);
perror ("closing");
}
int file_write (FILE *f, const char *buf, size_t msize) return ret;
}*/
int file_read (const char *path, char **buf, size_t *msize)
{ {
if (0 == fwrite (buf, msize, 1, f)) { int fd = open (path, O_RDONLY);
fprintf (stderr, "err writing in the file\n"); if (fd <= 0) {
if (ER_FILE_CLOSE == file_close (f)) { return ER_FILE_OPEN;
fprintf (stderr, "err closing the file\n");
return ER_FILE_CLOSE;
}
return ER_FILE_WRITE;
} }
if (*buf == NULL)
return 0; *buf = malloc (BUFSIZ);
int ret = 0;
int ret2 = 0;
ret = read (fd, *buf, BUFSIZ);
if (ret <= 0) {
fprintf (stderr, "err: read %s\n", path);
}
else {
*msize = ret;
}
ret2 = close (fd);
if (ret2 < 0) {
fprintf (stderr, "err: close [err: %d] %s\n", ret2, path);
perror ("closing");
}
return ret;
} }
int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname)) int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname))
@ -164,72 +195,20 @@ int srv_close (struct service *srv)
return 0; return 0;
} }
// only get a raw line from TMPDIR/<service>
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize)
{
*buf = malloc(BUFSIZ);
memset (*buf, 0, BUFSIZ);
FILE * f = NULL;
if (file_open (&f, srv->spath, "rb")) {
return ER_FILE_OPEN;
}
char *ret = NULL;
ret = fgets (*buf, BUFSIZ, f);
if (ret == NULL) {
return ER_FILE_READ;
}
buf[0][BUFSIZ -1] = '\0';
if (file_close (f)) {
return ER_FILE_CLOSE;
}
*msize = strlen (*buf);
return 0;
}
int srv_get_new_process (const struct service *srv, struct process *p) int srv_get_new_process (const struct service *srv, struct process *p)
{ {
if (srv->spath == NULL) { if (srv->spath == NULL) {
return -1; return -1;
} }
char buf[BUFSIZ]; char *buf = NULL;
memset (buf, 0, BUFSIZ); size_t msize = 0;
int ret = file_read (srv->spath, &buf, &msize);
// read the pipe, get a process to work on if (ret <= 0) {
struct timespec ts = { 0 }; fprintf (stderr, "err: listening on %s\n", srv->spath);
struct timespec ts2 = { 0 }; exit (1);
FILE * f = NULL;
if (file_open (&f, srv->spath, "rb")) {
return ER_FILE_OPEN;
} }
clock_gettime(CLOCK_REALTIME, &ts);
char *ret = NULL;
ret = fgets (buf, BUFSIZ, f);
if (ret == NULL) {
if (file_close (f)) {
return ER_FILE_CLOSE;
}
return ER_FILE_READ;
}
clock_gettime(CLOCK_REALTIME, &ts2);
if (file_close (f)) {
return ER_FILE_CLOSE;
}
printf("sec: %ld nsec: %ld\n", ts.tv_sec, ts.tv_nsec);
printf("sec: %ld nsec: %ld\n", ts2.tv_sec, ts2.tv_nsec);
printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec);
char *token = NULL, *saveptr = NULL; char *token = NULL, *saveptr = NULL;
char *str = NULL; char *str = NULL;
int i = 0; int i = 0;
@ -254,93 +233,22 @@ int srv_get_new_process (const struct service *srv, struct process *p)
} }
} }
// printf("pid = %d, index = %d, version = %d \n",pid, index, version );
if (buf != NULL)
free (buf);
srv_process_gen (p, pid, index, version); srv_process_gen (p, pid, index, version);
return 1; return 1;
} }
int srv_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize))
{
if (file_open (&p->out, p->path_out, "rb")) {
fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL;
}
return ER_FILE_OPEN;
}
if (cb != NULL) {
int ret = (*cb) (p->out, buf, msize);
if (ret != 0)
return ret;
}
else {
int ret = file_read (p->out, buf, msize);
if (ret != 0) {
return ret;
}
}
// printf ("DEBUG read, size %ld : %s\n", *msize, *buf);
if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL;
}
p->out = NULL;
return 0;
}
int srv_read (struct process *p, char ** buf, size_t * msize) int srv_read (struct process *p, char ** buf, size_t * msize)
{ {
if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "rb")) { return file_read (p->path_out, buf, msize);
fprintf (stderr, "err opening the file %s\n", p->path_out);
return ER_FILE_OPEN;
}
int ret = file_read (p->out, buf, msize);
if (ret != 0) {
p->out = NULL;
return ret;
}
// printf ("DEBUG read, size %ld : %s\n", *msize, buf);
if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL;
return ER_FILE_CLOSE;
}
p->out = NULL;
return 0;
} }
int srv_write (struct process *p, char * buf, size_t msize) int srv_write (struct process *p, char * buf, size_t msize)
{ {
if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "wb")) { return file_write (p->path_in, buf, msize);
fprintf (stderr, "err opening the file %s\n", p->path_in);
return ER_FILE_OPEN;
}
int ret = file_write (p->in, buf, msize);
if (ret != 0) {
fprintf (stderr, "err writing in the file %s\n", p->path_in);
p->in = NULL;
return ret;
}
if (ER_FILE_CLOSE == file_close (p->in)) {
fprintf (stderr, "err closing the file %s\n", p->path_in);
p->in = NULL;
return ER_FILE_CLOSE;
}
p->in = NULL;
return 0;
} }
// APPLICATION // APPLICATION
@ -349,27 +257,10 @@ int srv_write (struct process *p, char * buf, size_t msize)
int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize) int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize)
{ {
if (srv == NULL) { if (srv == NULL) {
return 1; return -1;
} }
FILE * f = NULL; return file_write (srv->spath, connectionstr, msize);
if (ER_FILE_OPEN == file_open (&f, srv->spath, "wb")) {
fprintf (stderr, "err opening the service file %s\n", srv->spath);
return ER_FILE_OPEN;
}
int ret = file_write (f, connectionstr, msize);
if (ret != 0) {
fprintf (stderr, "err writing in the service file %s\n", srv->spath);
return ret;
}
if (ER_FILE_CLOSE == file_close (f)) {
fprintf (stderr, "err closing the file\n");
return ER_FILE_CLOSE;
}
return 0;
} }
int app_create (struct process *p, pid_t pid, int index, int version) int app_create (struct process *p, pid_t pid, int index, int version)
@ -458,84 +349,12 @@ int app_destroy (struct process *p)
return 0; return 0;
} }
int app_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize))
{
if (file_open (&p->in, p->path_in, "rb")) {
fprintf (stderr, "\033[31merr: app_read_cb, file_open\033[00m\n");
p->in = NULL;
return 1;
}
if (cb != NULL) {
int ret = (*cb) (p->in, buf, msize);
if (ret != 0)
return ret;
}
else {
int ret = file_read (p->in, buf, msize);
if (ret != 0) {
p->in = NULL;
return ret;
}
}
if (ER_FILE_CLOSE == file_close (p->in)) {
fprintf (stderr, "err closing the file %s\n", p->path_in);
}
p->in = NULL;
return 0;
}
int app_read (struct process *p, char ** buf, size_t * msize) int app_read (struct process *p, char ** buf, size_t * msize)
{ {
if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "rb")) { return file_read (p->path_in, buf, msize);
fprintf (stderr, "err opening the file %s\n", p->path_in);
return ER_FILE_OPEN;
}
int ret = file_read (p->in, buf, msize);
if (ret != 0) {
p->in = NULL;
return ret;
}
if (ER_FILE_CLOSE == file_close (p->in)) {
fprintf (stderr, "err closing the file %s\n", p->path_in);
p->in = NULL;
return ER_FILE_CLOSE;
}
p->in = NULL;
return 0;
} }
int app_write (struct process *p, char * buf, size_t msize) int app_write (struct process *p, char * buf, size_t msize)
{ {
if (buf == NULL) { return file_write (p->path_out, buf, msize);
return ER_FILE_WRITE_PARAMS;
}
if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) {
fprintf (stderr, "err opening the file %s\n", p->path_out);
return ER_FILE_OPEN;
}
int ret = file_write (p->out, buf, msize);
if (ret != 0) {
fprintf (stderr, "err writing in the file %s\n", p->path_out);
p->out = NULL;
return ret;
}
if (ER_FILE_CLOSE == file_close (p->out)) {
fprintf (stderr, "err closing the file %s\n", p->path_out);
p->out = NULL;
return ER_FILE_CLOSE;
}
p->out = NULL;
return 0;
} }

View File

@ -37,7 +37,6 @@ int srv_init (int argc, char **argv, char **env
, int (*cb)(int argc, char **argv, char **env , int (*cb)(int argc, char **argv, char **env
, struct service *srv, const char *sname)); , struct service *srv, const char *sname));
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize);
int srv_get_new_process (const struct service *srv, struct process *proc); int srv_get_new_process (const struct service *srv, struct process *proc);
/* /*
@ -50,8 +49,6 @@ int srv_get_new_process (const struct service *srv, struct process *proc);
int srv_create (struct service *srv); int srv_create (struct service *srv);
int srv_close (struct service *srv); int srv_close (struct service *srv);
int srv_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize));
int srv_read (struct process *, char ** buf, size_t *); int srv_read (struct process *, char ** buf, size_t *);
int srv_write (struct process *, char * buf, size_t); int srv_write (struct process *, char * buf, size_t);
@ -63,15 +60,11 @@ int app_srv_connection (struct service *, const char *, size_t);
int app_create (struct process *, pid_t pid, int index, int version); int app_create (struct process *, pid_t pid, int index, int version);
int app_destroy (struct process *); int app_destroy (struct process *);
int app_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize));
int app_read (struct process *, char ** buf, size_t *); int app_read (struct process *, char ** buf, size_t *);
int app_write (struct process *, char * buf, size_t); int app_write (struct process *, char * buf, size_t);
// wrappers // wrappers
int file_open (FILE **f, const char *path, const char *mode); int file_read (const char *path, char **buf, size_t *msize);
int file_close (FILE *f); int file_write (const char *path, const char *buf, size_t msize);
int file_read (FILE *f, char **buf, size_t *msize);
int file_write (FILE *f, const char *buf, size_t msize);
#endif #endif

View File

@ -39,10 +39,15 @@ void srv_process_gen (struct process *p
void srv_process_free (struct process * p) void srv_process_free (struct process * p)
{ {
// TODO nothing to do now // TODO nothing to do now
snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index);
snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index);
} }
void srv_process_print (struct process *p) void srv_process_print (struct process *p)
{ {
if (p != NULL) if (p != NULL)
printf ("process %d : index %d\n", p->pid, p->index); printf ("process %d : index %d, version %d\n"
, p->pid, p->index, p->version);
} }

View File

@ -19,11 +19,9 @@ struct process {
unsigned int index; unsigned int index;
char path_in [PATH_MAX]; char path_in [PATH_MAX];
char path_out [PATH_MAX]; char path_out [PATH_MAX];
FILE *in, *out;
}; };
struct process * srv_process_copy (const struct process *p); struct process * srv_process_copy (const struct process *p);
void srv_process_free (struct process * p);
int srv_process_eq (const struct process *p1, const struct process *p2); int srv_process_eq (const struct process *p1, const struct process *p2);

View File

@ -59,18 +59,36 @@ struct channel * pubsubd_channel_copy (struct channel *c)
memcpy (copy, c, sizeof(struct channel)); memcpy (copy, c, sizeof(struct channel));
if (c->chan != NULL) { if (c->chan != NULL) {
// copy->chan = strndup (c->chan, c->chanlen); copy->chan = malloc (c->chanlen);
copy->chan = malloc (BUFSIZ); memset (copy->chan, 0, c->chanlen);
memcpy (copy->chan, c->chan, BUFSIZ); memcpy (copy->chan, c->chan, c->chanlen);
copy->chanlen = c->chanlen; copy->chanlen = c->chanlen;
} }
return copy; return copy;
} }
int pubsubd_channel_new (struct channel *c, const char * name)
{
if (c == NULL) {
return 1;
}
size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name);
printf ("NAME : %s, SIZE : %ld\n", name, nlen);
if (c->chan == NULL)
c->chan = malloc (nlen +1);
memset (c->chan, 0, nlen +1);
memcpy (c->chan, name, nlen);
c->chanlen = nlen;
return 0;
}
void pubsubd_channel_free (struct channel * c) void pubsubd_channel_free (struct channel * c)
{ {
// TODO
if (c == NULL) if (c == NULL)
return; return;
@ -98,7 +116,8 @@ struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
int int
pubsubd_channel_eq (const struct channel *c1, const struct channel *c2) pubsubd_channel_eq (const struct channel *c1, const struct channel *c2)
{ {
return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0); return c1->chanlen == c2->chanlen &&
strncmp (c1->chan, c2->chan, c1->chanlen) == 0;
} }
// SUBSCRIBER // SUBSCRIBER
@ -114,7 +133,7 @@ void pubsubd_subscriber_init (struct app_list_head **chans) {
void pubsubd_channels_print (const struct channels *chans) void pubsubd_channels_print (const struct channels *chans)
{ {
printf ("\033[36mmchannels\033[00m\n\n"); printf ("\033[36mmchannels\033[00m\n");
if (chans == NULL) if (chans == NULL)
return ; return ;
@ -130,13 +149,19 @@ void pubsubd_channel_print (const struct channel *c)
if (c == NULL || c->chan == NULL) if (c == NULL || c->chan == NULL)
return; return;
printf ( "\033[32mchan %s\033[00m\n\t", c->chan); if (c->chan == NULL) {
printf ( "\033[32mchan name not available\033[00m\n");
}
else {
printf ( "\033[32mchan %s\033[00m\n", c->chan);
}
if (c->alh == NULL) if (c->alh == NULL)
return; return;
struct app_list_elm *ale = NULL; struct app_list_elm *ale = NULL;
LIST_FOREACH(ale, c->alh, entries) { LIST_FOREACH(ale, c->alh, entries) {
printf ("\t");
srv_process_print (ale->p); srv_process_print (ale->p);
} }
} }
@ -230,7 +255,7 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{ {
if (todel == NULL || todel->p == NULL) if (todel == NULL || todel->p == NULL)
return; return;
srv_process_free (todel->p); free (todel->p);
} }
// MESSAGE, TODO CBOR // MESSAGE, TODO CBOR
@ -339,16 +364,15 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
// COMMUNICATION // COMMUNICATION
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c) , struct channels *chans, struct channel **c)
{ {
if (srv == NULL || ale == NULL || chans == NULL) if (spath == NULL || ale == NULL || chans == NULL)
return -1; return -1;
char *buf = NULL; char *buf = NULL;
size_t msize = 0; size_t msize = 0;
srv_get_listen_raw (srv, &buf, &msize); file_read (spath, &buf, &msize);
// parse pubsubd init msg (sent in TMPDIR/<service>) // parse pubsubd init msg (sent in TMPDIR/<service>)
// //
// line fmt : pid index version action chan // line fmt : pid index version action chan
@ -361,9 +385,12 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
int index = 0; int index = 0;
int version = 0; int version = 0;
// chan name
char chan[BUFSIZ]; char chan[BUFSIZ];
memset (chan, 0, BUFSIZ); memset (chan, 0, BUFSIZ);
printf ("INIT: %s\n", buf);
for (str = buf, i = 1; ; str = NULL, i++) { for (str = buf, i = 1; ; str = NULL, i++) {
token = strtok_r(str, " ", &saveptr); token = strtok_r(str, " ", &saveptr);
if (token == NULL) if (token == NULL)
@ -389,9 +416,11 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
break; break;
} }
case 5 : { case 5 : {
// for the last element of the line
// drop the following \n
if (ale->action != PUBSUB_QUIT) if (ale->action != PUBSUB_QUIT)
memcpy (chan, token, (strlen (token) < BUFSIZ) ? memcpy (chan, token, (strlen (token) < BUFSIZ) ?
strlen (token) : BUFSIZ); strlen (token) -1 : BUFSIZ);
break; break;
} }
} }
@ -418,16 +447,9 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
*c = malloc (sizeof (struct channel)); *c = malloc (sizeof (struct channel));
} }
if (c[0]->chan != NULL) {
free (c[0]->chan);
c[0]->chan = NULL;
}
chan[BUFSIZ -1] = '\0'; chan[BUFSIZ -1] = '\0';
// c[0]->chan = strndup (chan, BUFSIZ); pubsubd_channel_new (*c, chan);
c[0]->chan = malloc (BUFSIZ);
memcpy(c[0]->chan, chan, BUFSIZ);
c[0]->chanlen = strlen (chan);
struct channel *new_chan = NULL; struct channel *new_chan = NULL;
new_chan = pubsubd_channel_get (chans, *c); new_chan = pubsubd_channel_get (chans, *c);
@ -446,91 +468,6 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
return 0; return 0;
} }
// TODO CBOR
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
{
// msg: "type(1) chanlen(8) chan datalen(8) data
printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n");
// read
char type = ' ';
if (0 == fread (&type, 1, 1, f)) {
return ER_FILE_READ;
}
size_t chanlen = 0;
if (0 == fread (&chanlen, sizeof (size_t), 1, f)) {
return ER_FILE_READ;
}
if (chanlen > BUFSIZ) {
return ER_FILE_READ;
}
char *chan = NULL;
chan = malloc (chanlen);
if (chan == NULL) {
return ER_MEM_ALLOC;
}
if (0 == fread (chan, chanlen, 1, f)) {
return ER_FILE_READ;
}
size_t datalen = 0;
if (0 == fread (&datalen, sizeof (size_t), 1, f)) {
free (chan);
return ER_FILE_READ;
}
if (datalen > BUFSIZ) {
return 1;
}
char *data = NULL;
data = malloc (datalen);
if (data == NULL) {
free (chan);
return ER_MEM_ALLOC;
}
if (0 == fread (data, datalen, 1, f)) {
free (chan);
free (data);
return ER_FILE_READ;
}
*msize = 1 + 2 * sizeof (size_t) + chanlen + datalen;
if (*buf == NULL) {
*buf = malloc(*msize);
if (*buf == NULL) {
free (chan);
free (data);
return ER_MEM_ALLOC;
}
}
// TODO CHECK THIS
size_t i = 0;
char *cbuf = *buf;
cbuf[i] = type; i++;
memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t);
memcpy (cbuf + i, chan, chanlen); i += chanlen;
memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t);
memcpy (cbuf + i, data, datalen); i += datalen;
free (chan);
free (data);
printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n");
return 0;
}
// alh from the channel, message to send // alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{ {
@ -551,14 +488,8 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
void pubsubd_msg_print (const struct pubsub_msg *msg) void pubsubd_msg_print (const struct pubsub_msg *msg)
{ {
if (msg == NULL) { printf ("msg: type=%d chan=%s, data=%s\n"
return; , msg->type, msg->chan, msg->data);
}
printf ("\t\t\033[36mMessage\033[00m\n");
printf ("\t\ttype %d\n", msg->type);
printf ("\t\tchan %s\n", msg->chan);
printf ("\t\tdata %s\n", msg->data);
} }
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m) void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
@ -566,7 +497,11 @@ void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m)
// read the message from the process // read the message from the process
size_t mlen = 0; size_t mlen = 0;
char *buf = NULL; char *buf = NULL;
#if 0
srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); srv_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
#else
srv_read (p, &buf, &mlen);
#endif
pubsubd_msg_unserialize (m, buf, mlen); pubsubd_msg_unserialize (m, buf, mlen);
@ -628,19 +563,8 @@ void pubsub_disconnect (struct process *p)
pubsubd_msg_serialize (&m, &buf, &msize); pubsubd_msg_serialize (&m, &buf, &msize);
int ret = app_write (p, buf, msize); int ret = app_write (p, buf, msize);
switch (ret) { if (ret != (int) msize) {
case ER_FILE_WRITE : fprintf (stderr, "err: can't disconnect\n");
fprintf (stderr, "err: ER_FILE_WRITE\n");
break;
case ER_FILE_WRITE_PARAMS :
fprintf (stderr, "err: ER_FILE_WRITE_PARAMS\n");
break;
case ER_FILE_OPEN :
fprintf (stderr, "err: ER_FILE_OPEN\n");
break;
case ER_FILE_CLOSE :
fprintf (stderr, "err: ER_FILE_CLOSE\n");
break;
} }
pubsubd_msg_free (&m); pubsubd_msg_free (&m);
@ -676,7 +600,7 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
// read the message from the process // read the message from the process
size_t mlen = 0; size_t mlen = 0;
char *buf = NULL; char *buf = NULL;
app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); app_read (p, &buf, &mlen);
pubsubd_msg_unserialize (m, buf, mlen); pubsubd_msg_unserialize (m, buf, mlen);

View File

@ -35,7 +35,7 @@ void pubsubd_msg_print (const struct pubsub_msg *msg);
// //
// line fmt : pid index version action chan // line fmt : pid index version action chan
// action : quit | pub | sub // action : quit | pub | sub
int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
, struct channels *chans, struct channel **c); , struct channels *chans, struct channel **c);
int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize); int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
@ -60,6 +60,7 @@ struct channel {
}; };
// simple channel // simple channel
int pubsubd_channel_new (struct channel *c, const char *name);
struct channel * pubsubd_channel_copy (struct channel *c); struct channel * pubsubd_channel_copy (struct channel *c);
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c); struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
void pubsubd_channel_free (struct channel *c); void pubsubd_channel_free (struct channel *c);

View File

@ -1,5 +1,5 @@
CC=gcc CC=gcc
CFLAGS=-Wall -g CFLAGS=-Wall -g -Wextra
LDFLAGS= -pthread LDFLAGS= -pthread
CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change
EXEC=$(basename $(wildcard *.c)) EXEC=$(basename $(wildcard *.c))
@ -15,8 +15,11 @@ $(EXEC): $(OBJECTS) $(CFILES)
.c.o: .c.o:
$(CC) -c $(CFLAGS) $< -o $@ $(CC) -c $(CFLAGS) $< -o $@
$(TESTS):
valgrind --show-leak-kinds=all --leak-check=full -v --track-origins=yes ./$(basename $@)
clean: clean:
-rm $(OBJECTS) @-rm $(OBJECTS) *.o
mrproper: clean mrproper: clean
rm $(EXEC) @-rm $(EXEC)

48
pubsub/msg-serialize.c Normal file
View File

@ -0,0 +1,48 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#include <string.h>
#define MESSAGE "coucou"
#define CHAN "chan1"
void
ohshit(int rvalue, const char* str) {
fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
exit (rvalue);
}
void usage (char **argv)
{
printf ( "usage: %s\n", argv[0]);
}
int
main(int argc, char **argv)
{
if (argc != 1) {
usage (argv);
exit (1);
}
struct pubsub_msg msg;
memset (&msg, 0, sizeof (struct pubsub_msg));
msg.type = PUBSUB_TYPE_MESSAGE;
msg.chan = malloc (strlen (CHAN) + 1);
strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
msg.chanlen = strlen (CHAN) + 1;
msg.data = malloc (strlen (MESSAGE) + 1);
strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
msg.datalen = strlen (MESSAGE) + 1;
char *data = NULL;
size_t len = 0;
pubsubd_msg_serialize (&msg, &data, &len);
pubsubd_msg_free (&msg);
if ((int) len != write (1, data, len)) {
ohshit (1, "unable to write the data");
}
return EXIT_SUCCESS;
}

36
pubsub/msg-unserialize.c Normal file
View File

@ -0,0 +1,36 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#include <string.h>
void
ohshit(int rvalue, const char* str) {
fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
exit (rvalue);
}
void usage (char **argv)
{
printf ( "usage: cat msg | %s\n", argv[0]);
}
int
main(int argc, char **argv)
{
if (argc != 1) {
usage (argv);
exit (1);
}
char data[BUFSIZ];
memset (data, 0, BUFSIZ);
size_t len = read (0, data, BUFSIZ);
printf ("msg len %ld\n", len);
struct pubsub_msg msg;
pubsubd_msg_unserialize (&msg, data, len);
pubsubd_msg_print (&msg);
pubsubd_msg_free (&msg);
return EXIT_SUCCESS;
}

View File

@ -17,7 +17,7 @@ void usage (char **argv)
void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan) void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan)
{ {
printf ("Simulate connnection : pid %d index %d version %d " printf ("Simulate connection : pid %d index %d version %d "
"cmd %s chan %s\n" "cmd %s chan %s\n"
, pid, index, version, cmd, chan ); , pid, index, version, cmd, chan );
@ -29,9 +29,11 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
struct process p; struct process p;
memset (&p, 0, sizeof (struct process)); memset (&p, 0, sizeof (struct process));
printf ("app creation\n");
if (app_create (&p, pid, index, version)) // called by the application if (app_create (&p, pid, index, version)) // called by the application
ohshit (1, "app_create"); ohshit (1, "app_create");
printf ("connection\n");
// send a message to warn the service we want to do something // send a message to warn the service we want to do something
// line : pid index version action chan // line : pid index version action chan
pubsub_connection (&srv, &p, PUBSUB_PUB, chan); pubsub_connection (&srv, &p, PUBSUB_PUB, chan);
@ -39,27 +41,38 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
struct pubsub_msg m; struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
// first message, "coucou" if (strcmp (cmd, "pub") == 0) {
m.type = PUBSUB_TYPE_INFO; // first message, "coucou"
m.chan = malloc (strlen (chan) + 1); m.type = PUBSUB_TYPE_MESSAGE;
m.chan[strlen (chan)] = '\0'; m.chan = malloc (strlen (chan) + 1);
m.chanlen = strlen (chan); memset (m.chan, 0, strlen (chan) + 1);
m.data = malloc (strlen (MYMESSAGE) + 1); m.chan[strlen (chan)] = '\0';
m.datalen = strlen (MYMESSAGE); m.chanlen = strlen (chan);
m.datalen = strlen (MYMESSAGE);
pubsub_msg_send (&p, &m); m.data = malloc (strlen (MYMESSAGE) + 1);
memset (m.data, 0, strlen (MYMESSAGE) + 1);
strncpy ((char *) m.data, MYMESSAGE, strlen (MYMESSAGE) + 1);
m.datalen = strlen (MYMESSAGE);
printf ("send message\n");
pubsub_msg_send (&p, &m);
}
else {
pubsub_msg_recv (&p, &m);
pubsubd_msg_print (&m);
}
// free everything // free everything
pubsubd_msg_free (&m); pubsubd_msg_free (&m);
printf ("disconnection\n");
// disconnect from the server // disconnect from the server
pubsub_disconnect (&p); pubsub_disconnect (&p);
printf ("destroying app\n");
// the application will shut down, and remove the application named pipes // the application will shut down, and remove the application named pipes
if (app_destroy (&p)) if (app_destroy (&p))
ohshit (1, "app_destroy"); ohshit (1, "app_destroy");
srv_process_free (&p);
} }
void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version) void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version)
@ -78,8 +91,6 @@ void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index,
// send a message to disconnect // send a message to disconnect
// line : pid index version action chan // line : pid index version action chan
pubsub_disconnect (&p); pubsub_disconnect (&p);
srv_process_free (&p);
} }
int int

View File

@ -55,7 +55,5 @@ main(int argc, char **argv, char **env)
if (app_destroy (&p)) if (app_destroy (&p))
ohshit (1, "app_destroy"); ohshit (1, "app_destroy");
srv_process_free (&p);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }

View File

@ -18,19 +18,46 @@ struct worker_params {
void * pubsubd_worker_thread (void *params) void * pubsubd_worker_thread (void *params)
{ {
struct worker_params *wp = (struct worker_params *) params; struct worker_params *wp = (struct worker_params *) params;
if (wp == NULL) {
fprintf (stderr, "error pubsubd_worker_thread : params NULL\n");
return NULL;
}
while (1) {
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
sleep (5); // TODO DEBUG
printf ("MESSAGE : ");
pubsubd_msg_recv (wp->ale->p, &m);
pubsubd_msg_print (&m);
if (m.type == PUBSUB_TYPE_DISCONNECT) {
if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
fprintf (stderr, "err : subscriber not registered\n");
}
break;
}
else {
struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
pubsubd_msg_send (chan->alh, &m);
}
}
#if 0
while (1) { while (1) {
// each chan has a list of subscribers // each chan has a list of subscribers
// someone who only push a msg doesn't need to be registered // someone who only push a msg doesn't need to be registered
if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { // if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
if (wp->ale->action == PUBSUB_PUB) {
// publish a message // publish a message
printf ("publish or publish and subscribe to chan %s\n" printf ("publish to chan %s\n", wp->chan->chan);
, wp->chan->chan);
struct pubsub_msg m; struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
sleep (5); sleep (5); // TODO DEBUG
pubsubd_msg_recv (wp->ale->p, &m); pubsubd_msg_recv (wp->ale->p, &m);
pubsubd_msg_print (&m); pubsubd_msg_print (&m);
@ -61,13 +88,14 @@ void * pubsubd_worker_thread (void *params)
break; break;
} }
} }
#endif
pubsubd_app_list_elm_free (wp->ale); pubsubd_app_list_elm_free (wp->ale);
pthread_exit (NULL); pthread_exit (NULL);
} }
int int
main(int argc, char **argv, char **env) main(int argc, char **argv, char **env)
{ {
struct service srv; struct service srv;
@ -89,7 +117,7 @@ main(int argc, char **argv, char **env)
struct app_list_elm ale; struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm)); memset (&ale, 0, sizeof (struct app_list_elm));
struct channel *chan = NULL; struct channel *chan = NULL;
pubsubd_get_new_process (&srv, &ale, &chans, &chan); pubsubd_get_new_process (srv.spath, &ale, &chans, &chan);
pubsubd_channels_print (&chans); pubsubd_channels_print (&chans);
// end the application // end the application

106
pubsub/test-chan-lists.c Normal file
View File

@ -0,0 +1,106 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#define TEST_NAME "test-chan-lists"
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
int
main(int argc, char **argv, char **env)
{
struct service srv;
memset (&srv, 0, sizeof (struct service));
srv_init (argc, argv, env, &srv, TEST_NAME, NULL);
printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications
if (srv_create (&srv))
ohshit(1, "service_create error");
// init chans list
struct channels chans;
memset (&chans, 0, sizeof (struct channels));
pubsubd_channels_init (&chans);
// for each new process
// struct app_list_elm ale1;
// memset (&ale1, 0, sizeof (struct app_list_elm));
// warning : this is a local structure, not exactly the same in the prog.
struct channel chan;
memset (&chan, 0, sizeof (struct channel));
pubsubd_channel_new (&chan, "coucou");
// to emulate
// pubsubd_get_new_process (&srv, &ale1, &chans, &chan);
// FIRST CHAN TO BE ADDED
// search for the chan in channels, add it if not found
struct channel *new_chan = NULL;
new_chan = pubsubd_channel_get (&chans, &chan);
if (new_chan == NULL) {
new_chan = pubsubd_channels_add (&chans, &chan);
pubsubd_subscriber_init (&new_chan->alh);
}
else {
ohshit (2, "error : new chan, can't be found in channels yet");
}
pubsubd_channel_free (&chan);
printf ("print the channels, 1 chan\n");
printf ("--\n");
pubsubd_channels_print (&chans);
printf ("--\n");
// SAME CHAN, SHOULD NOT BE ADDED
pubsubd_channel_new (&chan, "coucou");
// search for the chan in channels, add it if not found
new_chan = pubsubd_channel_get (&chans, &chan);
if (new_chan == NULL) {
ohshit (3, "error : same chan, shouldn't be added in channels");
}
else {
printf ("already in the 'channels' structure\n");
}
printf ("print the channels, 1 chan\n");
printf ("--\n");
pubsubd_channels_print (&chans);
printf ("--\n");
// NEW CHAN, SHOULD BE ADDED
pubsubd_channel_new (&chan, "salut");
// search for the chan in channels, add it if not found
new_chan = pubsubd_channel_get (&chans, &chan);
if (new_chan == NULL) {
new_chan = pubsubd_channels_add (&chans, &chan);
pubsubd_subscriber_init (&new_chan->alh);
}
else {
ohshit (4, "error : new chan, should be added in channels");
}
pubsubd_channel_free (&chan);
printf ("print the channels, 2 chans\n");
printf ("--\n");
pubsubd_channels_print (&chans);
printf ("--\n");
// end the application
pubsubd_channels_del_all (&chans);
printf ("\nshould be empty now\n");
printf ("--\n");
pubsubd_channels_print (&chans);
printf ("--\n");
// the application will shut down, and remove the service named pipe
if (srv_close (&srv))
ohshit (1, "service_close error");
return EXIT_SUCCESS;
}

View File

@ -0,0 +1,51 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#include <string.h>
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
void usage (char **argv) {
fprintf (stderr, "usage: %s path\n", argv[0]);
exit (1);
}
int
main(int argc, char **argv)
{
if (argc != 2) {
usage (argv);
}
char *spath = argv[1];
printf ("Listening on %s.\n", spath);
struct channels chans;
memset (&chans, 0, sizeof (struct channels));
for (int nb = 10, i = 0 ; nb > 0; i++, nb--) {
struct app_list_elm ale;
memset (&ale, 0, sizeof (struct app_list_elm));
struct channel chan;
memset (&chan, 0, sizeof (struct channel));
struct channel *c = &chan;
pubsubd_get_new_process (spath, &ale, &chans, &c);
printf ("print the channels, %d chan\n", i);
printf ("--\n");
pubsubd_channels_print (&chans);
printf ("--\n");
printf ("still %d remaining processes\n", nb);
}
pubsubd_channels_del_all (&chans);
return EXIT_SUCCESS;
}

7
pubsub/test-gen-new-process.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
for i in $(seq 1 10)
do
echo "${i} 1 1 pub chan${i}" > /tmp/ipc/gen
sleep 0.1
done

57
pubsub/test-pipe-read.c Normal file
View File

@ -0,0 +1,57 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#define TEST_NAME "test-chan-lists"
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
void usage (char **argv) {
fprintf (stderr, "usage: %s path times\n", argv[0]);
fprintf (stderr, "ex: %s /tmp/pipe 5 => you will read 5 times\n", argv[0]);
exit (1);
}
int
main(int argc, char **argv)
{
if (argc != 3)
usage (argv);
char *path = argv[1];
int nb = atoi (argv[2]);
printf ("Listening on %s %d times.\n", path, nb);
char *buf = NULL;
size_t msize = 0;
int ret = 0;
while (nb--) {
ret = file_read (path, &buf, &msize);
if (ret <= 0) {
fprintf (stderr, "no msg");
if (ret == ER_FILE_OPEN) {
fprintf (stderr, " ER_FILE_OPEN");
}
fprintf (stderr, "\n");
nb++;
continue;
}
if (msize > 0) {
printf ("msg size %ld\t", msize);
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
pubsubd_msg_unserialize (&m, buf, msize);
pubsubd_msg_print (&m);
pubsubd_msg_free (&m);
}
}
return EXIT_SUCCESS;
}

57
pubsub/test-pipe-write.c Normal file
View File

@ -0,0 +1,57 @@
#include "../lib/pubsubd.h"
#include <stdlib.h>
#define CHAN "chan1"
#define MESSAGE "coucou"
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
void usage (char **argv) {
fprintf (stderr, "usage: %s path times\n", argv[0]);
fprintf (stderr, "ex: %s /tmp/pipe 5 => you will write 5 times\n", argv[0]);
exit (1);
}
int
main(int argc, char **argv)
{
if (argc != 3)
usage (argv);
char *path = argv[1];
int nb = atoi (argv[2]);
printf ("Writing on %s %d times.\n", path, nb);
char *buf = NULL;
size_t msize = 0;
int ret = 0;
while (nb--) {
struct pubsub_msg msg;
memset (&msg, 0, sizeof (struct pubsub_msg));
msg.type = PUBSUB_TYPE_MESSAGE;
msg.chan = malloc (strlen (CHAN) + 1);
strncpy ((char *)msg.chan, CHAN, strlen (CHAN) + 1);
msg.chanlen = strlen (CHAN) + 1;
msg.data = malloc (strlen (MESSAGE) + 1);
strncpy ((char *)msg.data, MESSAGE, strlen (CHAN) + 1);
msg.datalen = strlen (MESSAGE) + 1;
pubsubd_msg_serialize (&msg, &buf, &msize);
pubsubd_msg_print (&msg);
pubsubd_msg_free (&msg);
ret = file_write (path, buf, msize);
if (ret != (int) msize) {
fprintf (stderr, "msg not written\n");
}
}
return EXIT_SUCCESS;
}

38
pubsub/test-test-send-params.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/bash
# start pubsub-test-send alone, with some parameters
# then test it with this script
REP=/tmp/ipc/
PID=10
INDEX=1
VERSION=1
ACTION="pub"
CHAN="chan1"
if [ $# != 0 ] ; then
PID=$1
shift
fi
if [ $# != 0 ] ; then
INDEX=$1
shift
fi
if [ $# != 0 ] ; then
ACTION=$1
shift
fi
if [ $# != 0 ] ; then
CHAN=$1
shift
fi
echo "there should be a line in $REP/pubsub"
cat $REP/pubsub
echo ""
echo "there should be something to read in $REP/${PID}-${INDEX}-out"
cat $REP/${PID}-${INDEX}-out | xxd