2 * $Id: RTPSVar.c,v 0.0.0.2 2004/11/24
4 * DEBUG: section 46 RTPS message VAR
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSVarCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange)
38 CDR_Endianness data_endian;
46 CDR_put_octet(cdrCodec,VAR);
49 flags=cdrCodec->data_endian;
50 if (!CSChangeAttributes_is_empty(csChange))
54 if (csChange->guid.oid==OID_APP)
56 CDR_put_octet(cdrCodec,flags);
61 /* next data are sent in big endianing */
62 data_endian=cdrCodec->data_endian;
63 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
66 CDR_put_ulong(cdrCodec,roid);
69 CDR_put_ulong(cdrCodec,woid);
71 if (csChange->guid.oid==OID_APP) {
73 CDR_put_ulong(cdrCodec,csChange->guid.hid);
76 CDR_put_ulong(cdrCodec,csChange->guid.aid);
80 CDR_put_ulong(cdrCodec,csChange->guid.oid);
82 cdrCodec->data_endian=data_endian;
85 CDR_put_ulong(cdrCodec,csChange->sn.high);
86 if (CDR_put_ulong(cdrCodec,csChange->sn.low)==CORBA_FALSE) {
92 if (!CSChangeAttributes_is_empty(csChange)) {
93 if (parameterCodeCodecFromCSChange(csChange,cdrCodec)<0) {
99 /* count length of message */
100 len=cdrCodec->wptr-swptr;
103 cdrCodec->wptr=swptr+2;
104 CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)(len-4));
106 cdrCodec->wptr=swptr+len;
111 /**********************************************************************************/
113 RTPSVarFinish(CSTReader *cstReader,CSTRemoteWriter *cstRemoteWriter,
114 CSChange *csChange) {
116 if (cstReader && cstRemoteWriter) {
117 debug(46,10) ("recv: processing CSChange\n");
118 if (SeqNumberCmp(csChange->sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
119 CSTReaderAddCSChange(cstRemoteWriter,csChange);
120 CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
126 //destroy csChange if any
127 parameterDelete(csChange);
131 pthread_rwlock_unlock(&cstReader->lock);
136 /**********************************************************************************/
138 RTPSVarManager(ORTEDomain *d,CSChange *csChange,GUID_RTPS *writerGUID,
139 IPAddress senderIPAddress)
141 CSTReader *cstReader=NULL;
142 CSTRemoteReader *cstRemoteReader=NULL;
143 CSTRemoteWriter *cstRemoteWriter=NULL;
144 ObjectEntryAID *objectEntryAID;
145 ObjectEntryOID *objectEntryOID;
146 Boolean usedMulticast=ORTE_FALSE;
149 if ((d->guid.aid & 0x03)!=MANAGER) return;
152 if ((writerGUID->oid==OID_WRITE_APPSELF) &&
153 ((writerGUID->aid & 0x03)==MANAGER)) {
154 pthread_rwlock_wrlock(&d->readerManagers.lock);
155 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
156 cstReader=&d->readerManagers;
157 gavl_cust_for_each(CSTRemoteReader,
158 &d->writerApplicationSelf,
160 AppParams *ap=(AppParams*)cstRemoteReader->sobject->attributes;
161 for (i=0;i<ap->unicastIPAddressCount;i++) {
162 if (ap->unicastIPAddressList[i]==senderIPAddress) {
166 if (i!=ap->unicastIPAddressCount)
168 if (matchMulticastAddresses(cstRemoteReader->sobject,
169 cstRemoteReader->cstWriter->objectEntryOID)) {
170 usedMulticast=ORTE_TRUE;
174 if (cstRemoteReader) {
175 objectEntryOID=objectEntryFind(d,&csChange->guid);
176 if (!objectEntryOID) {
177 // create new RemoteReader
178 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
180 parameterUpdateApplication(csChange,ap);
181 if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
183 debug(46,2) ("manager 0x%x-0x%x accepted\n",
184 csChange->guid.hid,csChange->guid.aid);
185 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
186 CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
187 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID->oid);
188 pthread_rwlock_wrlock(&d->readerApplications.lock);
189 pthread_rwlock_wrlock(&d->writerApplications.lock);
190 CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
192 /* connect to virual objectEntryOID of multicast manager */
193 CSTWriterAddRemoteReader(d,
194 &d->writerApplications,
197 cstRemoteReader->sobject);
199 CSTWriterAddRemoteReader(d,&d->writerApplications,
204 pthread_rwlock_unlock(&d->writerApplications.lock);
205 pthread_rwlock_unlock(&d->readerApplications.lock);
206 //all applications from manager node set expiration timer
207 gavl_cust_for_each(ObjectEntryAID,
208 objectEntryOID->objectEntryHID,objectEntryAID) {
209 ObjectEntryOID *objectEntryOID1;
210 objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&csChange->guid.oid);
211 objectEntryRefreshApp(d,objectEntryOID1);
217 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
218 if ((cstRemoteWriter) && (csChange->alive==ORTE_TRUE))
219 objectEntryRefreshApp(d,cstRemoteWriter->spobject);
223 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
227 /* readerApplication */
228 if (((writerGUID->oid==OID_WRITE_APPSELF) &&
229 ((writerGUID->aid & 0x03)==MANAGEDAPPLICATION)) ||
230 ((writerGUID->oid==OID_WRITE_APP) &&
231 ((writerGUID->aid & 0x03)==MANAGER))) {
233 pthread_rwlock_wrlock(&d->readerApplications.lock);
234 cstReader=&d->readerApplications;
235 objectEntryOID=objectEntryFind(d,&csChange->guid);
236 if (!objectEntryOID) {
237 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
239 parameterUpdateApplication(csChange,ap);
240 if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
242 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
243 objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
244 if (objectEntryOID->appMOM) {
245 debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
246 csChange->guid.hid,csChange->guid.aid);
247 //increment vargAppsSequenceNumber and make csChange
248 SeqNumberInc(d->appParams->vargAppsSequenceNumber,
249 d->appParams->vargAppsSequenceNumber);
250 //WAS & WM is locked inside next function
251 appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE,ORTE_TRUE);
252 CSTReaderAddRemoteWriter(d,cstReader,
253 objectEntryOID,writerGUID->oid);
254 CSTWriterAddRemoteReader(d,&d->writerManagers,
258 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
259 pthread_rwlock_unlock(&d->writerManagers.lock);
261 debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
262 csChange->guid.hid,csChange->guid.aid);
264 pthread_rwlock_wrlock(&d->writerApplications.lock);
265 CSTWriterAddRemoteReader(d,&d->writerApplications,
269 pthread_rwlock_unlock(&d->writerApplications.lock);
274 if (objectEntryOID) {
275 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
276 if (objectEntryOID->appMOM) {
277 if (csChange->alive==ORTE_TRUE)
278 objectEntryRefreshApp(d,objectEntryOID);
280 //turn off expiration timer
282 objectEntryOID->objectEntryAID,
283 &objectEntryOID->expirationPurgeTimer,
285 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
286 objectEntryOID->guid.hid,objectEntryOID->guid.aid);
291 /* try to proc csChange */
292 RTPSVarFinish(cstReader,cstRemoteWriter,csChange);
295 /**********************************************************************************/
297 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
300 CSTWriter *cstWriter=NULL;
301 CSTReader *cstReader=NULL;
304 if ((d==NULL) || (op==NULL)) return;
305 pp=(ORTEPublProp*)op->attributes;
306 //***************************************
308 //try to find if subscription exists
309 pthread_rwlock_rdlock(&d->patternEntry.lock);
310 pthread_rwlock_rdlock(&d->subscriptions.lock);
311 gavl_cust_for_each(CSTReader,
312 &d->subscriptions,cstReader) {
313 if (cstReader->createdByPattern) {
315 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
316 if ((strcmp((const char *)sp->topic, (const char*)pp->topic)==0) &&
317 (strcmp((const char *)sp->typeName, (const char*)pp->typeName)==0))
321 pthread_rwlock_unlock(&d->subscriptions.lock);
322 if (!cstReader) { //not exists
323 ul_list_for_each(Pattern,&d->patternEntry,pnode) {
324 if ((fnmatch((const char *)pnode->topic, (const char*)pp->topic,0)==0) &&
325 (fnmatch((const char *)pnode->type, (const char*)pp->typeName,0)==0)) {
328 pthread_rwlock_unlock(&d->readerPublications.lock);
329 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
330 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
331 cstReader=pnode->subscriptionCallBack(
333 (char *)pp->typeName,
336 cstReader->createdByPattern=ORTE_TRUE;
339 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
340 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
341 pthread_rwlock_wrlock(&d->readerPublications.lock);
345 pthread_rwlock_unlock(&d->patternEntry.lock);
347 pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
348 gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
349 ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
350 if ((strcmp((const char *)pp->topic, (const char*)sp->topic)==0) &&
351 (strcmp((const char *)pp->typeName, (const char*)sp->typeName)==0) &&
352 (pp->typeChecksum==sp->typeChecksum)) {
353 //add Subscription to Publisher (only if private)
354 if (op->privateCreated) {
355 pthread_rwlock_rdlock(&d->publications.lock);
356 if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
357 pthread_rwlock_wrlock(&cstWriter->lock);
358 if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
361 so=getSubsO2SRemoteReader(d,o,sp);
362 CSTWriterAddRemoteReader(d,cstWriter,o,
364 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
365 op->guid.hid,op->guid.aid,op->guid.oid,
366 o->guid.hid,o->guid.aid,o->guid.oid);
368 pthread_rwlock_unlock(&cstWriter->lock);
370 pthread_rwlock_unlock(&d->publications.lock);
372 //add Publisher to Subscriber (only if private)
373 if (o->privateCreated) {
374 pthread_rwlock_rdlock(&d->subscriptions.lock);
375 if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
376 pthread_rwlock_wrlock(&cstReader->lock);
377 if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
378 CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
379 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
380 o->guid.hid,o->guid.aid,o->guid.oid,
381 op->guid.hid,op->guid.aid,op->guid.oid);
383 pthread_rwlock_unlock(&cstReader->lock);
385 pthread_rwlock_unlock(&d->subscriptions.lock);
389 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
392 /**********************************************************************************/
394 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
397 CSTWriter *cstWriter;
398 CSTReader *cstReader;
400 if ((d==NULL) || (os==NULL)) return;
401 sp=(ORTESubsProp*)os->attributes;
402 pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
403 gavl_cust_for_each(PublicationList,&d->psEntry,o) {
404 ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
405 if ((strcmp((const char *)sp->topic, (const char*)pp->topic)==0) &&
406 (strcmp((const char *)sp->typeName, (const char*)pp->typeName)==0) &&
407 (sp->typeChecksum==pp->typeChecksum)) {
408 //add Publication to Subscription (only if private)
409 if (os->privateCreated) {
410 pthread_rwlock_rdlock(&d->subscriptions.lock);
411 if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
412 pthread_rwlock_wrlock(&cstReader->lock);
413 if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
414 CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
415 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
416 os->guid.hid,os->guid.aid,os->guid.oid,
417 o->guid.hid,o->guid.aid,o->guid.oid);
419 pthread_rwlock_unlock(&cstReader->lock);
421 pthread_rwlock_unlock(&d->subscriptions.lock);
423 //add Subscriber to Publisher (only if private)
424 if (o->privateCreated) {
425 pthread_rwlock_rdlock(&d->publications.lock);
426 if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
427 pthread_rwlock_wrlock(&cstWriter->lock);
428 if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
431 sos=getSubsO2SRemoteReader(d,os,sp);
432 CSTWriterAddRemoteReader(d,cstWriter,os,
434 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
435 GUID_PRINTF(o->guid),
436 GUID_PRINTF(os->guid));
438 pthread_rwlock_unlock(&cstWriter->lock);
440 pthread_rwlock_unlock(&d->publications.lock);
444 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
447 /**********************************************************************************/
449 RTPSVarApp(ORTEDomain *d,CSChange *csChange,GUID_RTPS *writerGUID)
451 CSTReader *cstReader=NULL;
452 CSTRemoteWriter *cstRemoteWriter=NULL;
453 ObjectEntryAID *objectEntryAID;
454 ObjectEntryOID *objectEntryOID,*sobjectEntryOID;
456 if ((d->guid.aid & 3)!=MANAGEDAPPLICATION) return;
458 switch (writerGUID->oid) {
460 pthread_rwlock_wrlock(&d->readerManagers.lock);
461 cstReader=&d->readerManagers;
462 objectEntryOID=objectEntryFind(d,&csChange->guid);
463 if (!objectEntryOID) {
464 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
466 parameterUpdateApplication(csChange,ap);
467 if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
469 debug(46,2) ("new manager 0x%x-0x%x accepted\n",
470 csChange->guid.hid,csChange->guid.aid);
471 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
472 objectEntryOID->privateCreated=ORTE_FALSE;
473 pthread_rwlock_wrlock(&d->readerApplications.lock);
474 CSTReaderAddRemoteWriter(d,&d->readerApplications,
475 objectEntryOID,OID_WRITE_APP);
476 pthread_rwlock_unlock(&d->readerApplications.lock);
477 //all applications from manager node set expiration timer
478 gavl_cust_for_each(ObjectEntryAID,
479 objectEntryOID->objectEntryHID,objectEntryAID) {
480 ObjectEntryOID *objectEntryOID1;
481 objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&csChange->guid.oid);
482 objectEntryRefreshApp(d,objectEntryOID1);
488 GUID_RTPS guid_wapp=csChange->guid;
489 guid_wapp.oid=OID_WRITE_APP;
490 pthread_rwlock_wrlock(&d->readerApplications.lock);
491 cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
492 //setup state of cstRemoteWriter on send ACK to manager
493 if (cstRemoteWriter) {
494 if (cstRemoteWriter->commStateACK==WAITING) {
496 cstRemoteWriter->spobject->objectEntryAID,
497 &cstRemoteWriter->repeatActiveQueryTimer,
498 1); //metatraffic timer
500 cstRemoteWriter->spobject->objectEntryAID,
501 &cstRemoteWriter->repeatActiveQueryTimer,
502 1, //metatraffic timer
503 "CSTReaderQueryTimer",
505 &cstRemoteWriter->cstReader->lock,
510 pthread_rwlock_unlock(&d->readerApplications.lock);
512 if (csChange->alive==ORTE_TRUE)
513 objectEntryRefreshApp(d,objectEntryOID);
514 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
515 if ((!cstRemoteWriter) &&
516 (csChange->guid.hid==writerGUID->hid) &&
517 (csChange->guid.aid==writerGUID->aid)) {
519 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID->oid);
523 pthread_rwlock_wrlock(&d->readerApplications.lock);
524 cstReader=&d->readerApplications;
525 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
526 if (cstRemoteWriter) {
528 GUID_RTPS guid_tmp=csChange->guid;
529 guid_tmp.oid=OID_WRITE_PUBL;
530 objectEntryOID=objectEntryFind(d,&csChange->guid);
531 if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
532 if (!objectEntryOID) {
533 ap=(AppParams*)MALLOC(sizeof(AppParams));
535 parameterUpdateApplication(csChange,ap);
536 if (generateEvent(d,&csChange->guid,(void*)ap,csChange->alive) &&
538 debug(46,2) ("new application 0x%x-0x%x accepted\n",
539 csChange->guid.hid,csChange->guid.aid);
540 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)ap);
541 objectEntryOID->privateCreated=ORTE_FALSE;
548 ap=(AppParams*)objectEntryOID->attributes;
549 sobjectEntryOID=getAppO2SRemoteReader(d,objectEntryOID,ap);
551 pthread_rwlock_wrlock(&d->readerPublications.lock);
552 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
553 pthread_rwlock_wrlock(&d->writerPublications.lock);
554 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
555 CSTReaderAddRemoteWriter(d,&d->readerPublications,
556 objectEntryOID,OID_WRITE_PUBL);
557 CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
558 objectEntryOID,OID_WRITE_SUBS);
559 CSTWriterAddRemoteReader(d,&d->writerPublications,
563 CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
567 pthread_rwlock_unlock(&d->readerPublications.lock);
568 pthread_rwlock_unlock(&d->readerSubscriptions.lock);
569 pthread_rwlock_unlock(&d->writerPublications.lock);
570 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
572 if (objectEntryOID) {
573 //turn off expiration timer
575 objectEntryOID->objectEntryAID,
576 &objectEntryOID->expirationPurgeTimer,
578 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
579 objectEntryOID->guid.hid,
580 objectEntryOID->guid.aid);
585 pthread_rwlock_wrlock(&d->readerPublications.lock);
586 cstReader=&d->readerPublications;
587 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
588 if (cstRemoteWriter) {
589 objectEntryOID=objectEntryFind(d,&csChange->guid);
590 if (!objectEntryOID) {
591 ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
593 parameterUpdatePublication(csChange,pp);
594 if (generateEvent(d,&csChange->guid,(void*)pp,csChange->alive) &&
596 debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
597 GUID_PRINTF(csChange->guid));
598 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)pp);
599 objectEntryOID->privateCreated=ORTE_FALSE;
600 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
601 PublicationList_insert(&d->psEntry,objectEntryOID);
602 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
603 NewPublisher(d,objectEntryOID);
607 if ((!PublicationList_find(&d->psEntry,&csChange->guid)) &&
609 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
610 PublicationList_insert(&d->psEntry,objectEntryOID);
611 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
612 NewPublisher(d,objectEntryOID);
618 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
619 cstReader=&d->readerSubscriptions;
620 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
621 if (cstRemoteWriter) {
622 objectEntryOID=objectEntryFind(d,&csChange->guid);
623 if (!objectEntryOID) {
624 ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
626 parameterUpdateSubscription(csChange,sp);
627 if (generateEvent(d,&csChange->guid,(void*)sp,csChange->alive) &&
629 debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
630 GUID_PRINTF(csChange->guid));
631 objectEntryOID=objectEntryAdd(d,&csChange->guid,(void*)sp);
632 objectEntryOID->privateCreated=ORTE_FALSE;
633 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
634 SubscriptionList_insert(&d->psEntry,objectEntryOID);
635 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
636 NewSubscriber(d,objectEntryOID);
640 if ((!SubscriptionList_find(&d->psEntry,&csChange->guid)) &&
642 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
643 SubscriptionList_insert(&d->psEntry,objectEntryOID);
644 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
645 NewSubscriber(d,objectEntryOID);
652 /* try to proc csChange */
653 RTPSVarFinish(cstReader,cstRemoteWriter,csChange);
657 /**********************************************************************************/
659 RTPSVar(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
660 GUID_RTPS objectGUID,writerGUID;
663 char p_bit,a_bit,h_bit;
664 CORBA_unsigned_short submsg_len;
666 CDR_Endianness data_endian;
669 /* restore flag possition in submessage */
673 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
678 /* submessage length */
679 CDR_get_ushort(cdrCodec,&submsg_len);
681 /* next data are sent in big endianing */
682 data_endian=cdrCodec->data_endian;
683 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
686 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
689 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
693 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.hid);
696 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.aid);
699 objectGUID.hid=mi->sourceHostId;
700 objectGUID.aid=mi->sourceAppId;
704 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&objectGUID.oid);
706 cdrCodec->data_endian=data_endian;
709 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
710 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
712 writerGUID.hid=mi->sourceHostId;
713 writerGUID.aid=mi->sourceAppId;
716 debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
717 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
720 csChange=(CSChange*)MALLOC(sizeof(CSChange));
721 csChange->cdrCodec.buffer=NULL;
722 csChange->guid=objectGUID;
723 if (a_bit) csChange->alive=ORTE_TRUE;
724 else csChange->alive=ORTE_FALSE;
726 parameterDecodeCodecToCSChange(csChange,cdrCodec);
728 CSChangeAttributes_init_head(csChange);
730 SEQUENCE_NUMBER_NONE(csChange->gapSN);
733 RTPSVarManager(d,csChange,&writerGUID,senderIPAddress);
736 RTPSVarApp(d,csChange,&writerGUID);