From 292cca45e0eb081205658e5c390a6a74762d3ab5 Mon Sep 17 00:00:00 2001 From: Karchnu Date: Tue, 30 Jun 2020 12:59:05 +0200 Subject: [PATCH] WIP: messages are sent and received OK --- src/communication.c | 70 ++++++++++++++++++++++++++++++++++++------- src/error.c | 4 ++- src/ipc.h | 6 +++- src/message.c | 11 +++++-- tests/func_02_pong.c | 6 ++-- tests/func_02_pongd.c | 5 +--- 6 files changed, 82 insertions(+), 20 deletions(-) diff --git a/src/communication.c b/src/communication.c index fcb4c17..caa29eb 100644 --- a/src/communication.c +++ b/src/communication.c @@ -13,8 +13,6 @@ // print structures #include "message.h" -struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m); - struct ipc_error service_path (char *path, const char *sname) { @@ -89,6 +87,13 @@ struct ipc_error ipc_server_init (struct ipc_ctx *ctx, const char *sname) // ipc_add allocate memory then copy the data of srv and pollfd in ctx. TEST_IPC_RR (ipc_add (ctx, &srv, &pollfd), "cannot add the server in the context"); +#if 0 + printf ("server init, listen on %ld fd: ", ctx->size); + for (size_t i = 0; i < ctx->size; i++) + printf (" %d", ctx->pollfd[i].fd); + printf ("\n"); +#endif + IPC_RETURN_NO_ERROR; } @@ -217,8 +222,6 @@ struct ipc_error ipc_accept_add (struct ipc_event *event, struct ipc_ctx *ctx, u ctx->pollfd[ctx->size -1].events = POLLIN; // Tell to poll(2) to watch for incoming data from this fd. ctx->cinfos[ctx->size -1].type = IPC_CONNECTION_TYPE_IPC; - ctx->size++; - // Set the event structure. uint32_t client_index = ctx->size - 1; IPC_EVENT_SET (event, IPC_EVENT_TYPE_CONNECTION, client_index, *client_fd, NULL); @@ -234,6 +237,8 @@ struct ipc_error ipc_read (const struct ipc_ctx *ctx, uint32_t index, struct ipc char *buf = NULL; size_t msize = IPC_MAX_MESSAGE_SIZE; + printf ("reading on the fd %d\n", ctx->pollfd[index].fd); + // On error or closed recipient, the buffer already freed. TEST_IPC_RETURN_ON_ERROR (usock_recv (ctx->pollfd[index].fd, &buf, &msize)); TEST_IPC_RETURN_ON_ERROR_FREE (ipc_message_format_read (m, buf, msize), buf); @@ -256,7 +261,7 @@ struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m) free (buf); } // what was sent != what should have been sent - T_R ((nbytes_sent != msize), IPC_ERROR_WRITE__NOT_ENOUGH_DATA); + T_R ((nbytes_sent != msize), IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA); IPC_RETURN_NO_ERROR; } @@ -264,6 +269,16 @@ struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m) // Put the message in the list of messages to send. struct ipc_error ipc_write (struct ipc_ctx *ctx, const struct ipc_message *m) { + int found = 0; + for (size_t i = 0; i < ctx->size; i++) { + if (ctx->pollfd[i].fd == m->fd) { + ctx->pollfd[i].events |= POLLOUT; + found = 1; + } + } + + T_R ((found == 0), IPC_ERROR_WRITE__FD_NOT_FOUND); + return ipc_messages_add (&ctx->tx, m); } @@ -272,8 +287,11 @@ struct ipc_error ipc_write (struct ipc_ctx *ctx, const struct ipc_message *m) */ struct ipc_error ipc_add ( struct ipc_ctx *ctx, struct ipc_connection_info *p, struct pollfd *pollfd) { - T_R ((ctx == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENTS); - T_R ((p == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENT); + T_R ((ctx == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENTS); + T_R ((p == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENT); + T_R ((pollfd == NULL), IPC_ERROR_ADD__NO_PARAM_POLLFD); + + printf ("ipc add: add fd %d\n", pollfd[0].fd); // Memory reallocation. ipc_ctx_new_alloc (ctx); @@ -366,7 +384,6 @@ struct ipc_error ipc_del_fd (struct ipc_ctx *ctx, int fd) } -// TODO struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index); struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index) @@ -388,9 +405,16 @@ struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx mfd = m->fd; printf ("Something to send on fd %d\n", mfd); if (txfd == mfd) { - printf ("The fd %d is available!\n", txfd); + if (m->payload == NULL) { + printf ("message payload is NULL\n"); + } + else { + printf ("The fd %d is available! Writing %d characters: [%d%d%d%d]\n" + , txfd, m->length + , m->payload[0], m->payload[1], m->payload[2], m->payload[3]); - printf ("TODO: write!\n"); + TEST_IPC_RR (ipc_write_fd (txfd, m), "cannot send a message to the client"); + } printf ("Removing the message\n"); ipc_messages_del (&ctx->tx, i); // remove the message indexed by i @@ -510,6 +534,8 @@ struct ipc_error handle_new_message (struct ipc_event *event, struct ipc_ctx *ct IPC_RETURN_NO_ERROR; } + // The message carries the fd it was received on. + m->fd = ctx->pollfd[index].fd; IPC_EVENT_SET (event, IPC_EVENT_TYPE_MESSAGE, index, ctx->pollfd[index].fd, m); IPC_RETURN_NO_ERROR; } @@ -524,6 +550,29 @@ struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event, int32_t n; +#if 1 + printf ("%ld fd for the poll(2) syscall:", ctx->size); + for (size_t i = 0; i < ctx->size; i++) { + printf (" %d(%d)", ctx->pollfd[i].fd, ctx->pollfd[i].events); + } + printf ("\n"); + + printf ("%ld messages still to write, for fd:", ctx->tx.size); + for (size_t i = 0; i < ctx->tx.size; i++) { + printf (" %d", ctx->tx.messages[i].fd); + } + printf ("\n"); +#endif + + for (size_t i = 0; i < ctx->tx.size; i++) { + for (size_t y = 0; y < ctx->size; y++) { + if (ctx->pollfd[y].fd == ctx->tx.messages[i].fd) { + ctx->pollfd[y].events |= POLLOUT; + break; + } + } + } + if ((n = poll(ctx->pollfd, ctx->size, *timer)) < 0) { printf("select error\n"); exit(1); // TODO FIXME @@ -548,6 +597,7 @@ struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event, // Something can be sent. if (ctx->pollfd[i].revents & POLLOUT) { + ctx->pollfd[i].events &= ~POLLOUT; return handle_writing_message (event, ctx, i); } diff --git a/src/error.c b/src/error.c index 6443dd4..b573c9b 100644 --- a/src/error.c +++ b/src/error.c @@ -29,7 +29,7 @@ static struct ipc_errors_verbose ipc_errors_verbose[] = { , {IPC_ERROR_PROVIDE_FD__SENDMSG , "ipc_provide_fd: sendmsg function"} , {IPC_ERROR_WRITE__NO_MESSAGE_PARAM, "ipc_write: no message param"} - , {IPC_ERROR_WRITE__NOT_ENOUGH_DATA , "ipc_write: no enough data sent"} + , {IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA , "ipc_write: no enough data sent"} , {IPC_ERROR_READ__NO_MESSAGE_PARAM , "ipc_read: no message param"} , {IPC_ERROR_HANDLE_MESSAGE__NOT_ENOUGH_MEMORY , "handle_message: not enough memory"} @@ -138,6 +138,8 @@ static struct ipc_errors_verbose ipc_errors_verbose[] = { , {IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES, "IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES"} , {IPC_ERROR_MESSAGE_DEL__INDEX_ERROR, "IPC_ERROR_MESSAGE_DEL__INDEX_ERROR"} , {IPC_ERROR_MESSAGE_DEL__EMPTY_LIST, "IPC_ERROR_MESSAGE_DEL__EMPTY_LIST"} + , {IPC_ERROR_ADD__NO_PARAM_POLLFD, "IPC_ERROR_ADD__NO_PARAM_POLLFD"} + , {IPC_ERROR_WRITE__FD_NOT_FOUND, "IPC_ERROR_WRITE__FD_NOT_FOUND"} }; const char *ipc_errors_get (enum ipc_error_code e) diff --git a/src/ipc.h b/src/ipc.h index ba5c055..1d91211 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -100,7 +100,7 @@ enum ipc_error_code { , IPC_ERROR_SERVER_INIT__NO_SERVER_NAME_PARAM = 9 , IPC_ERROR_SERVER_INIT__MALLOC = 10 , IPC_ERROR_WRITE__NO_MESSAGE_PARAM = 11 - , IPC_ERROR_WRITE__NOT_ENOUGH_DATA = 12 + , IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA = 12 , IPC_ERROR_READ__NO_MESSAGE_PARAM = 13 , IPC_ERROR_CONNECTION__NO_SERVICE_NAME = 15 , IPC_ERROR_CONNECTION__NO_ENVIRONMENT_PARAM = 16 @@ -190,6 +190,8 @@ enum ipc_error_code { , IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES = 100 , IPC_ERROR_MESSAGE_DEL__INDEX_ERROR = 101 , IPC_ERROR_MESSAGE_DEL__EMPTY_LIST = 102 + , IPC_ERROR_ADD__NO_PARAM_POLLFD = 103 + , IPC_ERROR_WRITE__FD_NOT_FOUND = 104 }; struct ipc_error { @@ -347,6 +349,8 @@ void ipc_messages_free (struct ipc_messages *); * non public functions **/ +struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m); + /** * Used by ipc_server_init and ipc_connection */ diff --git a/src/message.c b/src/message.c index 5cc9d63..8614a6f 100644 --- a/src/message.c +++ b/src/message.c @@ -163,8 +163,15 @@ struct ipc_error ipc_messages_add (struct ipc_messages *messages, const struct T_R ((messages->messages == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__EMPTY_LIST); - // NOT A DEEP COPY. - messages->messages[messages->size - 1] = *message; + // DEEP COPY. + messages->messages[messages->size -1] = *message; + messages->messages[messages->size -1].payload = malloc(message->length * sizeof (char)); + strncpy(messages->messages[messages->size -1].payload, message->payload, message->length); + + printf ("first message payload (%d): %s\n", message->length, message->payload); + printf ("second message payload (%d): %s\n" + , messages->messages[messages->size - 1].length + , messages->messages[messages->size - 1].payload); IPC_RETURN_NO_ERROR; } diff --git a/tests/func_02_pong.c b/tests/func_02_pong.c index 337a1a9..4859786 100644 --- a/tests/func_02_pong.c +++ b/tests/func_02_pong.c @@ -23,11 +23,13 @@ void non_interactive () int server_fd = ctx.pollfd[0].fd; - printf ("msg to send (%ld): %.*s\n", (ssize_t) strlen(MSG) +1, (int) strlen(MSG), MSG); + printf ("msg for fd %d to send (%ld): %.*s\n" + , server_fd + , (ssize_t) strlen(MSG) +1, (int) strlen(MSG), MSG); TEST_IPC_Q(ipc_message_format_data (&m, /* type */ 'a', MSG, (ssize_t) strlen(MSG) +1), EXIT_FAILURE); m.fd = server_fd; - TEST_IPC_Q(ipc_write (&ctx, &m), EXIT_FAILURE); + TEST_IPC_Q(ipc_write_fd (server_fd, &m), EXIT_FAILURE); TEST_IPC_Q(ipc_read (&ctx, 0 /* only one option */, &m), EXIT_FAILURE); printf ("msg recv: %s\n", m.payload); diff --git a/tests/func_02_pongd.c b/tests/func_02_pongd.c index bc5db55..539985b 100644 --- a/tests/func_02_pongd.c +++ b/tests/func_02_pongd.c @@ -20,10 +20,6 @@ struct ipc_ctx *ctx; void main_loop () { SECURE_DECLARATION(struct ipc_error, ret); - - ctx = malloc (sizeof (struct ipc_ctx)); - memset(ctx, 0, sizeof(struct ipc_ctx)); - SECURE_DECLARATION(struct ipc_event, event); int timer = 10000; @@ -66,6 +62,7 @@ void main_loop () printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload); } + m->fd = event.origin; ret = ipc_write (ctx, m); if (ret.error_code != IPC_ERROR_NONE) { PRINTERR(ret,"server write");