]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSVar.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSVar.c
1 /*
2  *  $Id: RTPSVar.c,v 0.0.0.2            2004/11/24
3  *
4  *  DEBUG:  section 46                  RTPS message VAR
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSVarCreate(CDR_Codec *cdrCodec, ObjectId roid, ObjectId woid, CSChange *csChange)
37 {
38   CDR_Endianness     data_endian;
39   CORBA_octet        flags;
40   int                len, swptr;
41
42
43   swptr = cdrCodec->wptr;
44
45   /* submessage id */
46   CDR_put_octet(cdrCodec, VAR);
47
48   /* flags */
49   flags = cdrCodec->data_endian;
50   if (!CSChangeAttributes_is_empty(csChange))
51     flags |= 2;
52   if (csChange->alive)
53     flags |= 4;
54   if (csChange->guid.oid == OID_APP)
55     flags |= 8;
56   CDR_put_octet(cdrCodec, flags);
57
58   /* length */
59   cdrCodec->wptr += 2;
60
61   /* next data are sent in big endianing */
62   data_endian = cdrCodec->data_endian;
63   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
64
65   /* readerObjectId */
66   CDR_put_ulong(cdrCodec, roid);
67
68   /* writerObjectId */
69   CDR_put_ulong(cdrCodec, woid);
70
71   if (csChange->guid.oid == OID_APP) {
72     /* hid */
73     CDR_put_ulong(cdrCodec, csChange->guid.hid);
74
75     /* aid */
76     CDR_put_ulong(cdrCodec, csChange->guid.aid);
77   }
78
79   /* oid */
80   CDR_put_ulong(cdrCodec, csChange->guid.oid);
81
82   cdrCodec->data_endian = data_endian;
83
84   /* seqNumber */
85   CDR_put_ulong(cdrCodec, csChange->sn.high);
86   if (CDR_put_ulong(cdrCodec, csChange->sn.low) == CORBA_FALSE) {
87     cdrCodec->wptr = swptr;
88     return -1;
89   }
90
91   /* parameters */
92   if (!CSChangeAttributes_is_empty(csChange)) {
93     if (parameterCodeCodecFromCSChange(csChange, cdrCodec) < 0) {
94       cdrCodec->wptr = swptr;
95       return -1;
96     }
97   }
98
99   /* count length of message */
100   len = cdrCodec->wptr-swptr;
101
102   /* length */
103   cdrCodec->wptr = swptr+2;
104   CDR_put_ushort(cdrCodec, (CORBA_unsigned_short)(len-4));
105
106   cdrCodec->wptr = swptr+len;
107
108   return len;
109 }
110
111 /**********************************************************************************/
112 void
113 RTPSVarFinish(CSTReader *cstReader, CSTRemoteWriter *cstRemoteWriter,
114               CSChange *csChange)
115 {
116
117   if (cstReader && cstRemoteWriter) {
118     debug(46, 10) ("recv: processing CSChange\n");
119     if (SeqNumberCmp(csChange->sn, cstRemoteWriter->sn) > 0) { //have to be sn>writer_sn
120       CSTReaderAddCSChange(cstRemoteWriter, csChange);
121       CSTReaderProcCSChanges(cstReader->domain, cstRemoteWriter);
122       csChange = NULL;
123     }
124   }
125
126   if (csChange) {
127     //destroy csChange if any
128     parameterDelete(csChange);
129     FREE(csChange);
130   }
131   if (cstReader) {
132     pthread_rwlock_unlock(&cstReader->lock);
133     return;
134   }
135 }
136
137 /**********************************************************************************/
138 void
139 RTPSVarManager(ORTEDomain *d, CSChange *csChange, GUID_RTPS *writerGUID,
140                IPAddress senderIPAddress)
141 {
142   CSTReader          *cstReader = NULL;
143   CSTRemoteReader    *cstRemoteReader = NULL;
144   CSTRemoteWriter    *cstRemoteWriter = NULL;
145   ObjectEntryAID     *objectEntryAID;
146   ObjectEntryOID     *objectEntryOID;
147   Boolean            usedMulticast = ORTE_FALSE;
148   int                i;
149
150   if ((d->guid.aid & 0x03) != MANAGER)
151     return;
152
153   /* readerManagers */
154   if ((writerGUID->oid == OID_WRITE_APPSELF) &&
155       ((writerGUID->aid & 0x03) == MANAGER)) {
156     pthread_rwlock_wrlock(&d->readerManagers.lock);
157     pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
158     cstReader = &d->readerManagers;
159     gavl_cust_for_each(CSTRemoteReader,
160                        &d->writerApplicationSelf,
161                        cstRemoteReader) {
162       AppParams *ap = (AppParams *)cstRemoteReader->sobject->attributes;
163
164       for (i = 0; i < ap->unicastIPAddressCount; i++) {
165         if (ap->unicastIPAddressList[i] == senderIPAddress) {
166           break;
167         }
168       }
169       if (i != ap->unicastIPAddressCount)
170         break;
171       if (matchMulticastAddresses(cstRemoteReader->sobject,
172                                   cstRemoteReader->cstWriter->objectEntryOID)) {
173         usedMulticast = ORTE_TRUE;
174         break;
175       }
176     }
177     if (cstRemoteReader) {
178       objectEntryOID = objectEntryFind(d, &csChange->guid);
179       if (!objectEntryOID) {
180         //  create new RemoteReader
181         AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
182         AppParamsInit(ap);
183         parameterUpdateApplication(csChange, ap);
184         if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
185             csChange->alive) {
186           debug(46, 2) ("manager 0x%x-0x%x accepted\n",
187                         csChange->guid.hid, csChange->guid.aid);
188           objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
189           CSTWriterRefreshAllCSChanges(d, cstRemoteReader);
190           CSTReaderAddRemoteWriter(d, cstReader, objectEntryOID, writerGUID->oid);
191           pthread_rwlock_wrlock(&d->readerApplications.lock);
192           pthread_rwlock_wrlock(&d->writerApplications.lock);
193           CSTReaderAddRemoteWriter(d, &d->readerApplications, objectEntryOID, OID_WRITE_APP);
194           if (usedMulticast) {
195             /* connect to virual objectEntryOID of multicast manager */
196             CSTWriterAddRemoteReader(d,
197                                      &d->writerApplications,
198                                      objectEntryOID,
199                                      OID_READ_APP,
200                                      cstRemoteReader->sobject);
201           } else {
202             CSTWriterAddRemoteReader(d, &d->writerApplications,
203                                      objectEntryOID,
204                                      OID_READ_APP,
205                                      objectEntryOID);
206           }
207           pthread_rwlock_unlock(&d->writerApplications.lock);
208           pthread_rwlock_unlock(&d->readerApplications.lock);
209           //all applications from manager node set expiration timer
210           gavl_cust_for_each(ObjectEntryAID,
211                              objectEntryOID->objectEntryHID, objectEntryAID) {
212             ObjectEntryOID *objectEntryOID1;
213
214             objectEntryOID1 = ObjectEntryOID_find(objectEntryAID, &csChange->guid.oid);
215             objectEntryRefreshApp(d, objectEntryOID1);
216           }
217         } else {
218           FREE(ap);
219         }
220       }
221       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
222       if ((cstRemoteWriter) && (csChange->alive == ORTE_TRUE))
223         objectEntryRefreshApp(d, cstRemoteWriter->spobject);
224     } else {
225       //deny Manager
226     }
227     pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
228   }
229
230
231   /* readerApplication */
232   if (((writerGUID->oid == OID_WRITE_APPSELF) &&
233        ((writerGUID->aid & 0x03) == MANAGEDAPPLICATION)) ||
234       ((writerGUID->oid == OID_WRITE_APP) &&
235        ((writerGUID->aid & 0x03) == MANAGER))) {
236
237     pthread_rwlock_wrlock(&d->readerApplications.lock);
238     cstReader = &d->readerApplications;
239     objectEntryOID = objectEntryFind(d, &csChange->guid);
240     if (!objectEntryOID) {
241       AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
242       AppParamsInit(ap);
243       parameterUpdateApplication(csChange, ap);
244       if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
245           csChange->alive) {
246         objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
247         objectEntryOID->appMOM = getTypeApp(d, ap, senderIPAddress);
248         if (objectEntryOID->appMOM) {
249           debug(46, 2) ("MOM application 0x%x-0x%x accepted\n",
250                         csChange->guid.hid, csChange->guid.aid);
251           //increment vargAppsSequenceNumber and make csChange
252           SeqNumberInc(d->appParams->vargAppsSequenceNumber,
253                        d->appParams->vargAppsSequenceNumber);
254           //WAS & WM is locked inside next function
255           appSelfParamChanged(d, ORTE_TRUE, ORTE_FALSE, ORTE_TRUE, ORTE_TRUE);
256           CSTReaderAddRemoteWriter(d, cstReader,
257                                    objectEntryOID, writerGUID->oid);
258           CSTWriterAddRemoteReader(d, &d->writerManagers,
259                                    objectEntryOID,
260                                    OID_READ_MGR,
261                                    objectEntryOID);
262           pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
263           pthread_rwlock_unlock(&d->writerManagers.lock);
264         } else {
265           debug(46, 2) ("OAM application 0x%x-0x%x accepted\n",
266                         csChange->guid.hid, csChange->guid.aid);
267         }
268         pthread_rwlock_wrlock(&d->writerApplications.lock);
269         CSTWriterAddRemoteReader(d, &d->writerApplications,
270                                  objectEntryOID,
271                                  OID_READ_APP,
272                                  objectEntryOID);
273         pthread_rwlock_unlock(&d->writerApplications.lock);
274       } else {
275         FREE(ap);
276       }
277     }
278     if (objectEntryOID) {
279       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
280       if (objectEntryOID->appMOM) {
281         if (csChange->alive == ORTE_TRUE)
282           objectEntryRefreshApp(d, objectEntryOID);
283       } else {
284         //turn off expiration timer
285         eventDetach(d,
286                     objectEntryOID->objectEntryAID,
287                     &objectEntryOID->expirationPurgeTimer,
288                     0);
289         debug(46, 3) ("for application 0x%x-0x%x turn off expiration timer\n",
290                       objectEntryOID->guid.hid, objectEntryOID->guid.aid);
291       }
292     }
293   }
294
295   /* try to proc csChange */
296   RTPSVarFinish(cstReader, cstRemoteWriter, csChange);
297 }
298
299 /**********************************************************************************/
300 void
301 NewPublisher(ORTEDomain *d, ObjectEntryOID *op)
302 {
303   ObjectEntryOID     *o;
304   ORTEPublProp       *pp;
305   CSTWriter          *cstWriter = NULL;
306   CSTReader          *cstReader = NULL;
307   PatternNode        *pnode;
308
309   if ((d == NULL) || (op == NULL))
310     return;
311   pp = (ORTEPublProp *)op->attributes;
312   //***************************************
313   //Pattern
314   //try to find if subscription exists
315   pthread_rwlock_rdlock(&d->patternEntry.lock);
316   pthread_rwlock_rdlock(&d->subscriptions.lock);
317   gavl_cust_for_each(CSTReader,
318                      &d->subscriptions, cstReader) {
319     if (cstReader->createdByPattern) {
320       ORTESubsProp       *sp;
321       sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
322       if ((strcmp((const char *)sp->topic, (const char *)pp->topic) == 0) &&
323           (strcmp((const char *)sp->typeName, (const char *)pp->typeName) == 0))
324         break;  //found
325     }
326   }
327   pthread_rwlock_unlock(&d->subscriptions.lock);
328   if (!cstReader) { //not exists
329     ul_list_for_each(Pattern, &d->patternEntry, pnode) {
330       if ((fnmatch((const char *)pnode->topic, (const char *)pp->topic, 0) == 0) &&
331           (fnmatch((const char *)pnode->type, (const char *)pp->typeName, 0) == 0)) {
332         //pattern matched
333         // free resources
334         pthread_rwlock_unlock(&d->readerPublications.lock);
335         pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
336         pthread_rwlock_unlock(&d->objectEntry.objRootLock);
337         cstReader = pnode->subscriptionCallBack(
338           (char *)pp->topic,
339           (char *)pp->typeName,
340           pnode->param);
341         if (cstReader) {
342           cstReader->createdByPattern = ORTE_TRUE;
343         }
344         //allocate resources
345         pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
346         pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
347         pthread_rwlock_wrlock(&d->readerPublications.lock);
348       }
349     }
350   }
351   pthread_rwlock_unlock(&d->patternEntry.lock);
352   //Pattern end
353   pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
354   gavl_cust_for_each(SubscriptionList, &d->psEntry, o) {
355     ORTESubsProp *sp = (ORTESubsProp *)o->attributes;
356
357     if ((strcmp((const char *)pp->topic, (const char *)sp->topic) == 0) &&
358         (strcmp((const char *)pp->typeName, (const char *)sp->typeName) == 0) &&
359         (pp->typeChecksum == sp->typeChecksum)) {
360       //add Subscription to Publisher (only if private)
361       if (op->privateCreated) {
362         pthread_rwlock_rdlock(&d->publications.lock);
363         if ((cstWriter = CSTWriter_find(&d->publications, &op->guid))) {
364           pthread_rwlock_wrlock(&cstWriter->lock);
365           if (!CSTRemoteReader_find(cstWriter, &o->guid)) {
366             ObjectEntryOID     *so;
367
368             so = getSubsO2SRemoteReader(d, o, sp);
369             CSTWriterAddRemoteReader(d, cstWriter, o,
370                                      o->oid, so);
371             debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
372                           op->guid.hid, op->guid.aid, op->guid.oid,
373                           o->guid.hid, o->guid.aid, o->guid.oid);
374           }
375           pthread_rwlock_unlock(&cstWriter->lock);
376         }
377         pthread_rwlock_unlock(&d->publications.lock);
378       }
379       //add Publisher to Subscriber (only if private)
380       if (o->privateCreated) {
381         pthread_rwlock_rdlock(&d->subscriptions.lock);
382         if ((cstReader = CSTReader_find(&d->subscriptions, &o->guid))) {
383           pthread_rwlock_wrlock(&cstReader->lock);
384           if (!CSTRemoteWriter_find(cstReader, &op->guid)) {
385             CSTReaderAddRemoteWriter(d, cstReader, op, op->oid);
386             debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
387                           o->guid.hid, o->guid.aid, o->guid.oid,
388                           op->guid.hid, op->guid.aid, op->guid.oid);
389           }
390           pthread_rwlock_unlock(&cstReader->lock);
391         }
392         pthread_rwlock_unlock(&d->subscriptions.lock);
393       }
394     }
395   }
396   pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
397 }
398
399 /**********************************************************************************/
400 void
401 NewSubscriber(ORTEDomain *d, ObjectEntryOID *os)
402 {
403   ObjectEntryOID     *o;
404   ORTESubsProp       *sp;
405   CSTWriter          *cstWriter;
406   CSTReader          *cstReader;
407
408   if ((d == NULL) || (os == NULL))
409     return;
410   sp = (ORTESubsProp *)os->attributes;
411   pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
412   gavl_cust_for_each(PublicationList, &d->psEntry, o) {
413     ORTEPublProp *pp = (ORTEPublProp *)o->attributes;
414
415     if ((strcmp((const char *)sp->topic, (const char *)pp->topic) == 0) &&
416         (strcmp((const char *)sp->typeName, (const char *)pp->typeName) == 0) &&
417         (sp->typeChecksum == pp->typeChecksum)) {
418       //add Publication to Subscription (only if private)
419       if (os->privateCreated) {
420         pthread_rwlock_rdlock(&d->subscriptions.lock);
421         if ((cstReader = CSTReader_find(&d->subscriptions, &os->guid))) {
422           pthread_rwlock_wrlock(&cstReader->lock);
423           if (!CSTRemoteWriter_find(cstReader, &o->guid)) {
424             CSTReaderAddRemoteWriter(d, cstReader, o, o->oid);
425             debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
426                           os->guid.hid, os->guid.aid, os->guid.oid,
427                           o->guid.hid, o->guid.aid, o->guid.oid);
428           }
429           pthread_rwlock_unlock(&cstReader->lock);
430         }
431         pthread_rwlock_unlock(&d->subscriptions.lock);
432       }
433       //add Subscriber to Publisher (only if private)
434       if (o->privateCreated) {
435         pthread_rwlock_rdlock(&d->publications.lock);
436         if ((cstWriter = CSTWriter_find(&d->publications, &o->guid))) {
437           pthread_rwlock_wrlock(&cstWriter->lock);
438           if (!CSTRemoteReader_find(cstWriter, &os->guid)) {
439             ObjectEntryOID     *sos;
440
441             sos = getSubsO2SRemoteReader(d, os, sp);
442             CSTWriterAddRemoteReader(d, cstWriter, os,
443                                      os->oid, sos);
444             debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
445                           GUID_PRINTF(o->guid),
446                           GUID_PRINTF(os->guid));
447           }
448           pthread_rwlock_unlock(&cstWriter->lock);
449         }
450         pthread_rwlock_unlock(&d->publications.lock);
451       }
452     }
453   }
454   pthread_rwlock_unlock(&d->psEntry.publicationsLock);
455 }
456
457 /**********************************************************************************/
458 void
459 RTPSVarApp(ORTEDomain *d, CSChange *csChange, GUID_RTPS *writerGUID)
460 {
461   CSTReader          *cstReader = NULL;
462   CSTRemoteWriter    *cstRemoteWriter = NULL;
463   ObjectEntryAID     *objectEntryAID;
464   ObjectEntryOID     *objectEntryOID, *sobjectEntryOID;
465
466   if ((d->guid.aid & 3) != MANAGEDAPPLICATION)
467     return;
468
469   switch (writerGUID->oid) {
470     case OID_WRITE_MGR:
471       pthread_rwlock_wrlock(&d->readerManagers.lock);
472       cstReader = &d->readerManagers;
473       objectEntryOID = objectEntryFind(d, &csChange->guid);
474       if (!objectEntryOID) {
475         AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
476         AppParamsInit(ap);
477         parameterUpdateApplication(csChange, ap);
478         if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
479             csChange->alive) {
480           debug(46, 2) ("new manager 0x%x-0x%x accepted\n",
481                         csChange->guid.hid, csChange->guid.aid);
482           objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
483           objectEntryOID->privateCreated = ORTE_FALSE;
484           pthread_rwlock_wrlock(&d->readerApplications.lock);
485           CSTReaderAddRemoteWriter(d, &d->readerApplications,
486                                    objectEntryOID, OID_WRITE_APP);
487           pthread_rwlock_unlock(&d->readerApplications.lock);
488           //all applications from manager node set expiration timer
489           gavl_cust_for_each(ObjectEntryAID,
490                              objectEntryOID->objectEntryHID, objectEntryAID) {
491             ObjectEntryOID *objectEntryOID1;
492
493             objectEntryOID1 = ObjectEntryOID_find(objectEntryAID, &csChange->guid.oid);
494             objectEntryRefreshApp(d, objectEntryOID1);
495           }
496         } else {
497           FREE(ap);
498         }
499       } else {
500         GUID_RTPS guid_wapp = csChange->guid;
501         guid_wapp.oid = OID_WRITE_APP;
502         pthread_rwlock_wrlock(&d->readerApplications.lock);
503         cstRemoteWriter = CSTRemoteWriter_find(&d->readerApplications, &guid_wapp);
504         //setup state of cstRemoteWriter on send ACK to manager
505         if (cstRemoteWriter) {
506           if (cstRemoteWriter->commStateACK == WAITING) {
507             eventDetach(d,
508                         cstRemoteWriter->spobject->objectEntryAID,
509                         &cstRemoteWriter->repeatActiveQueryTimer,
510                         1); //metatraffic timer
511             eventAdd(d,
512                      cstRemoteWriter->spobject->objectEntryAID,
513                      &cstRemoteWriter->repeatActiveQueryTimer,
514                      1, //metatraffic timer
515                      "CSTReaderQueryTimer",
516                      CSTReaderQueryTimer,
517                      &cstRemoteWriter->cstReader->lock,
518                      cstRemoteWriter,
519                      NULL);
520           }
521         }
522         pthread_rwlock_unlock(&d->readerApplications.lock);
523       }
524       if (csChange->alive == ORTE_TRUE)
525         objectEntryRefreshApp(d, objectEntryOID);
526       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
527       if ((!cstRemoteWriter) &&
528           (csChange->guid.hid == writerGUID->hid) &&
529           (csChange->guid.aid == writerGUID->aid)) {
530         cstRemoteWriter =
531           CSTReaderAddRemoteWriter(d, cstReader, objectEntryOID, writerGUID->oid);
532       }
533       break;
534     case OID_WRITE_APP:
535       pthread_rwlock_wrlock(&d->readerApplications.lock);
536       cstReader = &d->readerApplications;
537       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
538       if (cstRemoteWriter) {
539         AppParams *ap;
540         GUID_RTPS guid_tmp = csChange->guid;
541         guid_tmp.oid = OID_WRITE_PUBL;
542         objectEntryOID = objectEntryFind(d, &csChange->guid);
543         if (!CSTRemoteWriter_find(&d->readerPublications, &guid_tmp)) {
544           if (!objectEntryOID) {
545             ap = (AppParams *)MALLOC(sizeof(AppParams));
546             AppParamsInit(ap);
547             parameterUpdateApplication(csChange, ap);
548             if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
549                 csChange->alive) {
550               debug(46, 2) ("new application 0x%x-0x%x accepted\n",
551                             csChange->guid.hid, csChange->guid.aid);
552               objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
553               objectEntryOID->privateCreated = ORTE_FALSE;
554             } else {
555               FREE(ap);
556               break;
557             }
558           }
559
560           ap = (AppParams *)objectEntryOID->attributes;
561           sobjectEntryOID = getAppO2SRemoteReader(d, objectEntryOID, ap);
562
563           pthread_rwlock_wrlock(&d->readerPublications.lock);
564           pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
565           pthread_rwlock_wrlock(&d->writerPublications.lock);
566           pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
567           CSTReaderAddRemoteWriter(d, &d->readerPublications,
568                                    objectEntryOID, OID_WRITE_PUBL);
569           CSTReaderAddRemoteWriter(d, &d->readerSubscriptions,
570                                    objectEntryOID, OID_WRITE_SUBS);
571           CSTWriterAddRemoteReader(d, &d->writerPublications,
572                                    objectEntryOID,
573                                    OID_READ_PUBL,
574                                    sobjectEntryOID);
575           CSTWriterAddRemoteReader(d, &d->writerSubscriptions,
576                                    objectEntryOID,
577                                    OID_READ_SUBS,
578                                    sobjectEntryOID);
579           pthread_rwlock_unlock(&d->readerPublications.lock);
580           pthread_rwlock_unlock(&d->readerSubscriptions.lock);
581           pthread_rwlock_unlock(&d->writerPublications.lock);
582           pthread_rwlock_unlock(&d->writerSubscriptions.lock);
583         }
584         if (objectEntryOID) {
585           //turn off expiration timer
586           eventDetach(d,
587                       objectEntryOID->objectEntryAID,
588                       &objectEntryOID->expirationPurgeTimer,
589                       0);
590           debug(46, 3) ("for application 0x%x-0x%x turn off expiration timer\n",
591                         objectEntryOID->guid.hid,
592                         objectEntryOID->guid.aid);
593         }
594       }
595       break;
596     case OID_WRITE_PUBL:
597       pthread_rwlock_wrlock(&d->readerPublications.lock);
598       cstReader = &d->readerPublications;
599       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
600       if (cstRemoteWriter) {
601         objectEntryOID = objectEntryFind(d, &csChange->guid);
602         if (!objectEntryOID) {
603           ORTEPublProp *pp = (ORTEPublProp *)MALLOC(sizeof(ORTEPublProp));
604           PublParamsInit(pp);
605           parameterUpdatePublication(csChange, pp);
606           if (generateEvent(d, &csChange->guid, (void *)pp, csChange->alive) &&
607               csChange->alive) {
608             debug(46, 2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
609                           GUID_PRINTF(csChange->guid));
610             objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)pp);
611             objectEntryOID->privateCreated = ORTE_FALSE;
612             pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
613             PublicationList_insert(&d->psEntry, objectEntryOID);
614             pthread_rwlock_unlock(&d->psEntry.publicationsLock);
615             NewPublisher(d, objectEntryOID);
616           } else
617             FREE(pp);
618         } else {
619           if ((!PublicationList_find(&d->psEntry, &csChange->guid)) &&
620               csChange->alive) {
621             pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
622             PublicationList_insert(&d->psEntry, objectEntryOID);
623             pthread_rwlock_unlock(&d->psEntry.publicationsLock);
624             NewPublisher(d, objectEntryOID);
625           }
626         }
627       }
628       break;
629     case OID_WRITE_SUBS:
630       pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
631       cstReader = &d->readerSubscriptions;
632       cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
633       if (cstRemoteWriter) {
634         objectEntryOID = objectEntryFind(d, &csChange->guid);
635         if (!objectEntryOID) {
636           ORTESubsProp *sp = (ORTESubsProp *)MALLOC(sizeof(ORTESubsProp));
637           SubsParamsInit(sp);
638           parameterUpdateSubscription(csChange, sp);
639           if (generateEvent(d, &csChange->guid, (void *)sp, csChange->alive) &&
640               csChange->alive) {
641             debug(46, 2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
642                           GUID_PRINTF(csChange->guid));
643             objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)sp);
644             objectEntryOID->privateCreated = ORTE_FALSE;
645             pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
646             SubscriptionList_insert(&d->psEntry, objectEntryOID);
647             pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
648             NewSubscriber(d, objectEntryOID);
649           } else
650             FREE(sp);
651         } else {
652           if ((!SubscriptionList_find(&d->psEntry, &csChange->guid)) &&
653               csChange->alive) {
654             pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
655             SubscriptionList_insert(&d->psEntry, objectEntryOID);
656             pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
657             NewSubscriber(d, objectEntryOID);
658           }
659         }
660       }
661       break;
662   }
663
664   /* try to proc csChange */
665   RTPSVarFinish(cstReader, cstRemoteWriter, csChange);
666 }
667
668
669 /**********************************************************************************/
670 void
671 RTPSVar(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
672 {
673   GUID_RTPS          objectGUID, writerGUID;
674   ObjectId           roid, woid;
675   SequenceNumber     sn;
676   char               p_bit, a_bit, h_bit;
677   CORBA_unsigned_short submsg_len;
678   CSChange           *csChange;
679   CDR_Endianness     data_endian;
680   CORBA_octet        flags;
681
682   /* restore flag possition in submessage */
683   cdrCodec->rptr -= 3;
684
685   /* flags */
686   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
687   p_bit = flags & 2;
688   a_bit = flags & 4;
689   h_bit = flags & 8;
690
691   /* submessage length */
692   CDR_get_ushort(cdrCodec, &submsg_len);
693
694   /* next data are sent in big endianing */
695   data_endian = cdrCodec->data_endian;
696   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
697
698   /* readerObjectId */
699   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
700
701   /* writerObjectId */
702   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
703
704   if (h_bit) {
705     /* HostId         */
706     CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.hid);
707
708     /* AppId          */
709     CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.aid);
710
711   } else {
712     objectGUID.hid = mi->sourceHostId;
713     objectGUID.aid = mi->sourceAppId;
714   }
715
716   /* ObjectId       */
717   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.oid);
718
719   cdrCodec->data_endian = data_endian;
720
721   /* writerSN       */
722   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
723   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
724
725   writerGUID.hid = mi->sourceHostId;
726   writerGUID.aid = mi->sourceAppId;
727   writerGUID.oid = woid;
728
729   debug(46, 3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
730                 woid, mi->sourceHostId, mi->sourceAppId, sn.low);
731
732   //prepare csChange
733   csChange = (CSChange *)MALLOC(sizeof(CSChange));
734   csChange->cdrCodec.buffer = NULL;
735   csChange->guid = objectGUID;
736   if (a_bit)
737     csChange->alive = ORTE_TRUE;
738   else
739     csChange->alive = ORTE_FALSE;
740   if (p_bit)
741     parameterDecodeCodecToCSChange(csChange, cdrCodec);
742   else
743     CSChangeAttributes_init_head(csChange);
744   csChange->sn = sn;
745   SEQUENCE_NUMBER_NONE(csChange->gapSN);
746
747   /* Manager */
748   RTPSVarManager(d, csChange, &writerGUID, senderIPAddress);
749
750   /* Application */
751   RTPSVarApp(d, csChange, &writerGUID);
752 }