2 * $Id: RTPSVar.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 46 RTPS message VAR
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 void NewPublisher(ORTEDomain *d,ObjectEntryOID *op);
25 void NewSubscriber(ORTEDomain *d,ObjectEntryOID *os);
27 /**********************************************************************************/
29 RTPSVar(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
30 GUID_RTPS objectGUID,writerGUID;
33 int8_t e_bit,p_bit,a_bit;
35 CSTReader *cstReader=NULL;
36 CSTRemoteWriter *cstRemoteWriter=NULL;
37 CSTRemoteReader *cstRemoteReader=NULL;
39 ObjectEntryAID *objectEntryAID;
40 ObjectEntryOID *objectEntryOID;
42 e_bit=rtps_msg[1] & 0x01;
43 p_bit=(rtps_msg[1] & 0x02)>>1;
44 a_bit=(rtps_msg[1] & 0x04)>>2;
45 submsg_len=*((u_int16_t*)(rtps_msg+2));
46 conv_u16(&submsg_len,e_bit);
47 roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
49 woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
51 if (rtps_msg[1] & 0x08) { /* bit H */
52 objectGUID.hid=*((HostId*)(rtps_msg+12)); /* HostId */
53 conv_u32(&objectGUID.hid,0);
54 objectGUID.aid=*((AppId*)(rtps_msg+16)); /* AppId */
55 conv_u32(&objectGUID.aid,0);
58 objectGUID.hid=mi->sourceHostId;
59 objectGUID.aid=mi->sourceAppId;
61 objectGUID.oid=*((ObjectId*)(rtps_msg+20)); /* ObjectId */
62 conv_u32(&objectGUID.oid,0);
63 sn=*((SequenceNumber*)(rtps_msg+24)); /* writerSN */
65 writerGUID.hid=mi->sourceHostId;
66 writerGUID.aid=mi->sourceAppId;
69 debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
70 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
73 csChange=(CSChange*)MALLOC(sizeof(CSChange));
74 csChange->cdrStream.buffer=NULL;
75 csChange->guid=objectGUID;
76 if (a_bit) csChange->alive=ORTE_TRUE;
77 else csChange->alive=ORTE_FALSE;
79 parameterDecodeStreamToCSChange(csChange,rtps_msg+32,submsg_len,e_bit);
81 CSChangeAttributes_init_head(csChange);
83 SEQUENCE_NUMBER_NONE(csChange->gapSN);
86 if ((d->guid.aid & 0x03)==MANAGER) {
87 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
88 ((writerGUID.aid & 0x03)==MANAGER)) {
90 pthread_rwlock_wrlock(&d->readerManagers.lock);
91 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
92 cstReader=&d->readerManagers;
93 gavl_cust_for_each(CSTRemoteReader,
94 &d->writerApplicationSelf,
96 if (cstRemoteReader->guid.hid==senderIPAddress) break;
98 if (cstRemoteReader) {
99 objectEntryOID=objectEntryFind(d,&objectGUID);
100 if (!objectEntryOID) {
101 // create new RemoteReader
102 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
104 parameterUpdateApplication(csChange,ap);
105 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
107 debug(46,2) ("manager 0x%x-0x%x accepted\n",
108 objectGUID.hid,objectGUID.aid);
109 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
110 CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
111 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
112 pthread_rwlock_wrlock(&d->readerApplications.lock);
113 pthread_rwlock_wrlock(&d->writerApplications.lock);
114 CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
115 CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
116 pthread_rwlock_unlock(&d->writerApplications.lock);
117 pthread_rwlock_unlock(&d->readerApplications.lock);
118 //all applications from manager node set expiration timer
119 gavl_cust_for_each(ObjectEntryAID,
120 objectEntryOID->objectEntryHID,objectEntryAID) {
121 ObjectEntryOID *objectEntryOID1;
122 objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
123 objectEntryRefreshApp(d,objectEntryOID1);
129 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
131 objectEntryRefreshApp(d,cstRemoteWriter->objectEntryOID);
135 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
137 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
138 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
139 ((writerGUID.oid==OID_WRITE_APP) &&
140 ((writerGUID.aid & 0x03)==MANAGER))) {
142 pthread_rwlock_wrlock(&d->readerApplications.lock);
143 cstReader=&d->readerApplications;
144 objectEntryOID=objectEntryFind(d,&objectGUID);
145 if (!objectEntryOID) {
146 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
148 parameterUpdateApplication(csChange,ap);
149 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
151 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
152 objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
153 if (objectEntryOID->appMOM) {
154 debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
155 objectGUID.hid,objectGUID.aid);
156 //increment vargAppsSequenceNumber and make csChange
157 SeqNumberInc(d->appParams->vargAppsSequenceNumber,
158 d->appParams->vargAppsSequenceNumber);
159 //WAS & WM is locked inside next function
160 appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE,ORTE_TRUE);
161 CSTReaderAddRemoteWriter(d,cstReader,
162 objectEntryOID,writerGUID.oid);
163 CSTWriterAddRemoteReader(d,&d->writerManagers,
164 objectEntryOID,OID_READ_MGR);
165 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
166 pthread_rwlock_unlock(&d->writerManagers.lock);
168 debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
169 objectGUID.hid,objectGUID.aid);
171 pthread_rwlock_wrlock(&d->writerApplications.lock);
172 CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
173 pthread_rwlock_unlock(&d->writerApplications.lock);
178 if (objectEntryOID) {
179 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
180 if (objectEntryOID->appMOM) {
181 objectEntryRefreshApp(d,objectEntryOID);
183 //turn off expiration timer
185 objectEntryOID->objectEntryAID,
186 &objectEntryOID->expirationPurgeTimer,
188 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
189 objectEntryOID->guid.hid,objectEntryOID->guid.aid);
195 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
196 switch (writerGUID.oid) {
198 pthread_rwlock_wrlock(&d->readerManagers.lock);
199 cstReader=&d->readerManagers;
200 objectEntryOID=objectEntryFind(d,&objectGUID);
201 if (!objectEntryOID) {
202 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
204 parameterUpdateApplication(csChange,ap);
205 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
207 debug(46,2) ("new manager 0x%x-0x%x accepted\n",
208 objectGUID.hid,objectGUID.aid);
209 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
210 objectEntryOID->privateCreated=ORTE_FALSE;
211 pthread_rwlock_wrlock(&d->readerApplications.lock);
212 CSTReaderAddRemoteWriter(d,&d->readerApplications,
213 objectEntryOID,OID_WRITE_APP);
214 pthread_rwlock_unlock(&d->readerApplications.lock);
215 //all applications from manager node set expiration timer
216 gavl_cust_for_each(ObjectEntryAID,
217 objectEntryOID->objectEntryHID,objectEntryAID) {
218 ObjectEntryOID *objectEntryOID1;
219 objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
220 objectEntryRefreshApp(d,objectEntryOID1);
226 GUID_RTPS guid_wapp=objectGUID;
227 guid_wapp.oid=OID_WRITE_APP;
228 pthread_rwlock_wrlock(&d->readerApplications.lock);
229 cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
230 //setup state of cstRemoteWriter on send ACK to manager
231 if (cstRemoteWriter) {
232 if (cstRemoteWriter->commStateACK==WAITING) {
234 cstRemoteWriter->objectEntryOID->objectEntryAID,
235 &cstRemoteWriter->repeatActiveQueryTimer,
236 1); //metatraffic timer
238 cstRemoteWriter->objectEntryOID->objectEntryAID,
239 &cstRemoteWriter->repeatActiveQueryTimer,
240 1, //metatraffic timer
241 "CSTReaderQueryTimer",
243 &cstRemoteWriter->cstReader->lock,
248 pthread_rwlock_unlock(&d->readerApplications.lock);
250 objectEntryRefreshApp(d,objectEntryOID);
251 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
252 if ((!cstRemoteWriter) &&
253 (objectGUID.hid==writerGUID.hid) && (objectGUID.aid==writerGUID.aid)) {
255 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
259 pthread_rwlock_wrlock(&d->readerApplications.lock);
260 cstReader=&d->readerApplications;
261 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
262 if (cstRemoteWriter) {
263 GUID_RTPS guid_tmp=objectGUID;
264 guid_tmp.oid=OID_WRITE_PUBL;
265 objectEntryOID=objectEntryFind(d,&objectGUID);
266 if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
267 if (!objectEntryOID) {
268 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
270 parameterUpdateApplication(csChange,ap);
271 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
273 debug(46,2) ("new application 0x%x-0x%x accepted\n",
274 objectGUID.hid,objectGUID.aid);
275 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
276 objectEntryOID->privateCreated=ORTE_FALSE;
282 pthread_rwlock_wrlock(&d->readerPublications.lock);
283 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
284 pthread_rwlock_wrlock(&d->writerPublications.lock);
285 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
286 CSTReaderAddRemoteWriter(d,&d->readerPublications,
287 objectEntryOID,OID_WRITE_PUBL);
288 CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
289 objectEntryOID,OID_WRITE_SUBS);
290 CSTWriterAddRemoteReader(d,&d->writerPublications,
291 objectEntryOID,OID_READ_PUBL);
292 CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
293 objectEntryOID,OID_READ_SUBS);
294 pthread_rwlock_unlock(&d->readerPublications.lock);
295 pthread_rwlock_unlock(&d->readerSubscriptions.lock);
296 pthread_rwlock_unlock(&d->writerPublications.lock);
297 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
299 if (objectEntryOID) {
300 //turn off expiration timer
302 objectEntryOID->objectEntryAID,
303 &objectEntryOID->expirationPurgeTimer,
305 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
306 objectEntryOID->guid.hid,
307 objectEntryOID->guid.aid);
312 pthread_rwlock_wrlock(&d->readerPublications.lock);
313 cstReader=&d->readerPublications;
314 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
315 if (cstRemoteWriter) {
316 objectEntryOID=objectEntryFind(d,&objectGUID);
317 if (!objectEntryOID) {
318 ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
320 parameterUpdatePublication(csChange,pp);
321 if (generateEvent(d,&objectGUID,(void*)pp,ORTE_TRUE) &&
323 debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
324 objectGUID.hid,objectGUID.aid,objectGUID.oid);
325 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)pp);
326 objectEntryOID->privateCreated=ORTE_FALSE;
327 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
328 PublicationList_insert(&d->psEntry,objectEntryOID);
329 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
330 NewPublisher(d,objectEntryOID);
334 if ((!PublicationList_find(&d->psEntry,&objectGUID)) &&
336 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
337 PublicationList_insert(&d->psEntry,objectEntryOID);
338 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
339 NewPublisher(d,objectEntryOID);
345 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
346 cstReader=&d->readerSubscriptions;
347 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
348 if (cstRemoteWriter) {
349 objectEntryOID=objectEntryFind(d,&objectGUID);
350 if (!objectEntryOID) {
351 ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
353 parameterUpdateSubscription(csChange,sp);
354 if (generateEvent(d,&objectGUID,(void*)sp,ORTE_TRUE) &&
356 debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
357 objectGUID.hid,objectGUID.aid,objectGUID.oid);
358 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)sp);
359 objectEntryOID->privateCreated=ORTE_FALSE;
360 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
361 SubscriptionList_insert(&d->psEntry,objectEntryOID);
362 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
363 NewSubscriber(d,objectEntryOID);
367 if ((!SubscriptionList_find(&d->psEntry,&objectGUID)) &&
369 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
370 SubscriptionList_insert(&d->psEntry,objectEntryOID);
371 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
372 NewSubscriber(d,objectEntryOID);
379 if (!cstReader) return;
380 if (!cstRemoteWriter) {
381 pthread_rwlock_unlock(&cstReader->lock);
384 debug(46,10) ("recv: processing CSChange\n");
385 if (SeqNumberCmp(sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
386 CSTReaderAddCSChange(cstRemoteWriter,csChange);
387 CSTReaderProcCSChanges(d,cstRemoteWriter);
390 parameterDelete(csChange);
393 pthread_rwlock_unlock(&cstReader->lock);
397 /**********************************************************************************/
399 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
402 CSTWriter *cstWriter=NULL;
403 CSTReader *cstReader=NULL;
406 if ((d==NULL) || (op==NULL)) return;
407 pp=(ORTEPublProp*)op->attributes;
408 //***************************************
410 //try to find if subscription exists
411 pthread_rwlock_rdlock(&d->patternEntry.lock);
412 pthread_rwlock_rdlock(&d->subscriptions.lock);
413 gavl_cust_for_each(CSTReader,
414 &d->subscriptions,cstReader) {
415 if (cstReader->createdByPattern) {
417 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
418 if ((strcmp(sp->topic,pp->topic)==0) &&
419 (strcmp(sp->typeName,pp->typeName)==0))
423 pthread_rwlock_unlock(&d->subscriptions.lock);
424 if (!cstReader) { //not exists
425 ul_list_for_each(Pattern,&d->patternEntry,pnode) {
426 if ((fnmatch(pnode->topic,pp->topic,0)==0) &&
427 (fnmatch(pnode->type,pp->typeName,0)==0)) {
430 pthread_rwlock_unlock(&d->readerPublications.lock);
431 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
432 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
433 cstReader=pnode->subscriptionCallBack(
438 cstReader->createdByPattern=ORTE_TRUE;
441 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
442 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
443 pthread_rwlock_wrlock(&d->readerPublications.lock);
447 pthread_rwlock_unlock(&d->patternEntry.lock);
449 pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
450 gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
451 ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
452 if ((strcmp(pp->topic,sp->topic)==0) &&
453 (strcmp(pp->typeName,sp->typeName)==0) &&
454 (pp->typeChecksum==sp->typeChecksum)) {
455 //add Subscription to Publisher (only if private)
456 if (op->privateCreated) {
457 pthread_rwlock_rdlock(&d->publications.lock);
458 if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
459 pthread_rwlock_wrlock(&cstWriter->lock);
460 if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
461 CSTWriterAddRemoteReader(d,cstWriter,o,o->oid);
462 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
463 op->guid.hid,op->guid.aid,op->guid.oid,
464 o->guid.hid,o->guid.aid,o->guid.oid);
466 pthread_rwlock_unlock(&cstWriter->lock);
468 pthread_rwlock_unlock(&d->publications.lock);
470 //add Publisher to Subscriber (only if private)
471 if (o->privateCreated) {
472 pthread_rwlock_rdlock(&d->subscriptions.lock);
473 if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
474 pthread_rwlock_wrlock(&cstReader->lock);
475 if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
476 CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
477 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
478 o->guid.hid,o->guid.aid,o->guid.oid,
479 op->guid.hid,op->guid.aid,op->guid.oid);
481 pthread_rwlock_unlock(&cstReader->lock);
483 pthread_rwlock_unlock(&d->subscriptions.lock);
487 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
490 /**********************************************************************************/
492 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
495 CSTWriter *cstWriter;
496 CSTReader *cstReader;
498 if ((d==NULL) || (os==NULL)) return;
499 sp=(ORTESubsProp*)os->attributes;
500 pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
501 gavl_cust_for_each(PublicationList,&d->psEntry,o) {
502 ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
503 if ((strcmp(sp->topic,pp->topic)==0) &&
504 (strcmp(sp->typeName,pp->typeName)==0) &&
505 (sp->typeChecksum==pp->typeChecksum)) {
506 //add Publication to Subscription (only if private)
507 if (os->privateCreated) {
508 pthread_rwlock_rdlock(&d->subscriptions.lock);
509 if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
510 pthread_rwlock_wrlock(&cstReader->lock);
511 if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
512 CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
513 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
514 os->guid.hid,os->guid.aid,os->guid.oid,
515 o->guid.hid,o->guid.aid,o->guid.oid);
517 pthread_rwlock_unlock(&cstReader->lock);
519 pthread_rwlock_unlock(&d->subscriptions.lock);
521 //add Subscriber to Publisher (only if private)
522 if (o->privateCreated) {
523 pthread_rwlock_rdlock(&d->publications.lock);
524 if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
525 pthread_rwlock_wrlock(&cstWriter->lock);
526 if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
527 CSTWriterAddRemoteReader(d,cstWriter,os,os->oid);
528 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
529 o->guid.hid,o->guid.aid,o->guid.oid,
530 os->guid.hid,os->guid.aid,os->guid.oid);
532 pthread_rwlock_unlock(&cstWriter->lock);
534 pthread_rwlock_unlock(&d->publications.lock);
538 pthread_rwlock_unlock(&d->psEntry.publicationsLock);