]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSVar.c
Update of ORTE. Configured to compile for Linux out of box.
[orte.git] / orte / liborte / RTPSVar.c
1 /*                            
2  *  $Id: RTPSVar.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 46                  RTPS message VAR
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 void NewPublisher(ORTEDomain *d,ObjectEntryOID *op);
25 void NewSubscriber(ORTEDomain *d,ObjectEntryOID *os);
26
27 /**********************************************************************************/
28 void 
29 RTPSVar(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
30   GUID_RTPS          objectGUID,writerGUID;
31   ObjectId           roid,woid;
32   SequenceNumber     sn;   
33   int8_t             e_bit,p_bit,a_bit;
34   u_int16_t          submsg_len;
35   CSTReader          *cstReader=NULL;
36   CSTRemoteWriter    *cstRemoteWriter=NULL;
37   CSTRemoteReader    *cstRemoteReader=NULL;
38   CSChange           *csChange;
39   ObjectEntryAID     *objectEntryAID;
40   ObjectEntryOID     *objectEntryOID;
41
42   e_bit=rtps_msg[1] & 0x01;
43   p_bit=(rtps_msg[1] & 0x02)>>1;
44   a_bit=(rtps_msg[1] & 0x04)>>2;
45   submsg_len=*((u_int16_t*)(rtps_msg+2));
46   conv_u16(&submsg_len,e_bit);
47   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
48   conv_u32(&roid,0);
49   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
50   conv_u32(&woid,0);
51   if (rtps_msg[1] & 0x08) {                     /* bit H          */
52     objectGUID.hid=*((HostId*)(rtps_msg+12));      /* HostId         */
53     conv_u32(&objectGUID.hid,0);
54     objectGUID.aid=*((AppId*)(rtps_msg+16));       /* AppId          */
55     conv_u32(&objectGUID.aid,0);
56   } else {
57     rtps_msg-=8;
58     objectGUID.hid=mi->sourceHostId;
59     objectGUID.aid=mi->sourceAppId;
60   }     
61   objectGUID.oid=*((ObjectId*)(rtps_msg+20));      /* ObjectId       */
62   conv_u32(&objectGUID.oid,0);
63   sn=*((SequenceNumber*)(rtps_msg+24));         /* writerSN       */
64   conv_sn(&sn,e_bit);
65   writerGUID.hid=mi->sourceHostId;
66   writerGUID.aid=mi->sourceAppId;
67   writerGUID.oid=woid;
68
69   debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
70                 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
71   
72   //prepare csChange
73   csChange=(CSChange*)MALLOC(sizeof(CSChange));
74   csChange->cdrStream.buffer=NULL;
75   csChange->guid=objectGUID;
76   if (a_bit) csChange->alive=ORTE_TRUE;
77   else csChange->alive=ORTE_FALSE;
78   if (p_bit)
79     parameterDecodeStreamToCSChange(csChange,rtps_msg+32,submsg_len,e_bit);
80   else
81     CSChangeAttributes_init_head(csChange);
82   csChange->sn=sn;
83   SEQUENCE_NUMBER_NONE(csChange->gapSN);
84
85   //Manager
86   if ((d->guid.aid & 0x03)==MANAGER) {
87     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
88         ((writerGUID.aid & 0x03)==MANAGER)) {
89       //readerManagers
90       pthread_rwlock_wrlock(&d->readerManagers.lock);
91       pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
92       cstReader=&d->readerManagers;
93       gavl_cust_for_each(CSTRemoteReader,
94                          &d->writerApplicationSelf,
95                          cstRemoteReader) {
96         if (cstRemoteReader->guid.hid==senderIPAddress) break;
97       }
98       if (cstRemoteReader) {
99         objectEntryOID=objectEntryFind(d,&objectGUID);
100         if (!objectEntryOID) {
101           //  create new RemoteReader
102           AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
103           AppParamsInit(ap);
104           parameterUpdateApplication(csChange,ap);
105           if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
106               csChange->alive) {
107             debug(46,2) ("manager 0x%x-0x%x accepted\n",
108                           objectGUID.hid,objectGUID.aid);
109             objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);          
110             CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
111             CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
112             pthread_rwlock_wrlock(&d->readerApplications.lock);
113             pthread_rwlock_wrlock(&d->writerApplications.lock);
114             CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
115             CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
116             pthread_rwlock_unlock(&d->writerApplications.lock);
117             pthread_rwlock_unlock(&d->readerApplications.lock);
118             //all applications from manager node set expiration timer
119             gavl_cust_for_each(ObjectEntryAID,
120                                objectEntryOID->objectEntryHID,objectEntryAID) {
121               objectEntryOID=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
122               objectEntryRefreshApp(d,objectEntryOID);
123             }
124           } else {
125             FREE(ap);
126           }
127         }
128         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
129         if (cstRemoteWriter)
130           objectEntryRefreshApp(d,cstRemoteWriter->objectEntryOID);
131       } else {
132         //deny Manager
133       }
134       pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
135     }
136     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
137          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
138         ((writerGUID.oid==OID_WRITE_APP) &&
139          ((writerGUID.aid & 0x03)==MANAGER))) {
140       //readerApplication
141       pthread_rwlock_wrlock(&d->readerApplications.lock);
142       cstReader=&d->readerApplications;
143       objectEntryOID=objectEntryFind(d,&objectGUID);
144       if (!objectEntryOID) {
145         AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
146         AppParamsInit(ap);
147         parameterUpdateApplication(csChange,ap);
148         if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
149             csChange->alive) {
150           objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
151           objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
152           if (objectEntryOID->appMOM) {
153             debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
154                           objectGUID.hid,objectGUID.aid);
155             //increment vargAppsSequenceNumber and make csChange
156             SeqNumberInc(d->appParams->vargAppsSequenceNumber,
157                          d->appParams->vargAppsSequenceNumber);
158             //WAS & WM is locked inside next function
159             appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
160             CSTReaderAddRemoteWriter(d,cstReader,
161                 objectEntryOID,writerGUID.oid);
162             CSTWriterAddRemoteReader(d,&d->writerManagers,
163                 objectEntryOID,OID_READ_MGR);
164             pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
165             pthread_rwlock_unlock(&d->writerManagers.lock);
166           } else {
167             debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
168                           objectGUID.hid,objectGUID.aid);
169           }
170           pthread_rwlock_wrlock(&d->writerApplications.lock);
171           CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
172           pthread_rwlock_unlock(&d->writerApplications.lock);
173         } else {
174           FREE(ap);
175         }        
176       }
177       if (objectEntryOID) {
178         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
179         if (objectEntryOID->appMOM) {
180           objectEntryRefreshApp(d,objectEntryOID);
181         } else {
182           //turn off expiration timer
183           eventDetach(d,
184               objectEntryOID->objectEntryAID,
185               &objectEntryOID->expirationPurgeTimer,
186               0);
187           debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
188                         objectEntryOID->guid.hid,objectEntryOID->guid.aid);
189         }
190       }
191     }
192   }  
193   //ManagedApplication
194   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
195     switch (writerGUID.oid) {
196       case OID_WRITE_MGR:
197         pthread_rwlock_wrlock(&d->readerManagers.lock);        
198         cstReader=&d->readerManagers;
199         objectEntryOID=objectEntryFind(d,&objectGUID);
200         if (!objectEntryOID) {
201           AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
202           AppParamsInit(ap);
203           parameterUpdateApplication(csChange,ap);
204           if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
205               csChange->alive) {
206             debug(46,2) ("new manager 0x%x-0x%x accepted\n",
207                           objectGUID.hid,objectGUID.aid);
208             objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
209             objectEntryOID->private=ORTE_FALSE;
210             pthread_rwlock_wrlock(&d->readerApplications.lock);
211             CSTReaderAddRemoteWriter(d,&d->readerApplications,
212                                     objectEntryOID,OID_WRITE_APP);
213             pthread_rwlock_unlock(&d->readerApplications.lock);
214             //all applications from manager node set expiration timer
215             gavl_cust_for_each(ObjectEntryAID,
216                                objectEntryOID->objectEntryHID,objectEntryAID) {
217               objectEntryOID=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
218               objectEntryRefreshApp(d,objectEntryOID);
219             }
220           } else {
221             FREE(ap);
222           }
223         } else {
224           GUID_RTPS guid_wapp=objectGUID;
225           guid_wapp.oid=OID_WRITE_APP;
226           pthread_rwlock_wrlock(&d->readerApplications.lock);
227           cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
228           //setup state of cstRemoteWriter on send ACK to manager
229           if (cstRemoteWriter) {
230             if (cstRemoteWriter->commStateACK==WAITING) {
231               eventDetach(d,
232                   cstRemoteWriter->objectEntryOID->objectEntryAID,
233                   &cstRemoteWriter->repeatActiveQueryTimer,
234                   1);   //metatraffic timer
235               eventAdd(d,
236                   cstRemoteWriter->objectEntryOID->objectEntryAID,
237                   &cstRemoteWriter->repeatActiveQueryTimer,
238                   1,   //metatraffic timer
239                   "CSTReaderQueryTimer",
240                   CSTReaderQueryTimer,
241                   &cstRemoteWriter->cstReader->lock,
242                   cstRemoteWriter,
243                   NULL);               
244             }
245           }
246           pthread_rwlock_unlock(&d->readerApplications.lock);
247         } 
248         objectEntryRefreshApp(d,objectEntryOID);
249         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
250         if ((!cstRemoteWriter) &&
251             (objectGUID.hid==writerGUID.hid) && (objectGUID.aid==writerGUID.aid)) {
252           cstRemoteWriter=
253               CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
254         }
255         break;
256       case OID_WRITE_APP:
257         pthread_rwlock_wrlock(&d->readerApplications.lock);        
258         cstReader=&d->readerApplications;
259         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
260         if (cstRemoteWriter) {
261           GUID_RTPS guid_tmp=objectGUID;
262           guid_tmp.oid=OID_WRITE_PUBL;
263           objectEntryOID=objectEntryFind(d,&objectGUID);
264           if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
265             if (!objectEntryOID) {
266               AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
267               AppParamsInit(ap);
268               parameterUpdateApplication(csChange,ap);
269               if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
270                   csChange->alive) {
271                 debug(46,2) ("new application 0x%x-0x%x accepted\n",
272                               objectGUID.hid,objectGUID.aid);
273                 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
274                 objectEntryOID->private=ORTE_FALSE;
275               } else {
276                 FREE(ap);
277                 break;
278               }
279             }
280             pthread_rwlock_wrlock(&d->readerPublications.lock);            
281             pthread_rwlock_wrlock(&d->readerSubscriptions.lock);            
282             pthread_rwlock_wrlock(&d->writerPublications.lock);            
283             pthread_rwlock_wrlock(&d->writerSubscriptions.lock);            
284             CSTReaderAddRemoteWriter(d,&d->readerPublications,
285                                      objectEntryOID,OID_WRITE_PUBL);
286             CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
287                                      objectEntryOID,OID_WRITE_SUBS);
288             CSTWriterAddRemoteReader(d,&d->writerPublications,
289                                      objectEntryOID,OID_READ_PUBL);
290             CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
291                                      objectEntryOID,OID_READ_SUBS);
292             pthread_rwlock_unlock(&d->readerPublications.lock);            
293             pthread_rwlock_unlock(&d->readerSubscriptions.lock);            
294             pthread_rwlock_unlock(&d->writerPublications.lock);            
295             pthread_rwlock_unlock(&d->writerSubscriptions.lock);            
296           }
297           if (objectEntryOID) {
298             //turn off expiration timer
299             eventDetach(d,
300                 objectEntryOID->objectEntryAID,
301                 &objectEntryOID->expirationPurgeTimer,
302                 0);
303             debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
304                           objectEntryOID->guid.hid,
305                           objectEntryOID->guid.aid);
306           }
307         }
308         break;
309       case OID_WRITE_PUBL:
310         pthread_rwlock_wrlock(&d->readerPublications.lock);        
311         cstReader=&d->readerPublications;
312         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
313         if (cstRemoteWriter) {
314           objectEntryOID=objectEntryFind(d,&objectGUID);
315           if (!objectEntryOID) {
316             ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
317             PublParamsInit(pp);
318             parameterUpdatePublication(csChange,pp);
319             if (generateEvent(d,&objectGUID,(void*)pp,ORTE_TRUE) &&
320                 csChange->alive) {
321               debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
322                             objectGUID.hid,objectGUID.aid,objectGUID.oid);
323               objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)pp);
324               objectEntryOID->private=ORTE_FALSE;
325               pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
326               PublicationList_insert(&d->psEntry,objectEntryOID);
327               pthread_rwlock_unlock(&d->psEntry.publicationsLock);
328               NewPublisher(d,objectEntryOID);
329             } else
330               FREE(pp);
331           } else {
332             if (!PublicationList_find(&d->psEntry,&objectGUID)) {
333               pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
334               PublicationList_insert(&d->psEntry,objectEntryOID);
335               pthread_rwlock_unlock(&d->psEntry.publicationsLock);
336               NewPublisher(d,objectEntryOID);
337             }
338           }
339         }
340         break;
341       case OID_WRITE_SUBS:
342         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);        
343         cstReader=&d->readerSubscriptions;
344         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
345         if (cstRemoteWriter) {
346           objectEntryOID=objectEntryFind(d,&objectGUID);
347           if (!objectEntryOID) {
348             ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
349             SubsParamsInit(sp);
350             parameterUpdateSubscription(csChange,sp);
351             if (generateEvent(d,&objectGUID,(void*)sp,ORTE_TRUE) &&
352                 csChange->alive) {
353               debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
354                             objectGUID.hid,objectGUID.aid,objectGUID.oid);
355               objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)sp);
356               objectEntryOID->private=ORTE_FALSE;
357               pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
358               SubscriptionList_insert(&d->psEntry,objectEntryOID);
359               pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
360               NewSubscriber(d,objectEntryOID);
361             } else
362               FREE(sp);
363           } else {
364             if (!SubscriptionList_find(&d->psEntry,&objectGUID)) {
365               pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
366               SubscriptionList_insert(&d->psEntry,objectEntryOID);
367               pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
368               NewSubscriber(d,objectEntryOID);
369             }
370           }
371         }
372         break;
373     }
374   }
375   if (!cstReader) return;
376   if (!cstRemoteWriter) {
377     pthread_rwlock_unlock(&cstReader->lock);
378     return;
379   }
380   debug(46,10) ("recv: processing CSChange\n");
381   if (SeqNumberCmp(sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
382     CSTReaderAddCSChange(cstRemoteWriter,csChange);
383     CSTReaderProcCSChanges(d,cstRemoteWriter);
384   } else {
385     //destroy csChange
386     parameterDelete(csChange);
387     FREE(csChange);
388   }
389   pthread_rwlock_unlock(&cstReader->lock);
390
391
392
393 /**********************************************************************************/
394 void 
395 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
396   ObjectEntryOID     *o;
397   ORTEPublProp       *pp;
398   CSTWriter          *cstWriter=NULL;
399   CSTReader          *cstReader=NULL;
400   PatternNode        *pnode;
401   
402   if ((d==NULL) || (op==NULL)) return;
403   pp=(ORTEPublProp*)op->attributes;
404   //***************************************
405   //Pattern 
406   //try to find if subscription exists
407   pthread_rwlock_rdlock(&d->patternEntry.lock);
408   pthread_rwlock_rdlock(&d->subscriptions.lock);
409   gavl_cust_for_each(CSTReader,
410                      &d->subscriptions,cstReader) {
411     if (cstReader->createdByPattern) {
412       ORTESubsProp       *sp;
413       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
414       if ((strcmp(sp->topic,pp->topic)==0) &&
415           (strcmp(sp->typeName,pp->typeName)==0)) 
416         break; //found
417     }
418   }
419   pthread_rwlock_unlock(&d->subscriptions.lock);
420   if (!cstReader) { //not exists
421     ul_list_for_each(Pattern,&d->patternEntry,pnode) {
422       if ((fnmatch(pnode->topic,pp->topic,0)==0) &&
423           (fnmatch(pnode->type,pp->typeName,0)==0)) {
424         //pattern matched
425         // free resources
426         pthread_rwlock_unlock(&d->patternEntry.lock);        
427         pthread_rwlock_unlock(&d->readerPublications.lock);        
428         pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
429         pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
430         cstReader=pnode->subscriptionCallBack(
431             pp->topic,
432             pp->typeName,
433             pnode->param);
434         if (cstReader) {
435           cstReader->createdByPattern=ORTE_TRUE;
436         }
437         //allocate resources
438         pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
439         pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
440         pthread_rwlock_wrlock(&d->readerPublications.lock);        
441         pthread_rwlock_rdlock(&d->patternEntry.lock);
442       }  
443     }
444   }
445   pthread_rwlock_unlock(&d->patternEntry.lock);
446   //Pattern end
447   pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
448   gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
449     ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
450     if ((strcmp(pp->topic,sp->topic)==0) &&
451         (strcmp(pp->typeName,sp->typeName)==0) &&
452         (pp->typeChecksum==sp->typeChecksum)) {
453       //add Subscription to Publisher (only if private)
454       if (op->private) {
455         pthread_rwlock_rdlock(&d->publications.lock);
456         if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
457           pthread_rwlock_wrlock(&cstWriter->lock);
458           if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
459             CSTWriterAddRemoteReader(d,cstWriter,o,o->oid);
460             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
461                           op->guid.hid,op->guid.aid,op->guid.oid,
462                           o->guid.hid,o->guid.aid,o->guid.oid);
463           }
464           pthread_rwlock_unlock(&cstWriter->lock);
465         }
466         pthread_rwlock_unlock(&d->publications.lock);
467       }
468       //add Publisher to Subscriber (only if private)
469       if (o->private) {
470         pthread_rwlock_rdlock(&d->subscriptions.lock);
471         if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
472           pthread_rwlock_wrlock(&cstReader->lock);
473           if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
474             CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
475             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
476                           o->guid.hid,o->guid.aid,o->guid.oid,
477                           op->guid.hid,op->guid.aid,op->guid.oid);
478           }
479           pthread_rwlock_unlock(&cstReader->lock);
480         }
481         pthread_rwlock_unlock(&d->subscriptions.lock);
482       }
483     }
484   } 
485   pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
486 }              
487
488 /**********************************************************************************/
489 void 
490 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
491   ObjectEntryOID     *o;
492   ORTESubsProp       *sp;
493   CSTWriter          *cstWriter;
494   CSTReader          *cstReader;
495   
496   if ((d==NULL) || (os==NULL)) return;
497   sp=(ORTESubsProp*)os->attributes;
498   pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
499   gavl_cust_for_each(PublicationList,&d->psEntry,o) {
500     ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
501     if ((strcmp(sp->topic,pp->topic)==0) &&
502         (strcmp(sp->typeName,pp->typeName)==0) &&
503         (sp->typeChecksum==pp->typeChecksum)) {
504       //add Publication to Subscription (only if private)
505       if (os->private) {
506         pthread_rwlock_rdlock(&d->subscriptions.lock);
507         if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
508           pthread_rwlock_wrlock(&cstReader->lock);
509           if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
510             CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
511             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
512                           os->guid.hid,os->guid.aid,os->guid.oid,
513                           o->guid.hid,o->guid.aid,o->guid.oid);
514           }
515           pthread_rwlock_unlock(&cstReader->lock);
516         }
517         pthread_rwlock_unlock(&d->subscriptions.lock);
518       }
519       //add Subscriber to Publisher (only if private)
520       if (o->private) {
521         pthread_rwlock_rdlock(&d->publications.lock);
522         if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
523           pthread_rwlock_wrlock(&cstWriter->lock);
524           if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
525             CSTWriterAddRemoteReader(d,cstWriter,os,os->oid);
526             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
527                           o->guid.hid,o->guid.aid,o->guid.oid,
528                           os->guid.hid,os->guid.aid,os->guid.oid);
529           }
530           pthread_rwlock_unlock(&cstWriter->lock);
531         }
532         pthread_rwlock_unlock(&d->publications.lock);
533       }
534     }
535   } 
536   pthread_rwlock_unlock(&d->psEntry.publicationsLock);
537 }