Things.
parent
f1790cf314
commit
8439cc0f18
|
@ -333,84 +333,26 @@ struct ipc_error handle_writing_message (struct ipc_event *event, struct ipc_ctx
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// new message
|
|
||||||
struct ipc_error handle_new_message (struct ipc_event *event, struct ipc_ctx *ctx, int index)
|
struct ipc_error handle_new_message (struct ipc_event *event, struct ipc_ctx *ctx, int index)
|
||||||
{
|
{
|
||||||
|
SECURE_DECLARATION (struct ipc_error, ret);
|
||||||
|
|
||||||
// If the socket is associated to another one for ipcd:
|
{ /* First test: should the message be switched? */
|
||||||
// read and write automatically and provide a new IPC_EVENT_TYPE indicating the switch.
|
ret = fd_switching (event, ctx, index);
|
||||||
if (ctx->switchdb.size > 0) {
|
if (ret.error_code != IPC_ERROR_FD_SWITCHING__NO_FD_RECORD) {
|
||||||
int talkingfd = ctx->pollfd[index].fd;
|
return ret;
|
||||||
int correspondingfd = ipc_switching_get (&ctx->switchdb, talkingfd);
|
|
||||||
if (correspondingfd != -1) {
|
|
||||||
char *buf = NULL;
|
|
||||||
size_t msize = 0;
|
|
||||||
|
|
||||||
{ /** Some macros use "ret" as a variable name, so this is to be sure. */
|
|
||||||
struct ipc_error ret = usock_recv (talkingfd, &buf, &msize);
|
|
||||||
if (ret.error_code != IPC_ERROR_NONE && ret.error_code != IPC_ERROR_CLOSED_RECIPIENT) {
|
|
||||||
if (buf != NULL)
|
|
||||||
free (buf);
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_ERROR, index, talkingfd, NULL)
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** There is a message, send it to the corresponding fd **/
|
|
||||||
if (msize > 0) {
|
|
||||||
size_t nbytes_sent = 0;
|
|
||||||
TEST_IPC_RETURN_ON_ERROR_FREE (usock_send (correspondingfd, buf, msize, &nbytes_sent), buf);
|
|
||||||
|
|
||||||
if (nbytes_sent != msize) {
|
|
||||||
// LOG_ERROR ("wrote not enough data from %d to fd %d", talkingfd, correspondingfd);
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_ERROR, index, ctx->pollfd[index].fd, NULL);
|
|
||||||
IPC_RETURN_NO_ERROR; // FIXME: return something else, maybe?
|
|
||||||
}
|
|
||||||
// LOG_DEBUG ("received a message on fd %d => switch to fd %d", talkingfd, correspondingfd);
|
|
||||||
|
|
||||||
if (buf != NULL)
|
|
||||||
free (buf);
|
|
||||||
|
|
||||||
// Everything is OK: inform ipcd of a successful transfer.
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_SWITCH, index, ctx->pollfd[index].fd, NULL);
|
|
||||||
IPC_RETURN_NO_ERROR;
|
|
||||||
} else if (msize == 0) {
|
|
||||||
int delfd;
|
|
||||||
|
|
||||||
delfd = ipc_switching_del (&ctx->switchdb, talkingfd);
|
|
||||||
if (delfd >= 0) {
|
|
||||||
close (delfd);
|
|
||||||
ipc_del_fd (ctx, delfd);
|
|
||||||
}
|
|
||||||
|
|
||||||
close (talkingfd);
|
|
||||||
ipc_del_fd (ctx, talkingfd);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (delfd >= 0) {
|
|
||||||
LOG_DEBUG ("disconnection of %d (and related fd %d)", talkingfd, delfd);
|
|
||||||
} else {
|
|
||||||
LOG_DEBUG ("disconnection of %d", talkingfd);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_DISCONNECTION, index, talkingfd, NULL);
|
|
||||||
IPC_RETURN_ERROR (IPC_ERROR_CLOSED_RECIPIENT);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// no treatment of the socket if external socket
|
// No treatment of the socket if external socket: the libipc user should handle it himself.
|
||||||
if (ctx->cinfos[index].type == IPC_CONNECTION_TYPE_EXTERNAL) {
|
if (ctx->cinfos[index].type == IPC_CONNECTION_TYPE_EXTERNAL) {
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_EXTRA_SOCKET, index, ctx->pollfd[index].fd, NULL);
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_EXTRA_SOCKET, index, ctx->pollfd[index].fd, NULL);
|
||||||
IPC_RETURN_NO_ERROR;
|
IPC_RETURN_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// listen to what they have to say (disconnection or message)
|
// Listen to what they have to say (disconnection or message)
|
||||||
// then add a client to `event`, the ipc_event structure
|
// then add a client to `event`, the ipc_event structure.
|
||||||
SECURE_DECLARATION (struct ipc_error, ret);
|
|
||||||
struct ipc_message *m = NULL;
|
struct ipc_message *m = NULL;
|
||||||
|
|
||||||
SECURE_BUFFER_HEAP_ALLOCATION_R (m, sizeof (struct ipc_message),, IPC_ERROR_HANDLE_MESSAGE__NOT_ENOUGH_MEMORY);
|
SECURE_BUFFER_HEAP_ALLOCATION_R (m, sizeof (struct ipc_message),, IPC_ERROR_HANDLE_MESSAGE__NOT_ENOUGH_MEMORY);
|
||||||
|
|
||||||
// current talking client
|
// current talking client
|
||||||
|
@ -450,7 +392,7 @@ struct ipc_error handle_new_message (struct ipc_event *event, struct ipc_ctx *ct
|
||||||
}
|
}
|
||||||
|
|
||||||
/* timer is in ms */
|
/* timer is in ms */
|
||||||
struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event, int *timer)
|
struct ipc_error ipc_wait_event (struct ipc_ctx *ctx, struct ipc_event *event, int *timer)
|
||||||
{
|
{
|
||||||
T_R ((ctx == NULL), IPC_ERROR_WAIT_EVENT__NO_CLIENTS_PARAM);
|
T_R ((ctx == NULL), IPC_ERROR_WAIT_EVENT__NO_CLIENTS_PARAM);
|
||||||
T_R ((event == NULL), IPC_ERROR_WAIT_EVENT__NO_EVENT_PARAM);
|
T_R ((event == NULL), IPC_ERROR_WAIT_EVENT__NO_EVENT_PARAM);
|
||||||
|
@ -479,52 +421,3 @@ struct ipc_error ipc_events_loop (struct ipc_ctx *ctx, struct ipc_event *event,
|
||||||
if ((n = poll(ctx->pollfd, ctx->size, *timer)) < 0) {
|
if ((n = poll(ctx->pollfd, ctx->size, *timer)) < 0) {
|
||||||
IPC_RETURN_ERROR (IPC_ERROR_WAIT_EVENT__POLL);
|
IPC_RETURN_ERROR (IPC_ERROR_WAIT_EVENT__POLL);
|
||||||
}
|
}
|
||||||
|
|
||||||
gettimeofday(&tv_2, NULL);
|
|
||||||
|
|
||||||
int nb_sec_ms = (tv_2.tv_sec - tv_1.tv_sec) * 1000;
|
|
||||||
int nb_usec_ms = (tv_2.tv_usec - tv_1.tv_usec) / 1000;
|
|
||||||
int time_elapsed_ms = (nb_sec_ms + nb_usec_ms);
|
|
||||||
|
|
||||||
// Handle memory fuckery, 'cause low level programming is fun.
|
|
||||||
if (time_elapsed_ms >= *timer) {
|
|
||||||
*timer = 0;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
*timer -= time_elapsed_ms;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timeout.
|
|
||||||
if (n == 0) {
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_TIMER, 0, 0, NULL);
|
|
||||||
IPC_RETURN_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i <= ctx->size; i++) {
|
|
||||||
// Something to read or connection.
|
|
||||||
if (ctx->pollfd[i].revents & POLLIN) {
|
|
||||||
// In case there is something to read for the server socket: new client.
|
|
||||||
if (ctx->cinfos[i].type == IPC_CONNECTION_TYPE_SERVER) {
|
|
||||||
return ipc_accept_add (event, ctx, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
return handle_new_message (event, ctx, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Something can be sent.
|
|
||||||
if (ctx->pollfd[i].revents & POLLOUT) {
|
|
||||||
ctx->pollfd[i].events &= ~POLLOUT;
|
|
||||||
return handle_writing_message (event, ctx, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disconnection.
|
|
||||||
if (ctx->pollfd[i].revents & POLLHUP) {
|
|
||||||
/** IPC_EVENT_SET: event, type, index, fd, message */
|
|
||||||
IPC_EVENT_SET (event, IPC_EVENT_TYPE_DISCONNECTION, i, ctx->pollfd[i].fd, NULL);
|
|
||||||
return ipc_close (ctx, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
} /** for loop: end of the message handling */
|
|
||||||
|
|
||||||
IPC_RETURN_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
|
@ -204,6 +204,10 @@ struct ipc_error {
|
||||||
struct ipc_switching {
|
struct ipc_switching {
|
||||||
int orig;
|
int orig;
|
||||||
int dest;
|
int dest;
|
||||||
|
int (*orig_in) (int origin_fd, struct message *m);
|
||||||
|
int (*orig_out) (int origin_fd, struct message *m);
|
||||||
|
int (*dest_in) (int origin_fd, struct message *m);
|
||||||
|
int (*dest_out) (int origin_fd, struct message *m);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ipc_switchings {
|
struct ipc_switchings {
|
||||||
|
@ -300,7 +304,7 @@ enum ipc_connection_types {
|
||||||
* main public functions
|
* main public functions
|
||||||
**/
|
**/
|
||||||
|
|
||||||
struct ipc_error ipc_events_loop (struct ipc_ctx *, struct ipc_event *, int *timer);
|
struct ipc_error ipc_wait_event (struct ipc_ctx *, struct ipc_event *, int *timer);
|
||||||
|
|
||||||
struct ipc_error ipc_server_init (struct ipc_ctx *ctx, const char *sname);
|
struct ipc_error ipc_server_init (struct ipc_ctx *ctx, const char *sname);
|
||||||
struct ipc_error ipc_connection (struct ipc_ctx *ctx, const char *sname);
|
struct ipc_error ipc_connection (struct ipc_ctx *ctx, const char *sname);
|
||||||
|
@ -313,7 +317,7 @@ void ipc_ctx_free (struct ipc_ctx *ctx);
|
||||||
struct ipc_error ipc_read (const struct ipc_ctx *, uint32_t index, struct ipc_message *m);
|
struct ipc_error ipc_read (const struct ipc_ctx *, uint32_t index, struct ipc_message *m);
|
||||||
struct ipc_error ipc_write (struct ipc_ctx *, const struct ipc_message *m);
|
struct ipc_error ipc_write (struct ipc_ctx *, const struct ipc_message *m);
|
||||||
|
|
||||||
struct ipc_error ipc_wait_event (struct ipc_ctx *, struct ipc_event *, int *timer);
|
struct ipc_error fd_switching (struct ipc_event *event, struct ipc_ctx *ctx, int index);
|
||||||
|
|
||||||
// store and remove only pointers on allocated structures
|
// store and remove only pointers on allocated structures
|
||||||
struct ipc_error ipc_add (struct ipc_ctx *, struct ipc_connection_info *, struct pollfd *);
|
struct ipc_error ipc_add (struct ipc_ctx *, struct ipc_connection_info *, struct pollfd *);
|
||||||
|
|
|
@ -90,6 +90,7 @@ void ipc_switching_add (struct ipc_switchings *is, int orig, int dest)
|
||||||
is->collection = realloc (is->collection, sizeof (struct ipc_switching) * (is->size + 1));
|
is->collection = realloc (is->collection, sizeof (struct ipc_switching) * (is->size + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** TODO: less brutal approach */
|
||||||
if (is->collection == NULL) {
|
if (is->collection == NULL) {
|
||||||
fprintf (stderr, __FILE__ " error realloc line %d", __LINE__);
|
fprintf (stderr, __FILE__ " error realloc line %d", __LINE__);
|
||||||
exit (EXIT_FAILURE);
|
exit (EXIT_FAILURE);
|
||||||
|
@ -99,6 +100,11 @@ void ipc_switching_add (struct ipc_switchings *is, int orig, int dest)
|
||||||
|
|
||||||
is->collection[is->size - 1].orig = orig;
|
is->collection[is->size - 1].orig = orig;
|
||||||
is->collection[is->size - 1].dest = dest;
|
is->collection[is->size - 1].dest = dest;
|
||||||
|
|
||||||
|
is->collection[is->size - 1].orig_in = NULL;
|
||||||
|
is->collection[is->size - 1].dest_in = NULL;
|
||||||
|
is->collection[is->size - 1].orig_out = NULL;
|
||||||
|
is->collection[is->size - 1].dest_out = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ipc_switching_del (struct ipc_switchings *is, int fd)
|
int ipc_switching_del (struct ipc_switchings *is, int fd)
|
||||||
|
@ -156,3 +162,73 @@ void ipc_switching_free (struct ipc_switchings *is)
|
||||||
}
|
}
|
||||||
is->size = 0;
|
is->size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* switching_messages allows to send messages from a fd to another.
|
||||||
|
*/
|
||||||
|
struct ipc_error fd_switching (struct ipc_event *event, struct ipc_ctx *ctx, int index)
|
||||||
|
{
|
||||||
|
// If the socket is associated to another one for ipcd:
|
||||||
|
// read and write automatically and provide a new IPC_EVENT_TYPE indicating the switch.
|
||||||
|
T_R ((ctx->switchdb.size == 0), IPC_ERROR_FD_SWITCHING__NO_FD_RECORD);
|
||||||
|
|
||||||
|
int talkingfd = ctx->pollfd[index].fd;
|
||||||
|
int correspondingfd = ipc_switching_get (&ctx->switchdb, talkingfd);
|
||||||
|
|
||||||
|
T_R ((correspondingfd == -1), IPC_ERROR_FD_SWITCHING__NO_FD_RECORD);
|
||||||
|
|
||||||
|
char *buf = NULL;
|
||||||
|
size_t msize = 0;
|
||||||
|
|
||||||
|
{ /** Some macros use "ret" as a variable name, so this is to be sure. */
|
||||||
|
struct ipc_error ret = usock_recv (talkingfd, &buf, &msize);
|
||||||
|
if (ret.error_code != IPC_ERROR_NONE && ret.error_code != IPC_ERROR_CLOSED_RECIPIENT) {
|
||||||
|
if (buf != NULL)
|
||||||
|
free (buf);
|
||||||
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_ERROR, index, talkingfd, NULL)
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** There is a message, send it to the corresponding fd **/
|
||||||
|
if (msize > 0) {
|
||||||
|
size_t nbytes_sent = 0;
|
||||||
|
TEST_IPC_RETURN_ON_ERROR_FREE (usock_send (correspondingfd, buf, msize, &nbytes_sent), buf);
|
||||||
|
|
||||||
|
if (nbytes_sent != msize) {
|
||||||
|
// LOG_ERROR ("wrote not enough data from %d to fd %d", talkingfd, correspondingfd);
|
||||||
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_ERROR, index, ctx->pollfd[index].fd, NULL);
|
||||||
|
IPC_RETURN_NO_ERROR; // FIXME: return something else, maybe?
|
||||||
|
}
|
||||||
|
// LOG_DEBUG ("received a message on fd %d => switch to fd %d", talkingfd, correspondingfd);
|
||||||
|
|
||||||
|
if (buf != NULL)
|
||||||
|
free (buf);
|
||||||
|
|
||||||
|
// Everything is OK: inform ipcd of a successful transfer.
|
||||||
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_SWITCH, index, ctx->pollfd[index].fd, NULL);
|
||||||
|
IPC_RETURN_NO_ERROR;
|
||||||
|
} else if (msize == 0) {
|
||||||
|
int delfd;
|
||||||
|
|
||||||
|
delfd = ipc_switching_del (&ctx->switchdb, talkingfd);
|
||||||
|
if (delfd >= 0) {
|
||||||
|
close (delfd);
|
||||||
|
ipc_del_fd (ctx, delfd);
|
||||||
|
}
|
||||||
|
|
||||||
|
close (talkingfd);
|
||||||
|
ipc_del_fd (ctx, talkingfd);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if (delfd >= 0) {
|
||||||
|
LOG_DEBUG ("disconnection of %d (and related fd %d)", talkingfd, delfd);
|
||||||
|
} else {
|
||||||
|
LOG_DEBUG ("disconnection of %d", talkingfd);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
IPC_EVENT_SET (event, IPC_EVENT_TYPE_DISCONNECTION, index, talkingfd, NULL);
|
||||||
|
IPC_RETURN_ERROR (IPC_ERROR_CLOSED_RECIPIENT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Reference in New Issue