plein de modifications à l'arrache

more_to_read
Philippe PITTOLI 2016-06-05 20:48:13 +02:00
parent 1c3800bbc3
commit 6c4ed731c9
10 changed files with 310 additions and 192 deletions

View File

@ -25,54 +25,50 @@ int file_close (FILE *f)
return 0;
}
// SERVICE
int srv_path (char *buf, const char *sname)
void srv_init (struct service *srv, const char *sname)
{
if (buf == NULL) {
return 1;
}
if (srv == NULL)
return;
// already done in mkfifo
if (strlen(TMPDIR) + strlen(sname) > PATH_MAX) {
return 2;
}
// gets the service path, such as /tmp/<service>
bzero (srv->spath, PATH_MAX);
strncat (srv->spath, TMPDIR, PATH_MAX);
strncat (srv->spath, sname, PATH_MAX);
bzero (buf, PATH_MAX);
strncat (buf, TMPDIR, PATH_MAX);
strncat (buf, sname, PATH_MAX);
return 0;
srv->version = COMMUNICATION_VERSION;
srv->index = 0; // TODO
}
int srv_create (const char *fifopath)
// SERVICE
int srv_create (struct service *srv)
{
int ret;
if ((ret = mkfifo (fifopath, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) {
if ((ret = mkfifo (srv->spath, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) {
switch (errno) {
case EACCES :
printf ("file %s : EACCES\n", fifopath);
printf ("file %s : EACCES\n", srv->spath);
return 1;
case EEXIST :
printf ("file %s : EEXIST\n", fifopath);
printf ("file %s : EEXIST\n", srv->spath);
break;
case ENAMETOOLONG :
printf ("file %s : ENAMETOOLONG\n", fifopath);
printf ("file %s : ENAMETOOLONG\n", srv->spath);
return 2;
case ENOENT :
printf ("file %s : ENOENT\n", fifopath);
printf ("file %s : ENOENT\n", srv->spath);
return 3;
case ENOSPC :
printf ("file %s : ENOSPC\n", fifopath);
printf ("file %s : ENOSPC\n", srv->spath);
return 4;
case ENOTDIR :
printf ("file %s : ENOTDIR\n", fifopath);
printf ("file %s : ENOTDIR\n", srv->spath);
return 5;
case EROFS :
printf ("file %s : EROFS\n", fifopath);
printf ("file %s : EROFS\n", srv->spath);
return 6;
default :
printf ("err file %s unknown\n", fifopath);
printf ("err file %s unknown\n", srv->spath);
return 7;
}
}
@ -80,18 +76,18 @@ int srv_create (const char *fifopath)
return 0;
}
int srv_close (const char *fifopath)
int srv_close (struct service *srv)
{
if (unlink (fifopath)) {
if (unlink (srv->spath)) {
return 1;
}
return 0;
}
int srv_get_new_process (struct process *p, const char * spath)
int srv_get_new_process (struct process *p, const struct service *srv)
{
if (spath == NULL) {
if (srv->spath == NULL) {
return -1;
}
@ -102,7 +98,7 @@ int srv_get_new_process (struct process *p, const char * spath)
struct timespec ts = { 0 };
struct timespec ts2 = { 0 };
FILE * f = fopen (spath, "r");
FILE * f = fopen (srv->spath, "r");
clock_gettime(CLOCK_REALTIME, &ts);
fgets (buf, BUFSIZ, f);
clock_gettime(CLOCK_REALTIME, &ts2);

View File

@ -19,11 +19,13 @@
struct service {
unsigned int version;
unsigned int index;
char spath[PATH_MAX];
FILE *spipe;
};
int srv_path (char *buf, const char *sname);
void srv_init (struct service *srv, const char *sname);
int srv_get_new_process (struct process *proc, const char * spath);
int srv_get_new_process (struct process *proc, const struct service *srv);
/*
* returns
@ -32,8 +34,8 @@ int srv_get_new_process (struct process *proc, const char * spath);
* 2 : service name too long
* 3 : unable to create fifo
*/
int srv_create (const char *sname);
int srv_close (const char *sname);
int srv_create (struct service *srv);
int srv_close (struct service *srv);
int srv_read (struct process *, void * buf, size_t *);
int srv_write (struct process *, void * buf, size_t);

Binary file not shown.

View File

@ -23,6 +23,7 @@ struct process {
};
struct process * srv_process_copy (const struct process *p);
void srv_process_free (struct process * p);
int srv_process_eq (const struct process *p1, const struct process *p2);

190
lib/pubsubd.c Normal file
View File

@ -0,0 +1,190 @@
#include "pubsubd.h"
#include <stdlib.h>
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
// CHANNELS
void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
void
pubsubd_channels_add (struct channels *chans, struct channel *c)
{
if(!chans || !c)
return;
struct channel *n = pubsubd_channel_copy (c);
LIST_INSERT_HEAD(chans, n, entries);
}
void
pubsubd_channels_del (struct channels *chans, struct channel *c)
{
struct channel *todel = pubsubd_channel_get (chans, c);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
srv_process_free (todel);
free (todel);
todel = NULL;
}
}
struct channel * pubsubd_channel_copy (struct channel *c)
{
struct channel *copy;
copy = malloc (sizeof(struct channel));
memcpy (copy, c, sizeof(struct channel));
return copy;
}
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c)
{
struct channel * np = NULL;
LIST_FOREACH(np, chans, entries) {
if (pubsubd_channels_eq (np, c))
return np;
}
return NULL;
}
int
pubsubd_channels_eq (const struct channel *c1, const struct channel *c2)
{
return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0);
}
// SUBSCRIBER
void pubsubd_subscriber_init (struct app_list_head *chans) { LIST_INIT(chans); }
struct app_list_elm * pubsubd_app_list_elm_copy (struct app_list_elm *ale)
{
if (ale == NULL)
return NULL;
struct app_list_elm * n;
n = malloc (sizeof (struct app_list_elm));
n->p = srv_process_copy(ale->p);
return n;
}
void
pubsubd_subscriber_add (struct app_list_head *alh, struct app_list_elm *ale)
{
if(!alh || !ale)
return;
struct app_list_elm *n = pubsubd_app_list_elm_copy (ale);
LIST_INSERT_HEAD(alh, n, entries);
}
struct app_list_elm *
pubsubd_subscriber_get (const struct app_list_head *chans, const struct app_list_elm *p)
{
struct app_list_elm *np, *res = NULL;
LIST_FOREACH(np, chans, entries) {
if(srv_process_eq (np, p)) {
res = np;
}
}
return res;
}
void
pubsubd_subscriber_del (struct app_list_head *chans, struct app_list_elm *p)
{
struct app_list_elm *todel = pubsubd_subscriber_get (chans, p);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
pubsubd_app_list_elm_free (todel);
free (todel);
todel = NULL;
}
}
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p)
{
if (ale == NULL)
return;
ale->p = srv_process_copy (p);
}
void pubsubd_app_list_elm_free (struct app_list_elm *todel)
{
if (todel == NULL)
return NULL;
srv_process_free (todel->p);
}
// MESSAGE, TODO CBOR
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len)
{
if (msg == NULL || data == NULL || len == NULL)
return;
// msg: "type(1) chanlen(8) chan datalen(8) data
*len = 1 + sizeof(size_t) + msg->chanlen + sizeof(size_t) + msg->datalen;
*data = malloc(*len);
size_t i = 0;
data[0][i] = msg->type; i++;
memcpy (data[0][i], msg->chanlen, sizeof(size_t)); i += sizeof(size_t);
memcpy (data[0][i], msg->chan, msg->chanlen); i += msg->chanlen;
memcpy (data[0][i], msg->datalen, sizeof(size_t)); i += sizeof(size_t);
memcpy (data[0][i], msg->data, msg->datalen); i += msg->datalen;
}
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len)
{
if (msg == NULL || data == NULL)
return;
size_t i = 0;
msg->type = data[0][i]; i++;
memcpy (&msg->chanlen, data + i, sizeof(size_t)); i += sizeof(size_t);
msg->chan = malloc (msg->chanlen);
memcpy (msg->chan, data + i, msg->chanlen); i += msg->chanlen;
memcpy (&msg->datalen, data + i, sizeof(size_t)); i += sizeof(size_t);
msg->data = malloc (msg->datalen);
memcpy (msg->data, data + i, msg->datalen); i += msg->datalen;
}
void pubsubd_msg_free (struct pubsub_msg *msg)
{
if (msg->chan) {
free (msg->chan);
msg->chan = 0;
}
if (msg->data) {
free (msg->data);
msg->data = 0;
}
}
// COMMUNICATION
void pubsubd_msg_send (struct service *s, struct pubsub_msg * m, struct process *p)
{
}
void pubsubd_msg_recv (struct service *s, struct pubsub_msg * m, struct process *p)
{
}
void pubsub_msg_send (struct service *s, struct pubsub_msg * m)
{
}
void pubsub_msg_recv (struct service *s, struct pubsub_msg * m)
{
}
// SERVICE
void pubsubd_srv_init ();

68
lib/pubsubd.h Normal file
View File

@ -0,0 +1,68 @@
#ifndef __PUBSUBD_H__
#define __PUBSUBD_H__
#include "communication.h"
#include "process.h"
#include "queue.h"
#define PUBSUB_SERVICE_NAME "pubsub"
struct pubsub_msg {
unsigned char *chan;
size_t chanlen;
unsigned char *data;
size_t datalen;
unsigned char type; // message type : alert, notification, …
};
void pubsubd_msg_serialize (const struct pubsub_msg *msg, char **data, size_t *len);
void pubsubd_msg_unserialize (struct pubsub_msg *msg, const char *data, size_t len);
void pubsubd_msg_free (struct pubsub_msg *msg);
void pubsubd_msg_send (struct service *, struct pubsub_msg *msg, struct process *p);
void pubsubd_msg_recv (struct service *, struct pubsub_msg *msg, struct process *p);
void pubsub_msg_send (struct service *, struct pubsub_msg *msg);
void pubsub_msg_recv (struct service *, struct pubsub_msg *msg);
// CHANNEL
// head of the list
LIST_HEAD(channels, channel);
// element of the list
struct channel {
unsigned char *chan;
size_t chanlen;
LIST_ENTRY(channel) entries;
};
struct channel * pubsubd_channel_copy (struct channel *c);
struct channel * pubsubd_channel_get (struct channels *chans, struct channel *c);
void pubsubd_channel_free (struct channel *c);
int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
// APPLICATION
// head of the list
LIST_HEAD(app_list_head, app_list_elm);
// element of the list
struct app_list_elm {
struct process *p;
LIST_ENTRY(app_list_elm) entries;
};
void pubsubd_subscriber_add (const struct app_list_head *
, const struct app_list_elm *);
struct app_list_elm * pubsubd_subscriber_get (const struct app_list_head *
, const struct app_list_elm *);
void pubsubd_subscriber_del (struct app_list_head *al, struct app_list_elm *p);
struct app_list_elm * pubsubd_app_list_elm_copy (struct app_list_elm *ale);
void pubsubd_app_list_elm_create (struct app_list_elm *ale, struct process *p);
void pubsubd_app_list_elm_free (struct app_list_elm *todel);
#endif

View File

@ -1,5 +1,7 @@
#include "../lib/communication.h"
#define PONGD_SERVICE_NAME "pongd"
/*
* main loop
*
@ -8,7 +10,7 @@
* then closes the pipes
*/
void main_loop (const char *spath)
void main_loop (const struct service *srv)
{
int ret;
struct process proc;
@ -17,7 +19,7 @@ void main_loop (const char *spath)
while (cnt--) {
// -1 : error, 0 = no new process, 1 = new process
ret = srv_get_new_process (&proc, spath);
ret = srv_get_new_process (&proc, srv);
if (ret == -1) {
fprintf (stderr, "error service_get_new_process\n");
continue;
@ -64,22 +66,22 @@ void main_loop (const char *spath)
int main(int argc, char * argv[])
{
// gets the service path, such as /tmp/<service>
char spath[PATH_MAX];
srv_path (spath, "pingpong");
struct service srv;
srv_init (&srv, PONGD_SERVICE_NAME);
printf ("Listening on %s.\n", srv.spath);
// creates the service named pipe, that listens to client applications
int ret;
if ((ret = srv_create (spath))) {
if ((ret = srv_create (&srv))) {
fprintf(stdout, "error service_create %d\n", ret);
exit (1);
}
// the service will loop until the end of time, a specific message, a signal
main_loop (spath);
main_loop (&srv);
// the application will shut down, and remove the service named pipe
if ((ret = srv_close (spath))) {
if ((ret = srv_close (&srv))) {
fprintf(stdout, "error service_close %d\n", ret);
exit (1);
}

View File

@ -1,7 +1,7 @@
#!/bin/dash
REP=/tmp/ipc/
SERVICE="pingpong"
SERVICE="pongd"
NB=3
# CLEAN UP !

View File

@ -1,116 +1,21 @@
#include "pubsubd.h"
#include "../lib/pubsubd.h"
#include <stdlib.h>
const char* service_name = "pubsub";
void
ohshit(int rvalue, const char* str) {
fprintf(stderr, "%s\n", str);
exit(rvalue);
}
// CHANNELS
void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); }
void
pubsubd_channels_add (struct channels *chans, struct channel *c)
{
if(!chans || !c)
return;
struct channel *n = pubsubd_channel_copy (c);
LIST_INSERT_HEAD(al, n, entries);
}
void
pubsubd_channels_del (struct app_list *al, struct channel *c)
{
struct channel *todel = pubsubd_channel_get (al, c);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
srv_process_free (mfree, todel);
mfree (todel);
todel = NULL;
}
}
struct channel * pubsubd_channel_copy (struct channel *c)
{
struct channel *copy;
copy = malloc (sizeof(struct channel));
memcpy (copy, c, sizeof(struct channel));
return copy;
}
int
pubsubd_channels_eq (const struct channel *c1, const struct channel *c2)
{
return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0);
}
// SUBSCRIBER
void pubsubd_subscriber_init (struct app_list *al) { LIST_INIT(al); }
void
pubsubd_subscriber_add (struct app_list *al, struct process *p)
{
if(!al || !p)
return;
struct process *n = srv_process_copy (p);
LIST_INSERT_HEAD(al, n, entries);
}
struct process *
pubsubd_subscriber_get (const struct app_list *al
, const struct process *p)
{
struct process *np, *res = NULL;
LIST_FOREACH(np, al, entries) {
if(srv_process_eq (np, p)) {
res = np;
}
}
return res;
}
void
pubsubd_subscriber_del (struct app_list *al, struct process *p)
{
struct process *todel = pubsubd_subscriber_get (al, p);
if(todel != NULL) {
LIST_REMOVE(todel, entries);
srv_process_free (mfree, todel);
mfree (todel);
todel = NULL;
}
}
void pubsubd_msg_send (struct service *s, struct message * m, struct process *p)
{
}
void pubsubd_msg_recv (struct service *s, struct message * m, struct process *p)
{
}
void pubsub_msg_send (struct service *s, struct message * m)
{
}
void pubsub_msg_recv (struct service *s, struct message * m)
{
}
int
main(int argc, char* argv[])
{
// gets the service path, such as /tmp/<service>
char s_path[PATH_MAX];
service_path (s_path, service_name);
printf ("Listening on %s.\n", s_path);
struct service srv;
srv_init (&srv, PUBSUB_SERVICE_NAME);
printf ("Listening on %s.\n", srv->spath);
// creates the service named pipe, that listens to client applications
if (service_create (s_path))
if (service_create (&srv))
ohshit(1, "service_create error");
struct channels chans;
@ -120,7 +25,7 @@ main(int argc, char* argv[])
struct process proc;
int proc_count, i;
service_get_new_process (&proc, s_path);
service_get_new_process (&proc, &srv);
printf("> %i proc\n", proc_count);
@ -157,7 +62,7 @@ main(int argc, char* argv[])
* then closes the pipes
*/
void main_loop (const char *spath)
void main_loop (const struct service *srv)
{
int ret;
struct process proc;
@ -166,7 +71,7 @@ void main_loop (const char *spath)
while (cnt--) {
// -1 : error, 0 = no new process, 1 = new process
ret = srv_get_new_process (&proc, spath);
ret = srv_get_new_process (&proc, srv);
if (ret == -1) {
fprintf (stderr, "error service_get_new_process\n");
continue;

View File

@ -1,46 +0,0 @@
#ifndef __PUBSUBD_H__
#define __PUBSUBD_H__
#include "../lib/communication.h"
#include "../lib/process.h"
#include "../lib/queue.h"
struct message {
unsigned char *chan;
size_t chanlen;
unsigned char *data;
size_t datalen;
unsigned char type; // message type : alert, notification, …
};
struct channel {
unsigned char *chan;
size_t chanlen;
};
struct channel * pubsubd_channel_copy (struct channel *c);
int pubsubd_channel_eq (const struct channel *c1, const struct channel *c2);
struct channels {
struct channel *chan;
LIST_ENTRY(channels) entries;
};
struct app_list {
struct process *p;
LIST_ENTRY(app_list) entries;
};
void pubsubd_msg_send (struct service *, struct message *msg, struct process *p);
void pubsubd_msg_recv (struct service *, struct message *msg, struct process *p);
struct process * pubsubd_subscriber_get (const struct app_list *
, const struct process *);
void pubsubd_subscriber_del (struct app_list *al, struct process *p);
void pubsub_msg_send (struct service *, struct message *msg);
void pubsub_msg_recv (struct service *, struct message *msg);
#endif