diff --git a/lib/communication.c b/lib/communication.c index 12a9c93..dd62f7d 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -2,82 +2,39 @@ #include #include -int file_open (FILE **f, const char *path, const char *mode) +int file_write (const char *path, const char *buf, size_t msize) { - printf ("opening %s\n", path); - if (*f != NULL) { - // printf ("f != NULL : %p\n", (void*) *f); - if (file_close (*f)) { - return ER_FILE_CLOSE; - } - } - *f = fopen (path, mode); - if (*f == NULL) { - fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path); + int fd = open (path, O_WRONLY); + if (fd <= 0) { return ER_FILE_OPEN; } - // printf ("opened : %ld\n", (long) *f); + + int ret = 0; + ret = write (fd, buf, msize); - return 0; + close (fd); + + return ret; } -int file_close (FILE *f) +int file_read (const char *path, char **buf, size_t *msize) { - if (f != 0) { - // printf ("before fclosing\n"); - if (fclose (f)) { - return ER_FILE_CLOSE; - } - // printf ("after fclosing\n"); - } - return 0; -} - -int file_read (FILE *f, char **buf, size_t *msize) { - if (*msize == 0) { - *msize = BUFSIZ; // default value - } - - if (*buf == NULL) { - *buf = malloc (*msize); - if (*buf == NULL) { - fprintf (stderr, "err can't allocate enough memory (%ld)\n", *msize); - int ret = file_close (f); - if (ret != 0) - return ret; - } + int fd = open (path, O_RDONLY); + if (fd <= 0) { + return ER_FILE_OPEN; } int ret = 0; - - ret = fread (*buf, *msize, 1, f); + ret = read (fd, *buf, BUFSIZ); if (ret < 0) { - fprintf (stderr, "err can't read a file\n"); - ret = file_close (f); - if (ret != 0) - return ret; - return ER_FILE_READ; - } - - ret = file_close (f); - if (ret != 0) return ret; - - return 0; -} - -int file_write (FILE *f, const char *buf, size_t msize) -{ - if (0 == fwrite (buf, msize, 1, f)) { - fprintf (stderr, "err writing in the file\n"); - if (ER_FILE_CLOSE == file_close (f)) { - fprintf (stderr, "err closing the file\n"); - return ER_FILE_CLOSE; - } - return ER_FILE_WRITE; } - return 0; + *msize = ret; + + close (fd); + + return ret; } int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname)) @@ -158,31 +115,10 @@ int srv_close (struct service *srv) return 0; } -// only get a raw line from TMPDIR/ +// TODO remove, replace by file_read int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize) { - *buf = malloc(BUFSIZ); - memset (*buf, 0, BUFSIZ); - - FILE * f = NULL; - if (file_open (&f, srv->spath, "rb")) { - return ER_FILE_OPEN; - } - - char *ret = NULL; - ret = fgets (*buf, BUFSIZ, f); - if (ret == NULL) { - return ER_FILE_READ; - } - buf[0][BUFSIZ -1] = '\0'; - - if (file_close (f)) { - return ER_FILE_CLOSE; - } - - *msize = strlen (*buf); - - return 0; + return file_read (srv->spath, buf, msize); } int srv_get_new_process (const struct service *srv, struct process *p) @@ -191,39 +127,14 @@ int srv_get_new_process (const struct service *srv, struct process *p) return -1; } - char buf[BUFSIZ]; - memset (buf, 0, BUFSIZ); - - // read the pipe, get a process to work on - struct timespec ts = { 0 }; - struct timespec ts2 = { 0 }; - - FILE * f = NULL; - if (file_open (&f, srv->spath, "rb")) { - return ER_FILE_OPEN; + char *buf = NULL; + size_t msize = 0; + int ret = file_read (srv->spath, &buf, &msize); + if (ret <= 0) { + fprintf (stderr, "err: listening on %s\n", srv->spath); + exit (1); } - clock_gettime(CLOCK_REALTIME, &ts); - - char *ret = NULL; - ret = fgets (buf, BUFSIZ, f); - if (ret == NULL) { - if (file_close (f)) { - return ER_FILE_CLOSE; - } - return ER_FILE_READ; - } - - clock_gettime(CLOCK_REALTIME, &ts2); - if (file_close (f)) { - return ER_FILE_CLOSE; - } - - printf("sec: %ld nsec: %ld\n", ts.tv_sec, ts.tv_nsec); - printf("sec: %ld nsec: %ld\n", ts2.tv_sec, ts2.tv_nsec); - - printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec); - char *token = NULL, *saveptr = NULL; char *str = NULL; int i = 0; @@ -253,85 +164,14 @@ int srv_get_new_process (const struct service *srv, struct process *p) return 0; } -int srv_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)) -{ - if (file_open (&p->out, p->path_out, "rb")) { - fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n"); - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - } - return ER_FILE_OPEN; - } - - int ret = 0; - - if (cb != NULL) { - ret = (*cb) (p->out, buf, msize); - } - else { - ret = file_read (p->out, buf, msize); - } - // printf ("DEBUG read, size %ld : %s\n", *msize, *buf); - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - } - p->out = NULL; - - return ret; -} - int srv_read (struct process *p, char ** buf, size_t * msize) { - if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "rb")) { - fprintf (stderr, "err opening the file %s\n", p->path_out); - return ER_FILE_OPEN; - } - - int ret = 0; - - ret = file_read (p->out, buf, msize); - if (ret != 0) { - p->out = NULL; - } - - // printf ("DEBUG read, size %ld : %s\n", *msize, buf); - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - ret = ER_FILE_CLOSE; - } - p->out = NULL; - - return ret; + return file_read (p->path_out, buf, msize); } int srv_write (struct process *p, char * buf, size_t msize) { - if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "wb")) { - fprintf (stderr, "err opening the file %s\n", p->path_in); - return ER_FILE_OPEN; - } - - int ret = file_write (p->in, buf, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the file %s\n", p->path_in); - p->in = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - p->in = NULL; - return ER_FILE_CLOSE; - } - p->in = NULL; - - return 0; + return file_write (p->path_in, buf, msize); } // APPLICATION @@ -340,27 +180,10 @@ int srv_write (struct process *p, char * buf, size_t msize) int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize) { if (srv == NULL) { - return 1; + return -1; } - - FILE * f = NULL; - if (ER_FILE_OPEN == file_open (&f, srv->spath, "wb")) { - fprintf (stderr, "err opening the service file %s\n", srv->spath); - return ER_FILE_OPEN; - } - - int ret = file_write (f, connectionstr, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the service file %s\n", srv->spath); - return ret; - } - - if (ER_FILE_CLOSE == file_close (f)) { - fprintf (stderr, "err closing the file\n"); - return ER_FILE_CLOSE; - } - - return 0; + + return file_write (srv->spath, connectionstr, msize); } int app_create (struct process *p, pid_t pid, int index, int version) @@ -449,82 +272,12 @@ int app_destroy (struct process *p) return 0; } -int app_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)) -{ - if (file_open (&p->in, p->path_in, "rb")) { - fprintf (stderr, "\033[31merr: app_read_cb, file_open\033[00m\n"); - p->in = NULL; - return 1; - } - - int ret = 0; - - if (cb != NULL) { - ret = (*cb) (p->in, buf, msize); - } - else { - ret = file_read (p->in, buf, msize); - if (ret != 0) { - p->in = NULL; - } - } - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - } - p->in = NULL; - - return 0; -} - int app_read (struct process *p, char ** buf, size_t * msize) { - if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "rb")) { - fprintf (stderr, "err opening the file %s\n", p->path_in); - return ER_FILE_OPEN; - } - - int ret = file_read (p->in, buf, msize); - if (ret != 0) { - p->in = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->in)) { - fprintf (stderr, "err closing the file %s\n", p->path_in); - p->in = NULL; - return ER_FILE_CLOSE; - } - p->in = NULL; - - return 0; + return file_read (p->path_in, buf, msize); } int app_write (struct process *p, char * buf, size_t msize) { - if (buf == NULL) { - return ER_FILE_WRITE_PARAMS; - } - - if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) { - fprintf (stderr, "err opening the file %s\n", p->path_out); - return ER_FILE_OPEN; - } - - int ret = file_write (p->out, buf, msize); - if (ret != 0) { - fprintf (stderr, "err writing in the file %s\n", p->path_out); - p->out = NULL; - return ret; - } - - if (ER_FILE_CLOSE == file_close (p->out)) { - fprintf (stderr, "err closing the file %s\n", p->path_out); - p->out = NULL; - return ER_FILE_CLOSE; - } - p->out = NULL; - - return 0; + return file_write (p->path_out, buf, msize); } diff --git a/lib/communication.h b/lib/communication.h index 6dfbf1d..bc875f5 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -50,8 +50,6 @@ int srv_get_new_process (const struct service *srv, struct process *proc); int srv_create (struct service *srv); int srv_close (struct service *srv); -int srv_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)); int srv_read (struct process *, char ** buf, size_t *); int srv_write (struct process *, char * buf, size_t); @@ -63,15 +61,11 @@ int app_srv_connection (struct service *, const char *, size_t); int app_create (struct process *, pid_t pid, int index, int version); int app_destroy (struct process *); -int app_read_cb (struct process *p, char ** buf, size_t * msize - , int (*cb)(FILE *f, char ** buf, size_t * msize)); int app_read (struct process *, char ** buf, size_t *); int app_write (struct process *, char * buf, size_t); // wrappers -int file_open (FILE **f, const char *path, const char *mode); -int file_close (FILE *f); -int file_read (FILE *f, char **buf, size_t *msize); -int file_write (FILE *f, const char *buf, size_t msize); +int file_read (const char *path, char **buf, size_t *msize); +int file_write (const char *path, const char *buf, size_t msize); #endif diff --git a/lib/process.c b/lib/process.c index 1898f98..1f5a7ed 100644 --- a/lib/process.c +++ b/lib/process.c @@ -28,17 +28,11 @@ void srv_process_gen (struct process *p memset (p->path_out, 0, PATH_MAX); snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index); snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index); - p->in = NULL; - p->out = NULL; -} - -void srv_process_free (struct process * p) -{ - // TODO nothing to do now } void srv_process_print (struct process *p) { if (p != NULL) - printf ("process %d : index %d\n", p->pid, p->index); + printf ("process %d : index %d, version %d\n" + , p->pid, p->index, p->version); } diff --git a/lib/process.h b/lib/process.h index 11bf64d..d707c81 100644 --- a/lib/process.h +++ b/lib/process.h @@ -19,11 +19,9 @@ struct process { unsigned int index; char path_in [PATH_MAX]; char path_out [PATH_MAX]; - FILE *in, *out; }; struct process * srv_process_copy (const struct process *p); -void srv_process_free (struct process * p); int srv_process_eq (const struct process *p1, const struct process *p2); diff --git a/lib/pubsubd.c b/lib/pubsubd.c index 034fd38..1d83374 100644 --- a/lib/pubsubd.c +++ b/lib/pubsubd.c @@ -74,14 +74,14 @@ int pubsubd_channel_new (struct channel *c, const char * name) return 1; } - size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name) + 1; + size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name); printf ("NAME : %s, SIZE : %ld\n", name, nlen); if (c->chan == NULL) - c->chan = malloc (nlen); + c->chan = malloc (nlen +1); - memset (c->chan, 0, nlen); + memset (c->chan, 0, nlen +1); memcpy (c->chan, name, nlen); c->chanlen = nlen; return 0; @@ -89,7 +89,6 @@ int pubsubd_channel_new (struct channel *c, const char * name) void pubsubd_channel_free (struct channel * c) { - // TODO if (c == NULL) return; @@ -255,7 +254,7 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel) { if (todel == NULL || todel->p == NULL) return; - srv_process_free (todel->p); + free (todel->p); } // MESSAGE, TODO CBOR @@ -467,94 +466,6 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale return 0; } -#if 0 -// TODO CBOR -int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize) -{ - // msg: "type(1) chanlen(8) chan datalen(8) data - - printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n"); - - // read - char type = ' '; - if (0 == fread (&type, 1, 1, f)) { - return ER_FILE_READ; - } - - size_t chanlen = 0; - if (0 == fread (&chanlen, sizeof (size_t), 1, f)) { - return ER_FILE_READ; - } - - if (chanlen > BUFSIZ) { - return ER_FILE_READ; - } - - char *chan = NULL; - chan = malloc (chanlen); - - if (chan == NULL) { - return ER_MEM_ALLOC; - } - - if (0 == fread (chan, chanlen, 1, f)) { - return ER_FILE_READ; - } - - size_t datalen = 0; - if (0 == fread (&datalen, sizeof (size_t), 1, f)) { - free (chan); - return ER_FILE_READ; - } - - if (datalen > BUFSIZ) { - return 1; - } - - char *data = NULL; - data = malloc (datalen); - if (data == NULL) { - free (chan); - return ER_MEM_ALLOC; - } - - if (0 == fread (data, datalen, 1, f)) { - free (chan); - free (data); - return ER_FILE_READ; - } - - *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen; - if (*buf == NULL) { - *buf = malloc(*msize); - if (*buf == NULL) { - free (chan); - free (data); - return ER_MEM_ALLOC; - } - } - - // TODO CHECK THIS - size_t i = 0; - - char *cbuf = *buf; - - cbuf[i] = type; i++; - memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t); - memcpy (cbuf + i, chan, chanlen); i += chanlen; - memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t); - memcpy (cbuf + i, data, datalen); i += datalen; - - free (chan); - free (data); - - printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n"); - - return 0; -} - -#endif - // alh from the channel, message to send void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m) { @@ -698,11 +609,7 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m) // read the message from the process size_t mlen = 0; char *buf = NULL; -#if 0 - app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb); -#else - app_read_cb (p, &buf, &mlen, NULL); -#endif + app_read (p, &buf, &mlen); pubsubd_msg_unserialize (m, buf, mlen); diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c index 7fcfa0d..b4c9d6a 100644 --- a/pubsub/pubsub-test-send-params.c +++ b/pubsub/pubsub-test-send-params.c @@ -73,8 +73,6 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in // the application will shut down, and remove the application named pipes if (app_destroy (&p)) ohshit (1, "app_destroy"); - - srv_process_free (&p); } void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version) @@ -93,8 +91,6 @@ void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, // send a message to disconnect // line : pid index version action chan pubsub_disconnect (&p); - - srv_process_free (&p); } int diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c index 976b76d..8aaeca9 100644 --- a/pubsub/pubsub-test-send.c +++ b/pubsub/pubsub-test-send.c @@ -55,7 +55,5 @@ main(int argc, char **argv, char **env) if (app_destroy (&p)) ohshit (1, "app_destroy"); - srv_process_free (&p); - return EXIT_SUCCESS; }