Archived
3
0

pingpong adapted for unix socket

This commit is contained in:
lapupe 2016-10-27 16:27:04 +02:00
parent 2e41ec11c8
commit f4ba61f02d
2 changed files with 214 additions and 56 deletions

View File

@ -1,7 +1,39 @@
#include "../lib/communication.h" #include "../lib/communication.h"
#include <pthread.h> #include <pthread.h>
#include <sys/socket.h>
#include <sys/un.h>
#define PONGD_SERVICE_NAME "pongd" #define PONGD_SERVICE_NAME "pongd"
#define LISTEN_BACKLOG 50
#define handle_error(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)
/*init a unit socket : bind, listen
*and return a socket
*/
int set_listen_socket(const char *path) {
int sfd;
struct sockaddr_un my_addr;
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1)
return -1;
memset(&my_addr, 0, sizeof(struct sockaddr_un));
/* Clear structure */
my_addr.sun_family = AF_UNIX;
strncpy(my_addr.sun_path, path, sizeof(my_addr.sun_path) - 1);
unlink(my_addr.sun_path);
if (bind(sfd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr_un)) == -1)
return -1;
if (listen(sfd, LISTEN_BACKLOG) == -1)
return -1;
return sfd;
}
/* control the file descriptor*/ /* control the file descriptor*/
@ -9,37 +41,64 @@ void * pongd_thread(void * pdata) {
struct process *proc = (struct process*) pdata; struct process *proc = (struct process*) pdata;
// about the message // about the message
size_t msize = 0; char *buf = malloc(BUFSIZ);
char *buf = NULL; if (buf == NULL)
int ret = 0; {
handle_error("malloc");
}
memset(buf, 0, BUFSIZ);
int nbytes;
//init unix socket
int sfd, cfd;
struct sockaddr_un peer_addr;
socklen_t peer_addr_size;
printf("%s\n", proc->path_proc);
sfd = set_listen_socket(proc->path_proc);
if (sfd == -1){
handle_error("set_listen_socket");
}
peer_addr_size = sizeof(struct sockaddr_un);
printf("ici\n");
cfd = accept(sfd, (struct sockaddr *) &peer_addr, &peer_addr_size);
if (cfd == -1)
handle_error("accept");
proc->proc_fd = cfd;
while (1) { while (1) {
// printf ("before read\n"); if ((nbytes = srv_read (proc, &buf)) == -1) {
while (ret <= 0 || msize == 0) { fprintf(stdout, "MAIN_LOOP: error service_read %d\n", nbytes);
if ((ret = srv_read (proc, &buf, &msize)) == -1) {
fprintf(stdout, "MAIN_LOOP: error service_read %d\n", ret);
}
} }
// printf ("after read\n"); // printf ("after read\n");
printf ("read, size %ld : %s\n", msize, buf); printf ("read, size %d : %s\n", nbytes, buf);
if (nbytes == 0){
if (strncmp ("exit", buf, 4) != 0) {
//printf ("before proc write\n");
if ((ret = srv_write (proc, buf, msize)) == -1) {
fprintf(stdout, "MAIN_LOOP: error service_write %d\n", ret);
}
}else {
printf("------thread shutdown------------\n"); printf("------thread shutdown------------\n");
close(cfd);
close(sfd);
free(buf);
break;
}
if (strncmp ("exit", buf, 4) != 0) {
//printf ("before proc write\n");
if ((nbytes = srv_write (proc, buf, nbytes)) == -1) {
fprintf(stdout, "MAIN_LOOP: error service_write %d\n", nbytes);
}
}
else {
printf("------thread shutdown------------\n");
close(cfd);
close(sfd);
free(buf); free(buf);
break; break;
} }
ret = 0;
msize = 0;
} }
return NULL; return NULL;
} }
/* /*
* main loop * main loop
* *
@ -48,20 +107,107 @@ void * pongd_thread(void * pdata) {
* then closes the pipes * then closes the pipes
*/ */
void main_loop (const struct service *srv) void main_loop (struct service *srv)
{ {
int ret; int ret;
struct process tab_proc[10]; struct process tab_proc[10];
//thread //thread
pthread_t tab_thread[10]; pthread_t tab_thread[10];
int i;
int cnt = 0; int cnt = 0;
while (cnt < 10) { //init socket unix for server
int sfd;
struct sockaddr_un peer_addr;
socklen_t peer_addr_size;
sfd = set_listen_socket(srv->spath);
if (sfd == -1){
handle_error("set_listen_socket");
}
/* master file descriptor list */
fd_set master;
/* temp file descriptor list for select() */
fd_set read_fds;
/* maximum file descriptor number */
int fdmax;
/* listening socket descriptor */
int listener = sfd;
/* newly accept()ed socket descriptor */
int newfd;
/* buffer for client data */
char *buf = malloc(BUFSIZ);
if (buf == NULL)
{
handle_error("malloc");
}
memset(buf, 0, BUFSIZ);
int nbytes;
int i;
/* clear the master and temp sets */
FD_ZERO(&master);
FD_ZERO(&read_fds);
/* add the listener to the master set */
FD_SET(listener, &master);
//FD_SET(sfd, &master);
/* keep track of the biggest file descriptor */
fdmax = sfd; /* so far, it's this one*/
for(;;) {
/* copy it */
read_fds = master;
if(select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1)
{
perror("Server-select() error lol!");
exit(1);
}
printf("Server-select...OK\n");
/*run through the existing connections looking for data to be read*/
for(i = 0; i <= fdmax; i++) {
if(FD_ISSET(i, &read_fds)) {
/* we got one... */
if(i == listener) {
/* handle new connections */
peer_addr_size = sizeof(struct sockaddr_un);
newfd = accept(sfd, (struct sockaddr *) &peer_addr, &peer_addr_size);
if (newfd == -1) {
handle_error("accept");
}
else
{
printf("Server-accept() is OK...\n");
FD_SET(newfd, &master); /* add to master set */
if(newfd > fdmax)
{ /* keep track of the maximum */
fdmax = newfd;
}
}
} else {
nbytes = file_read (i, &buf);
if ( nbytes == -1) {
handle_error("file_read");
} else if( nbytes == 0) {
printf ("msg received (%d) : %s\n", nbytes, buf);
/* close it... */
close(i);
/* remove from master set */
FD_CLR(i, &master);
}else {
//buf[BUFSIZ - 1] = '\0';
printf ("msg received (%d) : %s\n", nbytes, buf);
if (strncmp ("exit", buf, 4) == 0) {
break;
}
// -1 : error, 0 = no new process, 1 = new process // -1 : error, 0 = no new process, 1 = new process
ret = srv_get_new_process (srv, &tab_proc[cnt]); ret = srv_get_new_process (buf, &tab_proc[cnt]);
if (ret == -1) { if (ret == -1) {
fprintf (stderr, "MAIN_LOOP: error service_get_new_process\n"); fprintf (stderr, "MAIN_LOOP: error service_get_new_process\n");
@ -70,24 +216,33 @@ void main_loop (const struct service *srv)
srv_process_print (&tab_proc[cnt]); srv_process_print (&tab_proc[cnt]);
printf ("\n-------New thread created---------\n");
int ret = pthread_create( &tab_thread[cnt], NULL, &pongd_thread, (void *) &tab_proc[cnt]); int ret = pthread_create( &tab_thread[cnt], NULL, &pongd_thread, (void *) &tab_proc[cnt]);
if (ret) { if (ret) {
perror("pthread_create()"); perror("pthread_create()");
exit(errno); exit(errno);
} else { } else {
printf("Creation of listen thread \n"); printf ("\n-------New thread created---------\n");
} }
// printf ("after proc write\n");
printf ("%d applications to serve\n",cnt); printf ("%d applications to serve\n",cnt);
cnt++; cnt++;
} }
}
}
}
if (strncmp ("exit", buf, 4) == 0) {
break;
}
}
printf("ici\n");
for (i = 0; i < cnt; i++) { for (i = 0; i < cnt; i++) {
pthread_join(tab_thread[i], NULL); pthread_join(tab_thread[i], NULL);
} }
free(buf);
close(sfd);
} }
/* /*

View File

@ -3,7 +3,6 @@
REP=/tmp/ipc/ REP=/tmp/ipc/
SERVICE="pongd" SERVICE="pongd"
NB=10 NB=10
# CLEAN UP ! # CLEAN UP !
if [ $# -ne 0 ] && [ "$1" = clean ] if [ $# -ne 0 ] && [ "$1" = clean ]
then then
@ -23,22 +22,26 @@ fi
for pid in `seq 1 ${NB}` for pid in `seq 1 ${NB}`
do do
# we make the application pipes # we make the application pipes
mkfifo ${REP}${pid}-1-1-in 2>/dev/null # mkfifo ${REP}${pid}-1-1-in 2>/dev/null
mkfifo ${REP}${pid}-1-1-out 2>/dev/null # mkfifo ${REP}${pid}-1-1-out 2>/dev/null
# pid index version # pid index version
echo "${pid} 1 1" > ${REP}${SERVICE} echo "${pid} 1 1" | nc -U ${REP}${SERVICE}
sleep 1
# the purpose is to send something in the pipe # the purpose is to send something in the pipe
cat /dev/urandom | base64 | head -n 1 > ${REP}${pid}-1-1-out #cat /dev/urandom | base64 | head -n 1 > ${REP}${pid}-1-1-out
# echo "hello world" > ${REP}${pid}-1-out echo "exit" | nc -U ${REP}${pid}-1-1
#echo "exit" | nc -U ${REP}${pid}-1-1
# the the service will answer with our message # the service will answer with our message
echo "pid : ${pid}" echo "pid : ${pid}"
cat ${REP}/${pid}-1-1-in #cat ${REP}/${pid}-1-1-in
done done
echo "exit" | nc -U ${REP}${SERVICE}
echo "clean rep" echo "clean rep"
rm ${REP}/*-in #rm ${REP}/*
rm ${REP}/*-out #rm ${REP}/*-out