From 6df133c2c1c7cb1ba885f8faf203b45ffa57a161 Mon Sep 17 00:00:00 2001 From: Philippe PITTOLI Date: Sat, 4 Jun 2016 20:33:44 +0200 Subject: [PATCH] GROS CHANTIER --- .gitignore | 3 + lib/communication.c | 17 +++ lib/communication.h | 5 + lib/communication.o | Bin 0 -> 16952 bytes lib/queue.h | 171 +++++++++++++++++++++++++++++ pubsub/Makefile | 22 ++++ pubsub/list.c | 48 --------- pubsub/list.h | 23 ---- pubsub/pubsubd.c | 254 ++++++++++++++++++++++++++++++-------------- pubsub/pubsubd.h | 41 +++++++ 10 files changed, 436 insertions(+), 148 deletions(-) create mode 100644 .gitignore create mode 100644 lib/communication.o create mode 100644 lib/queue.h create mode 100644 pubsub/Makefile delete mode 100644 pubsub/list.c delete mode 100644 pubsub/list.h create mode 100644 pubsub/pubsubd.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8020363 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.a +*.so +*.swp diff --git a/lib/communication.c b/lib/communication.c index d3e5c2b..313e697 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -72,6 +72,23 @@ int service_close (const char *fifopath) return 0; } +struct process * srv_process_copy (const struct process *p) +{ + if (p == NULL) + return NULL; + + struct process * copy = malloc (sizeof(struct process)); + memcpy (copy, p, sizeof (struct process)); + + return copy; +} + +int srv_process_eq (const struct process *p1, const struct process *p2) +{ + return (p1->pid == p2->pid && p1>version == p2->version + && p1->index == p2->index); +} + int service_get_new_process (struct process *proc, const char * spath) { if (spath == NULL) { diff --git a/lib/communication.h b/lib/communication.h index 4ab656c..e9a27b0 100644 --- a/lib/communication.h +++ b/lib/communication.h @@ -36,6 +36,10 @@ struct service { int service_path (char *buf, const char *sname); +struct process * srv_process_copy (struct process *p); + +int srv_process_eq (const struct process *p1, const struct process *p2); + void gen_process_structure (struct process *p , pid_t pid, unsigned int index, unsigned int version); @@ -53,6 +57,7 @@ int service_get_new_process (struct process *proc, const char * spath); void service_get_new_processes (struct process ***, int *nproc, char *spath); void service_free_processes (struct process **, int nproc); + void process_print (struct process *); int process_create (struct process *, int index); // called by the application int process_destroy (struct process *); // called by the application diff --git a/lib/communication.o b/lib/communication.o new file mode 100644 index 0000000000000000000000000000000000000000..a0cea9da474b6b4da1b905dd5750f3a466c7a97e GIT binary patch literal 16952 zcmeI20d!PVddJ^ng3Jb-83iS~K$+FB8x+G#0vHk9nUI(l90;1Ac7eu_OiW6cWRo`% z7jR)GL0-dX_Q>vGTkCR^)6&Z6p(|yPQXL@zyGwggh00p19A#I+pk)O$Dr@?E_kDNf zo6EejxToi|J*O9D=Dpwj-|v3+-tXOe-+TF9RrszvheMOeq1~xv&Lq{e@;fufL2($= zyxP^89zW;O`^O;B6L;Xx{_EiDMYN&!yUrllHwsCQ54sMu4CsBMp5DI(xU^0;KiADq zbn_#$A%uEj$yfH}Vabi3X+BMR_!=y=Y{j9L63?_=;8@98mu~(lkkpSj#rcu=$TX1y zuDmy*&-irX%UJZ~Vdn+?Io*8QGW^0UV|C}Ow$3KqadQ7G(6^XNz|IFC&`sBVEMWaOZCL}Ty^@9>JiA)^dpa*Cl^BIv9S5C=tv+LPFyW=XM7q? zSp8#OuzWL5kavkk!H3`KN3OEWY?3W`5&g;0oKXF}Ve@tU$jjKF`_VJubf}){TAHru zZ7ve#0!i6G-?9~<<pz9}CCk(6U1VHu*THskc1SVEl5H$FWSkXrGUDrJlCU54@V%*y5R1 z36ABqj{`8g48t&Yem>^kZl8}MG&~veWvuokJwHrhczX6Gv*H5+d*eC3Pt%(9{`~31 zP*68}i;4k2(Dr@o>D_`Io>*Sw)6L(#ph3;L`Jr^{NPZC%00IM8S>zJOaNxxQ5zjFC zl|_YkDn>~+e^X-)21dn6*nG!|92xZqqsW`idoOeVd(kS^A1;E+c486L%g0iOyh6Rd zz6i!%j8%4KJ!IZo52bTtns2AZQC#WqBM#5rvGaY#N4-g-M^7|3Qf?T(`93{S=P-(O z^EJ<2XKyn0K`4I7VHCl4o-qr)oyNRSe9!?ska`L9n0wd>9r9Z2&HV-7f8b-$0<%g> zy`BwRsl(YIKlKmd^40sdi2iTZ%f^cViA~Tq4&o4rD?v<8sW*h#fzbV-`$9`Y^A%t(HX&+ZHGMB+4hmmrS$WTDPtv$U z9KfQYo9}uayAOin^0h_8iJYI`;ekljtZM;g-rl>xfVAj&>}NLSa6K^J$!@JEB{mt> zdI56-wj@0_c>2Br*5C@phRZAX5L`UH8YUZiV*Pp^$0v7vU{XsxX`e6Fr%t>ma+ily zLK|LUpD)%YzsP?hS9@`N9`<6s?BA>l{RQ-|(mY#f{w8dGBDyG$96kjx3^T&;Kf-UO z{bz?iSI)3vDvT;Mh4r1?P};jpUoZyb#b9_Gfk(Ww|-y8-p(Bj(rrm*{Qt-O`-=}*M+P-< z*JQ00R_w*sdNpGPZ?WFL)=P0IRu${`$O5s6gqs_<3(LgMW%l~l7UKTqYFSF`HAN{g zUy6m{B9uBI?-}G(SFHChge(kUtVS&MN_Ojzu&*Y|3?43SLWa3Wo3cY(GJ7c(HV(G znkO&w`&Soe@|JX#Hobe+^yXXOe_J$j+z~UhmbUhYZ+f?Hp06sjU_n(~0Vh=t)YR2y zk;S1!RrR&C;o8O3*}S%Dv7)T4Te3jW)K}Krn}zPJy$foLbanZpAF=4FXvdmpf!3vU zN1EpOrnfiyqVlLfYi?_4;pbf|wf44X1l*tA4VJqi5npFlM^mJ`JJOwX>YLtNpivea zLxHwvbEHRL?6bD$LqLQTs|(R26j`%UliAQ+EMIGC@94HlX)7Zw9bFM$3!(^^(HdKf z2(&4EF16U>|DXO3_dxZ61@nBxOIOCCM$9*-q_iY(OKE9LoR-e8fUndam=o{^{M5{e z^y~QUy_%!P>zF)#tZM*f9OP3duLOT9-QG60OPdXc_`$RnzZSZ^oAN^L!nkvh+qbO1 zT^Ks#_J$6-U3ZVmYjPJZptEw2wxb@>&+)6m?cI`B=`P<aAI}`FV!BIOO)<=PuXX z!4|gnLEgh0vZ3pF*q$(mnX(2Fg8=SK%&v+*#rFCGsoW#aqUJvzx98 zxy$2YtKFx9YYW`vKtk?fP=FFjfbU=p7oBFZg_~SV#qjAN)_@?>7rN++MVZE9d3-{J z=_}Tcuqkfr;an_F@C#dg?HK9Slh2Q%&k5|`E&1TtrZJUnfBdRycXERx@6!U|E%Za$ z+lAQnneds2WyDwq9@ zm)k%7CG^;T`Qk~c)rS;YBJeQJM|n3<-SNCu5tBHE^C@p3=4E25631{oguaYl)V7iq zZq+Z-_9)t|U&3zk3hino-H)s`8dt)v(PjO%%A>AGdt-?a=`pmD=0>AYD_Pmyt(A0j z;0ekDB>_$3m&Bs&ZL1>fk3cS#u_RhaQ^)Gnkth_0H9k^Oy?E&@-9}^6DpEyS8(O*= zS4Xhrwh2QwU;FK7E`c-W6M0Q@!oOU}i4PTXB&Q~?!0b$?CLTa%!E4|wkJT&YKwvX$ zc+iF~x8Z=nd%P{ZJUO+3%OMb_hF55IdPJOF-mXW9i}{chi4*rRhKHNQ`51Bi^Q;8% zf8r31pCrCQ;oFGE6#g{v+Z6sR@w*killYGn{&&P5Q}`a@yA-~k_^S#}5(@_) z#9jrC@3GN8V8b6IJ63pZP- z^7_X8O)mXm(tByD6#F3X=PeukX&e5b4Ii=Lqc$9059IQ*5IDALh^B6_?*czdZS=R< z@DSPY)6^!NiX+gTZr!_j(Z18e@=XexY(Nk{~H_o zyGg%u-$ee*B7UCuLgKh5$~C?fqz}3+VHN3@5Fa2e?W`bvO3`b$*N8>ini`F^j%Z1f zylZa2```w+gDvT7>@rHqvUyw7!hX1I?|QhcDbj#9+g2Hw7s!#9wpXf^gctbns|>$p zS_jb?a^p|z~(YBM64 zb~YNVkk!>5iQ-Qb`ZV2bosie9H8gZbJG#B6YrmjdM*jc@*rLCm{L|{AG z(a_#u4GMr4mxyQWx#=Paum+rgd&E{k8V5Nk4Ijj%I~Ko$5UBLh!lHWcrS zLG2GgY|-Nxs2B~zx+0mr!X8SWQRjdooo&r&+8lv$9gn2XMYzJV7+EWi#~ctJ%}57i zq^E)Cng%c?6&B!Sp!LnzO!$_f>{t+4(itmDT%j~=iIw^mAeZ1>1tz*IVUMh7cr3XO zjxpghUIHJonJ7He=Q*OCm5g0_`A$LPT5j_f$?h@&%=g7LjP}tkJL{fjz1F^ zzrjXd#`O5SMB1sa(bqBkbxgm~M&Hf!xF?bJzi*>|lIbTg{WCWD=b0YYR%!pBjs7*J z|0>hJW266walA*7_7}oBgbB011|P}S63215o$<|#&u4rauW+S$Q4*Z-99JDL6< zar9?0<3Ms9z<4Fo?_+kp&bY$`Cy>xjyw{g; zGLi98#=p)upI2{UoX@M@B986Fd369jnCckk^U_LYhqtSPabE8_#(BMuGy7O?5zXU& z#Wib*I!Q@+so(Y8*KD*nSK(Kk^Ns`qrZ>o`Fy+5M*lF= z^ZEAsHu@hiJzwvhvC%)z^n4r-+UQR(uCw;m(EXEoy=-8d$Khti`MU698$QT*kku<6 z-C@1l{#XneNH~r+z(AC%XX8Iy#{{+)>|9{Q&-2aPA&+Sj6`&xB;n;7T*|0!|o|LM%1N0^@5-@x>| z|2H!|_kSDHe}mcoOQx@8{3zo=#+`JZt;SCQan=8?GClX-$Mn4aiRRjPrH{#-iHNSHZ8;4>HcVv>5< z|6vQK`Fzr2Y-5u8Q^YYQF-b1pH9uhCG}lSLT;XBj4=Q{K@n(hB6K_@c1H{`EzMOcc z!hLifWhlIv^gRmKNxx3v?WEtJ@J`~r3O9&vR(KEbEec;pd_dvbiEmZ-0PU-`DSRvO zrxZTovcPtQKSlZ-3g1qAr^0s--=*-K#CI!v7x6s`-%Wg?uXB2*#_&J52A%0%r z=ZIfa_<7b9AFf8$NiFm%kW&d06FEjH-KJClLD|+d_SK;GHZ@t&d z*zpoCRP+;xPf>Uwai7Ac5SRChvR!^^SFxfGl76PbD~ZeZjM9$mx7mt5O!{(#%lA$5 z6%KzkDN<134-l_Z_;TX9!XG3aRyh0-qex2>-b%b);qAoneJv*0-cI83{kG(@i9e|5 zdx$qHd>!#th0FWHc7@Bh?NqqDZ!{FXh3xkzT=vg8g>NPO28C}U-mCDZh;LT-cH&zU zzJvIH!gmtis_rSL)GyA^(v_#TBHBfd}J zGJbp_sxom!r&_tf{dC@`a5)dE>r|5JQ|JHl7^slcd04)$pQ89F-;b;DDc^Ui@gv`B ztNkh8UmsBH%lFqq3YYJ%kdCI) zeUr@a$^-hH48QsC3DUXz@!e=0ZKnG){K0x?dyxKnQM{K&PR)ZOD@BxgC8uzZr{X>Y zZT_L<@tq#p#Ah))$@-sDDwL=4HwcCBSnWSAzO>7cWI35t4+qF7+$7dyd3jyP4D4&3 zYT+bT|L??xL29M=$NtA#TDAX|1H<~|JOYOz9lO^0#d~O;r2q0a0-5rd>pr9MoZ+#m z{Vp-J{o-XJ&FvZ}LtNq^jB3qO9lh}TIR42<9TG<*7lTwAqIfD;(A@y26 q>2KA4Jip9$V4ah!B0;JU!+9or)cSFom8<^U__74jDkW2`|GxmgJRQUU literal 0 HcmV?d00001 diff --git a/lib/queue.h b/lib/queue.h new file mode 100644 index 0000000..373a965 --- /dev/null +++ b/lib/queue.h @@ -0,0 +1,171 @@ +/* $OpenBSD: queue.h,v 1.43 2015/12/28 19:38:40 millert Exp $ */ +/* $NetBSD: queue.h,v 1.11 1996/05/16 05:17:14 mycroft Exp $ */ + +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + */ + +#ifndef _SYS_QUEUE_H_ +#define _SYS_QUEUE_H_ + +/* + * This file defines five types of data structures: singly-linked lists, + * lists, simple queues, tail queues and XOR simple queues. + * + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may only be traversed in the forward direction. + * + * A simple queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are singly + * linked to save space, so elements can only be removed from the + * head of the list. New elements can be added to the list before or after + * an existing element, at the head of the list, or at the end of the + * list. A simple queue may only be traversed in the forward direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * An XOR simple queue is used in the same way as a regular simple queue. + * The difference is that the head structure also includes a "cookie" that + * is XOR'd with the queue pointer (first, last or next) to generate the + * real pointer value. + * + * For details on the use of these macros, see the queue(3) manual page. + */ + +#if defined(QUEUE_MACRO_DEBUG) || (defined(_KERNEL) && defined(DIAGNOSTIC)) +#define _Q_INVALIDATE(a) (a) = ((void *)-1) +#else +#define _Q_INVALIDATE(a) +#endif + +/* + * List definitions. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List access methods. + */ +#define LIST_FIRST(head) ((head)->lh_first) +#define LIST_END(head) NULL +#define LIST_EMPTY(head) (LIST_FIRST(head) == LIST_END(head)) +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_FOREACH(var, head, field) \ + for((var) = LIST_FIRST(head); \ + (var)!= LIST_END(head); \ + (var) = LIST_NEXT(var, field)) + +#define LIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = LIST_FIRST(head); \ + (var) && ((tvar) = LIST_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * List functions. + */ +#define LIST_INIT(head) do { \ + LIST_FIRST(head) = LIST_END(head); \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \ + (listelm)->field.le_next->field.le_prev = \ + &(elm)->field.le_next; \ + (listelm)->field.le_next = (elm); \ + (elm)->field.le_prev = &(listelm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + (elm)->field.le_next = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &(elm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.le_next = (head)->lh_first) != NULL) \ + (head)->lh_first->field.le_prev = &(elm)->field.le_next;\ + (head)->lh_first = (elm); \ + (elm)->field.le_prev = &(head)->lh_first; \ +} while (0) + +#define LIST_REMOVE(elm, field) do { \ + if ((elm)->field.le_next != NULL) \ + (elm)->field.le_next->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = (elm)->field.le_next; \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +#define LIST_REPLACE(elm, elm2, field) do { \ + if (((elm2)->field.le_next = (elm)->field.le_next) != NULL) \ + (elm2)->field.le_next->field.le_prev = \ + &(elm2)->field.le_next; \ + (elm2)->field.le_prev = (elm)->field.le_prev; \ + *(elm2)->field.le_prev = (elm2); \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +#endif /* _SYS_QUEUE_H_ */ diff --git a/pubsub/Makefile b/pubsub/Makefile new file mode 100644 index 0000000..22d244e --- /dev/null +++ b/pubsub/Makefile @@ -0,0 +1,22 @@ +CC=gcc +CFLAGS=-Wall -g +LDFLAGS= +CFILES=$(wildcard *.c) # CFILES => recompiles everything on a C file change +EXEC=$(basename $(wildcard *.c)) +SOURCES=$(wildcard ../lib/*.c) +OBJECTS=$(SOURCES:.c=.o) +TESTS=$(addsuffix .test, $(EXEC)) + +all: $(SOURCES) $(EXEC) + +$(EXEC): $(OBJECTS) $(CFILES) + $(CC) $(CFLAGS) $(LDFLAGS) $(OBJECTS) $@.c -o $@ + +.c.o: + $(CC) -c $(CFLAGS) $< -o $@ + +clean: + -rm $(OBJECTS) + +mrproper: clean + rm $(EXEC) diff --git a/pubsub/list.c b/pubsub/list.c deleted file mode 100644 index a6ea98f..0000000 --- a/pubsub/list.c +++ /dev/null @@ -1,48 +0,0 @@ -#include - -#include "list.h" - -List* -list_new(size_t element_size) -{ - List* l = malloc(sizeof(*l)); - - l->element_size = element_size; - l->head = NULL; - l->tail = NULL; - l->length = 0; - - return l; -} - -void* -list_append(List* l) -{ - struct link* link = malloc(sizeof(*link) + l->element_size); - - link->next = l->tail; - l->tail = link; - - if (!l->head) - l->tail = link; - - l->length++; - - return (void*) link->value; -} - -void -list_free(List* l) -{ - struct link* next; - struct link* link; - - for (link = l->head; link; link = next) { - next = link->next; - - free(link); - } - - free(l); -} - diff --git a/pubsub/list.h b/pubsub/list.h deleted file mode 100644 index 27d1612..0000000 --- a/pubsub/list.h +++ /dev/null @@ -1,23 +0,0 @@ - -#ifndef LIST_H -#define LIST_H - -struct link { - struct link* next; - char value[]; -}; - -typedef struct { - struct link* head; - struct link* tail; - size_t element_size; - size_t length; -} List; - -List* list_new(size_t); -void* list_append(List*); -void list_remove(List*, size_t); -void list_free(List*); - -#endif - diff --git a/pubsub/pubsubd.c b/pubsub/pubsubd.c index e278f3e..c1263b0 100644 --- a/pubsub/pubsubd.c +++ b/pubsub/pubsubd.c @@ -1,91 +1,191 @@ +#include "../lib/communication.h" #include -#include - -#include "list.h" - -typedef struct { - int test; -} Publisher; - -typedef struct { - int test; -} Subscriber; - const char* service_name = "pubsub"; void ohshit(int rvalue, const char* str) { - fprintf(stderr, "%s\n", str); - - exit(rvalue); + fprintf(stderr, "%s\n", str); + exit(rvalue); } +// init lists +void pubsubd_channels_init (struct channels *chans) { LIST_INIT(chans); } +void pubsubd_subscriber_init (struct app_list *al) { LIST_INIT(al); } + int -main(int argc, char* argv[]) +pubsubd_channels_eq (const struct channel *c1, const struct channel *c2) { - List* subscribers; - List* publishers; - int r; - char s_path[PATH_MAX]; - int s_pipe; + return (strncmp (c1->chan, c2->chan, c1->chanlen) == 0); +} +struct channels * pubsubd_channels_copy (struct channels *c); - (void) argc; - (void) argv; +void +pubsubd_channels_add (struct channels *chans, struct channels *c) +{ + if(!chans || !c) + return; - service_path(s_path, service_name); - - printf("Listening on %s.\n", s_path); - - if ((r = service_create(s_path))) - ohshit(1, "service_create error"); - - publishers = list_new(sizeof(Publisher)); - subscribers = list_new(sizeof(Subscriber)); - - if (!publishers && !subscribers) - ohshit(1, "out of memory, already..."); - - /* ?!?!?!?!? */ - mkfifo(s_path, S_IRUSR); - - s_pipe = open(s_path, S_IRUSR); - - for (;;) { - struct process* proc; - int proc_count, i; - - service_get_new_processes(&proc, &proc_count, s_pipe); - - printf("> %i proc\n", proc_count); - - for (i = 0; i < proc_count; i++) { - size_t message_size = BUFSIZ; - char buffer[BUFSIZ]; - - process_print(proc + i); - - if ((r = process_read(&proc[i], &buffer, &message_size))) { - ohshit(1, "process_read error"); - } - - printf(": %s\n", buffer); - - - } - - service_free_processes(&proc, proc_count); - - break; - } - - close(s_pipe); - - list_free(publishers); - list_free(subscribers); - - service_close(s_path); - - return 0; + struct process *n = pubsubd_channels_copy (c); + LIST_INSERT_HEAD(al, n, entries); } +void +pubsubd_subscriber_del (struct app_list *al, struct process *p) +{ + struct process *todel = srv_subscriber_get (al, p); + if(todel != NULL) { + LIST_REMOVE(todel, entries); + srv_process_free (mfree, todel); + mfree (todel); + todel = NULL; + } +} + +void +pubsubd_subscriber_add (struct app_list *al, struct process *p) +{ + if(!al || !p) + return; + + struct process *n = srv_process_copy (p); + LIST_INSERT_HEAD(al, n, entries); +} + +struct process * +pubsubd_subscriber_get (const struct app_list *al + , const struct process *p) +{ + struct process *np, *res = NULL; + LIST_FOREACH(np, al, entries) { + if(srv_process_eq (np, p)) { + res = np; + } + } + return res; +} + +void +pubsubd_subscriber_del (struct app_list *al, struct process *p) +{ + struct process *todel = srv_subscriber_get (al, p); + if(todel != NULL) { + LIST_REMOVE(todel, entries); + srv_process_free (mfree, todel); + mfree (todel); + todel = NULL; + } +} + +void pubsubd_msg_send (struct service *s, struct message * m, struct process *p) +{ +} +void pubsubd_msg_recv (struct service *s, struct message * m, struct process *p) +{ +} +void pubsub_msg_send (struct service *s, struct message * m) +{ +} +void pubsub_msg_recv (struct service *s, struct message * m) +{ +} + + int +main(int argc, char* argv[]) +{ + // gets the service path, such as /tmp/ + char s_path[PATH_MAX]; + service_path (s_path, service_name); + printf ("Listening on %s.\n", s_path); + + // creates the service named pipe, that listens to client applications + if (service_create (s_path)) + ohshit(1, "service_create error"); + + struct channels chans; + pubsubd_channels_init (&chans); + + for (;;) { + struct process proc; + int proc_count, i; + + service_get_new_process (&proc, s_path); + + printf("> %i proc\n", proc_count); + + for (i = 0; i < proc_count; i++) { + size_t message_size = BUFSIZ; + char buffer[BUFSIZ]; + + process_print(proc + i); + + if (process_read (&proc[i], &buffer, &message_size)) + ohshit(1, "process_read error"); + + printf(": %s\n", buffer); + + + } + + service_free_processes(&proc, proc_count); + } + + // the application will shut down, and remove the service named pipe + if (service_close (s_path)) + ohshit(1, "service_close error"); + + return EXIT_SUCCESS; +} + + +/* + * main loop + * + * opens the application pipes, + * reads then writes the same message, + * then closes the pipes + */ + +void main_loop (const char *spath) +{ + int ret; + struct process proc; + + int cnt = 10; + + while (cnt--) { + // -1 : error, 0 = no new process, 1 = new process + ret = service_get_new_process (&proc, spath); + if (ret == -1) { + fprintf (stderr, "error service_get_new_process\n"); + continue; + } else if (ret == 0) { // that should not happen + continue; + } + + // printf ("before print\n"); + process_print (&proc); + // printf ("after print\n"); + + // about the message + size_t msize = BUFSIZ; + char buf[BUFSIZ]; + bzero(buf, BUFSIZ); + + // printf ("before read\n"); + if ((ret = service_read (&proc, &buf, &msize))) { + fprintf(stdout, "error service_read %d\n", ret); + continue; + } + // printf ("after read\n"); + printf ("read, size %ld : %s\n", msize, buf); + + // printf ("before proc write\n"); + if ((ret = service_write (&proc, &buf, msize))) { + fprintf(stdout, "error service_write %d\n", ret); + continue; + } + // printf ("after proc write\n"); + printf ("\033[32mStill \033[31m%d\033[32m applications to serve\n",cnt); + } +} diff --git a/pubsub/pubsubd.h b/pubsub/pubsubd.h new file mode 100644 index 0000000..4c70462 --- /dev/null +++ b/pubsub/pubsubd.h @@ -0,0 +1,41 @@ +#ifndef __PUBSUBD_H__ +#define __PUBSUBD_H__ + +#include "queue.h" + +struct message { + unsigned char *chan; + size_t chanlen; + unsigned char *data; + size_t datalen; + unsigned char type; // message type : alert, notification, … +}; + +struct channel { + unsigned char *chan; + size_t chanlen; +}; + +struct channels { + struct channel *chan; + LIST_ENTRY(channels) entries; +}; + +int pubsubd_channels_eq (const struct channels *c1, const struct channels *c2); + +struct app_list { + struct process *p; + LIST_ENTRY(app_list) entries; +}; + +void pubsubd_msg_send (struct service *, struct message *msg, struct process *p); +void pubsubd_msg_recv (struct service *, struct message *msg, struct process *p); + +struct process * pubsubd_subscriber_get (const struct app_list * + , const struct process *); +void pubsubd_subscriber_del (struct app_list *al, struct process *p); + +void pubsub_msg_send (struct service *, struct message *msg); +void pubsub_msg_recv (struct service *, struct message *msg); + +#endif