WIP: messages are sent and received OK
parent
a6ca358847
commit
292cca45e0
|
@ -13,8 +13,6 @@
|
||||||
// print structures
|
// print structures
|
||||||
#include "message.h"
|
#include "message.h"
|
||||||
|
|
||||||
struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m);
|
|
||||||
|
|
||||||
struct ipc_error
|
struct ipc_error
|
||||||
service_path (char *path, const char *sname)
|
service_path (char *path, const char *sname)
|
||||||
{
|
{
|
||||||
|
@ -89,6 +87,13 @@ struct ipc_error ipc_server_init (struct ipc_ctx *ctx, const char *sname)
|
||||||
// ipc_add allocate memory then copy the data of srv and pollfd in ctx.
|
// ipc_add allocate memory then copy the data of srv and pollfd in ctx.
|
||||||
TEST_IPC_RR (ipc_add (ctx, &srv, &pollfd), "cannot add the server in the context");
|
TEST_IPC_RR (ipc_add (ctx, &srv, &pollfd), "cannot add the server in the context");
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
printf ("server init, listen on %ld fd: ", ctx->size);
|
||||||
|
for (size_t i = 0; i < ctx->size; i++)
|
||||||
|
printf (" %d", ctx->pollfd[i].fd);
|
||||||
|
printf ("\n");
|
||||||
|
#endif
|
||||||
|
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,8 +222,6 @@ struct ipc_error ipc_accept_add (struct ipc_event *event, struct ipc_ctx *ctx, u
|
||||||
ctx->pollfd[ctx->size -1].events = POLLIN; // Tell to poll(2) to watch for incoming data from this fd.
|
ctx->pollfd[ctx->size -1].events = POLLIN; // Tell to poll(2) to watch for incoming data from this fd.
|
||||||
ctx->cinfos[ctx->size -1].type = IPC_CONNECTION_TYPE_IPC;
|
ctx->cinfos[ctx->size -1].type = IPC_CONNECTION_TYPE_IPC;
|
||||||
|
|
||||||
ctx->size++;
|
|
||||||
|
|
||||||
// Set the event structure.
|
// Set the event structure.
|
||||||
uint32_t client_index = ctx->size - 1;
|
uint32_t client_index = ctx->size - 1;
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_CONNECTION, client_index, *client_fd, NULL);
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_CONNECTION, client_index, *client_fd, NULL);
|
||||||
|
@ -234,6 +237,8 @@ struct ipc_error ipc_read (const struct ipc_ctx *ctx, uint32_t index, struct ipc
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
size_t msize = IPC_MAX_MESSAGE_SIZE;
|
size_t msize = IPC_MAX_MESSAGE_SIZE;
|
||||||
|
|
||||||
|
printf ("reading on the fd %d\n", ctx->pollfd[index].fd);
|
||||||
|
|
||||||
// On error or closed recipient, the buffer already freed.
|
// On error or closed recipient, the buffer already freed.
|
||||||
TEST_IPC_RETURN_ON_ERROR (usock_recv (ctx->pollfd[index].fd, &buf, &msize));
|
TEST_IPC_RETURN_ON_ERROR (usock_recv (ctx->pollfd[index].fd, &buf, &msize));
|
||||||
TEST_IPC_RETURN_ON_ERROR_FREE (ipc_message_format_read (m, buf, msize), buf);
|
TEST_IPC_RETURN_ON_ERROR_FREE (ipc_message_format_read (m, buf, msize), buf);
|
||||||
|
@ -256,7 +261,7 @@ struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m)
|
||||||
free (buf);
|
free (buf);
|
||||||
}
|
}
|
||||||
// what was sent != what should have been sent
|
// what was sent != what should have been sent
|
||||||
T_R ((nbytes_sent != msize), IPC_ERROR_WRITE__NOT_ENOUGH_DATA);
|
T_R ((nbytes_sent != msize), IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA);
|
||||||
|
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -264,6 +269,16 @@ struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m)
|
||||||
// Put the message in the list of messages to send.
|
// Put the message in the list of messages to send.
|
||||||
struct ipc_error ipc_write (struct ipc_ctx *ctx, const struct ipc_message *m)
|
struct ipc_error ipc_write (struct ipc_ctx *ctx, const struct ipc_message *m)
|
||||||
{
|
{
|
||||||
|
int found = 0;
|
||||||
|
for (size_t i = 0; i < ctx->size; i++) {
|
||||||
|
if (ctx->pollfd[i].fd == m->fd) {
|
||||||
|
ctx->pollfd[i].events |= POLLOUT;
|
||||||
|
found = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
T_R ((found == 0), IPC_ERROR_WRITE__FD_NOT_FOUND);
|
||||||
|
|
||||||
return ipc_messages_add (&ctx->tx, m);
|
return ipc_messages_add (&ctx->tx, m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,8 +287,11 @@ struct ipc_error ipc_write (struct ipc_ctx *ctx, const struct ipc_message *m)
|
||||||
*/
|
*/
|
||||||
struct ipc_error ipc_add ( struct ipc_ctx *ctx, struct ipc_connection_info *p, struct pollfd *pollfd)
|
struct ipc_error ipc_add ( struct ipc_ctx *ctx, struct ipc_connection_info *p, struct pollfd *pollfd)
|
||||||
{
|
{
|
||||||
T_R ((ctx == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENTS);
|
T_R ((ctx == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENTS);
|
||||||
T_R ((p == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENT);
|
T_R ((p == NULL), IPC_ERROR_ADD__NO_PARAM_CLIENT);
|
||||||
|
T_R ((pollfd == NULL), IPC_ERROR_ADD__NO_PARAM_POLLFD);
|
||||||
|
|
||||||
|
printf ("ipc add: add fd %d\n", pollfd[0].fd);
|
||||||
|
|
||||||
// Memory reallocation.
|
// Memory reallocation.
|
||||||
ipc_ctx_new_alloc (ctx);
|
ipc_ctx_new_alloc (ctx);
|
||||||
|
@ -366,7 +384,6 @@ struct ipc_error ipc_del_fd (struct ipc_ctx *ctx, int fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index);
|
struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index);
|
||||||
|
|
||||||
struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index)
|
struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx *ctx, uint32_t index)
|
||||||
|
@ -388,9 +405,16 @@ struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx
|
||||||
mfd = m->fd;
|
mfd = m->fd;
|
||||||
printf ("Something to send on fd %d\n", mfd);
|
printf ("Something to send on fd %d\n", mfd);
|
||||||
if (txfd == mfd) {
|
if (txfd == mfd) {
|
||||||
printf ("The fd %d is available!\n", txfd);
|
if (m->payload == NULL) {
|
||||||
|
printf ("message payload is NULL\n");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
printf ("The fd %d is available! Writing %d characters: [%d%d%d%d]\n"
|
||||||
|
, txfd, m->length
|
||||||
|
, m->payload[0], m->payload[1], m->payload[2], m->payload[3]);
|
||||||
|
|
||||||
printf ("TODO: write!\n");
|
TEST_IPC_RR (ipc_write_fd (txfd, m), "cannot send a message to the client");
|
||||||
|
}
|
||||||
|
|
||||||
printf ("Removing the message\n");
|
printf ("Removing the message\n");
|
||||||
ipc_messages_del (&ctx->tx, i); // remove the message indexed by i
|
ipc_messages_del (&ctx->tx, i); // remove the message indexed by i
|
||||||
|
@ -510,6 +534,8 @@ struct ipc_error handle_new_message (struct ipc_event *event, struct ipc_ctx *ct
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The message carries the fd it was received on.
|
||||||
|
m->fd = ctx->pollfd[index].fd;
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_MESSAGE, index, ctx->pollfd[index].fd, m);
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_MESSAGE, index, ctx->pollfd[index].fd, m);
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -524,6 +550,29 @@ struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event,
|
||||||
|
|
||||||
int32_t n;
|
int32_t n;
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
printf ("%ld fd for the poll(2) syscall:", ctx->size);
|
||||||
|
for (size_t i = 0; i < ctx->size; i++) {
|
||||||
|
printf (" %d(%d)", ctx->pollfd[i].fd, ctx->pollfd[i].events);
|
||||||
|
}
|
||||||
|
printf ("\n");
|
||||||
|
|
||||||
|
printf ("%ld messages still to write, for fd:", ctx->tx.size);
|
||||||
|
for (size_t i = 0; i < ctx->tx.size; i++) {
|
||||||
|
printf (" %d", ctx->tx.messages[i].fd);
|
||||||
|
}
|
||||||
|
printf ("\n");
|
||||||
|
#endif
|
||||||
|
|
||||||
|
for (size_t i = 0; i < ctx->tx.size; i++) {
|
||||||
|
for (size_t y = 0; y < ctx->size; y++) {
|
||||||
|
if (ctx->pollfd[y].fd == ctx->tx.messages[i].fd) {
|
||||||
|
ctx->pollfd[y].events |= POLLOUT;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ((n = poll(ctx->pollfd, ctx->size, *timer)) < 0) {
|
if ((n = poll(ctx->pollfd, ctx->size, *timer)) < 0) {
|
||||||
printf("select error\n");
|
printf("select error\n");
|
||||||
exit(1); // TODO FIXME
|
exit(1); // TODO FIXME
|
||||||
|
@ -548,6 +597,7 @@ struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event,
|
||||||
|
|
||||||
// Something can be sent.
|
// Something can be sent.
|
||||||
if (ctx->pollfd[i].revents & POLLOUT) {
|
if (ctx->pollfd[i].revents & POLLOUT) {
|
||||||
|
ctx->pollfd[i].events &= ~POLLOUT;
|
||||||
return handle_writing_message (event, ctx, i);
|
return handle_writing_message (event, ctx, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ static struct ipc_errors_verbose ipc_errors_verbose[] = {
|
||||||
, {IPC_ERROR_PROVIDE_FD__SENDMSG , "ipc_provide_fd: sendmsg function"}
|
, {IPC_ERROR_PROVIDE_FD__SENDMSG , "ipc_provide_fd: sendmsg function"}
|
||||||
|
|
||||||
, {IPC_ERROR_WRITE__NO_MESSAGE_PARAM, "ipc_write: no message param"}
|
, {IPC_ERROR_WRITE__NO_MESSAGE_PARAM, "ipc_write: no message param"}
|
||||||
, {IPC_ERROR_WRITE__NOT_ENOUGH_DATA , "ipc_write: no enough data sent"}
|
, {IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA , "ipc_write: no enough data sent"}
|
||||||
, {IPC_ERROR_READ__NO_MESSAGE_PARAM , "ipc_read: no message param"}
|
, {IPC_ERROR_READ__NO_MESSAGE_PARAM , "ipc_read: no message param"}
|
||||||
|
|
||||||
, {IPC_ERROR_HANDLE_MESSAGE__NOT_ENOUGH_MEMORY , "handle_message: not enough memory"}
|
, {IPC_ERROR_HANDLE_MESSAGE__NOT_ENOUGH_MEMORY , "handle_message: not enough memory"}
|
||||||
|
@ -138,6 +138,8 @@ static struct ipc_errors_verbose ipc_errors_verbose[] = {
|
||||||
, {IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES, "IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES"}
|
, {IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES, "IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES"}
|
||||||
, {IPC_ERROR_MESSAGE_DEL__INDEX_ERROR, "IPC_ERROR_MESSAGE_DEL__INDEX_ERROR"}
|
, {IPC_ERROR_MESSAGE_DEL__INDEX_ERROR, "IPC_ERROR_MESSAGE_DEL__INDEX_ERROR"}
|
||||||
, {IPC_ERROR_MESSAGE_DEL__EMPTY_LIST, "IPC_ERROR_MESSAGE_DEL__EMPTY_LIST"}
|
, {IPC_ERROR_MESSAGE_DEL__EMPTY_LIST, "IPC_ERROR_MESSAGE_DEL__EMPTY_LIST"}
|
||||||
|
, {IPC_ERROR_ADD__NO_PARAM_POLLFD, "IPC_ERROR_ADD__NO_PARAM_POLLFD"}
|
||||||
|
, {IPC_ERROR_WRITE__FD_NOT_FOUND, "IPC_ERROR_WRITE__FD_NOT_FOUND"}
|
||||||
};
|
};
|
||||||
|
|
||||||
const char *ipc_errors_get (enum ipc_error_code e)
|
const char *ipc_errors_get (enum ipc_error_code e)
|
||||||
|
|
|
@ -100,7 +100,7 @@ enum ipc_error_code {
|
||||||
, IPC_ERROR_SERVER_INIT__NO_SERVER_NAME_PARAM = 9
|
, IPC_ERROR_SERVER_INIT__NO_SERVER_NAME_PARAM = 9
|
||||||
, IPC_ERROR_SERVER_INIT__MALLOC = 10
|
, IPC_ERROR_SERVER_INIT__MALLOC = 10
|
||||||
, IPC_ERROR_WRITE__NO_MESSAGE_PARAM = 11
|
, IPC_ERROR_WRITE__NO_MESSAGE_PARAM = 11
|
||||||
, IPC_ERROR_WRITE__NOT_ENOUGH_DATA = 12
|
, IPC_ERROR_WRITE_FD__NOT_ENOUGH_DATA = 12
|
||||||
, IPC_ERROR_READ__NO_MESSAGE_PARAM = 13
|
, IPC_ERROR_READ__NO_MESSAGE_PARAM = 13
|
||||||
, IPC_ERROR_CONNECTION__NO_SERVICE_NAME = 15
|
, IPC_ERROR_CONNECTION__NO_SERVICE_NAME = 15
|
||||||
, IPC_ERROR_CONNECTION__NO_ENVIRONMENT_PARAM = 16
|
, IPC_ERROR_CONNECTION__NO_ENVIRONMENT_PARAM = 16
|
||||||
|
@ -190,6 +190,8 @@ enum ipc_error_code {
|
||||||
, IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES = 100
|
, IPC_ERROR_DEL_MESSAGE_TO_SEND__NO_PARAM_MESSAGES = 100
|
||||||
, IPC_ERROR_MESSAGE_DEL__INDEX_ERROR = 101
|
, IPC_ERROR_MESSAGE_DEL__INDEX_ERROR = 101
|
||||||
, IPC_ERROR_MESSAGE_DEL__EMPTY_LIST = 102
|
, IPC_ERROR_MESSAGE_DEL__EMPTY_LIST = 102
|
||||||
|
, IPC_ERROR_ADD__NO_PARAM_POLLFD = 103
|
||||||
|
, IPC_ERROR_WRITE__FD_NOT_FOUND = 104
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ipc_error {
|
struct ipc_error {
|
||||||
|
@ -347,6 +349,8 @@ void ipc_messages_free (struct ipc_messages *);
|
||||||
* non public functions
|
* non public functions
|
||||||
**/
|
**/
|
||||||
|
|
||||||
|
struct ipc_error ipc_write_fd (int fd, const struct ipc_message *m);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used by ipc_server_init and ipc_connection
|
* Used by ipc_server_init and ipc_connection
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -163,8 +163,15 @@ struct ipc_error ipc_messages_add (struct ipc_messages *messages, const struct
|
||||||
|
|
||||||
T_R ((messages->messages == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__EMPTY_LIST);
|
T_R ((messages->messages == NULL), IPC_ERROR_ADD_MESSAGE_TO_SEND__EMPTY_LIST);
|
||||||
|
|
||||||
// NOT A DEEP COPY.
|
// DEEP COPY.
|
||||||
messages->messages[messages->size - 1] = *message;
|
messages->messages[messages->size -1] = *message;
|
||||||
|
messages->messages[messages->size -1].payload = malloc(message->length * sizeof (char));
|
||||||
|
strncpy(messages->messages[messages->size -1].payload, message->payload, message->length);
|
||||||
|
|
||||||
|
printf ("first message payload (%d): %s\n", message->length, message->payload);
|
||||||
|
printf ("second message payload (%d): %s\n"
|
||||||
|
, messages->messages[messages->size - 1].length
|
||||||
|
, messages->messages[messages->size - 1].payload);
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,11 +23,13 @@ void non_interactive ()
|
||||||
|
|
||||||
int server_fd = ctx.pollfd[0].fd;
|
int server_fd = ctx.pollfd[0].fd;
|
||||||
|
|
||||||
printf ("msg to send (%ld): %.*s\n", (ssize_t) strlen(MSG) +1, (int) strlen(MSG), MSG);
|
printf ("msg for fd %d to send (%ld): %.*s\n"
|
||||||
|
, server_fd
|
||||||
|
, (ssize_t) strlen(MSG) +1, (int) strlen(MSG), MSG);
|
||||||
TEST_IPC_Q(ipc_message_format_data (&m, /* type */ 'a', MSG, (ssize_t) strlen(MSG) +1), EXIT_FAILURE);
|
TEST_IPC_Q(ipc_message_format_data (&m, /* type */ 'a', MSG, (ssize_t) strlen(MSG) +1), EXIT_FAILURE);
|
||||||
|
|
||||||
m.fd = server_fd;
|
m.fd = server_fd;
|
||||||
TEST_IPC_Q(ipc_write (&ctx, &m), EXIT_FAILURE);
|
TEST_IPC_Q(ipc_write_fd (server_fd, &m), EXIT_FAILURE);
|
||||||
TEST_IPC_Q(ipc_read (&ctx, 0 /* only one option */, &m), EXIT_FAILURE);
|
TEST_IPC_Q(ipc_read (&ctx, 0 /* only one option */, &m), EXIT_FAILURE);
|
||||||
|
|
||||||
printf ("msg recv: %s\n", m.payload);
|
printf ("msg recv: %s\n", m.payload);
|
||||||
|
|
|
@ -20,10 +20,6 @@ struct ipc_ctx *ctx;
|
||||||
void main_loop ()
|
void main_loop ()
|
||||||
{
|
{
|
||||||
SECURE_DECLARATION(struct ipc_error, ret);
|
SECURE_DECLARATION(struct ipc_error, ret);
|
||||||
|
|
||||||
ctx = malloc (sizeof (struct ipc_ctx));
|
|
||||||
memset(ctx, 0, sizeof(struct ipc_ctx));
|
|
||||||
|
|
||||||
SECURE_DECLARATION(struct ipc_event, event);
|
SECURE_DECLARATION(struct ipc_event, event);
|
||||||
|
|
||||||
int timer = 10000;
|
int timer = 10000;
|
||||||
|
@ -66,6 +62,7 @@ void main_loop ()
|
||||||
printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload);
|
printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m->fd = event.origin;
|
||||||
ret = ipc_write (ctx, m);
|
ret = ipc_write (ctx, m);
|
||||||
if (ret.error_code != IPC_ERROR_NONE) {
|
if (ret.error_code != IPC_ERROR_NONE) {
|
||||||
PRINTERR(ret,"server write");
|
PRINTERR(ret,"server write");
|
||||||
|
|
Reference in New Issue