Archived
3
0

ipc.h, event, server loop()

This commit is contained in:
Philippe PITTOLI 2018-10-28 17:09:35 +01:00
parent cf59401ef7
commit 64f1977eb2
8 changed files with 304 additions and 156 deletions

View File

@ -1,6 +1,7 @@
#include "communication.h"
#include "utils.h"
#include "error.h"
#include "event.h"
#include <assert.h>
#include <stdio.h>
@ -126,9 +127,8 @@ int ipc_application_write (struct ipc_service *srv, const struct ipc_message *m)
/*calculer le max filedescriptor*/
static int getMaxFd(struct ipc_clients *clients)
static int get_max_fd_from_ipc_clients_ (struct ipc_clients *clients)
{
int i;
int max = 0;
@ -141,6 +141,20 @@ static int getMaxFd(struct ipc_clients *clients)
return max;
}
static int get_max_fd_from_ipc_services_ (struct ipc_services *services)
{
int i;
int max = 0;
for (i = 0; i < services->size; i++ ) {
if (services->services[i]->service_fd > max) {
max = services->services[i]->service_fd;
}
}
return max;
}
/*
* ipc_server_select prend en parametre
* * un tableau de client qu'on écoute
@ -182,7 +196,7 @@ int ipc_server_select (struct ipc_clients *clients, struct ipc_service *srv
}
/* keep track of the biggest file descriptor */
fdmax = getMaxFd(clients) > srv->service_fd ? getMaxFd(clients) : srv->service_fd;
fdmax = get_max_fd_from_ipc_clients_ (clients) > srv->service_fd ? get_max_fd_from_ipc_clients_ (clients) : srv->service_fd;
// printf ("loop ipc_server_select main_loop\n");
readf = master;
@ -211,21 +225,6 @@ int ipc_server_select (struct ipc_clients *clients, struct ipc_service *srv
return 0;
}
/*calculer le max filedescriptor*/
static int getMaxFdServices(struct ipc_services *services)
{
int i;
int max = 0;
for (i = 0; i < services->size; i++ ) {
if (services->services[i]->service_fd > max) {
max = services->services[i]->service_fd;
}
}
return max;
}
/*
* ipc_application_select prend en parametre
* * un tableau de server qu'on écoute
@ -261,7 +260,7 @@ int ipc_application_select (struct ipc_services *services, struct ipc_services *
}
/* keep track of the biggest file descriptor */
fdmax = getMaxFdServices(services);
fdmax = get_max_fd_from_ipc_services_ (services);
// printf ("loop ipc_server_select main_loop\n");
readf = master;
@ -285,3 +284,131 @@ int ipc_application_select (struct ipc_services *services, struct ipc_services *
return 0;
}
int handle_new_connection (struct ipc_service *srv
, struct ipc_clients *clients
, struct ipc_client **new_client)
{
*new_client = malloc(sizeof(struct ipc_client));
memset(*new_client, 0, sizeof(struct ipc_client));
if (ipc_server_accept (srv, *new_client) < 0) {
handle_error("server_accept < 0");
return 1;
} else {
printf("new connection\n");
}
if (ipc_client_add (clients, *new_client) < 0) {
handle_error("ipc_client_add < 0");
return 1;
}
return 0;
}
int ipc_service_loop (struct ipc_clients *clients, struct ipc_service *srv
, struct ipc_event *event)
{
assert (clients != NULL);
IPC_EVENT_CLEAN(event);
int i, j;
/* master file descriptor list */
fd_set master;
fd_set readf;
/* maximum file descriptor number */
int fdmax;
/* listening socket descriptor */
int listener = srv->service_fd;
/* clear the master and temp sets */
FD_ZERO(&master);
FD_ZERO(&readf);
/* add the listener to the master set */
FD_SET(listener, &master);
for (i=0; i < clients->size; i++) {
FD_SET(clients->clients[i]->proc_fd, &master);
}
/* keep track of the biggest file descriptor */
fdmax = get_max_fd_from_ipc_clients_ (clients) > srv->service_fd ? get_max_fd_from_ipc_clients_ (clients) : srv->service_fd;
// printf ("loop ipc_server_select main_loop\n");
readf = master;
if(select(fdmax+1, &readf, NULL, NULL, NULL) == -1) {
perror("select");
return -1;
}
/*run through the existing connections looking for data to be read*/
for (i = 0; i <= fdmax; i++) {
// printf ("loop ipc_server_select inner loop\n");
if (FD_ISSET(i, &readf)) {
if (i == listener) {
// connection
struct ipc_client *new_client = NULL;
handle_new_connection (srv, clients, &new_client);
IPC_EVENT_SET (event, IPC_EVENT_TYPE_CONNECTION, NULL, new_client);
return 0;
} else {
for(j = 0; j < clients->size; j++) {
if(i == clients->clients[j]->proc_fd ) {
// listen to what they have to say (disconnection or message)
// then add a client to `event`, the ipc_event structure
int ret = 0;
struct ipc_message *m = NULL;
m = malloc (sizeof(struct ipc_message));
if (m == NULL) {
return IPC_ERROR_NOT_ENOUGH_MEMORY;
}
memset (m, 0, sizeof (struct ipc_message));
// current talking client
struct ipc_client *pc = clients->clients[j];
ret = ipc_server_read (pc, m);
if (ret < 0) {
handle_err ("ipc_service_loop", "ipc_server_read < 0");
ipc_message_empty (m);
free (m);
IPC_EVENT_SET(event, IPC_EVENT_TYPE_ERROR, NULL, pc);
return IPC_ERROR_READ;
}
// disconnection: close the client then delete it from clients
if (ret == 1) {
if (ipc_server_close_client (pc) < 0) {
handle_err( "ipc_service_loop", "ipc_server_close_client < 0");
}
if (ipc_client_del (clients, pc) < 0) {
handle_err( "ipc_service_loop", "ipc_client_del < 0");
}
ipc_message_empty (m);
free (m);
IPC_EVENT_SET(event, IPC_EVENT_TYPE_DISCONNECTION, NULL, pc);
// warning: do not forget to free the ipc_client structure
return 0;
}
// we received a new message from a client
IPC_EVENT_SET (event, IPC_EVENT_TYPE_MESSAGE, m, pc);
return 0;
}
}
}
}
}
return 0;
}
int ipc_application_loop (struct ipc_services *services)
{
return 0;
}

