pubsubd: pubsubc, the client, still in progress but (kind of) works
This commit is contained in:
parent
2cd6b62bf0
commit
0314120304
@ -407,8 +407,10 @@ void pubsubd_msg_free (struct pubsub_msg *msg)
|
|||||||
int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
|
int pubsubd_get_new_process (const char *spath, struct app_list_elm *ale
|
||||||
, struct channels *chans, struct channel **c)
|
, struct channels *chans, struct channel **c)
|
||||||
{
|
{
|
||||||
if (spath == NULL || ale == NULL || chans == NULL)
|
if (spath == NULL || ale == NULL || chans == NULL) {
|
||||||
|
fprintf (stderr, "pubsubd_get_new_process: spath or ale or chans == NULL\n");
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
size_t msize = 0;
|
size_t msize = 0;
|
||||||
@ -529,8 +531,6 @@ void pubsubd_msg_send (const struct app_list_head *alh, const struct pubsub_msg
|
|||||||
size_t msize = 0;
|
size_t msize = 0;
|
||||||
pubsubd_msg_serialize (m, &buf, &msize);
|
pubsubd_msg_serialize (m, &buf, &msize);
|
||||||
|
|
||||||
printf ("\033[32mmsg to send : %.*s (%ld)\n", (int) msize, buf, msize);
|
|
||||||
|
|
||||||
LIST_FOREACH(ale, alh, entries) {
|
LIST_FOREACH(ale, alh, entries) {
|
||||||
srv_write (ale->p, buf, msize);
|
srv_write (ale->p, buf, msize);
|
||||||
}
|
}
|
||||||
|
122
pubsub/pubsubc.c
Normal file
122
pubsub/pubsubc.c
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
#include "../lib/pubsubd.h"
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
void
|
||||||
|
ohshit(int rvalue, const char* str) {
|
||||||
|
fprintf (stderr, "\033[31merr: %s\033[00m\n", str);
|
||||||
|
exit (rvalue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void usage (char **argv)
|
||||||
|
{
|
||||||
|
printf ( "usage: %s\n", argv[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void main_loop (int argc, char **argv, char **env
|
||||||
|
, pid_t pid, int index, int version
|
||||||
|
, char *cmd, char *chan)
|
||||||
|
{
|
||||||
|
printf ("connection : pid %d index %d version %d "
|
||||||
|
"cmd %s chan %s\n"
|
||||||
|
, pid, index, version, cmd, chan );
|
||||||
|
|
||||||
|
struct service srv;
|
||||||
|
memset (&srv, 0, sizeof (struct service));
|
||||||
|
srv_init (argc, argv, env, &srv, PUBSUB_SERVICE_NAME, NULL);
|
||||||
|
printf ("Writing on %s.\n", srv.spath);
|
||||||
|
|
||||||
|
struct process p;
|
||||||
|
memset (&p, 0, sizeof (struct process));
|
||||||
|
|
||||||
|
printf ("app creation\n");
|
||||||
|
if (app_create (&p, pid, index, version)) // called by the application
|
||||||
|
ohshit (1, "app_create");
|
||||||
|
|
||||||
|
printf ("main_loop\n");
|
||||||
|
// send a message to warn the service we want to do something
|
||||||
|
// line : pid index version action chan
|
||||||
|
enum app_list_elm_action action = PUBSUB_BOTH;
|
||||||
|
|
||||||
|
if (strncmp (cmd, "pub", 3) == 0) {
|
||||||
|
action = PUBSUB_PUB;
|
||||||
|
}
|
||||||
|
else if (strncmp (cmd, "sub", 3) == 0) {
|
||||||
|
action = PUBSUB_SUB;
|
||||||
|
}
|
||||||
|
|
||||||
|
pubsub_connection (&srv, &p, action, chan);
|
||||||
|
|
||||||
|
struct pubsub_msg m;
|
||||||
|
memset (&m, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
|
||||||
|
// meta data on the message
|
||||||
|
m.type = PUBSUB_TYPE_MESSAGE;
|
||||||
|
m.chan = malloc (strlen (chan) + 1);
|
||||||
|
memset (m.chan, 0, strlen (chan) + 1);
|
||||||
|
m.chan[strlen (chan)] = '\0';
|
||||||
|
m.chanlen = strlen (chan);
|
||||||
|
|
||||||
|
// msg loop
|
||||||
|
for (;;) {
|
||||||
|
struct pubsub_msg msg;
|
||||||
|
memset (&msg, 0, sizeof (struct pubsub_msg));
|
||||||
|
|
||||||
|
char buf[BUFSIZ];
|
||||||
|
memset (buf, 0, BUFSIZ);
|
||||||
|
printf ("msg to send [quit]: ");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
|
size_t mlen = read (0, buf, BUFSIZ);
|
||||||
|
|
||||||
|
printf ("data (%ld): %s\n", mlen, buf);
|
||||||
|
|
||||||
|
if (strncmp(buf, "quit\n", strlen ("quit\n")) == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
m.data = malloc (strlen (buf) + 1);
|
||||||
|
memset (m.data, 0, strlen (buf) + 1);
|
||||||
|
strncpy ((char *) m.data, buf, strlen (buf) + 1);
|
||||||
|
m.datalen = strlen (buf);
|
||||||
|
|
||||||
|
printf ("send message\n");
|
||||||
|
pubsub_msg_send (&p, &m);
|
||||||
|
free (m.data);
|
||||||
|
m.data = NULL;
|
||||||
|
m.datalen = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// free everything
|
||||||
|
pubsubd_msg_free (&m);
|
||||||
|
|
||||||
|
printf ("disconnection...\n");
|
||||||
|
// disconnect from the server
|
||||||
|
pubsub_disconnect (&p);
|
||||||
|
|
||||||
|
printf ("destroying app\n");
|
||||||
|
// the application will shut down, and remove the application named pipes
|
||||||
|
if (app_destroy (&p))
|
||||||
|
ohshit (1, "app_destroy");
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv, char **env)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (argc != 1) {
|
||||||
|
usage (argv);
|
||||||
|
exit (1);
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = getpid();
|
||||||
|
int index = 0;
|
||||||
|
// don't care about the version
|
||||||
|
int version = COMMUNICATION_VERSION;
|
||||||
|
char *cmd = "both";
|
||||||
|
char *chan = "chan1";
|
||||||
|
|
||||||
|
main_loop (argc, argv, env, pid, index, version, cmd, chan);
|
||||||
|
|
||||||
|
return EXIT_SUCCESS;
|
||||||
|
}
|
@ -55,12 +55,12 @@ void * pubsubd_worker_thread (void *params)
|
|||||||
else {
|
else {
|
||||||
struct channel *ch = pubsubd_channel_search (chans, chan->chan);
|
struct channel *ch = pubsubd_channel_search (chans, chan->chan);
|
||||||
if (ch == NULL) {
|
if (ch == NULL) {
|
||||||
printf ("CHAN NON TROUVE\n");
|
printf ("CHAN NOT FOUND\n");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
printf ("Je dois dire :\n");
|
printf ("what should be sent: ");
|
||||||
pubsubd_msg_print (&m);
|
pubsubd_msg_print (&m);
|
||||||
printf ("CHAN ET PROCESS À QUI JE DOIS COMMUNIQUER\n");
|
printf ("send the message to:\t");
|
||||||
pubsubd_channel_print (ch);
|
pubsubd_channel_print (ch);
|
||||||
pubsubd_msg_send (ch->alh, &m);
|
pubsubd_msg_send (ch->alh, &m);
|
||||||
}
|
}
|
||||||
@ -137,10 +137,12 @@ main(int argc, char **argv, char **env)
|
|||||||
pubsubd_channels_init (&chans);
|
pubsubd_channels_init (&chans);
|
||||||
|
|
||||||
pthread_t *thr = NULL;
|
pthread_t *thr = NULL;
|
||||||
thr = malloc (sizeof (pthread_t) * NB_CLIENTS);
|
thr = malloc (sizeof (pthread_t));
|
||||||
memset (thr, 0, sizeof (pthread_t) * NB_CLIENTS);
|
memset (thr, 0, sizeof (pthread_t));
|
||||||
|
|
||||||
for (int i = 0; i < NB_CLIENTS; i++) {
|
int i = 0;
|
||||||
|
// for (i = 0; i < NB_CLIENTS; i++)
|
||||||
|
for (i = 0; ; i++) {
|
||||||
// for each new process
|
// for each new process
|
||||||
struct app_list_elm ale;
|
struct app_list_elm ale;
|
||||||
memset (&ale, 0, sizeof (struct app_list_elm));
|
memset (&ale, 0, sizeof (struct app_list_elm));
|
||||||
@ -151,10 +153,8 @@ main(int argc, char **argv, char **env)
|
|||||||
// end the application
|
// end the application
|
||||||
if (ale.action == PUBSUB_QUIT) {
|
if (ale.action == PUBSUB_QUIT) {
|
||||||
printf ("Quitting ...\n");
|
printf ("Quitting ...\n");
|
||||||
|
|
||||||
pubsubd_channels_del_all (&chans);
|
pubsubd_channels_del_all (&chans);
|
||||||
srv_close (&srv);
|
srv_close (&srv);
|
||||||
|
|
||||||
// TODO end the threads
|
// TODO end the threads
|
||||||
exit (0);
|
exit (0);
|
||||||
}
|
}
|
||||||
@ -168,16 +168,18 @@ main(int argc, char **argv, char **env)
|
|||||||
|
|
||||||
pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
|
pthread_create (thr + i, NULL, pubsubd_worker_thread, wp);
|
||||||
pthread_detach (thr[i]);
|
pthread_detach (thr[i]);
|
||||||
|
// realloc memory for further workers
|
||||||
|
thr = realloc (thr, sizeof (pthread_t) * (i+1));
|
||||||
|
|
||||||
pubsubd_app_list_elm_free (&ale);
|
pubsubd_app_list_elm_free (&ale);
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep (10);
|
sleep (10);
|
||||||
// stop threads
|
// stop threads
|
||||||
for (int i = 0 ; i < NB_CLIENTS ; i++) {
|
for (int j = 0 ; j < i ; j++) {
|
||||||
pthread_cancel (thr[i]);
|
pthread_cancel (thr[j]);
|
||||||
void *ret = NULL;
|
void *ret = NULL;
|
||||||
pthread_join (thr[i], &ret);
|
pthread_join (thr[j], &ret);
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
free (ret);
|
free (ret);
|
||||||
}
|
}
|
||||||
@ -193,50 +195,3 @@ main(int argc, char **argv, char **env)
|
|||||||
|
|
||||||
return EXIT_SUCCESS;
|
return EXIT_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
void main_loop (const struct service *srv)
|
|
||||||
{
|
|
||||||
int ret;
|
|
||||||
struct process proc;
|
|
||||||
|
|
||||||
int cnt = 10;
|
|
||||||
|
|
||||||
while (cnt--) {
|
|
||||||
// -1 : error, 0 = no new process, 1 = new process
|
|
||||||
ret = srv_get_new_process (&proc, srv);
|
|
||||||
if (ret == -1) {
|
|
||||||
fprintf (stderr, "error service_get_new_process\n");
|
|
||||||
continue;
|
|
||||||
} else if (ret == 0) { // that should not happen
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// printf ("before print\n");
|
|
||||||
process_print (&proc);
|
|
||||||
// printf ("after print\n");
|
|
||||||
|
|
||||||
// about the message
|
|
||||||
size_t msize = BUFSIZ;
|
|
||||||
char buf[BUFSIZ];
|
|
||||||
bzero(buf, BUFSIZ);
|
|
||||||
|
|
||||||
// printf ("before read\n");
|
|
||||||
if ((ret = srv_read (&proc, &buf, &msize))) {
|
|
||||||
fprintf(stdout, "error service_read %d\n", ret);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// printf ("after read\n");
|
|
||||||
printf ("read, size %ld : %s\n", msize, buf);
|
|
||||||
|
|
||||||
// printf ("before proc write\n");
|
|
||||||
if ((ret = srv_write (&proc, &buf, msize))) {
|
|
||||||
fprintf(stdout, "error service_write %d\n", ret);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// printf ("after proc write\n");
|
|
||||||
printf ("\033[32mStill \033[31m%d\033[32m applications to serve\n",cnt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
Reference in New Issue
Block a user