From e8db551536b33b59aa5c1e765a8d4b360ce4462a Mon Sep 17 00:00:00 2001
From: Philippe PITTOLI
Date: Mon, 29 Oct 2018 23:36:06 +0100
Subject: [PATCH] pubsub rewrite, draft
---
pubsub/app/pubsub-client.c | 182 +++++++++++++++++++++++++++
pubsub/app/pubsubc.c | 9 +-
pubsub/app/pubsubd.c | 10 +-
pubsub/lib/pubsub.c | 11 +-
pubsub/lib/pubsub.h | 6 +-
pubsub/lib/pubsubd.c | 246 ++++++++++++++++---------------------
6 files changed, 306 insertions(+), 158 deletions(-)
create mode 100644 pubsub/app/pubsub-client.c
diff --git a/pubsub/app/pubsub-client.c b/pubsub/app/pubsub-client.c
new file mode 100644
index 0000000..d1175d9
--- /dev/null
+++ b/pubsub/app/pubsub-client.c
@@ -0,0 +1,182 @@
+#include "../../core/ipc.h"
+
+#include "../lib/message.h"
+#include "../lib/channels.h"
+
+#include "../lib/pubsub.h"
+#include "../lib/pubsubd.h"
+
+#include
+#include
+#include
+
+void usage (char **argv) {
+ printf ( "usage: %s [chan [pub]]\n", argv[0]);
+}
+
+void print_cmd (void) {
+ printf ("\033[32m>\033[00m ");
+ fflush (stdout);
+}
+
+void chan_sub (struct ipc_service *srv, char *chan)
+{
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+
+ // meta data on the message
+ msg.type = PUBSUB_MSG_TYPE_SUB;
+ msg.chanlen = strlen (chan) + 1;
+ msg.chan = malloc (msg.chanlen);
+ memset (msg.chan, 0, msg.chanlen);
+ strncpy (msg.chan, chan, msg.chanlen);
+ msg.chan[strlen (chan)] = '\0';
+
+ pubsub_message_send (srv, &msg);
+ printf ("subscribed to %s\n", chan);
+
+ pubsub_message_free (&msg);
+}
+
+void main_loop (char **env, int index, int version
+ , char *cmd, char *chan)
+{
+ printf ("connection to pubsubd: index %d version %d "
+ "cmd %s chan %s\n"
+ , index, version, cmd, chan );
+
+ struct ipc_service srv;
+ memset (&srv, 0, sizeof (struct ipc_service));
+ int ret = ipc_application_connection (env, &srv, PUBSUBD_SERVICE_NAME);
+ if (ret != 0) {
+ handle_err ("pubsub_connection", "application_connection != 0");
+ }
+ printf ("connected\n");
+
+ if (strncmp (cmd, "sub", 3) == 0) {
+ chan_sub (&srv, chan);
+ }
+
+ printf ("main_loop\n");
+
+ struct pubsub_msg msg;
+ memset (&msg, 0, sizeof (struct pubsub_msg));
+
+ // meta data on the message
+ msg.type = PUBSUB_MSG_TYPE_PUB;
+ msg.chanlen = strlen (chan) + 1;
+ msg.chan = malloc (msg.chanlen);
+ memset (msg.chan, 0, msg.chanlen);
+ strncpy ((char *) msg.chan, chan, msg.chanlen);
+ msg.chan[strlen (chan)] = '\0';
+
+ struct ipc_event event;
+ memset (&event, 0, sizeof (struct ipc_event));
+
+ struct ipc_services services;
+ memset (&services, 0, sizeof (struct ipc_services));
+ ipc_service_add (&services, &srv);
+
+ int should_continue = 1;
+
+ while (should_continue) {
+ print_cmd ();
+ ret = ipc_application_peek_event (&services, &event);
+
+ if (ret != 0) {
+ handle_error("ipc_application_peek_event != 0");
+ exit (EXIT_FAILURE);
+ }
+
+ switch (event.type) {
+ case IPC_EVENT_TYPE_STDIN:
+ {
+ struct ipc_message *m = event.m;
+ if ( m->length == 0 || strncmp (m->payload, "exit", 4) == 0) {
+ // TODO: disconnection
+
+ ipc_message_empty (m);
+ free (m);
+
+ should_continue = 0;
+ break;
+ }
+
+ // get the curent payload, change it to be compatible with the application
+ // then send it
+ print_cmd ();
+
+ // TODO: remove \n
+ msg.datalen = m->length + 1;
+ msg.data = malloc (msg.datalen);
+ memset (msg.data, 0, msg.datalen);
+ strncpy (msg.data, m->payload, msg.datalen);
+ msg.data[msg.datalen] = '\0';
+
+ pubsub_message_send (&srv, &msg);
+ free (msg.data);
+ msg.data = NULL;
+ msg.datalen = 0;
+ }
+ break;
+ case IPC_EVENT_TYPE_MESSAGE:
+ {
+ struct ipc_message *m = event.m;
+ printf ("msg recv: %.*s", m->length, m->payload);
+ // TODO: correctly print the message
+ };
+ break;
+ case IPC_EVENT_TYPE_DISCONNECTION:
+ {
+ printf ("server disconnected: quitting...\n");
+
+ // just remove srv from services, it's already closed
+ ipc_services_free (&services);
+
+ exit (EXIT_SUCCESS);
+ };
+ case IPC_EVENT_TYPE_NOT_SET:
+ case IPC_EVENT_TYPE_CONNECTION:
+ case IPC_EVENT_TYPE_ERROR:
+ default :
+ fprintf (stderr, "should not happen, event type %d\n", event.type);
+ }
+ }
+
+ // free everything
+ pubsub_message_free (&msg);
+
+ printf ("disconnection...\n");
+ ipc_services_free (&services);
+ if (ipc_application_close (&srv) < 0) {
+ handle_err("main", "application_close < 0");
+ exit (EXIT_FAILURE);
+ }
+}
+
+int main(int argc, char **argv, char **env)
+{
+ char *cmd = "sub";
+ char *chan = "chan1";
+
+ if (argc == 2 && strncmp("-h", argv[1], 2) == 0) {
+ usage (argv);
+ exit (0);
+ }
+
+ if (argc >= 2) {
+ chan = argv[1];
+ }
+
+ if (argc >= 3) {
+ cmd = argv[2];
+ }
+
+ int index = 0;
+ // don't care about the version
+ int version = 0;
+
+ main_loop (env, index, version, cmd, chan);
+
+ return EXIT_SUCCESS;
+}
diff --git a/pubsub/app/pubsubc.c b/pubsub/app/pubsubc.c
index 6291a86..5aded0b 100644
--- a/pubsub/app/pubsubc.c
+++ b/pubsub/app/pubsubc.c
@@ -2,7 +2,7 @@
// TODO: select on service + input instead of threads
-#include "../../core/error.h"
+#include "../../core/ipc.h"
#include "../lib/pubsub.h"
#include "../lib/pubsubd.h"
#include
@@ -67,8 +67,7 @@ void chan_sub (struct ipc_service *srv, char *chan)
pubsub_message_free (&msg);
}
-void main_loop (int argc, char **argv, char **env
- , int index, int version
+void main_loop (char **env, int index, int version
, char *cmd, char *chan)
{
printf ("connection to pubsubd: index %d version %d "
@@ -77,7 +76,7 @@ void main_loop (int argc, char **argv, char **env
struct ipc_service srv;
memset (&srv, 0, sizeof (struct ipc_service));
- pubsub_connection (argc, argv, env, &srv);
+ pubsub_connection (env, &srv);
printf ("connected\n");
if (strncmp (cmd, "sub", 3) == 0) {
@@ -167,7 +166,7 @@ int main(int argc, char **argv, char **env)
// don't care about the version
int version = 0;
- main_loop (argc, argv, env, index, version, cmd, chan);
+ main_loop (env, index, version, cmd, chan);
return EXIT_SUCCESS;
}
diff --git a/pubsub/app/pubsubd.c b/pubsub/app/pubsubd.c
index d588dc2..f41effc 100644
--- a/pubsub/app/pubsubd.c
+++ b/pubsub/app/pubsubd.c
@@ -1,6 +1,4 @@
-#include "../../core/communication.h"
-#include "../../core/client.h"
-#include "../../core/error.h"
+#include "../../core/ipc.h"
#include "../lib/pubsubd.h"
#include
@@ -29,6 +27,10 @@ void handle_signal (int signalnumber)
int
main(int argc, char **argv, char **env)
{
+
+ argc = argc;
+ argv = argv;
+
memset (&srv, 0, sizeof (struct ipc_service));
srv.index = 0;
srv.version = 0;
@@ -40,7 +42,7 @@ main(int argc, char **argv, char **env)
memset (&chans, 0, sizeof (struct channels));
pubsubd_channels_init (&chans);
- if (ipc_server_init (argc, argv, env, &srv, PUBSUBD_SERVICE_NAME) < 0) {
+ if (ipc_server_init (env, &srv, PUBSUBD_SERVICE_NAME) < 0) {
handle_error("ipc_server_init < 0");
return EXIT_FAILURE;
}
diff --git a/pubsub/lib/pubsub.c b/pubsub/lib/pubsub.c
index e975213..14e1378 100644
--- a/pubsub/lib/pubsub.c
+++ b/pubsub/lib/pubsub.c
@@ -3,7 +3,6 @@
#include "pubsub.h"
#include "pubsubd.h"
-#include "../../core/error.h"
#define PUBSUB_SUBSCRIBER_ACTION_STR_PUB "pub"
#define PUBSUB_SUBSCRIBER_ACTION_STR_SUB "sub"
@@ -33,11 +32,9 @@ void pubsub_quit (struct ipc_service *srv)
}
#endif
-int pubsub_connection (int argc, char **argv, char **env
- , struct ipc_service *srv)
+int pubsub_connection (char **env, struct ipc_service *srv)
{
- int ret = ipc_application_connection (argc, argv, env
- , srv, PUBSUBD_SERVICE_NAME, NULL, 0);
+ int ret = ipc_application_connection (env, srv, PUBSUBD_SERVICE_NAME);
if (ret != 0) {
handle_err ("pubsub_connection", "application_connection != 0");
@@ -69,7 +66,7 @@ int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m)
}
ipc_application_write (srv, &m_data);
- ipc_message_free (&m_data);
+ ipc_message_empty (&m_data);
if (buf != NULL)
free(buf);
@@ -95,7 +92,7 @@ int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m)
ipc_application_read (srv, &m_recv);
pubsub_message_unserialize (m, m_recv.payload, m_recv.length);
- ipc_message_free (&m_recv);
+ ipc_message_empty (&m_recv);
return 0;
}
diff --git a/pubsub/lib/pubsub.h b/pubsub/lib/pubsub.h
index 4f26f17..97d009a 100644
--- a/pubsub/lib/pubsub.h
+++ b/pubsub/lib/pubsub.h
@@ -1,9 +1,7 @@
#ifndef __PUBSUB_H__
#define __PUBSUB_H__
-#include "../../core/communication.h"
-#include "../../core/client.h"
-#include "../../core/queue.h"
+#include "../../core/ipc.h"
#include "message.h"
@@ -15,7 +13,7 @@ enum subscriber_action {PUBSUB_QUIT = 1, PUBSUB_PUB, PUBSUB_SUB, PUBSUB_BOTH};
#define PUBSUB_TYPE_DEBUG 4
#define PUBSUB_TYPE_INFO 5
-int pubsub_connection (int argc, char **argv, char **env, struct ipc_service *srv);
+int pubsub_connection (char **env, struct ipc_service *srv);
int pubsub_disconnect (struct ipc_service *srv);
int pubsub_message_send (struct ipc_service *srv, const struct pubsub_msg * m);
int pubsub_message_recv (struct ipc_service *srv, struct pubsub_msg *m);
diff --git a/pubsub/lib/pubsubd.c b/pubsub/lib/pubsubd.c
index 7122f61..ccfd64d 100644
--- a/pubsub/lib/pubsubd.c
+++ b/pubsub/lib/pubsubd.c
@@ -11,31 +11,31 @@
#include
#include
-void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m)
+void pubsubd_send (const struct ipc_clients *clients, const struct pubsub_msg * pubsub_msg)
{
- if (ap == NULL) {
- fprintf (stderr, "pubsubd_send: ap == NULL");
+ if (clients == NULL) {
+ fprintf (stderr, "pubsubd_send: clients == NULL");
return;
}
- if (m == NULL) {
- fprintf (stderr, "pubsubd_send: m == NULL");
+ if (pubsub_msg == NULL) {
+ fprintf (stderr, "pubsubd_send: pubsub_msg == NULL");
return;
}
char *buf = NULL;
size_t msize = 0;
- pubsub_message_serialize (m, &buf, &msize);
+ pubsub_message_serialize (pubsub_msg, &buf, &msize);
- struct ipc_message m_data;
- memset (&m_data, 0, sizeof (struct ipc_message));
- ipc_message_format_data (&m_data, buf, msize);
+ struct ipc_message m;
+ memset (&m, 0, sizeof (struct ipc_message));
+ ipc_message_format_data (&m, buf, msize);
int i;
- for (i = 0; i < ap->size ; i++) {
- ipc_server_write (ap->clients[i], &m_data);
+ for (i = 0; i < clients->size ; i++) {
+ ipc_server_write (clients->clients[i], &m);
}
- ipc_message_free (&m_data);
+ ipc_message_empty (&m);
if (buf != NULL) {
free (buf);
@@ -52,145 +52,115 @@ void pubsubd_send (const struct ipc_clients *ap, const struct pubsub_msg * m)
//
// pubsub_message_unserialize (m, m_data.payload, m_data.length);
//
-// ipc_message_free (&m_data);
+// ipc_message_empty (&m_data);
// }
-/**
- * new connection, once accepted the client is added to the array_proc
- * structure to be checked periodically for new messages
- */
-void handle_new_connection (struct ipc_service *srv, struct ipc_clients *ap)
-{
- struct ipc_client *p = malloc(sizeof(struct ipc_client));
- memset(p, 0, sizeof(struct ipc_client));
-
- if (ipc_server_accept (srv, p) < 0) {
- handle_error("server_accept < 0");
- } else {
- printf("new connection\n");
- }
-
- if (ipc_client_add (ap, p) < 0) {
- handle_error("ipc_client_add < 0");
- }
-}
-
-void handle_new_msg (struct channels *chans
- , struct ipc_clients *ap, struct ipc_clients *proc_to_read)
-{
- struct ipc_message m;
- memset (&m, 0, sizeof (struct ipc_message));
- int i;
- for (i = 0; i < proc_to_read->size; i++) {
- // printf ("loop handle_new_msg\n");
- if (ipc_server_read (proc_to_read->clients[i], &m) < 0) {
- handle_error("server_read < 0");
- }
-
- mprint_hexa ("msg received: ", (unsigned char *) m.payload, m.length);
-
- // close the client then delete it from the client array
- if (m.type == MSG_TYPE_CLOSE) {
- struct ipc_client *p = proc_to_read->clients[i];
-
- printf ("client %d disconnecting\n", p->proc_fd);
-
- // TODO: to test, unsubscribe when closing
- pubsubd_channels_unsubscribe_everywhere (chans, p);
-
- // close the connection to the client
- if (ipc_server_close_client (p) < 0)
- handle_error( "server_close_client < 0");
-
-
- // remove the client from the clientes list
- if (ipc_client_del (ap, p) < 0)
- handle_error( "ipc_client_del < 0");
- if (ipc_client_del (proc_to_read, p) < 0)
- handle_err( "handle_new_msg", "ipc_client_del < 0");
-
- ipc_message_free (&m);
-
- // free client
- free (p);
-
- i--;
- continue;
- }
-
- struct pubsub_msg m_data;
- memset (&m_data, 0, sizeof (struct pubsub_msg));
-
- pubsub_message_unserialize (&m_data, m.payload, m.length);
-
- if (m_data.type == PUBSUB_MSG_TYPE_SUB) {
- printf ("client %d subscribing to %s\n"
- , proc_to_read->clients[i]->proc_fd
- , m_data.chan);
- pubsubd_channels_subscribe (chans
- , m_data.chan, proc_to_read->clients[i]);
- }
-
- if (m_data.type == PUBSUB_MSG_TYPE_UNSUB) {
- printf ("client %d unsubscribing to %s\n"
- , proc_to_read->clients[i]->proc_fd
- , m_data.chan);
- pubsubd_channels_unsubscribe (chans
- , m_data.chan, proc_to_read->clients[i]);
- }
-
- if (m_data.type == PUBSUB_MSG_TYPE_PUB) {
- printf ("client %d publishing to %s\n"
- , proc_to_read->clients[i]->proc_fd
- , m_data.chan);
- struct channel *chan = pubsubd_channel_search (chans, m_data.chan);
- if (chan == NULL) {
- handle_err ("handle_new_msg", "publish on nonexistent channel");
- ipc_message_free (&m);
- return ;
- }
- pubsubd_send (chan->subs, &m_data);
- }
-
- pubsub_message_free (&m_data);
- ipc_message_free (&m);
- }
-}
-
-/*
- * main loop
- *
- * accept new application connections
- * read a message and send it back
- * close a connection if MSG_TYPE_CLOSE received
- */
-
void pubsubd_main_loop (struct ipc_service *srv, struct channels *chans)
{
int i, ret = 0;
- struct ipc_clients ap;
- memset(&ap, 0, sizeof(struct ipc_clients));
+ struct ipc_clients clients;
+ memset(&clients, 0, sizeof(struct ipc_clients));
struct ipc_clients proc_to_read;
memset(&proc_to_read, 0, sizeof(struct ipc_clients));
- while(1) {
- ret = ipc_server_select (&ap, srv, &proc_to_read);
+ struct ipc_event event;
+ memset(&event, 0, sizeof (struct ipc_event));
+ event.type = IPC_EVENT_TYPE_NOT_SET;
- if (ret == CONNECTION) {
- handle_new_connection (srv, &ap);
- } else if (ret == APPLICATION) {
- handle_new_msg (chans, &ap, &proc_to_read);
- } else { // both new connection and new msg from at least one client
- handle_new_connection (srv, &ap);
- handle_new_msg (chans, &ap, &proc_to_read);
- }
- ipc_clients_free (&proc_to_read);
+ int cpt = 0;
+
+ while(1) {
+ ret = ipc_service_poll_event (&clients, srv, &event);
+ if (ret != 0) {
+ handle_error("ipc_service_poll_event != 0");
+ // the application will shut down, and close the service
+ if (ipc_server_close (srv) < 0) {
+ handle_error("ipc_server_close < 0");
+ }
+ exit (EXIT_FAILURE);
+ }
+
+ switch (event.type) {
+ case IPC_EVENT_TYPE_CONNECTION:
+ {
+ cpt++;
+ struct ipc_client *cli = event.origin;
+ printf ("connection of client %d: %d clients connected\n", cli->proc_fd, cpt);
+ };
+ break;
+ case IPC_EVENT_TYPE_DISCONNECTION:
+ {
+ cpt--;
+ struct ipc_client *cli = event.origin;
+ printf ("disconnection of client %d: %d clients remaining\n", cli->proc_fd, cpt);
+
+ // TODO: to test, unsubscribe when closing
+ pubsubd_channels_unsubscribe_everywhere (chans, cli);
+
+ // free the ipc_client structure
+ free (event.origin);
+ };
+ break;
+ case IPC_EVENT_TYPE_MESSAGE:
+ {
+ struct ipc_message *m = event.m;
+ struct ipc_client *cli = event.origin;
+ if (m->length > 0) {
+ printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload);
+ }
+ // TODO: handle a message
+ // handle_new_msg (chans, &clients, &proc_to_read);
+
+ struct pubsub_msg pubsub_msg;
+ memset (&pubsub_msg, 0, sizeof (struct pubsub_msg));
+
+ pubsub_message_unserialize (&pubsub_msg, m->payload, m->length);
+
+ if (pubsub_msg.type == PUBSUB_MSG_TYPE_SUB) {
+ printf ("client %d subscribing to %s\n"
+ , cli->proc_fd
+ , pubsub_msg.chan);
+ pubsubd_channels_subscribe (chans
+ , pubsub_msg.chan, cli);
+ }
+
+ if (pubsub_msg.type == PUBSUB_MSG_TYPE_UNSUB) {
+ printf ("client %d unsubscribing to %s\n", cli->proc_fd, pubsub_msg.chan);
+ pubsubd_channels_unsubscribe (chans, pubsub_msg.chan, cli);
+ }
+
+ if (pubsub_msg.type == PUBSUB_MSG_TYPE_PUB) {
+ printf ("client %d publishing to %s\n", cli->proc_fd, pubsub_msg.chan);
+
+ struct channel *chan = pubsubd_channel_search (chans, pubsub_msg.chan);
+ if (chan == NULL) {
+ handle_err ("handle_new_msg", "publish on nonexistent channel");
+ ipc_message_empty (m);
+ continue;
+ }
+ pubsubd_send (chan->subs, &pubsub_msg);
+ }
+
+ pubsub_message_free (&pubsub_msg);
+ };
+ break;
+ case IPC_EVENT_TYPE_ERROR:
+ {
+ fprintf (stderr, "a problem happened with client %d\n"
+ , ((struct ipc_client*) event.origin)->proc_fd);
+ };
+ break;
+ default :
+ {
+ fprintf (stderr, "there must be a problem, event not set\n");
+ };
+ }
}
- for (i = 0; i < ap.size; i++) {
- if (ipc_server_close_client (ap.clients[i]) < 0) {
+ for (i = 0; i < clients.size; i++) {
+ if (ipc_server_close_client (clients.clients[i]) < 0) {
handle_error( "server_close_client < 0");
}
}