View File

@ -8,6 +8,7 @@
#include "message.h"
#include "client.h"
#include "event.h"
#define COMMUNICATION_VERSION 1
@ -32,6 +33,9 @@ int ipc_server_write (const struct ipc_client *, const struct ipc_message *m);
int ipc_server_select (struct ipc_clients * clients, struct ipc_service *srv
, struct ipc_clients *active_clients, int *new_connection);
int ipc_service_loop (struct ipc_clients *clients, struct ipc_service *srv
, struct ipc_event *event);
// APPLICATION
// Initialize connection with unix socket

View File

@ -3,8 +3,11 @@
#include "logger.h"
#define IPC_ERROR_NOT_ENOUGH_MEMORY 100
#define IPC_ERROR_WRONG_PARAMETERS 101
enum ipc_errors {
IPC_ERROR_NOT_ENOUGH_MEMORY
, IPC_ERROR_WRONG_PARAMETERS
, IPC_ERROR_READ
};
// #define IPC_WITH_ERRORS 3

35
core/event.h Normal file
View File

@ -0,0 +1,35 @@
#ifndef __IPC_EVENT__
#define __IPC_EVENT__
#include "message.h"
enum ipc_event_type {
IPC_EVENT_TYPE_NOT_SET
, IPC_EVENT_TYPE_ERROR
, IPC_EVENT_TYPE_CONNECTION
, IPC_EVENT_TYPE_DISCONNECTION
, IPC_EVENT_TYPE_MESSAGE
};
struct ipc_event {
enum ipc_event_type type;
void* origin; // currently used as an client or service pointer
void* m; // message pointer
};
#define IPC_EVENT_SET(pevent,type_,message_,origin_) {\
pevent->type = type_; \
pevent->m = message_; \
pevent->origin = origin_; \
};
#define IPC_EVENT_CLEAN(pevent) {\
pevent->type = IPC_EVENT_TYPE_NOT_SET;\
if (pevent->m != NULL) {\
ipc_message_empty (pevent->m);\
free(pevent->m);\
pevent->m = NULL;\
}\
};
#endif

15
core/ipc.h Normal file
View File

