diff --git a/pubsub/app/pubsub-client.c b/pubsub/app/pubsub-client.c new file mode 100644 index 0000000..d1175d9 --- /dev/null +++ b/pubsub/app/pubsub-client.c @@ -0,0 +1,182 @@ +#include "../../core/ipc.h" + +#include "../lib/message.h" +#include "../lib/channels.h" + +#include "../lib/pubsub.h" +#include "../lib/pubsubd.h" + +#include +#include +#include + +void usage (char **argv) { + printf ( "usage: %s [chan [pub]]\n", argv[0]); +} + +void print_cmd (void) { + printf ("\033[32m>\033[00m "); + fflush (stdout); +} + +void chan_sub (struct ipc_service *srv, char *chan) +{ + struct pubsub_msg msg; + memset (&msg, 0, sizeof (struct pubsub_msg)); + + // meta data on the message + msg.type = PUBSUB_MSG_TYPE_SUB; + msg.chanlen = strlen (chan) + 1; + msg.chan = malloc (msg.chanlen); + memset (msg.chan, 0, msg.chanlen); + strncpy (msg.chan, chan, msg.chanlen); + msg.chan[strlen (chan)] = '\0'; + + pubsub_message_send (srv, &msg); + printf ("subscribed to %s\n", chan); + + pubsub_message_free (&msg); +} + +void main_loop (char **env, int index, int version + , char *cmd, char *chan) +{ + printf ("connection to pubsubd: index %d version %d " + "cmd %s chan %s\n" + , index, version, cmd, chan ); + + struct ipc_service srv; + memset (&srv, 0, sizeof (struct ipc_service)); + int ret = ipc_application_connection (env, &srv, PUBSUBD_SERVICE_NAME); + if (ret != 0) { + handle_err ("pubsub_connection", "application_connection != 0"); + } + printf ("connected\n"); + + if (strncmp (cmd, "sub", 3) == 0) { + chan_sub (&srv, chan); + } + + printf ("main_loop\n"); + + struct pubsub_msg msg; + memset (&msg, 0, sizeof (struct pubsub_msg)); + + // meta data on the message + msg.type = PUBSUB_MSG_TYPE_PUB; + msg.chanlen = strlen (chan) + 1; + msg.chan = malloc (msg.chanlen); + memset (msg.chan, 0, msg.chanlen); + strncpy ((char *) msg.chan, chan, msg.chanlen); + msg.chan[strlen (chan)] = '\0'; + + struct ipc_event event; + memset (&event, 0, sizeof (struct ipc_event)); + + struct ipc_services services; + memset (&services, 0, sizeof (struct ipc_services)); + ipc_service_add (&services, &srv); + + int should_continue = 1; + + while (should_continue) { + print_cmd (); + ret = ipc_application_peek_event (&services, &event); + + if (ret != 0) { + handle_error("ipc_application_peek_event != 0"); + exit (EXIT_FAILURE); + } + + switch (event.type) { + case IPC_EVENT_TYPE_STDIN: + { + struct ipc_message *m = event.m; + if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) { + // TODO: disconnection + + ipc_message_empty (m); + free (m); + + should_continue = 0; + break; + } + + // get the curent payload, change it to be compatible with the application + // then send it + print_cmd (); + + // TODO: remove \n + msg.datalen = m->length + 1; + msg.data = malloc (msg.datalen); + memset (msg.data, 0, msg.datalen); + strncpy (msg.data, m->payload, msg.datalen); + msg.data[msg.datalen] = '\0'; + + pubsub_message_send (&srv, &msg); + free (msg.data); + msg.data = NULL; + msg.datalen = 0; + } + break; + case IPC_EVENT_TYPE_MESSAGE: + { + struct ipc_message *m = event.m; + printf ("msg recv: %.*s", m->length, m->payload); + // TODO: correctly print the message + }; + break; + case IPC_EVENT_TYPE_DISCONNECTION: + { + printf ("server disconnected: quitting...\n"); + + // just remove srv from services, it's already closed + ipc_services_free (&services); + + exit (EXIT_SUCCESS); + }; + case IPC_EVENT_TYPE_NOT_SET: + case IPC_EVENT_TYPE_CONNECTION: + case IPC_EVENT_TYPE_ERROR: + default : + fprintf (stderr, "should not happen, event type %d\n", event.type); + } + } + + // free everything + pubsub_message_free (&msg); + + printf ("disconnection...\n"); + ipc_services_free (&services); + if (ipc_application_close (&srv) < 0) { + handle_err("main", "application_close < 0"); + exit (EXIT_FAILURE); + } +} + +int main(int argc, char **argv, char **env) +{ + char *cmd = "sub"; + char *chan = "chan1"; + + if (argc == 2 && strncmp("-h", argv[1], 2) == 0) { + usage (argv); + exit (0); + } + + if (argc >= 2) { + chan = argv[1]; + } + + if (argc >= 3) { + cmd = argv[2]; + } + + int index = 0; + // don't care about the version + int version = 0; + + main_loop (env, index, version, cmd, chan); + + return EXIT_SUCCESS; +} diff --git a/pubsub/app/pubsubc.c b/pubsub/app/pubsubc.c index 6291a86..5aded0b 100644 --- a/pubsub/app/pubsubc.c +++ b/pubsub/app/pubsubc.c @@ -2,7 +2,7 @@ // TODO: select on service + input instead of threads -#include "../../core/error.h" +#include "../../core/ipc.h" #include "../lib/pubsub.h" #include "../lib/pubsubd.h" #include @@ -67,8 +67,7 @@ void chan_sub (struct ipc_service *srv, char *chan) pubsub_message_free (&msg); } -void main_loop (int argc, char **argv, char **env - , int index, int version +void main_loop (char **env, int index, int version , char *cmd, char *chan) { printf ("connection to pubsubd: index %d version %d " @@ -77,7 +76,7 @@ void main_loop (int argc, char **argv, char **env struct ipc_service srv; memset (&srv, 0, sizeof (struct ipc_service)); - pubsub_connection (argc, argv, env, &srv); + pubsub_connection (env, &srv); printf ("connected\n"); if (strncmp (cmd, "sub", 3) == 0) { @@ -167,7 +166,7 @@ int main(int argc, char **argv, char **env) // don't care about the version int version = 0; - main_loop (argc, argv, env, index, version, cmd, chan); + main_loop (env, index, version, cmd, chan); return EXIT_SUCCESS; } diff --git a/pubsub/app/pubsubd.c b/pubsub/app/pubsubd.c index d588dc2..f41effc 100644 --- a/pubsub/app/pubsubd.c +++ b/pubsub/app/pubsubd.c @@ -1,6 +1,4 @@ -#include "../../core/communication.h" -#include "../../core/client.h" -#include "../../core/error.h" +#include "../../core/ipc.h" #include "../lib/pubsubd.h" #include @@ -29,6 +27,10 @@ void handle_signal (int signalnumber) int main(int argc, char **argv, char **env) { + + argc = argc; + argv = argv; + memset (&srv, 0, sizeof (struct ipc_service)); srv.index = 0; srv.version = 0; @@ -40,7 +42,7 @@ main(int argc, char **argv, char **env) memset (&chans, 0, sizeof (struct channels)); pubsubd_channels_init (&chans); - if (ipc_server_init (argc, argv, env, &srv, PUBSUBD_SERVICE_NAME) < 0) { + if (ipc_server_init (env, &srv, PUBSUBD_SERVICE_NAME) < 0) { handle_error("ipc_server_init < 0"); return EXIT_FAILURE; } diff --git a/pubsub/lib/pubsub.c b/pubsub/lib/pubsub.c index e975213..14e1378 100644 --- a/pubsub/lib/pubsub.c +++ b/pubsub/lib/pubsub.c @@ -3,7 +3,6 @@ #include "pubsub.h" #include "pubsubd.h" -#include "../../core/error.h" #define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub" #define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub" @@ -33,11 +32,9 @@ void pubsub_quit (struct ipc_service *srv) } #endif -int pubsub_connection (int argc, char **argv, char **env - , struct ipc_service *srv) +int pubsub_connection (char **env, struct ipc_service *srv) { - int ret = ipc_application_connection (argc, argv, env - , srv, PUBSUBD_SERVICE_NAME, NULL, 0); + int ret = ipc_application_connection (env, srv, PUBSUBD_SERVICE_NAME); if (ret != 0) { handle_err ("pubsub_connection", "application_connection != 0"); @@ -69,7 +66,7 @@ int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m) } ipc_application_write (srv, &m_data); - ipc_message_free (&m_data); + ipc_message_empty (&m_data); if (buf != NULL) free(buf); @@ -95,7 +92,7 @@ int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m) ipc_application_read (srv, &m_recv); pubsub_message_unserialize (m, m_recv.payload, m_recv.length); - ipc_message_free (&m_recv); + ipc_message_empty (&m_recv); return 0; } diff --git a/pubsub/lib/pubsub.h b/pubsub/lib/pubsub.h index 4f26f17..97d009a 100644 --- a/pubsub/lib/pubsub.h +++ b/pubsub/lib/pubsub.h @@ -1,9 +1,7 @@ #ifndef __PUBSUB_H__ #define __PUBSUB_H__ -#include "../../core/communication.h" -#include "../../core/client.h" -#include "../../core/queue.h" +#include "../../core/ipc.h" #include "message.h" @@ -15,7 +13,7 @@ enum subscriber_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH}; #define PUBSUB_TYPE_DEBUG 4 #define PUBSUB_TYPE_INFO 5 -int pubsub_connection (int argc, char **argv, char **env, struct ipc_service *srv); +int pubsub_connection (char **env, struct ipc_service *srv); int pubsub_disconnect (struct ipc_service *srv); int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m); int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m); diff --git a/pubsub/lib/pubsubd.c b/pubsub/lib/pubsubd.c index 7122f61..ccfd64d 100644 --- a/pubsub/lib/pubsubd.c +++ b/pubsub/lib/pubsubd.c @@ -11,31 +11,31 @@ #include #include -void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m) +void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg * pubsub_msg) { - if (ap == NULL) { - fprintf (stderr, "pubsubd_send: ap == NULL"); + if (clients == NULL) { + fprintf (stderr, "pubsubd_send: clients == NULL"); return; } - if (m == NULL) { - fprintf (stderr, "pubsubd_send: m == NULL"); + if (pubsub_msg == NULL) { + fprintf (stderr, "pubsubd_send: pubsub_msg == NULL"); return; } char *buf = NULL; size_t msize = 0; - pubsub_message_serialize (m, &buf, &msize); + pubsub_message_serialize (pubsub_msg, &buf, &msize); - struct ipc_message m_data; - memset (&m_data, 0, sizeof (struct ipc_message)); - ipc_message_format_data (&m_data, buf, msize); + struct ipc_message m; + memset (&m, 0, sizeof (struct ipc_message)); + ipc_message_format_data (&m, buf, msize); int i; - for (i = 0; i < ap->size ; i++) { - ipc_server_write (ap->clients[i], &m_data); + for (i = 0; i < clients->size ; i++) { + ipc_server_write (clients->clients[i], &m); } - ipc_message_free (&m_data); + ipc_message_empty (&m); if (buf != NULL) { free (buf); @@ -52,145 +52,115 @@ void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m) // // pubsub_message_unserialize (m, m_data.payload, m_data.length); // -// ipc_message_free (&m_data); +// ipc_message_empty (&m_data); // } -/** - * new connection, once accepted the client is added to the array_proc - * structure to be checked periodically for new messages - */ -void handle_new_connection (struct ipc_service *srv, struct ipc_clients *ap) -{ - struct ipc_client *p = malloc(sizeof(struct ipc_client)); - memset(p, 0, sizeof(struct ipc_client)); - - if (ipc_server_accept (srv, p) < 0) { - handle_error("server_accept < 0"); - } else { - printf("new connection\n"); - } - - if (ipc_client_add (ap, p) < 0) { - handle_error("ipc_client_add < 0"); - } -} - -void handle_new_msg (struct channels *chans - , struct ipc_clients *ap, struct ipc_clients *proc_to_read) -{ - struct ipc_message m; - memset (&m, 0, sizeof (struct ipc_message)); - int i; - for (i = 0; i < proc_to_read->size; i++) { - // printf ("loop handle_new_msg\n"); - if (ipc_server_read (proc_to_read->clients[i], &m) < 0) { - handle_error("server_read < 0"); - } - - mprint_hexa ("msg received: ", (unsigned char *) m.payload, m.length); - - // close the client then delete it from the client array - if (m.type == MSG_TYPE_CLOSE) { - struct ipc_client *p = proc_to_read->clients[i]; - - printf ("client %d disconnecting\n", p->proc_fd); - - // TODO: to test, unsubscribe when closing - pubsubd_channels_unsubscribe_everywhere (chans, p); - - // close the connection to the client - if (ipc_server_close_client (p) < 0) - handle_error( "server_close_client < 0"); - - - // remove the client from the clientes list - if (ipc_client_del (ap, p) < 0) - handle_error( "ipc_client_del < 0"); - if (ipc_client_del (proc_to_read, p) < 0) - handle_err( "handle_new_msg", "ipc_client_del < 0"); - - ipc_message_free (&m); - - // free client - free (p); - - i--; - continue; - } - - struct pubsub_msg m_data; - memset (&m_data, 0, sizeof (struct pubsub_msg)); - - pubsub_message_unserialize (&m_data, m.payload, m.length); - - if (m_data.type == PUBSUB_MSG_TYPE_SUB) { - printf ("client %d subscribing to %s\n" - , proc_to_read->clients[i]->proc_fd - , m_data.chan); - pubsubd_channels_subscribe (chans - , m_data.chan, proc_to_read->clients[i]); - } - - if (m_data.type == PUBSUB_MSG_TYPE_UNSUB) { - printf ("client %d unsubscribing to %s\n" - , proc_to_read->clients[i]->proc_fd - , m_data.chan); - pubsubd_channels_unsubscribe (chans - , m_data.chan, proc_to_read->clients[i]); - } - - if (m_data.type == PUBSUB_MSG_TYPE_PUB) { - printf ("client %d publishing to %s\n" - , proc_to_read->clients[i]->proc_fd - , m_data.chan); - struct channel *chan = pubsubd_channel_search (chans, m_data.chan); - if (chan == NULL) { - handle_err ("handle_new_msg", "publish on nonexistent channel"); - ipc_message_free (&m); - return ; - } - pubsubd_send (chan->subs, &m_data); - } - - pubsub_message_free (&m_data); - ipc_message_free (&m); - } -} - -/* - * main loop - * - * accept new application connections - * read a message and send it back - * close a connection if MSG_TYPE_CLOSE received - */ - void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans) { int i, ret = 0; - struct ipc_clients ap; - memset(&ap, 0, sizeof(struct ipc_clients)); + struct ipc_clients clients; + memset(&clients, 0, sizeof(struct ipc_clients)); struct ipc_clients proc_to_read; memset(&proc_to_read, 0, sizeof(struct ipc_clients)); - while(1) { - ret = ipc_server_select (&ap, srv, &proc_to_read); + struct ipc_event event; + memset(&event, 0, sizeof (struct ipc_event)); + event.type = IPC_EVENT_TYPE_NOT_SET; - if (ret == CONNECTION) { - handle_new_connection (srv, &ap); - } else if (ret == APPLICATION) { - handle_new_msg (chans, &ap, &proc_to_read); - } else { // both new connection and new msg from at least one client - handle_new_connection (srv, &ap); - handle_new_msg (chans, &ap, &proc_to_read); - } - ipc_clients_free (&proc_to_read); + int cpt = 0; + + while(1) { + ret = ipc_service_poll_event (&clients, srv, &event); + if (ret != 0) { + handle_error("ipc_service_poll_event != 0"); + // the application will shut down, and close the service + if (ipc_server_close (srv) < 0) { + handle_error("ipc_server_close < 0"); + } + exit (EXIT_FAILURE); + } + + switch (event.type) { + case IPC_EVENT_TYPE_CONNECTION: + { + cpt++; + struct ipc_client *cli = event.origin; + printf ("connection of client %d: %d clients connected\n", cli->proc_fd, cpt); + }; + break; + case IPC_EVENT_TYPE_DISCONNECTION: + { + cpt--; + struct ipc_client *cli = event.origin; + printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt); + + // TODO: to test, unsubscribe when closing + pubsubd_channels_unsubscribe_everywhere (chans, cli); + + // free the ipc_client structure + free (event.origin); + }; + break; + case IPC_EVENT_TYPE_MESSAGE: + { + struct ipc_message *m = event.m; + struct ipc_client *cli = event.origin; + if (m->length > 0) { + printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload); + } + // TODO: handle a message + // handle_new_msg (chans, &clients, &proc_to_read); + + struct pubsub_msg pubsub_msg; + memset (&pubsub_msg, 0, sizeof (struct pubsub_msg)); + + pubsub_message_unserialize (&pubsub_msg, m->payload, m->length); + + if (pubsub_msg.type == PUBSUB_MSG_TYPE_SUB) { + printf ("client %d subscribing to %s\n" + , cli->proc_fd + , pubsub_msg.chan); + pubsubd_channels_subscribe (chans + , pubsub_msg.chan, cli); + } + + if (pubsub_msg.type == PUBSUB_MSG_TYPE_UNSUB) { + printf ("client %d unsubscribing to %s\n", cli->proc_fd, pubsub_msg.chan); + pubsubd_channels_unsubscribe (chans, pubsub_msg.chan, cli); + } + + if (pubsub_msg.type == PUBSUB_MSG_TYPE_PUB) { + printf ("client %d publishing to %s\n", cli->proc_fd, pubsub_msg.chan); + + struct channel *chan = pubsubd_channel_search (chans, pubsub_msg.chan); + if (chan == NULL) { + handle_err ("handle_new_msg", "publish on nonexistent channel"); + ipc_message_empty (m); + continue; + } + pubsubd_send (chan->subs, &pubsub_msg); + } + + pubsub_message_free (&pubsub_msg); + }; + break; + case IPC_EVENT_TYPE_ERROR: + { + fprintf (stderr, "a problem happened with client %d\n" + , ((struct ipc_client*) event.origin)->proc_fd); + }; + break; + default : + { + fprintf (stderr, "there must be a problem, event not set\n"); + }; + } } - for (i = 0; i < ap.size; i++) { - if (ipc_server_close_client (ap.clients[i]) < 0) { + for (i = 0; i < clients.size; i++) { + if (ipc_server_close_client (clients.clients[i]) < 0) { handle_error( "server_close_client < 0"); } }