Obsolete
/
libipc-old
Archived
3
0
Fork 0

plein de petits trucs

more_to_read
Philippe PITTOLI 2016-06-13 09:47:19 +02:00
parent 07f846b40e
commit 773703a234
9 changed files with 167 additions and 72 deletions

View File

@ -13,7 +13,7 @@ int file_open (FILE **f, const char *path, const char *mode)
} }
*f = fopen (path, mode); *f = fopen (path, mode);
if (*f == NULL) { if (*f == NULL) {
fprintf (stderr, "\033[31mnot opened\033[00m\n"); fprintf (stderr, "\033[31mnot opened %s\033[00m\n", path);
return ER_FILE_OPEN; return ER_FILE_OPEN;
} }
printf ("opened : %ld\n", (long) *f); printf ("opened : %ld\n", (long) *f);
@ -75,10 +75,10 @@ int file_write (FILE *f, const char *buf, size_t msize)
return 0; return 0;
} }
void srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname) int srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname, int (*cb)(int argc, char **argv, char **env, struct service *srv, const char *sname))
{ {
if (srv == NULL) if (srv == NULL)
return; return ER_PARAMS;
// TODO // TODO
// use the argc, argv and env parameters // use the argc, argv and env parameters
@ -97,6 +97,14 @@ void srv_init (int argc, char **argv, char **env, struct service *srv, const cha
srv->version = COMMUNICATION_VERSION; srv->version = COMMUNICATION_VERSION;
srv->index = 0; // TODO srv->index = 0; // TODO
if (cb != NULL) {
int ret = (*cb) (argc, argv, env, srv, sname);
if (ret != 0)
return ret;
}
return 0;
} }
// SERVICE // SERVICE
@ -352,12 +360,14 @@ int app_srv_connection (struct service *srv, const char *connectionstr, size_t m
return 0; return 0;
} }
int app_create (struct process *p, int index) int app_create (struct process *p, pid_t pid, int index, int version)
{ {
pid_t pid = getpid(); if (version == 0) {
version = COMMUNICATION_VERSION;
}
// then creates the structure // then creates the structure
srv_process_gen (p, pid, index, COMMUNICATION_VERSION); srv_process_gen (p, pid, index, version);
// creates the pipes // creates the pipes
int ret; int ret;
@ -492,6 +502,10 @@ int app_read (struct process *p, char ** buf, size_t * msize)
int app_write (struct process *p, char * buf, size_t msize) int app_write (struct process *p, char * buf, size_t msize)
{ {
if (buf == NULL) {
return ER_FILE_WRITE_PARAMS;
}
if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) { if (ER_FILE_OPEN == file_open (&p->out, p->path_out, "wb")) {
fprintf (stderr, "err opening the file %s\n", p->path_out); fprintf (stderr, "err opening the file %s\n", p->path_out);
return ER_FILE_OPEN; return ER_FILE_OPEN;

View File

@ -16,12 +16,14 @@
#define COMMUNICATION_VERSION 1 #define COMMUNICATION_VERSION 1
#define ER_FILE_OPEN 1 #define ER_FILE_OPEN 1
#define ER_FILE_CLOSE 2 #define ER_FILE_CLOSE 2
#define ER_FILE_READ 3 #define ER_FILE_READ 3
#define ER_FILE_WRITE 4 #define ER_FILE_WRITE 4
#define ER_FILE_WRITE_PARAMS 5
#define ER_MEM_ALLOC 100 #define ER_MEM_ALLOC 100
#define ER_PARAMS 101
struct service { struct service {
unsigned int version; unsigned int version;
@ -30,7 +32,10 @@ struct service {
FILE *spipe; FILE *spipe;
}; };
void srv_init (int argc, char **argv, char **env, struct service *srv, const char *sname); int srv_init (int argc, char **argv, char **env
, struct service *srv, const char *sname
, int (*cb)(int argc, char **argv, char **env
, struct service *srv, const char *sname));
int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize); int srv_get_listen_raw (const struct service *srv, char **buf, size_t *msize);
int srv_get_new_process (const struct service *srv, struct process *proc); int srv_get_new_process (const struct service *srv, struct process *proc);
@ -55,8 +60,8 @@ int srv_write (struct process *, char * buf, size_t);
// send the connection string to $TMP/<service> // send the connection string to $TMP/<service>
int app_srv_connection (struct service *, const char *, size_t); int app_srv_connection (struct service *, const char *, size_t);
int app_create (struct process *, int index); // called by the application int app_create (struct process *, pid_t pid, int index, int version);
int app_destroy (struct process *); // called by the application int app_destroy (struct process *);
int app_read_cb (struct process *p, char ** buf, size_t * msize int app_read_cb (struct process *p, char ** buf, size_t * msize
, int (*cb)(FILE *f, char ** buf, size_t * msize)); , int (*cb)(FILE *f, char ** buf, size_t * msize));

View File

@ -34,9 +34,7 @@ void srv_process_gen (struct process *p
void srv_process_free (struct process * p) void srv_process_free (struct process * p)
{ {
if (! p) // TODO nothing to do now
return;
free (p);
} }
void srv_process_print (struct process *p) void srv_process_print (struct process *p)

View File

@ -175,10 +175,10 @@ pubsubd_subscriber_add (struct app_list_head *alh, const struct app_list_elm *al
} }
struct app_list_elm * struct app_list_elm *
pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list_elm *p) pubsubd_subscriber_get (const struct app_list_head *alh, const struct app_list_elm *p)
{ {
struct app_list_elm *np = NULL, *res = NULL; struct app_list_elm *np = NULL, *res = NULL;
LIST_FOREACH(np, chans, entries) { LIST_FOREACH(np, alh, entries) {
if(pubsubd_subscriber_eq (np, p)) { if(pubsubd_subscriber_eq (np, p)) {
res = np; res = np;
} }
@ -186,16 +186,19 @@ pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list
return res; return res;
} }
void int
pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p) pubsubd_subscriber_del (struct app_list_head *alh, struct app_list_elm *p)
{ {
struct app_list_elm *todel = pubsubd_subscriber_get (chans, p); struct app_list_elm *todel = pubsubd_subscriber_get (alh, p);
if(todel != NULL) { if(todel != NULL) {
pubsubd_app_list_elm_free (todel); pubsubd_app_list_elm_free (todel);
LIST_REMOVE(todel, entries); LIST_REMOVE(todel, entries);
free (todel); free (todel);
todel = NULL; todel = NULL;
return 0;
} }
return 1;
} }
void pubsubd_subscriber_del_all (struct app_list_head *alh) void pubsubd_subscriber_del_all (struct app_list_head *alh)
@ -238,7 +241,20 @@ void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *l
return; return;
// msg: "type(1) chanlen(8) chan datalen(8) data // msg: "type(1) chanlen(8) chan datalen(8) data
*len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen; if (msg->type == PUBSUB_TYPE_DISCONNECT) {
*len = 1;
if (*data != NULL) {
free (*data);
*data = NULL;
}
*data = malloc(*len);
data[0][0] = msg->type;
return;
}
else {
// type + size chan + chan + size data + data
*len = 1 + 2 * sizeof(size_t) + msg->chanlen + msg->datalen;
}
if (*data != NULL) { if (*data != NULL) {
free (*data); free (*data);
@ -279,6 +295,14 @@ void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t l
size_t i = 0; size_t i = 0;
msg->type = data[i]; i++; msg->type = data[i]; i++;
if (msg->type == PUBSUB_TYPE_DISCONNECT) {
msg->chanlen = 0;
msg->chan = NULL;
msg->datalen = 0;
msg->data = NULL;
return ;
}
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t); memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
if (msg->chanlen > BUFSIZ) { if (msg->chanlen > BUFSIZ) {
fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n"); fprintf (stderr, "\033[31merr : msg->chanlen > BUFSIZ\033[00m\n");
@ -579,7 +603,7 @@ void pubsub_connection (struct service *srv, struct process *p, enum app_list_el
memset (line, 0, BUFSIZ); memset (line, 0, BUFSIZ);
// line fmt : pid index version action chan // line fmt : pid index version action chan
// "quit" action is also possible (see pubsub_disconnect) // "quit" action is also possible (see pubsubd_quit)
snprintf (line, BUFSIZ, "%d %d %d %s %s\n" snprintf (line, BUFSIZ, "%d %d %d %s %s\n"
, p->pid, p->index, p->version , p->pid, p->index, p->version
, straction , straction
@ -593,8 +617,40 @@ void pubsub_connection (struct service *srv, struct process *p, enum app_list_el
free (straction); free (straction);
} }
void pubsub_disconnect (struct process *p)
{
struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg));
m.type = PUBSUB_TYPE_DISCONNECT;
char *buf = NULL;
size_t msize = 0;
pubsubd_msg_serialize (&m, &buf, &msize);
int ret = app_write (p, buf, msize);
switch (ret) {
case ER_FILE_WRITE :
fprintf (stderr, "err: ER_FILE_WRITE\n");
break;
case ER_FILE_WRITE_PARAMS :
fprintf (stderr, "err: ER_FILE_WRITE_PARAMS\n");
break;
case ER_FILE_OPEN :
fprintf (stderr, "err: ER_FILE_OPEN\n");
break;
case ER_FILE_CLOSE :
fprintf (stderr, "err: ER_FILE_CLOSE\n");
break;
}
pubsubd_msg_free (&m);
if (buf != NULL) {
free (buf);
}
}
// tell the service to stop // tell the service to stop
void pubsub_disconnect (struct service *srv) void pubsubd_quit (struct service *srv)
{ {
// line fmt : 0 0 0 quit // line fmt : 0 0 0 quit
char line[BUFSIZ]; char line[BUFSIZ];

View File

@ -41,6 +41,7 @@ int pubsubd_msg_read_cb (FILE *f, char ** buf, size_t * msize);
void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m); void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg *m);
void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m); void pubsubd_msg_recv (struct process *p, struct pubsub_msg *m);
void pubsub_disconnect (struct process *p);
void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg); void pubsub_msg_send (struct process *p, const struct pubsub_msg *msg);
void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg); void pubsub_msg_recv (struct process *p, struct pubsub_msg *msg);
@ -72,6 +73,14 @@ struct channel * pubsubd_channels_add (struct channels *chans, struct channel *c
void pubsubd_channels_del (struct channels *chans, struct channel *c); void pubsubd_channels_del (struct channels *chans, struct channel *c);
void pubsubd_channels_del_all (struct channels *chans); void pubsubd_channels_del_all (struct channels *chans);
// remove an app_list_elm from the list (msg type DISCONNECT received)
int pubsubd_channels_del_subscriber (struct channels *chans
, struct channel *c);
struct channel *
pubsubd_channels_search_from_app_list_elm (struct channels *chans
, struct app_list_elm *ale);
// APPLICATION // APPLICATION
// head of the list // head of the list
@ -94,7 +103,7 @@ void pubsubd_subscriber_add (struct app_list_head *
, const struct app_list_elm *); , const struct app_list_elm *);
struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head * struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
, const struct app_list_elm *); , const struct app_list_elm *);
void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p); int pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
void pubsubd_subscriber_del_all (struct app_list_head *alh); void pubsubd_subscriber_del_all (struct app_list_head *alh);
struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale); struct app_list_elm * pubsubd_app_list_elm_copy (const struct app_list_elm *ale);
@ -102,6 +111,6 @@ void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel); void pubsubd_app_list_elm_free (struct app_list_elm *todel);
void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame); void pubsub_connection (struct service *srv, struct process *p, enum app_list_elm_action action, const char *channame);
void pubsub_disconnect (struct service *srv); void pubsubd_quit (struct service *srv);
#endif #endif

View File

@ -66,7 +66,7 @@ void main_loop (const struct service *srv)
int main(int argc, char * argv[], char **env) int main(int argc, char * argv[], char **env)
{ {
struct service srv; struct service srv;
srv_init (argc, argv, env, &srv, PONGD_SERVICE_NAME); srv_init (argc, argv, env, &srv, PONGD_SERVICE_NAME, NULL);
printf ("Listening on %s.\n", srv.spath); printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications // creates the service named pipe, that listens to client applications

View File

@ -3,7 +3,6 @@
#include <pthread.h> #include <pthread.h>
#define MYMESSAGE "coucou" #define MYMESSAGE "coucou"
#define MYCHAN "chan1"
void void
ohshit(int rvalue, const char* str) { ohshit(int rvalue, const char* str) {
@ -18,45 +17,44 @@ void usage (char **argv)
void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan) void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, int version, char *cmd, char *chan)
{ {
printf ("Simulate connnection : pid %d index %d version %d " printf ("Simulate connnection : pid %d index %d version %d "
"cmd %s chan %s\n" "cmd %s chan %s\n"
, pid, index, version, cmd, chan ); , pid, index, version, cmd, chan );
struct service srv; struct service srv;
bzero (&srv, sizeof (struct service)); memset (&srv, 0, sizeof (struct service));
srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME); srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Writing on %s.\n", srv.spath); printf ("Writing on %s.\n", srv.spath);
struct process p; struct process p;
bzero (&p, sizeof (struct process)); memset (&p, 0, sizeof (struct process));
if (app_create (&p, index)) // called by the application if (app_create (&p, pid, index, version)) // called by the application
ohshit (1, "app_create"); ohshit (1, "app_create");
// send a message to warn the service we want to do something // send a message to warn the service we want to do something
// line : pid index version action chan // line : pid index version action chan
pubsub_connection (&srv, &p, PUBSUB_PUB, MYCHAN); pubsub_connection (&srv, &p, PUBSUB_PUB, chan);
struct pubsub_msg m; struct pubsub_msg m;
bzero (&m, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
// first message, "coucou" // first message, "coucou"
m.type = PUBSUB_TYPE_INFO; m.type = PUBSUB_TYPE_INFO;
m.chan = malloc (strlen (MYCHAN)); m.chan = malloc (strlen (chan) + 1);
m.chanlen = strlen (MYCHAN); m.chan[strlen (chan)] = '\0';
m.data = malloc (strlen (MYMESSAGE)); m.chanlen = strlen (chan);
m.data = malloc (strlen (MYMESSAGE) + 1);
m.datalen = strlen (MYMESSAGE);
m.datalen = strlen (MYMESSAGE); m.datalen = strlen (MYMESSAGE);
pubsub_msg_send (&p, &m); pubsub_msg_send (&p, &m);
// second message, to disconnect from the server
m.type = PUBSUB_TYPE_DISCONNECT;
pubsub_msg_send (&p, &m);
// free everything // free everything
pubsubd_msg_free (&m); pubsubd_msg_free (&m);
// disconnect from the server
pubsub_disconnect (&p);
// the application will shut down, and remove the application named pipes // the application will shut down, and remove the application named pipes
if (app_destroy (&p)) if (app_destroy (&p))
ohshit (1, "app_destroy"); ohshit (1, "app_destroy");
@ -67,19 +65,19 @@ void sim_connection (int argc, char **argv, char **env, pid_t pid, int index, in
void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version) void sim_disconnection (int argc, char **argv, char **env, pid_t pid, int index, int version)
{ {
struct service srv; struct service srv;
bzero (&srv, sizeof (struct service)); memset (&srv, 0, sizeof (struct service));
srv_init (&srv, PUBSUB_SERVICE_NAME); srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Disconnecting from %s.\n", srv.spath); printf ("Disconnecting from %s.\n", srv.spath);
struct process p; struct process p;
bzero (&p, sizeof (struct process)); memset (&p, 0, sizeof (struct process));
// create the fake process // create the fake process
srv_process_gen (&p, pid, index, version); srv_process_gen (&p, pid, index, version);
// send a message to disconnect // send a message to disconnect
// line : pid index version action chan // line : pid index version action chan
pubsub_disconnect (&srv, &p, PUBSUB_PUB, MYCHAN); pubsub_disconnect (&p);
srv_process_free (&p); srv_process_free (&p);
} }

View File

@ -16,14 +16,16 @@ main(int argc, char **argv, char **env)
{ {
struct service srv; struct service srv;
memset (&srv, 0, sizeof (struct service)); memset (&srv, 0, sizeof (struct service));
srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME); srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Writing on %s.\n", srv.spath); printf ("Writing on %s.\n", srv.spath);
struct process p; struct process p;
memset (&p, 0, sizeof (struct process)); memset (&p, 0, sizeof (struct process));
int index = 1; int index = 1;
if (app_create (&p, index)) // called by the application pid_t pid = getpid();
if (app_create (&p, pid, index, COMMUNICATION_VERSION))
ohshit (1, "app_create"); ohshit (1, "app_create");
// send a message to warn the service we want to do something // send a message to warn the service we want to do something

View File

@ -19,35 +19,48 @@ void * pubsubd_worker_thread (void *params)
{ {
struct worker_params *wp = (struct worker_params *) params; struct worker_params *wp = (struct worker_params *) params;
// each chan has a list of subscribers while (1) {
// someone who only push a msg doesn't need to be registered // each chan has a list of subscribers
if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) { // someone who only push a msg doesn't need to be registered
// TODO add it to the application to follow if (wp->ale->action == PUBSUB_BOTH || wp->ale->action == PUBSUB_PUB) {
// TODO publish a message // publish a message
printf ("publish or publish and subscribe to something\n"); printf ("publish or publish and subscribe to chan %s\n"
, wp->chan->chan);
struct pubsub_msg m; struct pubsub_msg m;
memset (&m, 0, sizeof (struct pubsub_msg)); memset (&m, 0, sizeof (struct pubsub_msg));
pubsubd_msg_recv (wp->ale->p, &m);
pubsubd_msg_print (&m); sleep (5);
pubsubd_msg_recv (wp->ale->p, &m);
if (m.type == PUBSUB_TYPE_DISCONNECT) { pubsubd_msg_print (&m);
// TODO remove the application from the subscribers
if (m.type == PUBSUB_TYPE_DISCONNECT) {
// TODO remove the application from the subscribers
if ( 0 != pubsubd_subscriber_del (wp->chan->alh, wp->ale)) {
fprintf (stderr, "err : subscriber not registered\n");
}
break;
}
else {
struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan);
pubsubd_msg_send (chan->alh, &m);
}
}
else if (wp->ale->action == PUBSUB_SUB) {
// subscribe to a channel, no need to loop
// already subscribed
// printf ("subscribe to %s\n", wp->chan->chan);
// pubsubd_subscriber_add (wp->chan->alh, wp->ale);
break;
} }
else { else {
struct channel *chan = pubsubd_channel_get (wp->chans, wp->chan); // unrecognized command, no need to loop
pubsubd_msg_send (chan->alh, &m); printf ("\033[31mdo not know what you want to do\033[00m\n");
printf ("\tale->p : %p\n", (void*) wp->ale->p);
break;
} }
} }
else if (wp->ale->action == PUBSUB_SUB) {
// TODO
printf ("subscribe to something\n");
}
else {
printf ("\033[31mdo not know what you want to do\033[00m\n");
printf ("\tale->p : %p\n", (void*) wp->ale->p);
}
pubsubd_app_list_elm_free (wp->ale); pubsubd_app_list_elm_free (wp->ale);
@ -59,7 +72,7 @@ main(int argc, char **argv, char **env)
{ {
struct service srv; struct service srv;
memset (&srv, 0, sizeof (struct service)); memset (&srv, 0, sizeof (struct service));
srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME); srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
printf ("Listening on %s.\n", srv.spath); printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications // creates the service named pipe, that listens to client applications