@ -0,0 +1,15 @@
#ifndef __IPC_H__
#define __IPC_H__
#include "client.h"
#include "communication.h"
#include "error.h"
#include "event.h"
#include "ipc.h"
#include "logger.h"
#include "message.h"
#include "queue.h"
#include "usocket.h"
#include "utils.h"
#endif

View File

@ -57,7 +57,6 @@ int usock_recv (const int fd, char **buf, ssize_t *len)
memcpy (&msize, *buf + 1, sizeof msize);
}
}
printf("msize = %u\n", msize);
assert (msize < IPC_MAX_MESSAGE_SIZE);
msize_read += ret - IPC_HEADER_SIZE;

View File

@ -45,7 +45,6 @@ void non_interactive (char *env[])
printf ("msg recv: %s\n", m.payload);
ipc_message_empty (&m);
sleep(10);
if (ipc_application_close (&srv) < 0) {
handle_err("main", "application_close < 0");
exit (EXIT_FAILURE);

View File

@ -1,6 +1,5 @@
#include "../../core/communication.h"
#include "../../core/client.h"
#include "../../core/error.h"
#include "../../core/ipc.h"
#include <signal.h>
#include <sys/socket.h>
#include <sys/un.h>
@ -11,147 +10,110 @@
int cpt = 0;
struct ipc_service *srv = 0;
struct ipc_clients *clients;
void handle_new_connection (struct ipc_clients *clients)
{
struct ipc_client *p = malloc(sizeof(struct ipc_client));
memset(p, 0, sizeof(struct ipc_client));
if (ipc_server_accept (srv, p) < 0) {
handle_error("server_accept < 0");
} else {
printf("new connection\n");
}
if (ipc_client_add (clients, p) < 0) {
handle_error("ipc_client_add < 0");
}
cpt++;
printf ("%d client(s)\n", cpt);
}
void handle_new_msg (struct ipc_clients *clients, struct ipc_clients *clients_talking)
{
int ret = 0;
struct ipc_message m;
memset (&m, 0, sizeof (struct ipc_message));
int i = 0;
for (i = 0; i < clients_talking->size; i++) {
// printf ("loop handle_new_msg\n");
// current talking client
struct ipc_client *pc = clients_talking->clients[i];
ret = ipc_server_read (pc, &m);
if (ret < 0) {
handle_error("server_read < 0");
}
// close the client then delete it from clients
if (ret == 1) {
cpt--;
printf ("disconnection => %d client(s) remaining\n", cpt);
if (ipc_server_close_client (pc) < 0) {
handle_err( "handle_new_msg", "server_close_client < 0");
}
if (ipc_client_del (clients, pc) < 0) {
handle_err( "handle_new_msg", "ipc_client_del < 0");
}
if (ipc_client_del (clients_talking, pc) < 0) {
handle_err( "handle_new_msg", "ipc_client_del < 0");
}
i--;
// free the ipc_client structure
free (pc);
continue;
}
if (m.type == MSG_TYPE_SERVER_CLOSE) {
// free remaining clients
for (int y = 0; y < clients->size ; y++) {
struct ipc_client *cli = clients->clients[y];
// TODO: replace with specific ipc_client_empty function
if (cli != NULL)
free (cli);
clients->clients[y] = NULL;
}
ipc_clients_free (clients);
ipc_clients_free (clients_talking);
if (ipc_server_close (srv) < 0) {
handle_error("server_close < 0");
}
ipc_message_empty (&m);
free (srv);
exit (0);
}
if (m.length > 0) {
printf ("new message : %.*s\n", m.length, m.payload);
}
if (ipc_server_write (pc, &m) < 0) {
handle_err( "handle_new_msg", "server_write < 0");
}
// empty the message structure
ipc_message_empty (&m);
memset (&m, 0, sizeof m);
}
}
/*
* main loop
*
* accept new application connections
* read a message and send it back
* close a connection if MSG_TYPE_CLOSE received
*/
void main_loop ()
{
int ret = 0;
struct ipc_clients clients;
memset(&clients, 0, sizeof(struct ipc_clients));
clients = malloc (sizeof (struct ipc_clients));
memset(clients, 0, sizeof(struct ipc_clients));
struct ipc_clients clients_talking;
memset(&clients_talking, 0, sizeof(struct ipc_clients));
struct ipc_event event;
memset(&event, 0, sizeof (struct ipc_event));
event.type = IPC_EVENT_TYPE_NOT_SET;
while(1) {
int new_connection = 0;
ret = ipc_server_select (&clients, srv, &clients_talking, &new_connection);
if (ret < 0) {
handle_error("ipc_server_select < 0");
// ipc_service_loop provides one event at a time
// warning: event->m is free'ed if not NULL
ret = ipc_service_loop (clients, srv, &event);
if (ret != 0) {
handle_error("ipc_service_loop != 0");
// the application will shut down, and close the service
if (ipc_server_close (srv) < 0) {
handle_error("ipc_server_close < 0");
}
exit (EXIT_FAILURE);
}
if (new_connection) {
handle_new_connection (&clients);
switch (event.type) {
case IPC_EVENT_TYPE_CONNECTION:
{
cpt++;
printf ("connection: %d clients connected\n", cpt);
printf ("new client has the fd %d\n", ((struct ipc_client*) event.origin)->proc_fd);
};
break;
case IPC_EVENT_TYPE_DISCONNECTION:
{
cpt--;
printf ("disconnection: %d clients remaining\n", cpt);
// free the ipc_client structure
free (event.origin);
};
break;
case IPC_EVENT_TYPE_MESSAGE:
{
struct ipc_message *m = event.m;
if (m->length > 0) {
printf ("message received (type %d): %.*s\n", m->type, m->length, m->payload);
}
if (ipc_server_write (event.origin, m) < 0) {
handle_err( "handle_new_msg", "server_write < 0");
}
};
break;
case IPC_EVENT_TYPE_ERROR:
{
fprintf (stderr, "a problem happened with client %d\n"
, ((struct ipc_client*) event.origin)->proc_fd);
};
break;
default :
{
fprintf (stderr, "there must be a problem, event not set\n");
};
}
}
if (clients_talking.size > 0) {
handle_new_msg (&clients, &clients_talking);
}
ipc_clients_free (&clients_talking);
}
// should never go there
exit (1);
}
void exit_program(int signal)
{
printf("Quitting, signal: %d\n", signal);
// free remaining clients
for (int i = 0; i < clients->size ; i++) {
struct ipc_client *cli = clients->clients[i];
// TODO: replace with specific ipc_client_empty function
if (cli != NULL) {
// ipc_client_empty (cli);
free (cli);
}
clients->clients[i] = NULL;
}
ipc_clients_free (clients);
free (clients);
// the application will shut down, and close the service
if (ipc_server_close (srv) < 0) {
handle_error("ipc_server_close < 0");
}
free (srv);
exit(EXIT_SUCCESS);
}
/*
* service ping-pong
*
* 1. creates the unix socket /run/ipc/<service>.sock, then listens
* 2. listen for new clients
* 3. then accept a new client, and send back everything it sends
* 4. close any client that closes its socket
*
* and finally, stop the program once a client sends a SERVER CLOSE command
* service ping-pong: send back everything sent by the clients
* stop the program on SIGTERM, SIGALRM, SIGUSR{1,2}, SIGHUP signals
*/
int main(int argc, char * argv[], char **env)
@ -159,6 +121,8 @@ int main(int argc, char * argv[], char **env)
argc = argc; // warnings
argv = argv; // warnings
printf ("pid = %d\n", getpid ());
srv = malloc (sizeof (struct ipc_service));
if (srv == NULL) {
exit (1);
@ -170,20 +134,22 @@ int main(int argc, char * argv[], char **env)
// unlink("/tmp/ipc/pongd-0-0");
if (ipc_server_init (env, srv, PONGD_SERVICE_NAME) < 0) {
handle_error("server_init < 0");
handle_error("ipc_server_init < 0");
return EXIT_FAILURE;
}
printf ("Listening on %s.\n", srv->spath);
printf("MAIN: server created\n" );
// the service will loop until the end of time, a specific message, a signal
signal (SIGHUP, exit_program);
signal (SIGALRM, exit_program);
signal (SIGUSR1, exit_program);
signal (SIGUSR2, exit_program);
signal (SIGTERM, exit_program);
// the service will loop until the end of time, or a signal
main_loop ();
// the application will shut down, and remove the service named pipe
if (ipc_server_close (srv) < 0) {
handle_error("server_close < 0");
}
return EXIT_SUCCESS;
// main_loop should not return
return EXIT_FAILURE;
}