From d754958f9acf037daeeefaf91cc51aa66f38cec7 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Fri, 9 Sep 2016 12:06:25 +0200
Subject: [PATCH] pubsubd: simplifications
---
lib/communication.c | 315 ++++---------------------------
lib/communication.h | 10 +-
lib/process.c | 10 +-
lib/process.h | 2 -
lib/pubsubd.c | 103 +---------
pubsub/pubsub-test-send-params.c | 4 -
pubsub/pubsub-test-send.c | 2 -
7 files changed, 43 insertions(+), 403 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 12a9c93..dd62f7d 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -2,82 +2,39 @@
#include
#include
-int file_open (FILE **f, const char *path, const char *mode)
+int file_write (const char *path, const char *buf, size_t msize)
{
- printf ("opening %s\n", path);
- if (*f != NULL) {
- // printf ("f != NULL : %p\n", (void*) *f);
- if (file_close (*f)) {
- return ER_FILE_CLOSE;
- }
- }
- *f = fopen (path, mode);
- if (*f == NULL) {
- fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
+ int fd = open (path, O_WRONLY);
+ if (fd <= 0) {
return ER_FILE_OPEN;
}
- // printf ("opened : %ld\n", (long) *f);
+
+ int ret = 0;
+ ret = write (fd, buf, msize);
- return 0;
+ close (fd);
+
+ return ret;
}
-int file_close (FILE *f)
+int file_read (const char *path, char **buf, size_t *msize)
{
- if (f != 0) {
- // printf ("before fclosing\n");
- if (fclose (f)) {
- return ER_FILE_CLOSE;
- }
- // printf ("after fclosing\n");
- }
- return 0;
-}
-
-int file_read (FILE *f, char **buf, size_t *msize) {
- if (*msize == 0) {
- *msize = BUFSIZ; // default value
- }
-
- if (*buf == NULL) {
- *buf = malloc (*msize);
- if (*buf == NULL) {
- fprintf (stderr, "err can't allocate enough memory (%ld)\n", *msize);
- int ret = file_close (f);
- if (ret != 0)
- return ret;
- }
+ int fd = open (path, O_RDONLY);
+ if (fd <= 0) {
+ return ER_FILE_OPEN;
}
int ret = 0;
-
- ret = fread (*buf, *msize, 1, f);
+ ret = read (fd, *buf, BUFSIZ);
if (ret < 0) {
- fprintf (stderr, "err can't read a file\n");
- ret = file_close (f);
- if (ret != 0)
- return ret;
- return ER_FILE_READ;
- }
-
- ret = file_close (f);
- if (ret != 0)
return ret;
-
- return 0;
-}
-
-int file_write (FILE *f, const char *buf, size_t msize)
-{
- if (0 == fwrite (buf, msize, 1, f)) {
- fprintf (stderr, "err writing in the file\n");
- if (ER_FILE_CLOSE == file_close (f)) {
- fprintf (stderr, "err closing the file\n");
- return ER_FILE_CLOSE;
- }
- return ER_FILE_WRITE;
}
- return 0;
+ *msize = ret;
+
+ close (fd);
+
+ return ret;
}
int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname))
@@ -158,31 +115,10 @@ int srv_close (struct service *srv)
return 0;
}
-// only get a raw line from TMPDIR/
+// TODO remove, replace by file_read
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize)
{
- *buf = malloc(BUFSIZ);
- memset (*buf, 0, BUFSIZ);
-
- FILE * f = NULL;
- if (file_open (&f, srv->spath, "rb")) {
- return ER_FILE_OPEN;
- }
-
- char *ret = NULL;
- ret = fgets (*buf, BUFSIZ, f);
- if (ret == NULL) {
- return ER_FILE_READ;
- }
- buf[0][BUFSIZ -1] = '\0';
-
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
-
- *msize = strlen (*buf);
-
- return 0;
+ return file_read (srv->spath, buf, msize);
}
int srv_get_new_process (const struct service *srv, struct process *p)
@@ -191,39 +127,14 @@ int srv_get_new_process (const struct service *srv, struct process *p)
return -1;
}
- char buf[BUFSIZ];
- memset (buf, 0, BUFSIZ);
-
- // read the pipe, get a process to work on
- struct timespec ts = { 0 };
- struct timespec ts2 = { 0 };
-
- FILE * f = NULL;
- if (file_open (&f, srv->spath, "rb")) {
- return ER_FILE_OPEN;
+ char *buf = NULL;
+ size_t msize = 0;
+ int ret = file_read (srv->spath, &buf, &msize);
+ if (ret <= 0) {
+ fprintf (stderr, "err: listening on %s\n", srv->spath);
+ exit (1);
}
- clock_gettime(CLOCK_REALTIME, &ts);
-
- char *ret = NULL;
- ret = fgets (buf, BUFSIZ, f);
- if (ret == NULL) {
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
- return ER_FILE_READ;
- }
-
- clock_gettime(CLOCK_REALTIME, &ts2);
- if (file_close (f)) {
- return ER_FILE_CLOSE;
- }
-
- printf("sec: %ld nsec: %ld\n", ts.tv_sec, ts.tv_nsec);
- printf("sec: %ld nsec: %ld\n", ts2.tv_sec, ts2.tv_nsec);
-
- printf("diff nsec: %ld\n", ts2.tv_nsec - ts.tv_nsec);
-
char *token = NULL, *saveptr = NULL;
char *str = NULL;
int i = 0;
@@ -253,85 +164,14 @@ int srv_get_new_process (const struct service *srv, struct process *p)
return 0;
}
-int srv_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize))
-{
- if (file_open (&p->out, p->path_out, "rb")) {
- fprintf (stderr, "\033[31merr: srv_read_cb, file_open\033[00m\n");
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- }
- return ER_FILE_OPEN;
- }
-
- int ret = 0;
-
- if (cb != NULL) {
- ret = (*cb) (p->out, buf, msize);
- }
- else {
- ret = file_read (p->out, buf, msize);
- }
- // printf ("DEBUG read, size %ld : %s\n", *msize, *buf);
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- }
- p->out = NULL;
-
- return ret;
-}
-
int srv_read (struct process *p, char ** buf, size_t * msize)
{
- if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "rb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_out);
- return ER_FILE_OPEN;
- }
-
- int ret = 0;
-
- ret = file_read (p->out, buf, msize);
- if (ret != 0) {
- p->out = NULL;
- }
-
- // printf ("DEBUG read, size %ld : %s\n", *msize, buf);
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- ret = ER_FILE_CLOSE;
- }
- p->out = NULL;
-
- return ret;
+ return file_read (p->path_out, buf, msize);
}
int srv_write (struct process *p, char * buf, size_t msize)
{
- if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "wb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_in);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (p->in, buf, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the file %s\n", p->path_in);
- p->in = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- p->in = NULL;
- return ER_FILE_CLOSE;
- }
- p->in = NULL;
-
- return 0;
+ return file_write (p->path_in, buf, msize);
}
// APPLICATION
@@ -340,27 +180,10 @@ int srv_write (struct process *p, char * buf, size_t msize)
int app_srv_connection (struct service *srv, const char *connectionstr, size_t msize)
{
if (srv == NULL) {
- return 1;
+ return -1;
}
-
- FILE * f = NULL;
- if (ER_FILE_OPEN == file_open (&f, srv->spath, "wb")) {
- fprintf (stderr, "err opening the service file %s\n", srv->spath);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (f, connectionstr, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the service file %s\n", srv->spath);
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (f)) {
- fprintf (stderr, "err closing the file\n");
- return ER_FILE_CLOSE;
- }
-
- return 0;
+
+ return file_write (srv->spath, connectionstr, msize);
}
int app_create (struct process *p, pid_t pid, int index, int version)
@@ -449,82 +272,12 @@ int app_destroy (struct process *p)
return 0;
}
-int app_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize))
-{
- if (file_open (&p->in, p->path_in, "rb")) {
- fprintf (stderr, "\033[31merr: app_read_cb, file_open\033[00m\n");
- p->in = NULL;
- return 1;
- }
-
- int ret = 0;
-
- if (cb != NULL) {
- ret = (*cb) (p->in, buf, msize);
- }
- else {
- ret = file_read (p->in, buf, msize);
- if (ret != 0) {
- p->in = NULL;
- }
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- }
- p->in = NULL;
-
- return 0;
-}
-
int app_read (struct process *p, char ** buf, size_t * msize)
{
- if (ER_FILE_OPEN == file_open (&p->in, p->path_in, "rb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_in);
- return ER_FILE_OPEN;
- }
-
- int ret = file_read (p->in, buf, msize);
- if (ret != 0) {
- p->in = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->in)) {
- fprintf (stderr, "err closing the file %s\n", p->path_in);
- p->in = NULL;
- return ER_FILE_CLOSE;
- }
- p->in = NULL;
-
- return 0;
+ return file_read (p->path_in, buf, msize);
}
int app_write (struct process *p, char * buf, size_t msize)
{
- if (buf == NULL) {
- return ER_FILE_WRITE_PARAMS;
- }
-
- if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) {
- fprintf (stderr, "err opening the file %s\n", p->path_out);
- return ER_FILE_OPEN;
- }
-
- int ret = file_write (p->out, buf, msize);
- if (ret != 0) {
- fprintf (stderr, "err writing in the file %s\n", p->path_out);
- p->out = NULL;
- return ret;
- }
-
- if (ER_FILE_CLOSE == file_close (p->out)) {
- fprintf (stderr, "err closing the file %s\n", p->path_out);
- p->out = NULL;
- return ER_FILE_CLOSE;
- }
- p->out = NULL;
-
- return 0;
+ return file_write (p->path_out, buf, msize);
}
diff --git a/lib/communication.h b/lib/communication.h
index 6dfbf1d..bc875f5 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -50,8 +50,6 @@ int srv_get_new_process (const struct service *srv, struct process *proc);
int srv_create (struct service *srv);
int srv_close (struct service *srv);
-int srv_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize));
int srv_read (struct process *, char ** buf, size_t *);
int srv_write (struct process *, char * buf, size_t);
@@ -63,15 +61,11 @@ int app_srv_connection (struct service *, const char *, size_t);
int app_create (struct process *, pid_t pid, int index, int version);
int app_destroy (struct process *);
-int app_read_cb (struct process *p, char ** buf, size_t * msize
- , int (*cb)(FILE *f, char ** buf, size_t * msize));
int app_read (struct process *, char ** buf, size_t *);
int app_write (struct process *, char * buf, size_t);
// wrappers
-int file_open (FILE **f, const char *path, const char *mode);
-int file_close (FILE *f);
-int file_read (FILE *f, char **buf, size_t *msize);
-int file_write (FILE *f, const char *buf, size_t msize);
+int file_read (const char *path, char **buf, size_t *msize);
+int file_write (const char *path, const char *buf, size_t msize);
#endif
diff --git a/lib/process.c b/lib/process.c
index 1898f98..1f5a7ed 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -28,17 +28,11 @@ void srv_process_gen (struct process *p
memset (p->path_out, 0, PATH_MAX);
snprintf(p->path_in , PATH_MAX, "%s/%d-%d-in" , TMPDIR, pid, index);
snprintf(p->path_out, PATH_MAX, "%s/%d-%d-out", TMPDIR, pid, index);
- p->in = NULL;
- p->out = NULL;
-}
-
-void srv_process_free (struct process * p)
-{
- // TODO nothing to do now
}
void srv_process_print (struct process *p)
{
if (p != NULL)
- printf ("process %d : index %d\n", p->pid, p->index);
+ printf ("process %d : index %d, version %d\n"
+ , p->pid, p->index, p->version);
}
diff --git a/lib/process.h b/lib/process.h
index 11bf64d..d707c81 100644
--- a/lib/process.h
+++ b/lib/process.h
@@ -19,11 +19,9 @@ struct process {
unsigned int index;
char path_in [PATH_MAX];
char path_out [PATH_MAX];
- FILE *in, *out;
};
struct process * srv_process_copy (const struct process *p);
-void srv_process_free (struct process * p);
int srv_process_eq (const struct process *p1, const struct process *p2);
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 034fd38..1d83374 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -74,14 +74,14 @@ int pubsubd_channel_new (struct channel *c, const char * name)
return 1;
}
- size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name) + 1;
+ size_t nlen = (strlen (name) > BUFSIZ) ? BUFSIZ : strlen (name);
printf ("NAME : %s, SIZE : %ld\n", name, nlen);
if (c->chan == NULL)
- c->chan = malloc (nlen);
+ c->chan = malloc (nlen +1);
- memset (c->chan, 0, nlen);
+ memset (c->chan, 0, nlen +1);
memcpy (c->chan, name, nlen);
c->chanlen = nlen;
return 0;
@@ -89,7 +89,6 @@ int pubsubd_channel_new (struct channel *c, const char * name)
void pubsubd_channel_free (struct channel * c)
{
- // TODO
if (c == NULL)
return;
@@ -255,7 +254,7 @@ void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{
if (todel == NULL || todel->p == NULL)
return;
- srv_process_free (todel->p);
+ free (todel->p);
}
// MESSAGE, TODO CBOR
@@ -467,94 +466,6 @@ int pubsubd_get_new_process (struct service *srv, struct app_list_elm *ale
return 0;
}
-#if 0
-// TODO CBOR
-int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize)
-{
- // msg: "type(1) chanlen(8) chan datalen(8) data
-
- printf ("\033[36m ON PASSE DANS pubsubd_msg_read_cb \033[00m \n");
-
- // read
- char type = ' ';
- if (0 == fread (&type, 1, 1, f)) {
- return ER_FILE_READ;
- }
-
- size_t chanlen = 0;
- if (0 == fread (&chanlen, sizeof (size_t), 1, f)) {
- return ER_FILE_READ;
- }
-
- if (chanlen > BUFSIZ) {
- return ER_FILE_READ;
- }
-
- char *chan = NULL;
- chan = malloc (chanlen);
-
- if (chan == NULL) {
- return ER_MEM_ALLOC;
- }
-
- if (0 == fread (chan, chanlen, 1, f)) {
- return ER_FILE_READ;
- }
-
- size_t datalen = 0;
- if (0 == fread (&datalen, sizeof (size_t), 1, f)) {
- free (chan);
- return ER_FILE_READ;
- }
-
- if (datalen > BUFSIZ) {
- return 1;
- }
-
- char *data = NULL;
- data = malloc (datalen);
- if (data == NULL) {
- free (chan);
- return ER_MEM_ALLOC;
- }
-
- if (0 == fread (data, datalen, 1, f)) {
- free (chan);
- free (data);
- return ER_FILE_READ;
- }
-
- *msize = 1 + 2 * sizeof (size_t) + chanlen + datalen;
- if (*buf == NULL) {
- *buf = malloc(*msize);
- if (*buf == NULL) {
- free (chan);
- free (data);
- return ER_MEM_ALLOC;
- }
- }
-
- // TODO CHECK THIS
- size_t i = 0;
-
- char *cbuf = *buf;
-
- cbuf[i] = type; i++;
- memcpy (cbuf + i, &chanlen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (cbuf + i, chan, chanlen); i += chanlen;
- memcpy (cbuf + i, &datalen, sizeof(size_t)); i += sizeof(size_t);
- memcpy (cbuf + i, data, datalen); i += datalen;
-
- free (chan);
- free (data);
-
- printf ("\033[36m ON SORT de pubsubd_msg_read_cb \033[00m \n");
-
- return 0;
-}
-
-#endif
-
// alh from the channel, message to send
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg * m)
{
@@ -698,11 +609,7 @@ void pubsub_msg_recv (struct process *p, struct pubsub_msg * m)
// read the message from the process
size_t mlen = 0;
char *buf = NULL;
-#if 0
- app_read_cb (p, &buf, &mlen, pubsubd_msg_read_cb);
-#else
- app_read_cb (p, &buf, &mlen, NULL);
-#endif
+ app_read (p, &buf, &mlen);
pubsubd_msg_unserialize (m, buf, mlen);
diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c
index 7fcfa0d..b4c9d6a 100644
--- a/pubsub/pubsub-test-send-params.c
+++ b/pubsub/pubsub-test-send-params.c
@@ -73,8 +73,6 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
// the application will shut down, and remove the application named pipes
if (app_destroy (&p))
ohshit (1, "app_destroy");
-
- srv_process_free (&p);
}
void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version)
@@ -93,8 +91,6 @@ void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index,
// send a message to disconnect
// line : pid index version action chan
pubsub_disconnect (&p);
-
- srv_process_free (&p);
}
int
diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c
index 976b76d..8aaeca9 100644
--- a/pubsub/pubsub-test-send.c
+++ b/pubsub/pubsub-test-send.c
@@ -55,7 +55,5 @@ main(int argc, char **argv, char **env)
if (app_destroy (&p))
ohshit (1, "app_destroy");
- srv_process_free (&p);
-
return EXIT_SUCCESS;
}