From 773703a23449c1b2d0241621e99721574c232b70 Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Mon, 13 Jun 2016 09:47:19 +0200
Subject: [PATCH] plein de petits trucs
---
lib/communication.c | 26 +++++++++---
lib/communication.h | 19 +++++----
lib/process.c | 4 +-
lib/pubsubd.c | 72 ++++++++++++++++++++++++++++----
lib/pubsubd.h | 13 +++++-
pingpong/pingpong.c | 2 +-
pubsub/pubsub-test-send-params.c | 38 ++++++++---------
pubsub/pubsub-test-send.c | 6 ++-
pubsub/pubsubd.c | 59 ++++++++++++++++----------
9 files changed, 167 insertions(+), 72 deletions(-)
diff --git a/lib/communication.c b/lib/communication.c
index 4cdbd56..6abd9f2 100644
--- a/lib/communication.c
+++ b/lib/communication.c
@@ -13,7 +13,7 @@ int file_open (FILE **f, const char *path, const char *mode)
}
*f = fopen (path, mode);
if (*f == NULL) {
- fprintf (stderr, "\033[31mnot opened\033[00m\n");
+ fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
return ER_FILE_OPEN;
}
printf ("opened : %ld\n", (long) *f);
@@ -75,10 +75,10 @@ int file_write (FILE *f, const char *buf, size_t msize)
return 0;
}
-void srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname)
+int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname))
{
if (srv == NULL)
- return;
+ return ER_PARAMS;
// TODO
// use the argc, argv and env parameters
@@ -97,6 +97,14 @@ void srv_init (int argc, char **argv, char **env, struct service *srv, const cha
srv->version = COMMUNICATION_VERSION;
srv->index = 0; // TODO
+
+ if (cb != NULL) {
+ int ret = (*cb) (argc, argv, env, srv, sname);
+ if (ret != 0)
+ return ret;
+ }
+
+ return 0;
}
// SERVICE
@@ -352,12 +360,14 @@ int app_srv_connection (struct service *srv, const char *connectionstr, size_t m
return 0;
}
-int app_create (struct process *p, int index)
+int app_create (struct process *p, pid_t pid, int index, int version)
{
- pid_t pid = getpid();
+ if (version == 0) {
+ version = COMMUNICATION_VERSION;
+ }
// then creates the structure
- srv_process_gen (p, pid, index, COMMUNICATION_VERSION);
+ srv_process_gen (p, pid, index, version);
// creates the pipes
int ret;
@@ -492,6 +502,10 @@ int app_read (struct process *p, char ** buf, size_t * msize)
int app_write (struct process *p, char * buf, size_t msize)
{
+ if (buf == NULL) {
+ return ER_FILE_WRITE_PARAMS;
+ }
+
if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) {
fprintf (stderr, "err opening the file %s\n", p->path_out);
return ER_FILE_OPEN;
diff --git a/lib/communication.h b/lib/communication.h
index 764dd9e..6dfbf1d 100644
--- a/lib/communication.h
+++ b/lib/communication.h
@@ -16,12 +16,14 @@
#define COMMUNICATION_VERSION 1
-#define ER_FILE_OPEN 1
-#define ER_FILE_CLOSE 2
-#define ER_FILE_READ 3
-#define ER_FILE_WRITE 4
+#define ER_FILE_OPEN 1
+#define ER_FILE_CLOSE 2
+#define ER_FILE_READ 3
+#define ER_FILE_WRITE 4
+#define ER_FILE_WRITE_PARAMS 5
#define ER_MEM_ALLOC 100
+#define ER_PARAMS 101
struct service {
unsigned int version;
@@ -30,7 +32,10 @@ struct service {
FILE *spipe;
};
-void srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname);
+int srv_init (int argc, char **argv, char **env
+ , struct service *srv, const char *sname
+ , int (*cb)(int argc, char **argv, char **env
+ , struct service *srv, const char *sname));
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize);
int srv_get_new_process (const struct service *srv, struct process *proc);
@@ -55,8 +60,8 @@ int srv_write (struct process *, char * buf, size_t);
// send the connection string to $TMP/
int app_srv_connection (struct service *, const char *, size_t);
-int app_create (struct process *, int index); // called by the application
-int app_destroy (struct process *); // called by the application
+int app_create (struct process *, pid_t pid, int index, int version);
+int app_destroy (struct process *);
int app_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize));
diff --git a/lib/process.c b/lib/process.c
index be4086b..1898f98 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -34,9 +34,7 @@ void srv_process_gen (struct process *p
void srv_process_free (struct process * p)
{
- if (! p)
- return;
- free (p);
+ // TODO nothing to do now
}
void srv_process_print (struct process *p)
diff --git a/lib/pubsubd.c b/lib/pubsubd.c
index 624decb..facc71f 100644
--- a/lib/pubsubd.c
+++ b/lib/pubsubd.c
@@ -175,10 +175,10 @@ pubsubd_subscriber_add (struct app_list_head *alh, const struct app_list_elm *al
}
struct app_list_elm *
-pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list_elm *p)
+pubsubd_subscriber_get (const struct app_list_head *alh, const struct app_list_elm *p)
{
struct app_list_elm *np = NULL, *res = NULL;
- LIST_FOREACH(np, chans, entries) {
+ LIST_FOREACH(np, alh, entries) {
if(pubsubd_subscriber_eq (np, p)) {
res = np;
}
@@ -186,16 +186,19 @@ pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list
return res;
}
-void
-pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p)
+int
+pubsubd_subscriber_del (struct app_list_head *alh, struct app_list_elm *p)
{
- struct app_list_elm *todel = pubsubd_subscriber_get (chans, p);
+ struct app_list_elm *todel = pubsubd_subscriber_get (alh, p);
if(todel != NULL) {
pubsubd_app_list_elm_free (todel);
LIST_REMOVE(todel, entries);
free (todel);
todel = NULL;
+ return 0;
}
+
+ return 1;
}
void pubsubd_subscriber_del_all (struct app_list_head *alh)
@@ -238,7 +241,20 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
return;
// msg: "type(1) chanlen(8) chan datalen(8) data
- *len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen;
+ if (msg->type == PUBSUB_TYPE_DISCONNECT) {
+ *len = 1;
+ if (*data != NULL) {
+ free (*data);
+ *data = NULL;
+ }
+ *data = malloc(*len);
+ data[0][0] = msg->type;
+ return;
+ }
+ else {
+ // type + size chan + chan + size data + data
+ *len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
+ }
if (*data != NULL) {
free (*data);
@@ -279,6 +295,14 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
size_t i = 0;
msg->type = data[i]; i++;
+ if (msg->type == PUBSUB_TYPE_DISCONNECT) {
+ msg->chanlen = 0;
+ msg->chan = NULL;
+ msg->datalen = 0;
+ msg->data = NULL;
+ return ;
+ }
+
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
if (msg->chanlen > BUFSIZ) {
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
@@ -579,7 +603,7 @@ void pubsub_connection (struct service *srv, struct process *p, enum app_list_el
memset (line, 0, BUFSIZ);
// line fmt : pid index version action chan
- // "quit" action is also possible (see pubsub_disconnect)
+ // "quit" action is also possible (see pubsubd_quit)
snprintf (line, BUFSIZ, "%d %d %d %s %s\n"
, p->pid, p->index, p->version
, straction
@@ -593,8 +617,40 @@ void pubsub_connection (struct service *srv, struct process *p, enum app_list_el
free (straction);
}
+void pubsub_disconnect (struct process *p)
+{
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
+ m.type = PUBSUB_TYPE_DISCONNECT;
+
+ char *buf = NULL;
+ size_t msize = 0;
+ pubsubd_msg_serialize (&m, &buf, &msize);
+
+ int ret = app_write (p, buf, msize);
+ switch (ret) {
+ case ER_FILE_WRITE :
+ fprintf (stderr, "err: ER_FILE_WRITE\n");
+ break;
+ case ER_FILE_WRITE_PARAMS :
+ fprintf (stderr, "err: ER_FILE_WRITE_PARAMS\n");
+ break;
+ case ER_FILE_OPEN :
+ fprintf (stderr, "err: ER_FILE_OPEN\n");
+ break;
+ case ER_FILE_CLOSE :
+ fprintf (stderr, "err: ER_FILE_CLOSE\n");
+ break;
+ }
+
+ pubsubd_msg_free (&m);
+ if (buf != NULL) {
+ free (buf);
+ }
+}
+
// tell the service to stop
-void pubsub_disconnect (struct service *srv)
+void pubsubd_quit (struct service *srv)
{
// line fmt : 0 0 0 quit
char line[BUFSIZ];
diff --git a/lib/pubsubd.h b/lib/pubsubd.h
index bc44c51..30f5336 100644
--- a/lib/pubsubd.h
+++ b/lib/pubsubd.h
@@ -41,6 +41,7 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
+void pubsub_disconnect (struct process *p);
void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg);
void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg);
@@ -72,6 +73,14 @@ struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c
void pubsubd_channels_del (struct channels *chans, struct channel *c);
void pubsubd_channels_del_all (struct channels *chans);
+// remove an app_list_elm from the list (msg type DISCONNECT received)
+int pubsubd_channels_del_subscriber (struct channels *chans
+ , struct channel *c);
+
+struct channel *
+pubsubd_channels_search_from_app_list_elm (struct channels *chans
+ , struct app_list_elm *ale);
+
// APPLICATION
// head of the list
@@ -94,7 +103,7 @@ void pubsubd_subscriber_add (struct app_list_head *
, const struct app_list_elm *);
struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
, const struct app_list_elm *);
-void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
+int pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
void pubsubd_subscriber_del_all (struct app_list_head *alh);
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale);
@@ -102,6 +111,6 @@ void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
-void pubsub_disconnect (struct service *srv);
+void pubsubd_quit (struct service *srv);
#endif
diff --git a/pingpong/pingpong.c b/pingpong/pingpong.c
index a40b8e5..5d3f4fb 100644
--- a/pingpong/pingpong.c
+++ b/pingpong/pingpong.c
@@ -66,7 +66,7 @@ void main_loop (const struct service *srv)
int main(int argc, char * argv[], char **env)
{
struct service srv;
- srv_init (argc, argv, env, &srv, PONGD_SERVICE_NAME);
+ srv_init (argc, argv, env, &srv, PONGD_SERVICE_NAME, NULL);
printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications
diff --git a/pubsub/pubsub-test-send-params.c b/pubsub/pubsub-test-send-params.c
index a37e15b..88fd4c8 100644
--- a/pubsub/pubsub-test-send-params.c
+++ b/pubsub/pubsub-test-send-params.c
@@ -3,7 +3,6 @@
#include
#define MYMESSAGE "coucou"
-#define MYCHAN "chan1"
void
ohshit(int rvalue, const char* str) {
@@ -18,45 +17,44 @@ void usage (char **argv)
void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan)
{
-
printf ("Simulate connnection : pid %d index %d version %d "
"cmd %s chan %s\n"
, pid, index, version, cmd, chan );
struct service srv;
- bzero (&srv, sizeof (struct service));
- srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME);
+ memset (&srv, 0, sizeof (struct service));
+ srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Writing on %s.\n", srv.spath);
struct process p;
- bzero (&p, sizeof (struct process));
+ memset (&p, 0, sizeof (struct process));
- if (app_create (&p, index)) // called by the application
+ if (app_create (&p, pid, index, version)) // called by the application
ohshit (1, "app_create");
// send a message to warn the service we want to do something
// line : pid index version action chan
- pubsub_connection (&srv, &p, PUBSUB_PUB, MYCHAN);
+ pubsub_connection (&srv, &p, PUBSUB_PUB, chan);
struct pubsub_msg m;
- bzero (&m, sizeof (struct pubsub_msg));
+ memset (&m, 0, sizeof (struct pubsub_msg));
// first message, "coucou"
m.type = PUBSUB_TYPE_INFO;
- m.chan = malloc (strlen (MYCHAN));
- m.chanlen = strlen (MYCHAN);
- m.data = malloc (strlen (MYMESSAGE));
+ m.chan = malloc (strlen (chan) + 1);
+ m.chan[strlen (chan)] = '\0';
+ m.chanlen = strlen (chan);
+ m.data = malloc (strlen (MYMESSAGE) + 1);
+ m.datalen = strlen (MYMESSAGE);
m.datalen = strlen (MYMESSAGE);
pubsub_msg_send (&p, &m);
- // second message, to disconnect from the server
- m.type = PUBSUB_TYPE_DISCONNECT;
- pubsub_msg_send (&p, &m);
-
// free everything
-
pubsubd_msg_free (&m);
+ // disconnect from the server
+ pubsub_disconnect (&p);
+
// the application will shut down, and remove the application named pipes
if (app_destroy (&p))
ohshit (1, "app_destroy");
@@ -67,19 +65,19 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version)
{
struct service srv;
- bzero (&srv, sizeof (struct service));
- srv_init (&srv, PUBSUB_SERVICE_NAME);
+ memset (&srv, 0, sizeof (struct service));
+ srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Disconnecting from %s.\n", srv.spath);
struct process p;
- bzero (&p, sizeof (struct process));
+ memset (&p, 0, sizeof (struct process));
// create the fake process
srv_process_gen (&p, pid, index, version);
// send a message to disconnect
// line : pid index version action chan
- pubsub_disconnect (&srv, &p, PUBSUB_PUB, MYCHAN);
+ pubsub_disconnect (&p);
srv_process_free (&p);
}
diff --git a/pubsub/pubsub-test-send.c b/pubsub/pubsub-test-send.c
index c1f40c8..976b76d 100644
--- a/pubsub/pubsub-test-send.c
+++ b/pubsub/pubsub-test-send.c
@@ -16,14 +16,16 @@ main(int argc, char **argv, char **env)
{
struct service srv;
memset (&srv, 0, sizeof (struct service));
- srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME);
+ srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Writing on %s.\n", srv.spath);
struct process p;
memset (&p, 0, sizeof (struct process));
int index = 1;
- if (app_create (&p, index)) // called by the application
+ pid_t pid = getpid();
+
+ if (app_create (&p, pid, index, COMMUNICATION_VERSION))
ohshit (1, "app_create");
// send a message to warn the service we want to do something
diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c
index 9396c5a..f8c6ffd 100644
--- a/pubsub/pubsubd.c
+++ b/pubsub/pubsubd.c
@@ -19,35 +19,48 @@ void * pubsubd_worker_thread (void *params)
{
struct worker_params *wp = (struct worker_params *) params;
- // each chan has a list of subscribers
- // someone who only push a msg doesn't need to be registered
- if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
- // TODO add it to the application to follow
- // TODO publish a message
- printf ("publish or publish and subscribe to something\n");
+ while (1) {
+ // each chan has a list of subscribers
+ // someone who only push a msg doesn't need to be registered
+ if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
+ // publish a message
+ printf ("publish or publish and subscribe to chan %s\n"
+ , wp->chan->chan);
- struct pubsub_msg m;
- memset (&m, 0, sizeof (struct pubsub_msg));
- pubsubd_msg_recv (wp->ale->p, &m);
+ struct pubsub_msg m;
+ memset (&m, 0, sizeof (struct pubsub_msg));
- pubsubd_msg_print (&m);
+ sleep (5);
+ pubsubd_msg_recv (wp->ale->p, &m);
- if (m.type == PUBSUB_TYPE_DISCONNECT) {
- // TODO remove the application from the subscribers
+ pubsubd_msg_print (&m);
+
+ if (m.type == PUBSUB_TYPE_DISCONNECT) {
+ // TODO remove the application from the subscribers
+ if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
+ fprintf (stderr, "err : subscriber not registered\n");
+ }
+ break;
+ }
+ else {
+ struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
+ pubsubd_msg_send (chan->alh, &m);
+ }
+ }
+ else if (wp->ale->action == PUBSUB_SUB) {
+ // subscribe to a channel, no need to loop
+ // already subscribed
+ // printf ("subscribe to %s\n", wp->chan->chan);
+ // pubsubd_subscriber_add (wp->chan->alh, wp->ale);
+ break;
}
else {
- struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
- pubsubd_msg_send (chan->alh, &m);
+ // unrecognized command, no need to loop
+ printf ("\033[31mdo not know what you want to do\033[00m\n");
+ printf ("\tale->p : %p\n", (void*) wp->ale->p);
+ break;
}
}
- else if (wp->ale->action == PUBSUB_SUB) {
- // TODO
- printf ("subscribe to something\n");
- }
- else {
- printf ("\033[31mdo not know what you want to do\033[00m\n");
- printf ("\tale->p : %p\n", (void*) wp->ale->p);
- }
pubsubd_app_list_elm_free (wp->ale);
@@ -59,7 +72,7 @@ main(int argc, char **argv, char **env)
{
struct service srv;
memset (&srv, 0, sizeof (struct service));
- srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME);
+ srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications