diff --git a/examples/pongd.c b/examples/pongd.c index fa80447..f7cfed8 100644 --- a/examples/pongd.c +++ b/examples/pongd.c @@ -32,7 +32,7 @@ void main_loop () event.type = IPC_EVENT_TYPE_NOT_SET; while (1) { - // ipc_service_poll_event provides one event at a time + // ipc_wait_event provides one event at a time // warning: event->m is free'ed if not NULL TEST_IPC_WAIT_EVENT_Q (ipc_wait_event (clients, srv, &event, &timer), EXIT_FAILURE); diff --git a/src/communication.c b/src/communication.c index bd5eeb1..2cc16e0 100644 --- a/src/communication.c +++ b/src/communication.c @@ -4,6 +4,8 @@ #include #include // error numbers +#include + #include #include @@ -29,20 +31,6 @@ struct ipc_error service_path (char *path, const char *sname, int32_t index, int IPC_RETURN_NO_ERROR; } -static int32_t get_max_fd (struct ipc_connection_infos *cinfos) -{ - size_t i; - int32_t max = 0; - - for (i = 0; i < cinfos->size; i++) { - if (cinfos->cinfos[i]->fd > max) { - max = cinfos->cinfos[i]->fd; - } - } - - return max; -} - struct ipc_error ipc_server_init (char **env, struct ipc_connection_info *srv, const char *sname) { T_R ((env == NULL), IPC_ERROR_SERVER_INIT__NO_ENVIRONMENT_PARAM); @@ -77,7 +65,7 @@ struct ipc_error ipc_server_init (char **env, struct ipc_connection_info *srv, c memcpy (srv->spath, buf, s); srv->spath[s] = '\0'; // to be sure - TEST_IPC_RETURN_ON_ERROR (usock_init (&srv->fd, srv->spath)); + TEST_IPC_RETURN_ON_ERROR (usock_init (&srv->pollfd.fd, srv->spath)); IPC_RETURN_NO_ERROR; } @@ -92,7 +80,7 @@ struct ipc_error ipc_contact_networkd (struct ipc_connection_info *srv, const ch char *networkvar = getenv ("IPC_NETWORK"); if (networkvar == NULL) { - srv->fd = 0; + srv->pollfd.fd = 0; IPC_RETURN_NO_ERROR; } // TODO: is there another, more interesting way to do this? @@ -108,7 +96,7 @@ struct ipc_error ipc_contact_networkd (struct ipc_connection_info *srv, const ch if (strncmp (networkvar, sname, strlen (sname)) != 0 && strstr (networkvar, columnthensname) == NULL) { // printf ("sname %s not found\n", sname); - srv->fd = 0; + srv->pollfd.fd = 0; IPC_RETURN_NO_ERROR; } // printf ("(;)sname %s found\n", sname); @@ -133,7 +121,7 @@ struct ipc_error ipc_contact_networkd (struct ipc_connection_info *srv, const ch TEST_IPC_RR (ipc_write_fd (networkdfd, &msg), "cannot send a message to networkd"); - struct ipc_error ret = ipc_receive_fd (networkdfd, &srv->fd); + struct ipc_error ret = ipc_receive_fd (networkdfd, &srv->pollfd.fd); if (ret.error_code == IPC_ERROR_NONE) { usock_close (networkdfd); } @@ -150,11 +138,11 @@ struct ipc_error ipc_connection (char **env, struct ipc_connection_info *srv, co TEST_IPC_P (ipc_contact_networkd (srv, sname), "error during networkd connection"); // if networkd did not initiate the connection - if (srv->fd <= 0) { + if (srv->pollfd.fd <= 0) { // gets the service path SECURE_BUFFER_DECLARATION (char, buf, PATH_MAX); TEST_IPC_RR (service_path (buf, sname, srv->index, srv->version), "cannot get server path"); - TEST_IPC_RETURN_ON_ERROR (usock_connect (&srv->fd, buf)); + TEST_IPC_RETURN_ON_ERROR (usock_connect (&srv->pollfd.fd, buf)); } IPC_RETURN_NO_ERROR; @@ -162,7 +150,7 @@ struct ipc_error ipc_connection (char **env, struct ipc_connection_info *srv, co struct ipc_error ipc_server_close (struct ipc_connection_info *srv) { - usock_close (srv->fd); + usock_close (srv->pollfd.fd); struct ipc_error ret = usock_remove (srv->spath); if (srv->spath != NULL) { free (srv->spath); @@ -173,15 +161,18 @@ struct ipc_error ipc_server_close (struct ipc_connection_info *srv) struct ipc_error ipc_close (struct ipc_connection_info *p) { - return usock_close (p->fd); + return usock_close (p->pollfd.fd); } -struct ipc_error ipc_accept (struct ipc_connection_info *srv, struct ipc_connection_info *p) +struct ipc_error ipc_accept ( + struct ipc_connection_info *srv + , struct ipc_connection_info *p) { T_R ((srv == NULL), IPC_ERROR_ACCEPT__NO_SERVICE_PARAM); T_R ((p == NULL), IPC_ERROR_ACCEPT__NO_CLIENT_PARAM); - TEST_IPC_RR (usock_accept (srv->fd, &p->fd), "cannot accept IPC connection"); + TEST_IPC_RR (usock_accept (srv->pollfd.fd, &p->pollfd.fd), "cannot accept IPC connection"); + p->pollfd.events = POLLIN; // Tell to poll(2) to watch for incoming data from this fd. p->type = IPC_CONNECTION_TYPE_IPC; IPC_RETURN_NO_ERROR; @@ -196,7 +187,7 @@ struct ipc_error ipc_read (const struct ipc_connection_info *p, struct ipc_messa size_t msize = IPC_MAX_MESSAGE_SIZE; // on error or closed recipient, the buffer already freed - TEST_IPC_RETURN_ON_ERROR (usock_recv (p->fd, &buf, &msize)); + TEST_IPC_RETURN_ON_ERROR (usock_recv (p->pollfd.fd, &buf, &msize)); TEST_IPC_RETURN_ON_ERROR_FREE (ipc_message_format_read (m, buf, msize), buf); free (buf); @@ -226,45 +217,39 @@ struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m) struct ipc_error ipc_write (const struct ipc_connection_info *p, const struct ipc_message *m) { - return ipc_write_fd (p->fd, m); + return ipc_write_fd (p->pollfd.fd, m); } -struct ipc_error handle_new_connection (struct ipc_connection_info *cinfo, struct ipc_connection_infos *cinfos - , struct ipc_connection_info **new_client) +// New connection from a client. +struct ipc_error handle_connection (struct ipc_event *event + , struct ipc_connection_infos *cinfos + , struct ipc_connection_info *cinfo) { T_R ((cinfo == NULL), IPC_ERROR_HANDLE_NEW_CONNECTION__NO_CINFO_PARAM); T_R ((cinfos == NULL), IPC_ERROR_HANDLE_NEW_CONNECTION__NO_CINFOS_PARAM); - SECURE_BUFFER_HEAP_ALLOCATION_R (*new_client, sizeof (struct ipc_connection_info),, - IPC_ERROR_HANDLE_NEW_CONNECTION__MALLOC); - - TEST_IPC_RR (ipc_accept (cinfo, *new_client), "cannot accept the client during handle_new_connection"); - TEST_IPC_RR (ipc_add (cinfos, *new_client), "cannot add the new accepted client"); - - IPC_RETURN_NO_ERROR; -} - -// new connection from a client -struct ipc_error handle_connection (struct ipc_event *event, struct ipc_connection_infos *cinfos - , struct ipc_connection_info *cinfo) -{ - // connection struct ipc_connection_info *new_client = NULL; - TEST_IPC_RR (handle_new_connection (cinfo, cinfos, &new_client), "cannot add new client"); + SECURE_BUFFER_HEAP_ALLOCATION_R (new_client, sizeof (struct ipc_connection_info),, + IPC_ERROR_HANDLE_NEW_CONNECTION__MALLOC); + + TEST_IPC_RR (ipc_accept (cinfo, new_client), "cannot accept the client during handle_new_connection"); + TEST_IPC_RR (ipc_add (cinfos, new_client), "cannot add the new accepted client"); IPC_EVENT_SET (event, IPC_EVENT_TYPE_CONNECTION, NULL, new_client); IPC_RETURN_NO_ERROR; } // new message -struct ipc_error handle_message (struct ipc_event *event, struct ipc_connection_infos *cinfos +struct ipc_error handle_message ( + struct ipc_event *event + , struct ipc_connection_infos *cinfos , struct ipc_connection_info *pc, struct ipc_switchings *switchdb) { // if the socket is associated to another one for networkd // read and write automatically and provide a new IPC_EVENT_TYPE indicating the switch if (switchdb != NULL) { - int talkingfd = pc->fd; + int talkingfd = pc->pollfd.fd; int correspondingfd = ipc_switching_get (switchdb, talkingfd); if (correspondingfd != -1) { char *buf = NULL; @@ -365,33 +350,142 @@ struct ipc_error handle_message (struct ipc_event *event, struct ipc_connection_ IPC_RETURN_NO_ERROR; } -struct ipc_error ipc_wait_event_networkd (struct ipc_connection_infos *cinfos - , struct ipc_connection_info *cinfo // NULL for clients - , struct ipc_event *event, struct ipc_switchings *switchdb +struct ipc_error ipc_events_loop ( + struct ipc_connection_infos *cinfos + , struct ipc_connection_info *cinfo // NULL for clients + , struct ipc_event *event + , struct ipc_switchings *switchdb + , struct ipc_messages *messages_to_send , double *timer) { T_R ((cinfos == NULL), IPC_ERROR_WAIT_EVENT__NO_CLIENTS_PARAM); - T_R ((event == NULL), IPC_ERROR_WAIT_EVENT__NO_EVENT_PARAM); + T_R ((event == NULL), IPC_ERROR_WAIT_EVENT__NO_EVENT_PARAM); IPC_EVENT_CLEAN (event); - size_t i, j; - /* master file descriptor list */ - fd_set master; - fd_set readf; + // struct ipc_connection_info { + // uint32_t version; + // uint32_t index; + // struct pollfd pollfd; + // char type; // server, client, arbitrary fd + // char *spath; // max size: PATH_MAX + // }; - /* clear the master and temp sets */ - FD_ZERO (&master); - FD_ZERO (&readf); + // struct ipc_connection_infos { + // struct ipc_connection_info **cinfos; + // size_t size; + // }; + + // struct ipc_event { + // enum ipc_event_type type; + // struct ipc_connection_info *origin; + // void *m; // message pointer + // }; + + // enum ipc_event_type { + // IPC_EVENT_TYPE_NOT_SET = 0 + // , IPC_EVENT_TYPE_ERROR = 1 + // , IPC_EVENT_TYPE_EXTRA_SOCKET = 2 // Message coming from a non-IPC fd. + // , IPC_EVENT_TYPE_SWITCH = 3 // Message coming from a switched fd. + // , IPC_EVENT_TYPE_CONNECTION = 4 + // , IPC_EVENT_TYPE_DISCONNECTION = 5 + // , IPC_EVENT_TYPE_MESSAGE = 6 + // , IPC_EVENT_TYPE_LOOKUP = 7 + // , IPC_EVENT_TYPE_TIMER = 8 + // }; + + // struct ipc_messages { + // struct ipc_message **messages; + // size_t size; + // }; + + // struct ipc_message { + // char type; + // char user_type; + // uint32_t length; + // char *payload; + // int fd; // File descriptor concerned about this message. + // }; + + // struct pollfd { + // int fd; /* file descriptor */ + // short events; /* requested events */ + // short revents; /* returned events */ + // }; + // [struct pollfd] + +#if 0 + int i, n, listenfd, server_fd, nread; + char buf[MAXLINE]; + uid_t uid; + struct pollfd *fds_to_poll; + int timeout = (int) timer * 1000; // timer = seconds, poll's timeout = milliseconds + + // Generate the array of pollfd structure once, then use it each time. + if ( (fds_to_poll = malloc(sizeof(struct pollfd))) == NULL) + err_sys("malloc error"); + + client_add(listenfd, 0); /* we use [0] for listenfd */ + fds_to_poll[0].fd = listenfd; + fds_to_poll[0].events = POLLIN; + + if ((n = poll(fds_to_poll, cinfos->size, INFTIM)) < 0) + { + log_sys("select error"); + } + + for (i = 0; i <= cinfos->size; i++) { + if (fds_to_poll[i].revents & POLLIN) + { + // In case there is something to read for the server socket: new client. + if (cinfo != NULL && i == cinfo->pollfd) { + // In case there is a new client connecting. + /* accept new client request */ + if ( (server_fd = serv_accept(listenfd, &uid)) < 0) + { + printf ("serv_accept error: %d\n", server_fd); + } + + // i : Client number + return handle_connection (event, cinfos, cinfo); + } + + for (j = 0; j < cinfos->size; j++) { + if (i == (size_t) cinfos->cinfos[j]->pollfd.fd) { + return handle_message (event, cinfos, cinfos->cinfos[j], switchdb); + } + } + } + + if ( (server_fd = client[i].fd) < 0) + continue; + if (fds_to_poll[i].revents & POLLHUP) + goto hungup; + else if (fds_to_poll[i].revents & POLLIN) { + /* read argument buffer from client */ + if ( (nread = read(server_fd, buf, MAXLINE)) < 0) + log_sys("read error on fd %d", server_fd); + else if (nread == 0) { +hungup: + log_msg("closed: uid %d, fd %d", + client[i].uid, server_fd); + client_del(server_fd); /* client has closed conn */ + fds_to_poll[i].fd = -1; + close(server_fd); + } else /* process client's rquest */ + request(buf, nread, server_fd, client[i].uid); + } + } /** for loop: end of the message handling */ +#else +#endif /* maximum file descriptor number */ /* keep track of the biggest file descriptor */ - int32_t fdmax = get_max_fd (cinfos); /* listening socket descriptor */ int32_t listener; if (cinfo != NULL) { - listener = cinfo->fd; + listener = cinfo->pollfd.fd; /* add the listener to the master set */ FD_SET (listener, &master); @@ -402,7 +496,7 @@ struct ipc_error ipc_wait_event_networkd (struct ipc_connection_infos *cinfos } for (i = 0; i < cinfos->size; i++) { - FD_SET (cinfos->cinfos[i]->fd, &master); + FD_SET (cinfos->cinfos[i]->pollfd.fd, &master); } readf = master; @@ -432,7 +526,7 @@ struct ipc_error ipc_wait_event_networkd (struct ipc_connection_infos *cinfos return handle_connection (event, cinfos, cinfo); } else { for (j = 0; j < cinfos->size; j++) { - if (i == (size_t) cinfos->cinfos[j]->fd) { + if (i == (size_t) cinfos->cinfos[j]->pollfd.fd) { return handle_message (event, cinfos, cinfos->cinfos[j], switchdb); } } @@ -443,22 +537,110 @@ struct ipc_error ipc_wait_event_networkd (struct ipc_connection_infos *cinfos IPC_RETURN_NO_ERROR; } -struct ipc_error ipc_wait_event (struct ipc_connection_infos *cinfos +struct ipc_error ipc_wait_event_networkd ( + struct ipc_connection_infos *cinfos + , struct ipc_connection_info *cinfo // NULL for clients + , struct ipc_event *event + , struct ipc_switchings *switchdb + , double *timer) +{ + T_R ((cinfos == NULL), IPC_ERROR_WAIT_EVENT__NO_CLIENTS_PARAM); + T_R ((event == NULL), IPC_ERROR_WAIT_EVENT__NO_EVENT_PARAM); + + IPC_EVENT_CLEAN (event); + + size_t i, j; + /* master file descriptor list */ + fd_set master; + fd_set readf; + + /* clear the master and temp sets */ + FD_ZERO (&master); + FD_ZERO (&readf); + + /* maximum file descriptor number */ + /* keep track of the biggest file descriptor */ + int32_t fdmax = get_max_fd (cinfos); + + /* listening socket descriptor */ + int32_t listener; + if (cinfo != NULL) { + listener = cinfo->pollfd.fd; + + /* add the listener to the master set */ + FD_SET (listener, &master); + + /* if listener is max fd */ + if (fdmax < listener) + fdmax = listener; + } + + for (i = 0; i < cinfos->size; i++) { + FD_SET (cinfos->cinfos[i]->pollfd.fd, &master); + } + + readf = master; + + struct timeval *ptimeout = NULL; + SECURE_DECLARATION (struct timeval, timeout); + + if (timer != NULL && *timer > 0.0) { + timeout.tv_sec = (long) *timer; + timeout.tv_usec = (long) ((long)((*timer) * 1000000) % 1000000); + ptimeout = &timeout; + } + + T_PERROR_RIPC ((select (fdmax + 1, &readf, NULL, NULL, ptimeout) == -1), "select", IPC_ERROR_WAIT_EVENT__SELECT); + + if (ptimeout != NULL) { + *timer = (double) timeout.tv_sec + (timeout.tv_usec / 1000000.0); + if (*timer == 0) { + IPC_EVENT_SET (event, IPC_EVENT_TYPE_TIMER, NULL, NULL); + IPC_RETURN_NO_ERROR; + } + } + + for (i = 0; i <= (size_t) fdmax; i++) { + if (FD_ISSET (i, &readf)) { + if (cinfo != NULL && i == (size_t) listener) { + return handle_connection (event, cinfos, cinfo); + } else { + for (j = 0; j < cinfos->size; j++) { + if (i == (size_t) cinfos->cinfos[j]->pollfd.fd) { + return handle_message (event, cinfos, cinfos->cinfos[j], switchdb); + } + } + } + } + } + + IPC_RETURN_NO_ERROR; +} + +struct ipc_error ipc_wait_event ( + struct ipc_connection_infos *cinfos , struct ipc_connection_info *cinfo // NULL for clients , struct ipc_event *event, double *timer) { return ipc_wait_event_networkd (cinfos, cinfo, event, NULL, timer); } -// store and remove only pointers on allocated structures -struct ipc_error ipc_add (struct ipc_connection_infos *cinfos, struct ipc_connection_info *p) +/** + * PERFORMANCE POINT: + * Realloc is performed at each new user. There is plenty of room for improvement, + * for example by managing allocations of thousands of structures at once. + * WARNING: Store and remove only pointers on allocated structures. + */ +struct ipc_error ipc_add ( + struct ipc_connection_infos *cinfos + , struct ipc_connection_info *p) { T_R ((cinfos == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENTS); T_R ((p == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENT); cinfos->size++; + // In case this is the first allocation. if (cinfos->size == 1 && cinfos->cinfos == NULL) { - // first allocation SECURE_BUFFER_HEAP_ALLOCATION_R (cinfos->cinfos, sizeof (struct ipc_connection_info),, IPC_ERROR_ADD__MALLOC); } else { @@ -471,16 +653,20 @@ struct ipc_error ipc_add (struct ipc_connection_infos *cinfos, struct ipc_connec IPC_RETURN_NO_ERROR; } -struct ipc_error ipc_del (struct ipc_connection_infos *cinfos, struct ipc_connection_info *p) +struct ipc_error ipc_del ( + struct ipc_connection_infos *cinfos + , struct ipc_connection_info *p) { - T_R ((cinfos == NULL), IPC_ERROR_DEL__NO_CLIENTS_PARAM); - T_R ((p == NULL), IPC_ERROR_DEL__NO_CLIENT_PARAM); + T_R ((cinfos == NULL), IPC_ERROR_DEL__NO_CLIENTS_PARAM); + T_R ((p == NULL), IPC_ERROR_DEL__NO_CLIENT_PARAM); T_R ((cinfos->cinfos == NULL), IPC_ERROR_DEL__EMPTY_LIST); size_t i; for (i = 0; i < cinfos->size; i++) { + // WARNING: The test is performed on the pointers of both structures, + // this is efficient but it doesn't work if the p structure is a copy. if (cinfos->cinfos[i] == p) { - // TODO: possible memory leak if the ipc_connection_info is not free'ed + // TODO: possible memory leak if the ipc_connection_info is not deeply free'ed. cinfos->cinfos[i] = cinfos->cinfos[cinfos->size - 1]; cinfos->size--; if (cinfos->size == 0) { @@ -535,7 +721,11 @@ struct ipc_error ipc_connection_gen (struct ipc_connection_info *cinfo cinfo->type = type; cinfo->version = version; cinfo->index = index; - cinfo->fd = fd; + // cinfo->fd = fd; + + cinfo.pollfd.fd = fd; + cinfo.pollfd.events = POLLIN; + // cinfo.pollfd.revents == returned events IPC_RETURN_NO_ERROR; } @@ -563,8 +753,8 @@ struct ipc_error ipc_del_fd (struct ipc_connection_infos *cinfos, int fd) size_t i; for (i = 0; i < cinfos->size; i++) { - if (cinfos->cinfos[i]->fd == fd) { - cinfos->cinfos[i]->fd = -1; + if (cinfos->cinfos[i]->pollfd.fd == fd) { + cinfos->cinfos[i]->pollfd.fd = -1; free (cinfos->cinfos[i]); cinfos->size--; if (cinfos->size == 0) { diff --git a/src/ipc.h b/src/ipc.h index a28ac20..109fd6a 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -189,7 +189,7 @@ const char *ipc_errors_get (enum ipc_error_code e); struct ipc_connection_info { uint32_t version; uint32_t index; - int32_t fd; + struct pollfd pollfd; char type; // server, client, arbitrary fd char *spath; // max size: PATH_MAX }; @@ -199,11 +199,17 @@ struct ipc_connection_infos { size_t size; }; +struct ipc_messages { + struct ipc_message **messages; + size_t size; +}; + struct ipc_message { char type; char user_type; uint32_t length; char *payload; + int fd; // File descriptor concerned about this message. }; struct ipc_event { @@ -286,6 +292,9 @@ struct ipc_error ipc_message_format_data (struct ipc_message *m, char utype, con struct ipc_error ipc_message_format_server_close (struct ipc_message *m); struct ipc_error ipc_message_empty (struct ipc_message *m); +struct ipc_error ipc_messages_add (struct ipc_messages *, struct ipc_message *); +void ipc_messages_free (struct ipc_messages *); + // Switch cases macros // print on error #define ERROR_CASE(e,f,m) case e : { fprintf (stderr, "function %s: %s", f, m); } break; diff --git a/src/message.c b/src/message.c index 07d7ea3..625ac7b 100644 --- a/src/message.c +++ b/src/message.c @@ -145,3 +145,37 @@ struct ipc_error ipc_message_empty (struct ipc_message *m) IPC_RETURN_NO_ERROR; } + +// store and remove only pointers on allocated structures +struct ipc_error ipc_messages_add (struct ipc_messages *messages, struct ipc_message *message) +{ + T_R (( messages == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__NO_PARAM_MESSAGES); + T_R (( message == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__NO_PARAM_MESSAGE); + + messages->size++; + if (messages->size == 1 && messages->messages == NULL) { + // first allocation + SECURE_BUFFER_HEAP_ALLOCATION_R (messages->messages, sizeof (struct ipc_message),, + IPC_ERROR_ADD_MESSAGE_TO_SEND__MALLOC); + } else { + messages->messages = realloc (messages->messages, sizeof (struct ipc_message) * messages->size); + } + + T_R ((messages->messages == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__EMPTY_LIST); + + messages->messages[messages->size - 1] = p; + IPC_RETURN_NO_ERROR; +} + +void ipc_messages_free (struct ipc_messages *messages) +{ + if (messages != NULL) + { + if (messages->messages != NULL) + { + free(messages->messages); + messages->messages = 0; + } + free(messages); + } +}