]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSVar.c
ef70b94b1dfb12200bd616b57f2b2265aecf53a2
[orte.git] / orte / liborte / RTPSVar.c
1 /*                            
2  *  $Id: RTPSVar.c,v 0.0.0.2            2004/11/24 
3  *
4  *  DEBUG:  section 46                  RTPS message VAR
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int 
36 RTPSVarCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange) 
37 {
38   CDR_Endianness     data_endian;
39   CORBA_octet        flags;
40   int                len,swptr;
41
42
43   swptr=cdrCodec->wptr;
44
45   /* submessage id */
46   CDR_put_octet(cdrCodec,VAR);
47
48   /* flags */
49   flags=cdrCodec->data_endian;
50   if (!CSChangeAttributes_is_empty(csChange))
51     flags|=2;
52   if (csChange->alive) 
53     flags|=4;
54   if (csChange->guid.oid==OID_APP) 
55     flags|=8;
56   CDR_put_octet(cdrCodec,flags);
57
58   /* length */
59   cdrCodec->wptr+=2;
60
61   /* next data are sent in big endianing */
62   data_endian=cdrCodec->data_endian;
63   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
64
65   /* readerObjectId */
66   CDR_put_ulong(cdrCodec,roid);
67   
68   /* writerObjectId */
69   CDR_put_ulong(cdrCodec,woid);
70
71   if (csChange->guid.oid==OID_APP) {
72      /* hid */
73      CDR_put_ulong(cdrCodec,csChange->guid.hid);
74
75      /* aid */
76      CDR_put_ulong(cdrCodec,csChange->guid.aid);
77   }
78
79   /* oid */
80   CDR_put_ulong(cdrCodec,csChange->guid.oid);
81
82   cdrCodec->data_endian=data_endian;
83
84   /* seqNumber */
85   CDR_put_ulong(cdrCodec,csChange->sn.high);
86   if (CDR_put_ulong(cdrCodec,csChange->sn.low)==CORBA_FALSE) {
87     cdrCodec->wptr=swptr;
88     return -1;
89   }
90
91   /* parameters */
92   if (!CSChangeAttributes_is_empty(csChange)) {
93     if (parameterCodeCodecFromCSChange(csChange,cdrCodec)<0) {
94       cdrCodec->wptr=swptr;
95       return -1;
96     }
97   }
98  
99   /* count length of message */
100   len=cdrCodec->wptr-swptr;
101  
102   /* length */
103   cdrCodec->wptr=swptr+2;
104   CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)(len-4));
105
106   cdrCodec->wptr=swptr+len;
107
108   return len;
109 }
110
111 /**********************************************************************************/
112 void 
113 RTPSVarFinish(CSTReader *cstReader,CSTRemoteWriter *cstRemoteWriter,
114     CSChange *csChange) {
115
116   if (cstReader && cstRemoteWriter) {
117     debug(46,10) ("recv: processing CSChange\n");
118     if (SeqNumberCmp(csChange->sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
119       CSTReaderAddCSChange(cstRemoteWriter,csChange);
120       CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
121       csChange=NULL;
122     }
123   }  
124
125   if (csChange) {
126     //destroy csChange if any
127     parameterDelete(csChange);
128     FREE(csChange);
129   }
130   if (cstReader) {
131     pthread_rwlock_unlock(&cstReader->lock);
132     return;
133   }
134 }
135
136 /**********************************************************************************/
137 void 
138 RTPSVarManager(ORTEDomain *d,CSChange *csChange,GUID_RTPS *writerGUID,
139     IPAddress senderIPAddress) 
140 {
141   CSTReader          *cstReader=NULL;
142   CSTRemoteReader    *cstRemoteReader=NULL;
143   CSTRemoteWriter    *cstRemoteWriter=NULL;
144   ObjectEntryAID     *objectEntryAID;
145   ObjectEntryOID     *objectEntryOID;
146   Boolean            usedMulticast=ORTE_FALSE;
147   int                i;
148
149   if ((d->guid.aid & 0x03)!=MANAGER) return;
150
151   /* readerManagers */
152   if ((writerGUID->oid==OID_WRITE_APPSELF) && 
153       ((writerGUID->aid & 0x03)==MANAGER)) {
154     pthread_rwlock_wrlock(&d->readerManagers.lock);
155     pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
156     cstReader=&d->readerManagers;
157     gavl_cust_for_each(CSTRemoteReader,
158                        &d->writerApplicationSelf,
159                        cstRemoteReader) {
160       AppParams *ap=(AppParams*)cstRemoteReader->sobject->attributes;
161       for (i=0;i<ap->unicastIPAddressCount;i++) {
162         if (ap->unicastIPAddressList[i]==senderIPAddress) {
163           break;
164         }
165       }
166       if (i!=ap->unicastIPAddressCount) 
167         break;
168       if (matchMulticastAddresses(cstRemoteReader->sobject,
169                                   cstRemoteReader->cstWriter->objectEntryOID)) {
170         usedMulticast=ORTE_TRUE;
171         break;
172       }
173     }
174     if (cstRemoteReader) {
175       objectEntryOID=objectEntryFind(d,&csChange->guid);
176       if (!objectEntryOID) {
177         //  create new RemoteReader
178         AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
179         AppParamsInit(ap);
180         parameterUpdateApplication(csChange,ap);
181         if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
182             csChange->alive) {
183           debug(46,2) ("manager 0x%x-0x%x accepted\n",
184                         csChange->guid.hid,csChange->guid.aid);
185           objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);          
186           CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
187           CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID->oid);
188           pthread_rwlock_wrlock(&d->readerApplications.lock);
189           pthread_rwlock_wrlock(&d->writerApplications.lock);
190           CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
191           if (usedMulticast) {
192             /* connect to virual objectEntryOID of multicast manager */
193             CSTWriterAddRemoteReader(d,
194                                      &d->writerApplications,
195                                      objectEntryOID,
196                                      OID_READ_APP,
197                                      cstRemoteReader->sobject);
198           } else {
199             CSTWriterAddRemoteReader(d,&d->writerApplications,
200                                      objectEntryOID,
201                                      OID_READ_APP,
202                                      objectEntryOID);
203           }
204           pthread_rwlock_unlock(&d->writerApplications.lock);
205           pthread_rwlock_unlock(&d->readerApplications.lock);
206           //all applications from manager node set expiration timer
207           gavl_cust_for_each(ObjectEntryAID,
208                              objectEntryOID->objectEntryHID,objectEntryAID) {
209             ObjectEntryOID *objectEntryOID1;
210             objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&csChange->guid.oid);
211             objectEntryRefreshApp(d,objectEntryOID1);
212           }
213         } else {
214           FREE(ap);
215         }
216       }
217       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
218       if ((cstRemoteWriter) && (csChange->alive==ORTE_TRUE))
219         objectEntryRefreshApp(d,cstRemoteWriter->spobject);
220     } else {
221       //deny Manager
222     }
223     pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
224   }
225
226
227   /* readerApplication */
228   if (((writerGUID->oid==OID_WRITE_APPSELF) &&
229        ((writerGUID->aid & 0x03)==MANAGEDAPPLICATION)) ||
230       ((writerGUID->oid==OID_WRITE_APP) &&
231        ((writerGUID->aid & 0x03)==MANAGER))) {
232
233     pthread_rwlock_wrlock(&d->readerApplications.lock);
234     cstReader=&d->readerApplications;
235     objectEntryOID=objectEntryFind(d,&csChange->guid);
236     if (!objectEntryOID) {
237       AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
238       AppParamsInit(ap);
239       parameterUpdateApplication(csChange,ap);
240       if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
241           csChange->alive) {
242         objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
243         objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
244         if (objectEntryOID->appMOM) {
245           debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
246                         csChange->guid.hid,csChange->guid.aid);
247           //increment vargAppsSequenceNumber and make csChange
248           SeqNumberInc(d->appParams->vargAppsSequenceNumber,
249                        d->appParams->vargAppsSequenceNumber);
250           //WAS & WM is locked inside next function
251           appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE,ORTE_TRUE);
252           CSTReaderAddRemoteWriter(d,cstReader,
253               objectEntryOID,writerGUID->oid);
254           CSTWriterAddRemoteReader(d,&d->writerManagers,
255               objectEntryOID,
256               OID_READ_MGR,
257               objectEntryOID);
258           pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
259           pthread_rwlock_unlock(&d->writerManagers.lock);
260         } else {
261           debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
262                         csChange->guid.hid,csChange->guid.aid);
263         }
264         pthread_rwlock_wrlock(&d->writerApplications.lock);
265         CSTWriterAddRemoteReader(d,&d->writerApplications,
266             objectEntryOID,
267             OID_READ_APP,
268             objectEntryOID);
269         pthread_rwlock_unlock(&d->writerApplications.lock);
270       } else {
271         FREE(ap);
272       }        
273     }
274     if (objectEntryOID) {
275       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
276       if (objectEntryOID->appMOM) {
277         if (csChange->alive==ORTE_TRUE)
278           objectEntryRefreshApp(d,objectEntryOID);
279       } else {
280       //turn off expiration timer
281         eventDetach(d,
282             objectEntryOID->objectEntryAID,
283             &objectEntryOID->expirationPurgeTimer,
284             0);
285         debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
286                       objectEntryOID->guid.hid,objectEntryOID->guid.aid);
287       }
288     }
289   }
290
291   /* try to proc csChange */
292   RTPSVarFinish(cstReader,cstRemoteWriter,csChange);
293 }
294
295 /**********************************************************************************/
296 void 
297 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
298   ObjectEntryOID     *o;
299   ORTEPublProp       *pp;
300   CSTWriter          *cstWriter=NULL;
301   CSTReader          *cstReader=NULL;
302   PatternNode        *pnode;
303   
304   if ((d==NULL) || (op==NULL)) return;
305   pp=(ORTEPublProp*)op->attributes;
306   //***************************************
307   //Pattern 
308   //try to find if subscription exists
309   pthread_rwlock_rdlock(&d->patternEntry.lock);
310   pthread_rwlock_rdlock(&d->subscriptions.lock);
311   gavl_cust_for_each(CSTReader,
312                      &d->subscriptions,cstReader) {
313     if (cstReader->createdByPattern) {
314       ORTESubsProp       *sp;
315       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
316       if ((strcmp((const char *)sp->topic, (const char*)pp->topic)==0) &&
317           (strcmp((const char *)sp->typeName, (const char*)pp->typeName)==0)) 
318         break; //found
319     }
320   }
321   pthread_rwlock_unlock(&d->subscriptions.lock);
322   if (!cstReader) { //not exists
323     ul_list_for_each(Pattern,&d->patternEntry,pnode) {
324       if ((fnmatch((const char *)pnode->topic, (const char*)pp->topic,0)==0) &&
325           (fnmatch((const char *)pnode->type, (const char*)pp->typeName,0)==0)) {
326         //pattern matched
327         // free resources
328         pthread_rwlock_unlock(&d->readerPublications.lock);        
329         pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
330         pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
331         cstReader=pnode->subscriptionCallBack(
332             (char *)pp->topic,
333             (char *)pp->typeName,
334             pnode->param);
335         if (cstReader) {
336           cstReader->createdByPattern=ORTE_TRUE;
337         }
338         //allocate resources
339         pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
340         pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
341         pthread_rwlock_wrlock(&d->readerPublications.lock);        
342       }  
343     }
344   }
345   pthread_rwlock_unlock(&d->patternEntry.lock);
346   //Pattern end
347   pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
348   gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
349     ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
350     if ((strcmp((const char *)pp->topic, (const char*)sp->topic)==0) &&
351         (strcmp((const char *)pp->typeName, (const char*)sp->typeName)==0) &&
352         (pp->typeChecksum==sp->typeChecksum)) {
353       //add Subscription to Publisher (only if private)
354       if (op->privateCreated) {
355         pthread_rwlock_rdlock(&d->publications.lock);
356         if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
357           pthread_rwlock_wrlock(&cstWriter->lock);
358           if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
359             ObjectEntryOID     *so;
360
361             so=getSubsO2SRemoteReader(d,o,sp);
362             CSTWriterAddRemoteReader(d,cstWriter,o,
363                     o->oid,so);
364             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
365                           op->guid.hid,op->guid.aid,op->guid.oid,
366                           o->guid.hid,o->guid.aid,o->guid.oid);
367           }
368           pthread_rwlock_unlock(&cstWriter->lock);
369         }
370         pthread_rwlock_unlock(&d->publications.lock);
371       }
372       //add Publisher to Subscriber (only if private)
373       if (o->privateCreated) {
374         pthread_rwlock_rdlock(&d->subscriptions.lock);
375         if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
376           pthread_rwlock_wrlock(&cstReader->lock);
377           if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
378             CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
379             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
380                           o->guid.hid,o->guid.aid,o->guid.oid,
381                           op->guid.hid,op->guid.aid,op->guid.oid);
382           }
383           pthread_rwlock_unlock(&cstReader->lock);
384         }
385         pthread_rwlock_unlock(&d->subscriptions.lock);
386       }
387     }
388   } 
389   pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
390 }              
391
392 /**********************************************************************************/
393 void 
394 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
395   ObjectEntryOID     *o;
396   ORTESubsProp       *sp;
397   CSTWriter          *cstWriter;
398   CSTReader          *cstReader;
399   
400   if ((d==NULL) || (os==NULL)) return;
401   sp=(ORTESubsProp*)os->attributes;
402   pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
403   gavl_cust_for_each(PublicationList,&d->psEntry,o) {
404     ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
405     if ((strcmp((const char *)sp->topic, (const char*)pp->topic)==0) &&
406         (strcmp((const char *)sp->typeName, (const char*)pp->typeName)==0) &&
407         (sp->typeChecksum==pp->typeChecksum)) {
408       //add Publication to Subscription (only if private)
409       if (os->privateCreated) {
410         pthread_rwlock_rdlock(&d->subscriptions.lock);
411         if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
412           pthread_rwlock_wrlock(&cstReader->lock);
413           if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
414             CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
415             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
416                           os->guid.hid,os->guid.aid,os->guid.oid,
417                           o->guid.hid,o->guid.aid,o->guid.oid);
418           }
419           pthread_rwlock_unlock(&cstReader->lock);
420         }
421         pthread_rwlock_unlock(&d->subscriptions.lock);
422       }
423       //add Subscriber to Publisher (only if private)
424       if (o->privateCreated) {
425         pthread_rwlock_rdlock(&d->publications.lock);
426         if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
427           pthread_rwlock_wrlock(&cstWriter->lock);
428           if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
429             ObjectEntryOID     *sos;
430
431             sos=getSubsO2SRemoteReader(d,os,sp);
432             CSTWriterAddRemoteReader(d,cstWriter,os,
433                     os->oid,sos);
434             debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
435                           GUID_PRINTF(o->guid),
436                           GUID_PRINTF(os->guid));
437           }
438           pthread_rwlock_unlock(&cstWriter->lock);
439         }
440         pthread_rwlock_unlock(&d->publications.lock);
441       }
442     }
443   } 
444   pthread_rwlock_unlock(&d->psEntry.publicationsLock);
445 }
446
447 /**********************************************************************************/
448 void 
449 RTPSVarApp(ORTEDomain *d,CSChange *csChange,GUID_RTPS *writerGUID) 
450 {
451   CSTReader          *cstReader=NULL;
452   CSTRemoteWriter    *cstRemoteWriter=NULL;
453   ObjectEntryAID     *objectEntryAID;
454   ObjectEntryOID     *objectEntryOID,*sobjectEntryOID;
455
456   if ((d->guid.aid & 3)!=MANAGEDAPPLICATION) return;
457
458   switch (writerGUID->oid) {
459     case OID_WRITE_MGR:
460       pthread_rwlock_wrlock(&d->readerManagers.lock);        
461       cstReader=&d->readerManagers;
462       objectEntryOID=objectEntryFind(d,&csChange->guid);
463       if (!objectEntryOID) {
464         AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
465         AppParamsInit(ap);
466         parameterUpdateApplication(csChange,ap);
467         if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
468             csChange->alive) {
469           debug(46,2) ("new manager 0x%x-0x%x accepted\n",
470                         csChange->guid.hid,csChange->guid.aid);
471           objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
472           objectEntryOID->privateCreated=ORTE_FALSE;
473           pthread_rwlock_wrlock(&d->readerApplications.lock);
474           CSTReaderAddRemoteWriter(d,&d->readerApplications,
475                                   objectEntryOID,OID_WRITE_APP);
476           pthread_rwlock_unlock(&d->readerApplications.lock);
477           //all applications from manager node set expiration timer
478           gavl_cust_for_each(ObjectEntryAID,
479                              objectEntryOID->objectEntryHID,objectEntryAID) {
480             ObjectEntryOID *objectEntryOID1;
481             objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&csChange->guid.oid);
482             objectEntryRefreshApp(d,objectEntryOID1);
483           }
484         } else {
485           FREE(ap);
486         }
487       } else {
488         GUID_RTPS guid_wapp=csChange->guid;
489         guid_wapp.oid=OID_WRITE_APP;
490         pthread_rwlock_wrlock(&d->readerApplications.lock);
491         cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
492         //setup state of cstRemoteWriter on send ACK to manager
493         if (cstRemoteWriter) {
494           if (cstRemoteWriter->commStateACK==WAITING) {
495             eventDetach(d,
496                 cstRemoteWriter->spobject->objectEntryAID,
497                 &cstRemoteWriter->repeatActiveQueryTimer,
498                 1);   //metatraffic timer
499             eventAdd(d,
500                 cstRemoteWriter->spobject->objectEntryAID,
501                 &cstRemoteWriter->repeatActiveQueryTimer,
502                 1,   //metatraffic timer
503                 "CSTReaderQueryTimer",
504                 CSTReaderQueryTimer,
505                 &cstRemoteWriter->cstReader->lock,
506                 cstRemoteWriter,
507                 NULL);               
508           }
509         }
510         pthread_rwlock_unlock(&d->readerApplications.lock);
511       } 
512       if (csChange->alive==ORTE_TRUE)
513         objectEntryRefreshApp(d,objectEntryOID);
514       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
515       if ((!cstRemoteWriter) &&
516           (csChange->guid.hid==writerGUID->hid) && 
517           (csChange->guid.aid==writerGUID->aid)) {
518         cstRemoteWriter=
519             CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID->oid);
520       }
521       break;
522     case OID_WRITE_APP:
523       pthread_rwlock_wrlock(&d->readerApplications.lock);        
524       cstReader=&d->readerApplications;
525       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
526       if (cstRemoteWriter) {
527         AppParams *ap;
528         GUID_RTPS guid_tmp=csChange->guid;
529         guid_tmp.oid=OID_WRITE_PUBL;
530         objectEntryOID=objectEntryFind(d,&csChange->guid);
531         if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
532           if (!objectEntryOID) {
533             ap=(AppParams*)MALLOC(sizeof(AppParams));
534             AppParamsInit(ap);
535             parameterUpdateApplication(csChange,ap);
536             if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
537                 csChange->alive) {
538               debug(46,2) ("new application 0x%x-0x%x accepted\n",
539                             csChange->guid.hid,csChange->guid.aid);
540               objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
541               objectEntryOID->privateCreated=ORTE_FALSE;
542             } else {
543               FREE(ap);
544               break;
545             }
546           }
547
548           ap=(AppParams*)objectEntryOID->attributes;
549           sobjectEntryOID=getAppO2SRemoteReader(d,objectEntryOID,ap);
550
551           pthread_rwlock_wrlock(&d->readerPublications.lock);            
552           pthread_rwlock_wrlock(&d->readerSubscriptions.lock);            
553           pthread_rwlock_wrlock(&d->writerPublications.lock);            
554           pthread_rwlock_wrlock(&d->writerSubscriptions.lock);            
555           CSTReaderAddRemoteWriter(d,&d->readerPublications,
556                                    objectEntryOID,OID_WRITE_PUBL);
557           CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
558                                    objectEntryOID,OID_WRITE_SUBS);
559           CSTWriterAddRemoteReader(d,&d->writerPublications,
560                                    objectEntryOID,
561                                    OID_READ_PUBL,
562                                    sobjectEntryOID);
563           CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
564                                    objectEntryOID,
565                                    OID_READ_SUBS,
566                                    sobjectEntryOID);
567           pthread_rwlock_unlock(&d->readerPublications.lock);            
568           pthread_rwlock_unlock(&d->readerSubscriptions.lock);            
569           pthread_rwlock_unlock(&d->writerPublications.lock);            
570           pthread_rwlock_unlock(&d->writerSubscriptions.lock);            
571         }
572         if (objectEntryOID) {
573           //turn off expiration timer
574           eventDetach(d,
575               objectEntryOID->objectEntryAID,
576               &objectEntryOID->expirationPurgeTimer,
577               0);
578           debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
579                         objectEntryOID->guid.hid,
580                         objectEntryOID->guid.aid);
581         }
582       }
583       break;
584     case OID_WRITE_PUBL:
585       pthread_rwlock_wrlock(&d->readerPublications.lock);        
586       cstReader=&d->readerPublications;
587       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
588       if (cstRemoteWriter) {
589         objectEntryOID=objectEntryFind(d,&csChange->guid);
590         if (!objectEntryOID) {
591           ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
592           PublParamsInit(pp);
593           parameterUpdatePublication(csChange,pp);
594           if (generateEvent(d,&csChange->guid,(void*)pp,csChange->alive) &&
595               csChange->alive) {
596             debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
597                           GUID_PRINTF(csChange->guid));
598             objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)pp);
599             objectEntryOID->privateCreated=ORTE_FALSE;
600             pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
601             PublicationList_insert(&d->psEntry,objectEntryOID);
602             pthread_rwlock_unlock(&d->psEntry.publicationsLock);
603             NewPublisher(d,objectEntryOID);
604           } else
605             FREE(pp);
606         } else {
607           if ((!PublicationList_find(&d->psEntry,&csChange->guid)) &&
608                csChange->alive) {
609             pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
610             PublicationList_insert(&d->psEntry,objectEntryOID);
611             pthread_rwlock_unlock(&d->psEntry.publicationsLock);
612             NewPublisher(d,objectEntryOID);
613           }
614         }
615       }
616       break;
617     case OID_WRITE_SUBS:
618       pthread_rwlock_wrlock(&d->readerSubscriptions.lock);        
619       cstReader=&d->readerSubscriptions;
620       cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
621       if (cstRemoteWriter) {
622         objectEntryOID=objectEntryFind(d,&csChange->guid);
623         if (!objectEntryOID) {
624           ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
625           SubsParamsInit(sp);
626           parameterUpdateSubscription(csChange,sp);
627           if (generateEvent(d,&csChange->guid,(void*)sp,csChange->alive) &&
628               csChange->alive) {
629             debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
630                           GUID_PRINTF(csChange->guid));
631             objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)sp);
632             objectEntryOID->privateCreated=ORTE_FALSE;
633             pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
634             SubscriptionList_insert(&d->psEntry,objectEntryOID);
635             pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
636             NewSubscriber(d,objectEntryOID);
637           } else
638             FREE(sp);
639         } else {
640           if ((!SubscriptionList_find(&d->psEntry,&csChange->guid)) && 
641                csChange->alive) {
642             pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
643             SubscriptionList_insert(&d->psEntry,objectEntryOID);
644             pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
645             NewSubscriber(d,objectEntryOID);
646           }
647         }
648       }
649       break;
650   }
651
652   /* try to proc csChange */
653   RTPSVarFinish(cstReader,cstRemoteWriter,csChange);
654 }
655
656
657 /**********************************************************************************/
658 void 
659 RTPSVar(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
660   GUID_RTPS          objectGUID,writerGUID;
661   ObjectId           roid,woid;
662   SequenceNumber     sn;   
663   char               p_bit,a_bit,h_bit;
664   CORBA_unsigned_short submsg_len;
665   CSChange           *csChange;
666   CDR_Endianness     data_endian;
667   CORBA_octet        flags;
668
669   /* restore flag possition in submessage */
670   cdrCodec->rptr-=3;
671
672   /* flags */
673   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
674   p_bit=flags & 2;
675   a_bit=flags & 4;
676   h_bit=flags & 8;
677
678   /* submessage length */
679   CDR_get_ushort(cdrCodec,&submsg_len);
680
681   /* next data are sent in big endianing */
682   data_endian=cdrCodec->data_endian;
683   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
684
685   /* readerObjectId */
686   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
687   
688   /* writerObjectId */
689   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
690
691   if (h_bit) {
692      /* HostId         */
693      CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.hid);
694
695      /* AppId          */
696      CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.aid);
697
698   } else {
699     objectGUID.hid=mi->sourceHostId;
700     objectGUID.aid=mi->sourceAppId;
701   }
702
703    /* ObjectId       */
704   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.oid);
705
706   cdrCodec->data_endian=data_endian;
707
708   /* writerSN       */
709   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
710   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
711
712   writerGUID.hid=mi->sourceHostId;
713   writerGUID.aid=mi->sourceAppId;
714   writerGUID.oid=woid;
715
716   debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
717                 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
718   
719   //prepare csChange
720   csChange=(CSChange*)MALLOC(sizeof(CSChange));
721   csChange->cdrCodec.buffer=NULL;
722   csChange->guid=objectGUID;
723   if (a_bit) csChange->alive=ORTE_TRUE;
724   else csChange->alive=ORTE_FALSE;
725   if (p_bit)
726     parameterDecodeCodecToCSChange(csChange,cdrCodec);
727   else
728     CSChangeAttributes_init_head(csChange);
729   csChange->sn=sn;
730   SEQUENCE_NUMBER_NONE(csChange->gapSN);
731
732   /* Manager */
733   RTPSVarManager(d,csChange,&writerGUID,senderIPAddress);
734
735   /* Application */
736   RTPSVarApp(d,csChange,&writerGUID);
737
738