2 * $Id: RTPSVar.c,v 0.0.0.2 2004/11/24
4 * DEBUG: section 46 RTPS message VAR
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSVarCreate(CDR_Codec *cdrCodec, ObjectId roid, ObjectId woid, CSChange *csChange)
38 CDR_Endianness data_endian;
43 swptr = cdrCodec->wptr;
46 CDR_put_octet(cdrCodec, VAR);
49 flags = cdrCodec->data_endian;
50 if (!CSChangeAttributes_is_empty(csChange))
54 if (csChange->guid.oid == OID_APP)
56 CDR_put_octet(cdrCodec, flags);
61 /* next data are sent in big endianing */
62 data_endian = cdrCodec->data_endian;
63 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
66 CDR_put_ulong(cdrCodec, roid);
69 CDR_put_ulong(cdrCodec, woid);
71 if (csChange->guid.oid == OID_APP) {
73 CDR_put_ulong(cdrCodec, csChange->guid.hid);
76 CDR_put_ulong(cdrCodec, csChange->guid.aid);
80 CDR_put_ulong(cdrCodec, csChange->guid.oid);
82 cdrCodec->data_endian = data_endian;
85 CDR_put_ulong(cdrCodec, csChange->sn.high);
86 if (CDR_put_ulong(cdrCodec, csChange->sn.low) == CORBA_FALSE) {
87 cdrCodec->wptr = swptr;
92 if (!CSChangeAttributes_is_empty(csChange)) {
93 if (parameterCodeCodecFromCSChange(csChange, cdrCodec) < 0) {
94 cdrCodec->wptr = swptr;
99 /* count length of message */
100 len = cdrCodec->wptr-swptr;
103 cdrCodec->wptr = swptr+2;
104 CDR_put_ushort(cdrCodec, (CORBA_unsigned_short)(len-4));
106 cdrCodec->wptr = swptr+len;
111 /**********************************************************************************/
113 RTPSVarFinish(CSTReader *cstReader, CSTRemoteWriter *cstRemoteWriter,
117 if (cstReader && cstRemoteWriter) {
118 debug(46, 10) ("recv: processing CSChange\n");
119 if (SeqNumberCmp(csChange->sn, cstRemoteWriter->sn) > 0) { //have to be sn>writer_sn
120 CSTReaderAddCSChange(cstRemoteWriter, csChange);
121 CSTReaderProcCSChanges(cstReader->domain, cstRemoteWriter);
127 //destroy csChange if any
128 parameterDelete(csChange);
132 pthread_rwlock_unlock(&cstReader->lock);
137 /**********************************************************************************/
139 RTPSVarManager(ORTEDomain *d, CSChange *csChange, GUID_RTPS *writerGUID,
140 IPAddress senderIPAddress)
142 CSTReader *cstReader = NULL;
143 CSTRemoteReader *cstRemoteReader = NULL;
144 CSTRemoteWriter *cstRemoteWriter = NULL;
145 ObjectEntryAID *objectEntryAID;
146 ObjectEntryOID *objectEntryOID;
147 Boolean usedMulticast = ORTE_FALSE;
150 if ((d->guid.aid & 0x03) != MANAGER)
154 if ((writerGUID->oid == OID_WRITE_APPSELF) &&
155 ((writerGUID->aid & 0x03) == MANAGER)) {
156 pthread_rwlock_wrlock(&d->readerManagers.lock);
157 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
158 cstReader = &d->readerManagers;
159 gavl_cust_for_each(CSTRemoteReader,
160 &d->writerApplicationSelf,
162 AppParams *ap = (AppParams *)cstRemoteReader->sobject->attributes;
164 for (i = 0; i < ap->unicastIPAddressCount; i++) {
165 if (ap->unicastIPAddressList[i] == senderIPAddress) {
169 if (i != ap->unicastIPAddressCount)
171 if (matchMulticastAddresses(cstRemoteReader->sobject,
172 cstRemoteReader->cstWriter->objectEntryOID)) {
173 usedMulticast = ORTE_TRUE;
177 if (cstRemoteReader) {
178 objectEntryOID = objectEntryFind(d, &csChange->guid);
179 if (!objectEntryOID) {
180 // create new RemoteReader
181 AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
183 parameterUpdateApplication(csChange, ap);
184 if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
186 debug(46, 2) ("manager 0x%x-0x%x accepted\n",
187 csChange->guid.hid, csChange->guid.aid);
188 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
189 CSTWriterRefreshAllCSChanges(d, cstRemoteReader);
190 CSTReaderAddRemoteWriter(d, cstReader, objectEntryOID, writerGUID->oid);
191 pthread_rwlock_wrlock(&d->readerApplications.lock);
192 pthread_rwlock_wrlock(&d->writerApplications.lock);
193 CSTReaderAddRemoteWriter(d, &d->readerApplications, objectEntryOID, OID_WRITE_APP);
195 /* connect to virual objectEntryOID of multicast manager */
196 CSTWriterAddRemoteReader(d,
197 &d->writerApplications,
200 cstRemoteReader->sobject);
202 CSTWriterAddRemoteReader(d, &d->writerApplications,
207 pthread_rwlock_unlock(&d->writerApplications.lock);
208 pthread_rwlock_unlock(&d->readerApplications.lock);
209 //all applications from manager node set expiration timer
210 gavl_cust_for_each(ObjectEntryAID,
211 objectEntryOID->objectEntryHID, objectEntryAID) {
212 ObjectEntryOID *objectEntryOID1;
214 objectEntryOID1 = ObjectEntryOID_find(objectEntryAID, &csChange->guid.oid);
215 objectEntryRefreshApp(d, objectEntryOID1);
221 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
222 if ((cstRemoteWriter) && (csChange->alive == ORTE_TRUE))
223 objectEntryRefreshApp(d, cstRemoteWriter->spobject);
227 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
231 /* readerApplication */
232 if (((writerGUID->oid == OID_WRITE_APPSELF) &&
233 ((writerGUID->aid & 0x03) == MANAGEDAPPLICATION)) ||
234 ((writerGUID->oid == OID_WRITE_APP) &&
235 ((writerGUID->aid & 0x03) == MANAGER))) {
237 pthread_rwlock_wrlock(&d->readerApplications.lock);
238 cstReader = &d->readerApplications;
239 objectEntryOID = objectEntryFind(d, &csChange->guid);
240 if (!objectEntryOID) {
241 AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
243 parameterUpdateApplication(csChange, ap);
244 if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
246 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
247 objectEntryOID->appMOM = getTypeApp(d, ap, senderIPAddress);
248 if (objectEntryOID->appMOM) {
249 debug(46, 2) ("MOM application 0x%x-0x%x accepted\n",
250 csChange->guid.hid, csChange->guid.aid);
251 //increment vargAppsSequenceNumber and make csChange
252 SeqNumberInc(d->appParams->vargAppsSequenceNumber,
253 d->appParams->vargAppsSequenceNumber);
254 //WAS & WM is locked inside next function
255 appSelfParamChanged(d, ORTE_TRUE, ORTE_FALSE, ORTE_TRUE, ORTE_TRUE);
256 CSTReaderAddRemoteWriter(d, cstReader,
257 objectEntryOID, writerGUID->oid);
258 CSTWriterAddRemoteReader(d, &d->writerManagers,
262 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
263 pthread_rwlock_unlock(&d->writerManagers.lock);
265 debug(46, 2) ("OAM application 0x%x-0x%x accepted\n",
266 csChange->guid.hid, csChange->guid.aid);
268 pthread_rwlock_wrlock(&d->writerApplications.lock);
269 CSTWriterAddRemoteReader(d, &d->writerApplications,
273 pthread_rwlock_unlock(&d->writerApplications.lock);
278 if (objectEntryOID) {
279 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
280 if (objectEntryOID->appMOM) {
281 if (csChange->alive == ORTE_TRUE)
282 objectEntryRefreshApp(d, objectEntryOID);
284 //turn off expiration timer
286 objectEntryOID->objectEntryAID,
287 &objectEntryOID->expirationPurgeTimer,
289 debug(46, 3) ("for application 0x%x-0x%x turn off expiration timer\n",
290 objectEntryOID->guid.hid, objectEntryOID->guid.aid);
295 /* try to proc csChange */
296 RTPSVarFinish(cstReader, cstRemoteWriter, csChange);
299 /**********************************************************************************/
301 NewPublisher(ORTEDomain *d, ObjectEntryOID *op)
305 CSTWriter *cstWriter = NULL;
306 CSTReader *cstReader = NULL;
309 if ((d == NULL) || (op == NULL))
311 pp = (ORTEPublProp *)op->attributes;
312 //***************************************
314 //try to find if subscription exists
315 pthread_rwlock_rdlock(&d->patternEntry.lock);
316 pthread_rwlock_rdlock(&d->subscriptions.lock);
317 gavl_cust_for_each(CSTReader,
318 &d->subscriptions, cstReader) {
319 if (cstReader->createdByPattern) {
321 sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
322 if ((strcmp((const char *)sp->topic, (const char *)pp->topic) == 0) &&
323 (strcmp((const char *)sp->typeName, (const char *)pp->typeName) == 0))
327 pthread_rwlock_unlock(&d->subscriptions.lock);
328 if (!cstReader) { //not exists
329 ul_list_for_each(Pattern, &d->patternEntry, pnode) {
330 if ((fnmatch((const char *)pnode->topic, (const char *)pp->topic, 0) == 0) &&
331 (fnmatch((const char *)pnode->type, (const char *)pp->typeName, 0) == 0)) {
334 pthread_rwlock_unlock(&d->readerPublications.lock);
335 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
336 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
337 cstReader = pnode->subscriptionCallBack(
339 (char *)pp->typeName,
342 cstReader->createdByPattern = ORTE_TRUE;
345 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
346 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
347 pthread_rwlock_wrlock(&d->readerPublications.lock);
351 pthread_rwlock_unlock(&d->patternEntry.lock);
353 pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
354 gavl_cust_for_each(SubscriptionList, &d->psEntry, o) {
355 ORTESubsProp *sp = (ORTESubsProp *)o->attributes;
357 if ((strcmp((const char *)pp->topic, (const char *)sp->topic) == 0) &&
358 (strcmp((const char *)pp->typeName, (const char *)sp->typeName) == 0) &&
359 (pp->typeChecksum == sp->typeChecksum)) {
360 //add Subscription to Publisher (only if private)
361 if (op->privateCreated) {
362 pthread_rwlock_rdlock(&d->publications.lock);
363 if ((cstWriter = CSTWriter_find(&d->publications, &op->guid))) {
364 pthread_rwlock_wrlock(&cstWriter->lock);
365 if (!CSTRemoteReader_find(cstWriter, &o->guid)) {
368 so = getSubsO2SRemoteReader(d, o, sp);
369 CSTWriterAddRemoteReader(d, cstWriter, o,
371 debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
372 op->guid.hid, op->guid.aid, op->guid.oid,
373 o->guid.hid, o->guid.aid, o->guid.oid);
375 pthread_rwlock_unlock(&cstWriter->lock);
377 pthread_rwlock_unlock(&d->publications.lock);
379 //add Publisher to Subscriber (only if private)
380 if (o->privateCreated) {
381 pthread_rwlock_rdlock(&d->subscriptions.lock);
382 if ((cstReader = CSTReader_find(&d->subscriptions, &o->guid))) {
383 pthread_rwlock_wrlock(&cstReader->lock);
384 if (!CSTRemoteWriter_find(cstReader, &op->guid)) {
385 CSTReaderAddRemoteWriter(d, cstReader, op, op->oid);
386 debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
387 o->guid.hid, o->guid.aid, o->guid.oid,
388 op->guid.hid, op->guid.aid, op->guid.oid);
390 pthread_rwlock_unlock(&cstReader->lock);
392 pthread_rwlock_unlock(&d->subscriptions.lock);
396 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
399 /**********************************************************************************/
401 NewSubscriber(ORTEDomain *d, ObjectEntryOID *os)
405 CSTWriter *cstWriter;
406 CSTReader *cstReader;
408 if ((d == NULL) || (os == NULL))
410 sp = (ORTESubsProp *)os->attributes;
411 pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
412 gavl_cust_for_each(PublicationList, &d->psEntry, o) {
413 ORTEPublProp *pp = (ORTEPublProp *)o->attributes;
415 if ((strcmp((const char *)sp->topic, (const char *)pp->topic) == 0) &&
416 (strcmp((const char *)sp->typeName, (const char *)pp->typeName) == 0) &&
417 (sp->typeChecksum == pp->typeChecksum)) {
418 //add Publication to Subscription (only if private)
419 if (os->privateCreated) {
420 pthread_rwlock_rdlock(&d->subscriptions.lock);
421 if ((cstReader = CSTReader_find(&d->subscriptions, &os->guid))) {
422 pthread_rwlock_wrlock(&cstReader->lock);
423 if (!CSTRemoteWriter_find(cstReader, &o->guid)) {
424 CSTReaderAddRemoteWriter(d, cstReader, o, o->oid);
425 debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
426 os->guid.hid, os->guid.aid, os->guid.oid,
427 o->guid.hid, o->guid.aid, o->guid.oid);
429 pthread_rwlock_unlock(&cstReader->lock);
431 pthread_rwlock_unlock(&d->subscriptions.lock);
433 //add Subscriber to Publisher (only if private)
434 if (o->privateCreated) {
435 pthread_rwlock_rdlock(&d->publications.lock);
436 if ((cstWriter = CSTWriter_find(&d->publications, &o->guid))) {
437 pthread_rwlock_wrlock(&cstWriter->lock);
438 if (!CSTRemoteReader_find(cstWriter, &os->guid)) {
441 sos = getSubsO2SRemoteReader(d, os, sp);
442 CSTWriterAddRemoteReader(d, cstWriter, os,
444 debug(46, 2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
445 GUID_PRINTF(o->guid),
446 GUID_PRINTF(os->guid));
448 pthread_rwlock_unlock(&cstWriter->lock);
450 pthread_rwlock_unlock(&d->publications.lock);
454 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
457 /**********************************************************************************/
459 RTPSVarApp(ORTEDomain *d, CSChange *csChange, GUID_RTPS *writerGUID)
461 CSTReader *cstReader = NULL;
462 CSTRemoteWriter *cstRemoteWriter = NULL;
463 ObjectEntryAID *objectEntryAID;
464 ObjectEntryOID *objectEntryOID, *sobjectEntryOID;
466 if ((d->guid.aid & 3) != MANAGEDAPPLICATION)
469 switch (writerGUID->oid) {
471 pthread_rwlock_wrlock(&d->readerManagers.lock);
472 cstReader = &d->readerManagers;
473 objectEntryOID = objectEntryFind(d, &csChange->guid);
474 if (!objectEntryOID) {
475 AppParams *ap = (AppParams *)MALLOC(sizeof(AppParams));
477 parameterUpdateApplication(csChange, ap);
478 if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
480 debug(46, 2) ("new manager 0x%x-0x%x accepted\n",
481 csChange->guid.hid, csChange->guid.aid);
482 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
483 objectEntryOID->privateCreated = ORTE_FALSE;
484 pthread_rwlock_wrlock(&d->readerApplications.lock);
485 CSTReaderAddRemoteWriter(d, &d->readerApplications,
486 objectEntryOID, OID_WRITE_APP);
487 pthread_rwlock_unlock(&d->readerApplications.lock);
488 //all applications from manager node set expiration timer
489 gavl_cust_for_each(ObjectEntryAID,
490 objectEntryOID->objectEntryHID, objectEntryAID) {
491 ObjectEntryOID *objectEntryOID1;
493 objectEntryOID1 = ObjectEntryOID_find(objectEntryAID, &csChange->guid.oid);
494 objectEntryRefreshApp(d, objectEntryOID1);
500 GUID_RTPS guid_wapp = csChange->guid;
501 guid_wapp.oid = OID_WRITE_APP;
502 pthread_rwlock_wrlock(&d->readerApplications.lock);
503 cstRemoteWriter = CSTRemoteWriter_find(&d->readerApplications, &guid_wapp);
504 //setup state of cstRemoteWriter on send ACK to manager
505 if (cstRemoteWriter) {
506 if (cstRemoteWriter->commStateACK == WAITING) {
508 cstRemoteWriter->spobject->objectEntryAID,
509 &cstRemoteWriter->repeatActiveQueryTimer,
510 1); //metatraffic timer
512 cstRemoteWriter->spobject->objectEntryAID,
513 &cstRemoteWriter->repeatActiveQueryTimer,
514 1, //metatraffic timer
515 "CSTReaderQueryTimer",
517 &cstRemoteWriter->cstReader->lock,
522 pthread_rwlock_unlock(&d->readerApplications.lock);
524 if (csChange->alive == ORTE_TRUE)
525 objectEntryRefreshApp(d, objectEntryOID);
526 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
527 if ((!cstRemoteWriter) &&
528 (csChange->guid.hid == writerGUID->hid) &&
529 (csChange->guid.aid == writerGUID->aid)) {
531 CSTReaderAddRemoteWriter(d, cstReader, objectEntryOID, writerGUID->oid);
535 pthread_rwlock_wrlock(&d->readerApplications.lock);
536 cstReader = &d->readerApplications;
537 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
538 if (cstRemoteWriter) {
540 GUID_RTPS guid_tmp = csChange->guid;
541 guid_tmp.oid = OID_WRITE_PUBL;
542 objectEntryOID = objectEntryFind(d, &csChange->guid);
543 if (!CSTRemoteWriter_find(&d->readerPublications, &guid_tmp)) {
544 if (!objectEntryOID) {
545 ap = (AppParams *)MALLOC(sizeof(AppParams));
547 parameterUpdateApplication(csChange, ap);
548 if (generateEvent(d, &csChange->guid, (void *)ap, csChange->alive) &&
550 debug(46, 2) ("new application 0x%x-0x%x accepted\n",
551 csChange->guid.hid, csChange->guid.aid);
552 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)ap);
553 objectEntryOID->privateCreated = ORTE_FALSE;
560 ap = (AppParams *)objectEntryOID->attributes;
561 sobjectEntryOID = getAppO2SRemoteReader(d, objectEntryOID, ap);
563 pthread_rwlock_wrlock(&d->readerPublications.lock);
564 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
565 pthread_rwlock_wrlock(&d->writerPublications.lock);
566 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
567 CSTReaderAddRemoteWriter(d, &d->readerPublications,
568 objectEntryOID, OID_WRITE_PUBL);
569 CSTReaderAddRemoteWriter(d, &d->readerSubscriptions,
570 objectEntryOID, OID_WRITE_SUBS);
571 CSTWriterAddRemoteReader(d, &d->writerPublications,
575 CSTWriterAddRemoteReader(d, &d->writerSubscriptions,
579 pthread_rwlock_unlock(&d->readerPublications.lock);
580 pthread_rwlock_unlock(&d->readerSubscriptions.lock);
581 pthread_rwlock_unlock(&d->writerPublications.lock);
582 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
584 if (objectEntryOID) {
585 //turn off expiration timer
587 objectEntryOID->objectEntryAID,
588 &objectEntryOID->expirationPurgeTimer,
590 debug(46, 3) ("for application 0x%x-0x%x turn off expiration timer\n",
591 objectEntryOID->guid.hid,
592 objectEntryOID->guid.aid);
597 pthread_rwlock_wrlock(&d->readerPublications.lock);
598 cstReader = &d->readerPublications;
599 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
600 if (cstRemoteWriter) {
601 objectEntryOID = objectEntryFind(d, &csChange->guid);
602 if (!objectEntryOID) {
603 ORTEPublProp *pp = (ORTEPublProp *)MALLOC(sizeof(ORTEPublProp));
605 parameterUpdatePublication(csChange, pp);
606 if (generateEvent(d, &csChange->guid, (void *)pp, csChange->alive) &&
608 debug(46, 2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
609 GUID_PRINTF(csChange->guid));
610 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)pp);
611 objectEntryOID->privateCreated = ORTE_FALSE;
612 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
613 PublicationList_insert(&d->psEntry, objectEntryOID);
614 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
615 NewPublisher(d, objectEntryOID);
619 if ((!PublicationList_find(&d->psEntry, &csChange->guid)) &&
621 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
622 PublicationList_insert(&d->psEntry, objectEntryOID);
623 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
624 NewPublisher(d, objectEntryOID);
630 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
631 cstReader = &d->readerSubscriptions;
632 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
633 if (cstRemoteWriter) {
634 objectEntryOID = objectEntryFind(d, &csChange->guid);
635 if (!objectEntryOID) {
636 ORTESubsProp *sp = (ORTESubsProp *)MALLOC(sizeof(ORTESubsProp));
638 parameterUpdateSubscription(csChange, sp);
639 if (generateEvent(d, &csChange->guid, (void *)sp, csChange->alive) &&
641 debug(46, 2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
642 GUID_PRINTF(csChange->guid));
643 objectEntryOID = objectEntryAdd(d, &csChange->guid, (void *)sp);
644 objectEntryOID->privateCreated = ORTE_FALSE;
645 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
646 SubscriptionList_insert(&d->psEntry, objectEntryOID);
647 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
648 NewSubscriber(d, objectEntryOID);
652 if ((!SubscriptionList_find(&d->psEntry, &csChange->guid)) &&
654 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
655 SubscriptionList_insert(&d->psEntry, objectEntryOID);
656 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
657 NewSubscriber(d, objectEntryOID);
664 /* try to proc csChange */
665 RTPSVarFinish(cstReader, cstRemoteWriter, csChange);
669 /**********************************************************************************/
671 RTPSVar(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
673 GUID_RTPS objectGUID, writerGUID;
676 char p_bit, a_bit, h_bit;
677 CORBA_unsigned_short submsg_len;
679 CDR_Endianness data_endian;
682 /* restore flag possition in submessage */
686 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
691 /* submessage length */
692 CDR_get_ushort(cdrCodec, &submsg_len);
694 /* next data are sent in big endianing */
695 data_endian = cdrCodec->data_endian;
696 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
699 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
702 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
706 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.hid);
709 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.aid);
712 objectGUID.hid = mi->sourceHostId;
713 objectGUID.aid = mi->sourceAppId;
717 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.oid);
719 cdrCodec->data_endian = data_endian;
722 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
723 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
725 writerGUID.hid = mi->sourceHostId;
726 writerGUID.aid = mi->sourceAppId;
727 writerGUID.oid = woid;
729 debug(46, 3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
730 woid, mi->sourceHostId, mi->sourceAppId, sn.low);
733 csChange = (CSChange *)MALLOC(sizeof(CSChange));
734 csChange->cdrCodec.buffer = NULL;
735 csChange->guid = objectGUID;
737 csChange->alive = ORTE_TRUE;
739 csChange->alive = ORTE_FALSE;
741 parameterDecodeCodecToCSChange(csChange, cdrCodec);
743 CSChangeAttributes_init_head(csChange);
745 SEQUENCE_NUMBER_NONE(csChange->gapSN);
748 RTPSVarManager(d, csChange, &writerGUID, senderIPAddress);
751 RTPSVarApp(d, csChange, &writerGUID);