]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSVar.c
974b11626c3ed2fc8c2ab04129c975bef30f5199
[orte.git] / orte / liborte / RTPSVar.c
1 /*                            
2  *  $Id: RTPSVar.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 46                  RTPS message VAR
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 void NewPublisher(ORTEDomain *d,ObjectEntryOID *op);
25 void NewSubscriber(ORTEDomain *d,ObjectEntryOID *os);
26
27 /**********************************************************************************/
28 void 
29 RTPSVar(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
30   GUID_RTPS          objectGUID,writerGUID;
31   ObjectId           roid,woid;
32   SequenceNumber     sn;   
33   int8_t             e_bit,p_bit,a_bit;
34   u_int16_t          submsg_len;
35   CSTReader          *cstReader=NULL;
36   CSTRemoteWriter    *cstRemoteWriter=NULL;
37   CSTRemoteReader    *cstRemoteReader=NULL;
38   CSChange           *csChange;
39   ObjectEntryAID     *objectEntryAID;
40   ObjectEntryOID     *objectEntryOID;
41
42   e_bit=rtps_msg[1] & 0x01;
43   p_bit=(rtps_msg[1] & 0x02)>>1;
44   a_bit=(rtps_msg[1] & 0x04)>>2;
45   submsg_len=*((u_int16_t*)(rtps_msg+2));
46   conv_u16(&submsg_len,e_bit);
47   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
48   conv_u32(&roid,0);
49   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
50   conv_u32(&woid,0);
51   if (rtps_msg[1] & 0x08) {                     /* bit H          */
52     objectGUID.hid=*((HostId*)(rtps_msg+12));      /* HostId         */
53     conv_u32(&objectGUID.hid,0);
54     objectGUID.aid=*((AppId*)(rtps_msg+16));       /* AppId          */
55     conv_u32(&objectGUID.aid,0);
56   } else {
57     rtps_msg-=8;
58     objectGUID.hid=mi->sourceHostId;
59     objectGUID.aid=mi->sourceAppId;
60   }     
61   objectGUID.oid=*((ObjectId*)(rtps_msg+20));      /* ObjectId       */
62   conv_u32(&objectGUID.oid,0);
63   sn=*((SequenceNumber*)(rtps_msg+24));         /* writerSN       */
64   conv_sn(&sn,e_bit);
65   writerGUID.hid=mi->sourceHostId;
66   writerGUID.aid=mi->sourceAppId;
67   writerGUID.oid=woid;
68
69   debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
70                 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
71   
72   //prepare csChange
73   csChange=(CSChange*)MALLOC(sizeof(CSChange));
74   csChange->cdrStream.buffer=NULL;
75   csChange->guid=objectGUID;
76   if (a_bit) csChange->alive=ORTE_TRUE;
77   else csChange->alive=ORTE_FALSE;
78   if (p_bit)
79     parameterDecodeStreamToCSChange(csChange,rtps_msg+32,submsg_len,e_bit);
80   else
81     CSChangeAttributes_init_head(csChange);
82   csChange->sn=sn;
83   SEQUENCE_NUMBER_NONE(csChange->gapSN);
84
85   //Manager
86   if ((d->guid.aid & 0x03)==MANAGER) {
87     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
88         ((writerGUID.aid & 0x03)==MANAGER)) {
89       //readerManagers
90       pthread_rwlock_wrlock(&d->readerManagers.lock);
91       pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
92       cstReader=&d->readerManagers;
93       gavl_cust_for_each(CSTRemoteReader,
94                          &d->writerApplicationSelf,
95                          cstRemoteReader) {
96         if (cstRemoteReader->guid.hid==senderIPAddress) break;
97       }
98       if (cstRemoteReader) {
99         objectEntryOID=objectEntryFind(d,&objectGUID);
100         if (!objectEntryOID) {
101           //  create new RemoteReader
102           AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
103           AppParamsInit(ap);
104           parameterUpdateApplication(csChange,ap);
105           if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
106               csChange->alive) {
107             debug(46,2) ("manager 0x%x-0x%x accepted\n",
108                           objectGUID.hid,objectGUID.aid);
109             objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);          
110             CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
111             CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
112             pthread_rwlock_wrlock(&d->readerApplications.lock);
113             pthread_rwlock_wrlock(&d->writerApplications.lock);
114             CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
115             CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
116             pthread_rwlock_unlock(&d->writerApplications.lock);
117             pthread_rwlock_unlock(&d->readerApplications.lock);
118             //all applications from manager node set expiration timer
119             gavl_cust_for_each(ObjectEntryAID,
120                                objectEntryOID->objectEntryHID,objectEntryAID) {
121               ObjectEntryOID *objectEntryOID1;
122               objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
123               objectEntryRefreshApp(d,objectEntryOID1);
124             }
125           } else {
126             FREE(ap);
127           }
128         }
129         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
130         if (cstRemoteWriter)
131           objectEntryRefreshApp(d,cstRemoteWriter->objectEntryOID);
132       } else {
133         //deny Manager
134       }
135       pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
136     }
137     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
138          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
139         ((writerGUID.oid==OID_WRITE_APP) &&
140          ((writerGUID.aid & 0x03)==MANAGER))) {
141       //readerApplication
142       pthread_rwlock_wrlock(&d->readerApplications.lock);
143       cstReader=&d->readerApplications;
144       objectEntryOID=objectEntryFind(d,&objectGUID);
145       if (!objectEntryOID) {
146         AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
147         AppParamsInit(ap);
148         parameterUpdateApplication(csChange,ap);
149         if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
150             csChange->alive) {
151           objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
152           objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
153           if (objectEntryOID->appMOM) {
154             debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
155                           objectGUID.hid,objectGUID.aid);
156             //increment vargAppsSequenceNumber and make csChange
157             SeqNumberInc(d->appParams->vargAppsSequenceNumber,
158                          d->appParams->vargAppsSequenceNumber);
159             //WAS & WM is locked inside next function
160             appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE,ORTE_TRUE);
161             CSTReaderAddRemoteWriter(d,cstReader,
162                 objectEntryOID,writerGUID.oid);
163             CSTWriterAddRemoteReader(d,&d->writerManagers,
164                 objectEntryOID,OID_READ_MGR);
165             pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
166             pthread_rwlock_unlock(&d->writerManagers.lock);
167           } else {
168             debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
169                           objectGUID.hid,objectGUID.aid);
170           }
171           pthread_rwlock_wrlock(&d->writerApplications.lock);
172           CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
173           pthread_rwlock_unlock(&d->writerApplications.lock);
174         } else {
175           FREE(ap);
176         }        
177       }
178       if (objectEntryOID) {
179         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
180         if (objectEntryOID->appMOM) {
181           objectEntryRefreshApp(d,objectEntryOID);
182         } else {
183           //turn off expiration timer
184           eventDetach(d,
185               objectEntryOID->objectEntryAID,
186               &objectEntryOID->expirationPurgeTimer,
187               0);
188           debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
189                         objectEntryOID->guid.hid,objectEntryOID->guid.aid);
190         }
191       }
192     }
193   }  
194   //ManagedApplication
195   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
196     switch (writerGUID.oid) {
197       case OID_WRITE_MGR:
198         pthread_rwlock_wrlock(&d->readerManagers.lock);        
199         cstReader=&d->readerManagers;
200         objectEntryOID=objectEntryFind(d,&objectGUID);
201         if (!objectEntryOID) {
202           AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
203           AppParamsInit(ap);
204           parameterUpdateApplication(csChange,ap);
205           if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
206               csChange->alive) {
207             debug(46,2) ("new manager 0x%x-0x%x accepted\n",
208                           objectGUID.hid,objectGUID.aid);
209             objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
210             objectEntryOID->privateCreated=ORTE_FALSE;
211             pthread_rwlock_wrlock(&d->readerApplications.lock);
212             CSTReaderAddRemoteWriter(d,&d->readerApplications,
213                                     objectEntryOID,OID_WRITE_APP);
214             pthread_rwlock_unlock(&d->readerApplications.lock);
215             //all applications from manager node set expiration timer
216             gavl_cust_for_each(ObjectEntryAID,
217                                objectEntryOID->objectEntryHID,objectEntryAID) {
218               ObjectEntryOID *objectEntryOID1;
219               objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
220               objectEntryRefreshApp(d,objectEntryOID1);
221             }
222           } else {
223             FREE(ap);
224           }
225         } else {
226           GUID_RTPS guid_wapp=objectGUID;
227           guid_wapp.oid=OID_WRITE_APP;
228           pthread_rwlock_wrlock(&d->readerApplications.lock);
229           cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
230           //setup state of cstRemoteWriter on send ACK to manager
231           if (cstRemoteWriter) {
232             if (cstRemoteWriter->commStateACK==WAITING) {
233               eventDetach(d,
234                   cstRemoteWriter->objectEntryOID->objectEntryAID,
235                   &cstRemoteWriter->repeatActiveQueryTimer,
236                   1);   //metatraffic timer
237               eventAdd(d,
238                   cstRemoteWriter->objectEntryOID->objectEntryAID,
239                   &cstRemoteWriter->repeatActiveQueryTimer,
240                   1,   //metatraffic timer
241                   "CSTReaderQueryTimer",
242                   CSTReaderQueryTimer,
243                   &cstRemoteWriter->cstReader->lock,
244                   cstRemoteWriter,
245                   NULL);               
246             }
247           }
248           pthread_rwlock_unlock(&d->readerApplications.lock);
249         } 
250         objectEntryRefreshApp(d,objectEntryOID);
251         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
252         if ((!cstRemoteWriter) &&
253             (objectGUID.hid==writerGUID.hid) && (objectGUID.aid==writerGUID.aid)) {
254           cstRemoteWriter=
255               CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
256         }
257         break;
258       case OID_WRITE_APP:
259         pthread_rwlock_wrlock(&d->readerApplications.lock);        
260         cstReader=&d->readerApplications;
261         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
262         if (cstRemoteWriter) {
263           GUID_RTPS guid_tmp=objectGUID;
264           guid_tmp.oid=OID_WRITE_PUBL;
265           objectEntryOID=objectEntryFind(d,&objectGUID);
266           if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
267             if (!objectEntryOID) {
268               AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
269               AppParamsInit(ap);
270               parameterUpdateApplication(csChange,ap);
271               if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
272                   csChange->alive) {
273                 debug(46,2) ("new application 0x%x-0x%x accepted\n",
274                               objectGUID.hid,objectGUID.aid);
275                 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
276                 objectEntryOID->privateCreated=ORTE_FALSE;
277               } else {
278                 FREE(ap);
279                 break;
280               }
281             }
282             pthread_rwlock_wrlock(&d->readerPublications.lock);            
283             pthread_rwlock_wrlock(&d->readerSubscriptions.lock);            
284             pthread_rwlock_wrlock(&d->writerPublications.lock);            
285             pthread_rwlock_wrlock(&d->writerSubscriptions.lock);            
286             CSTReaderAddRemoteWriter(d,&d->readerPublications,
287                                      objectEntryOID,OID_WRITE_PUBL);
288             CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
289                                      objectEntryOID,OID_WRITE_SUBS);
290             CSTWriterAddRemoteReader(d,&d->writerPublications,
291                                      objectEntryOID,OID_READ_PUBL);
292             CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
293                                      objectEntryOID,OID_READ_SUBS);
294             pthread_rwlock_unlock(&d->readerPublications.lock);            
295             pthread_rwlock_unlock(&d->readerSubscriptions.lock);            
296             pthread_rwlock_unlock(&d->writerPublications.lock);            
297             pthread_rwlock_unlock(&d->writerSubscriptions.lock);            
298           }
299           if (objectEntryOID) {
300             //turn off expiration timer
301             eventDetach(d,
302                 objectEntryOID->objectEntryAID,
303                 &objectEntryOID->expirationPurgeTimer,
304                 0);
305             debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
306                           objectEntryOID->guid.hid,
307                           objectEntryOID->guid.aid);
308           }
309         }
310         break;
311       case OID_WRITE_PUBL:
312         pthread_rwlock_wrlock(&d->readerPublications.lock);        
313         cstReader=&d->readerPublications;
314         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
315         if (cstRemoteWriter) {
316           objectEntryOID=objectEntryFind(d,&objectGUID);
317           if (!objectEntryOID) {
318             ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
319             PublParamsInit(pp);
320             parameterUpdatePublication(csChange,pp);
321             if (generateEvent(d,&objectGUID,(void*)pp,ORTE_TRUE) &&
322                 csChange->alive) {
323               debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
324                             objectGUID.hid,objectGUID.aid,objectGUID.oid);
325               objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)pp);
326               objectEntryOID->privateCreated=ORTE_FALSE;
327               pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
328               PublicationList_insert(&d->psEntry,objectEntryOID);
329               pthread_rwlock_unlock(&d->psEntry.publicationsLock);
330               NewPublisher(d,objectEntryOID);
331             } else
332               FREE(pp);
333           } else {
334             if ((!PublicationList_find(&d->psEntry,&objectGUID)) &&
335                  csChange->alive) {
336               pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
337               PublicationList_insert(&d->psEntry,objectEntryOID);
338               pthread_rwlock_unlock(&d->psEntry.publicationsLock);
339               NewPublisher(d,objectEntryOID);
340             }
341           }
342         }
343         break;
344       case OID_WRITE_SUBS:
345         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);        
346         cstReader=&d->readerSubscriptions;
347         cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
348         if (cstRemoteWriter) {
349           objectEntryOID=objectEntryFind(d,&objectGUID);
350           if (!objectEntryOID) {
351             ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
352             SubsParamsInit(sp);
353             parameterUpdateSubscription(csChange,sp);
354             if (generateEvent(d,&objectGUID,(void*)sp,ORTE_TRUE) &&
355                 csChange->alive) {
356               debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
357                             objectGUID.hid,objectGUID.aid,objectGUID.oid);
358               objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)sp);
359               objectEntryOID->privateCreated=ORTE_FALSE;
360               pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
361               SubscriptionList_insert(&d->psEntry,objectEntryOID);
362               pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
363               NewSubscriber(d,objectEntryOID);
364             } else
365               FREE(sp);
366           } else {
367             if ((!SubscriptionList_find(&d->psEntry,&objectGUID)) && 
368                  csChange->alive) {
369               pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
370               SubscriptionList_insert(&d->psEntry,objectEntryOID);
371               pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
372               NewSubscriber(d,objectEntryOID);
373             }
374           }
375         }
376         break;
377     }
378   }
379   if (!cstReader) return;
380   if (!cstRemoteWriter) {
381     pthread_rwlock_unlock(&cstReader->lock);
382     return;
383   }
384   debug(46,10) ("recv: processing CSChange\n");
385   if (SeqNumberCmp(sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
386     CSTReaderAddCSChange(cstRemoteWriter,csChange);
387     CSTReaderProcCSChanges(d,cstRemoteWriter);
388   } else {
389     //destroy csChange
390     parameterDelete(csChange);
391     FREE(csChange);
392   }
393   pthread_rwlock_unlock(&cstReader->lock);
394
395
396
397 /**********************************************************************************/
398 void 
399 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
400   ObjectEntryOID     *o;
401   ORTEPublProp       *pp;
402   CSTWriter          *cstWriter=NULL;
403   CSTReader          *cstReader=NULL;
404   PatternNode        *pnode;
405   
406   if ((d==NULL) || (op==NULL)) return;
407   pp=(ORTEPublProp*)op->attributes;
408   //***************************************
409   //Pattern 
410   //try to find if subscription exists
411   pthread_rwlock_rdlock(&d->patternEntry.lock);
412   pthread_rwlock_rdlock(&d->subscriptions.lock);
413   gavl_cust_for_each(CSTReader,
414                      &d->subscriptions,cstReader) {
415     if (cstReader->createdByPattern) {
416       ORTESubsProp       *sp;
417       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
418       if ((strcmp(sp->topic,pp->topic)==0) &&
419           (strcmp(sp->typeName,pp->typeName)==0)) 
420         break; //found
421     }
422   }
423   pthread_rwlock_unlock(&d->subscriptions.lock);
424   if (!cstReader) { //not exists
425     ul_list_for_each(Pattern,&d->patternEntry,pnode) {
426       if ((fnmatch(pnode->topic,pp->topic,0)==0) &&
427           (fnmatch(pnode->type,pp->typeName,0)==0)) {
428         //pattern matched
429         // free resources
430         pthread_rwlock_unlock(&d->readerPublications.lock);        
431         pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
432         pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
433         cstReader=pnode->subscriptionCallBack(
434             pp->topic,
435             pp->typeName,
436             pnode->param);
437         if (cstReader) {
438           cstReader->createdByPattern=ORTE_TRUE;
439         }
440         //allocate resources
441         pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
442         pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
443         pthread_rwlock_wrlock(&d->readerPublications.lock);        
444       }  
445     }
446   }
447   pthread_rwlock_unlock(&d->patternEntry.lock);
448   //Pattern end
449   pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
450   gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
451     ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
452     if ((strcmp(pp->topic,sp->topic)==0) &&
453         (strcmp(pp->typeName,sp->typeName)==0) &&
454         (pp->typeChecksum==sp->typeChecksum)) {
455       //add Subscription to Publisher (only if private)
456       if (op->privateCreated) {
457         pthread_rwlock_rdlock(&d->publications.lock);
458         if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
459           pthread_rwlock_wrlock(&cstWriter->lock);
460           if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
461             CSTWriterAddRemoteReader(d,cstWriter,o,o->oid);
462             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
463                           op->guid.hid,op->guid.aid,op->guid.oid,
464                           o->guid.hid,o->guid.aid,o->guid.oid);
465           }
466           pthread_rwlock_unlock(&cstWriter->lock);
467         }
468         pthread_rwlock_unlock(&d->publications.lock);
469       }
470       //add Publisher to Subscriber (only if private)
471       if (o->privateCreated) {
472         pthread_rwlock_rdlock(&d->subscriptions.lock);
473         if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
474           pthread_rwlock_wrlock(&cstReader->lock);
475           if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
476             CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
477             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
478                           o->guid.hid,o->guid.aid,o->guid.oid,
479                           op->guid.hid,op->guid.aid,op->guid.oid);
480           }
481           pthread_rwlock_unlock(&cstReader->lock);
482         }
483         pthread_rwlock_unlock(&d->subscriptions.lock);
484       }
485     }
486   } 
487   pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
488 }              
489
490 /**********************************************************************************/
491 void 
492 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
493   ObjectEntryOID     *o;
494   ORTESubsProp       *sp;
495   CSTWriter          *cstWriter;
496   CSTReader          *cstReader;
497   
498   if ((d==NULL) || (os==NULL)) return;
499   sp=(ORTESubsProp*)os->attributes;
500   pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
501   gavl_cust_for_each(PublicationList,&d->psEntry,o) {
502     ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
503     if ((strcmp(sp->topic,pp->topic)==0) &&
504         (strcmp(sp->typeName,pp->typeName)==0) &&
505         (sp->typeChecksum==pp->typeChecksum)) {
506       //add Publication to Subscription (only if private)
507       if (os->privateCreated) {
508         pthread_rwlock_rdlock(&d->subscriptions.lock);
509         if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
510           pthread_rwlock_wrlock(&cstReader->lock);
511           if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
512             CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
513             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
514                           os->guid.hid,os->guid.aid,os->guid.oid,
515                           o->guid.hid,o->guid.aid,o->guid.oid);
516           }
517           pthread_rwlock_unlock(&cstReader->lock);
518         }
519         pthread_rwlock_unlock(&d->subscriptions.lock);
520       }
521       //add Subscriber to Publisher (only if private)
522       if (o->privateCreated) {
523         pthread_rwlock_rdlock(&d->publications.lock);
524         if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
525           pthread_rwlock_wrlock(&cstWriter->lock);
526           if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
527             CSTWriterAddRemoteReader(d,cstWriter,os,os->oid);
528             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
529                           o->guid.hid,o->guid.aid,o->guid.oid,
530                           os->guid.hid,os->guid.aid,os->guid.oid);
531           }
532           pthread_rwlock_unlock(&cstWriter->lock);
533         }
534         pthread_rwlock_unlock(&d->publications.lock);
535       }
536     }
537   } 
538   pthread_rwlock_unlock(&d->psEntry.publicationsLock);
539 }