2 * $Id: RTPSVar.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 46 RTPS message VAR
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 void NewPublisher(ORTEDomain *d,ObjectEntryOID *op);
25 void NewSubscriber(ORTEDomain *d,ObjectEntryOID *os);
27 /**********************************************************************************/
29 RTPSVar(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
30 GUID_RTPS objectGUID,writerGUID;
33 int8_t e_bit,p_bit,a_bit;
35 CSTReader *cstReader=NULL;
36 CSTRemoteWriter *cstRemoteWriter=NULL;
37 CSTRemoteReader *cstRemoteReader=NULL;
39 ObjectEntryAID *objectEntryAID;
40 ObjectEntryOID *objectEntryOID;
42 e_bit=rtps_msg[1] & 0x01;
43 p_bit=(rtps_msg[1] & 0x02)>>1;
44 a_bit=(rtps_msg[1] & 0x04)>>2;
45 submsg_len=*((u_int16_t*)(rtps_msg+2));
46 conv_u16(&submsg_len,e_bit);
47 roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
49 woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
51 if (rtps_msg[1] & 0x08) { /* bit H */
52 objectGUID.hid=*((HostId*)(rtps_msg+12)); /* HostId */
53 conv_u32(&objectGUID.hid,0);
54 objectGUID.aid=*((AppId*)(rtps_msg+16)); /* AppId */
55 conv_u32(&objectGUID.aid,0);
58 objectGUID.hid=mi->sourceHostId;
59 objectGUID.aid=mi->sourceAppId;
61 objectGUID.oid=*((ObjectId*)(rtps_msg+20)); /* ObjectId */
62 conv_u32(&objectGUID.oid,0);
63 sn=*((SequenceNumber*)(rtps_msg+24)); /* writerSN */
65 writerGUID.hid=mi->sourceHostId;
66 writerGUID.aid=mi->sourceAppId;
69 debug(46,3) ("recv: RTPS_VAR(0x%x) from 0x%x-0x%x sn:%u\n",
70 woid,mi->sourceHostId,mi->sourceAppId,sn.low);
73 csChange=(CSChange*)MALLOC(sizeof(CSChange));
74 csChange->cdrStream.buffer=NULL;
75 csChange->guid=objectGUID;
76 if (a_bit) csChange->alive=ORTE_TRUE;
77 else csChange->alive=ORTE_FALSE;
79 parameterDecodeStreamToCSChange(csChange,rtps_msg+32,submsg_len,e_bit);
81 CSChangeAttributes_init_head(csChange);
83 SEQUENCE_NUMBER_NONE(csChange->gapSN);
86 if ((d->guid.aid & 0x03)==MANAGER) {
87 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
88 ((writerGUID.aid & 0x03)==MANAGER)) {
90 pthread_rwlock_wrlock(&d->readerManagers.lock);
91 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
92 cstReader=&d->readerManagers;
93 gavl_cust_for_each(CSTRemoteReader,
94 &d->writerApplicationSelf,
96 if (cstRemoteReader->guid.hid==senderIPAddress) break;
98 if (cstRemoteReader) {
99 objectEntryOID=objectEntryFind(d,&objectGUID);
100 if (!objectEntryOID) {
101 // create new RemoteReader
102 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
104 parameterUpdateApplication(csChange,ap);
105 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
107 debug(46,2) ("manager 0x%x-0x%x accepted\n",
108 objectGUID.hid,objectGUID.aid);
109 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
110 CSTWriterRefreshAllCSChanges(d,cstRemoteReader);
111 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
112 pthread_rwlock_wrlock(&d->readerApplications.lock);
113 pthread_rwlock_wrlock(&d->writerApplications.lock);
114 CSTReaderAddRemoteWriter(d,&d->readerApplications,objectEntryOID,OID_WRITE_APP);
115 CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
116 pthread_rwlock_unlock(&d->writerApplications.lock);
117 pthread_rwlock_unlock(&d->readerApplications.lock);
118 //all applications from manager node set expiration timer
119 gavl_cust_for_each(ObjectEntryAID,
120 objectEntryOID->objectEntryHID,objectEntryAID) {
121 objectEntryOID=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
122 objectEntryRefreshApp(d,objectEntryOID);
128 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
130 objectEntryRefreshApp(d,cstRemoteWriter->objectEntryOID);
134 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
136 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
137 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
138 ((writerGUID.oid==OID_WRITE_APP) &&
139 ((writerGUID.aid & 0x03)==MANAGER))) {
141 pthread_rwlock_wrlock(&d->readerApplications.lock);
142 cstReader=&d->readerApplications;
143 objectEntryOID=objectEntryFind(d,&objectGUID);
144 if (!objectEntryOID) {
145 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
147 parameterUpdateApplication(csChange,ap);
148 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
150 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
151 objectEntryOID->appMOM=getTypeApp(d,ap,senderIPAddress);
152 if (objectEntryOID->appMOM) {
153 debug(46,2) ("MOM application 0x%x-0x%x accepted\n",
154 objectGUID.hid,objectGUID.aid);
155 //increment vargAppsSequenceNumber and make csChange
156 SeqNumberInc(d->appParams->vargAppsSequenceNumber,
157 d->appParams->vargAppsSequenceNumber);
158 //WAS & WM is locked inside next function
159 appSelfParamChanged(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
160 CSTReaderAddRemoteWriter(d,cstReader,
161 objectEntryOID,writerGUID.oid);
162 CSTWriterAddRemoteReader(d,&d->writerManagers,
163 objectEntryOID,OID_READ_MGR);
164 pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
165 pthread_rwlock_unlock(&d->writerManagers.lock);
167 debug(46,2) ("OAM application 0x%x-0x%x accepted\n",
168 objectGUID.hid,objectGUID.aid);
170 pthread_rwlock_wrlock(&d->writerApplications.lock);
171 CSTWriterAddRemoteReader(d,&d->writerApplications,objectEntryOID,OID_READ_APP);
172 pthread_rwlock_unlock(&d->writerApplications.lock);
177 if (objectEntryOID) {
178 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
179 if (objectEntryOID->appMOM) {
180 objectEntryRefreshApp(d,objectEntryOID);
182 //turn off expiration timer
184 objectEntryOID->objectEntryAID,
185 &objectEntryOID->expirationPurgeTimer,
187 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
188 objectEntryOID->guid.hid,objectEntryOID->guid.aid);
194 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
195 switch (writerGUID.oid) {
197 pthread_rwlock_wrlock(&d->readerManagers.lock);
198 cstReader=&d->readerManagers;
199 objectEntryOID=objectEntryFind(d,&objectGUID);
200 if (!objectEntryOID) {
201 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
203 parameterUpdateApplication(csChange,ap);
204 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
206 debug(46,2) ("new manager 0x%x-0x%x accepted\n",
207 objectGUID.hid,objectGUID.aid);
208 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
209 objectEntryOID->private=ORTE_FALSE;
210 pthread_rwlock_wrlock(&d->readerApplications.lock);
211 CSTReaderAddRemoteWriter(d,&d->readerApplications,
212 objectEntryOID,OID_WRITE_APP);
213 pthread_rwlock_unlock(&d->readerApplications.lock);
214 //all applications from manager node set expiration timer
215 gavl_cust_for_each(ObjectEntryAID,
216 objectEntryOID->objectEntryHID,objectEntryAID) {
217 objectEntryOID=ObjectEntryOID_find(objectEntryAID,&objectGUID.oid);
218 objectEntryRefreshApp(d,objectEntryOID);
224 GUID_RTPS guid_wapp=objectGUID;
225 guid_wapp.oid=OID_WRITE_APP;
226 pthread_rwlock_wrlock(&d->readerApplications.lock);
227 cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid_wapp);
228 //setup state of cstRemoteWriter on send ACK to manager
229 if (cstRemoteWriter) {
230 if (cstRemoteWriter->commStateACK==WAITING) {
232 cstRemoteWriter->objectEntryOID->objectEntryAID,
233 &cstRemoteWriter->repeatActiveQueryTimer,
234 1); //metatraffic timer
236 cstRemoteWriter->objectEntryOID->objectEntryAID,
237 &cstRemoteWriter->repeatActiveQueryTimer,
238 1, //metatraffic timer
239 "CSTReaderQueryTimer",
241 &cstRemoteWriter->cstReader->lock,
246 pthread_rwlock_unlock(&d->readerApplications.lock);
248 objectEntryRefreshApp(d,objectEntryOID);
249 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
250 if ((!cstRemoteWriter) &&
251 (objectGUID.hid==writerGUID.hid) && (objectGUID.aid==writerGUID.aid)) {
253 CSTReaderAddRemoteWriter(d,cstReader,objectEntryOID,writerGUID.oid);
257 pthread_rwlock_wrlock(&d->readerApplications.lock);
258 cstReader=&d->readerApplications;
259 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
260 if (cstRemoteWriter) {
261 GUID_RTPS guid_tmp=objectGUID;
262 guid_tmp.oid=OID_WRITE_PUBL;
263 objectEntryOID=objectEntryFind(d,&objectGUID);
264 if (!CSTRemoteWriter_find(&d->readerPublications,&guid_tmp)) {
265 if (!objectEntryOID) {
266 AppParams *ap=(AppParams*)MALLOC(sizeof(AppParams));
268 parameterUpdateApplication(csChange,ap);
269 if (generateEvent(d,&objectGUID,(void*)ap,ORTE_TRUE) &&
271 debug(46,2) ("new application 0x%x-0x%x accepted\n",
272 objectGUID.hid,objectGUID.aid);
273 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)ap);
274 objectEntryOID->private=ORTE_FALSE;
280 pthread_rwlock_wrlock(&d->readerPublications.lock);
281 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
282 pthread_rwlock_wrlock(&d->writerPublications.lock);
283 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
284 CSTReaderAddRemoteWriter(d,&d->readerPublications,
285 objectEntryOID,OID_WRITE_PUBL);
286 CSTReaderAddRemoteWriter(d,&d->readerSubscriptions,
287 objectEntryOID,OID_WRITE_SUBS);
288 CSTWriterAddRemoteReader(d,&d->writerPublications,
289 objectEntryOID,OID_READ_PUBL);
290 CSTWriterAddRemoteReader(d,&d->writerSubscriptions,
291 objectEntryOID,OID_READ_SUBS);
292 pthread_rwlock_unlock(&d->readerPublications.lock);
293 pthread_rwlock_unlock(&d->readerSubscriptions.lock);
294 pthread_rwlock_unlock(&d->writerPublications.lock);
295 pthread_rwlock_unlock(&d->writerSubscriptions.lock);
297 if (objectEntryOID) {
298 //turn off expiration timer
300 objectEntryOID->objectEntryAID,
301 &objectEntryOID->expirationPurgeTimer,
303 debug(46,3) ("for application 0x%x-0x%x turn off expiration timer\n",
304 objectEntryOID->guid.hid,
305 objectEntryOID->guid.aid);
310 pthread_rwlock_wrlock(&d->readerPublications.lock);
311 cstReader=&d->readerPublications;
312 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
313 if (cstRemoteWriter) {
314 objectEntryOID=objectEntryFind(d,&objectGUID);
315 if (!objectEntryOID) {
316 ORTEPublProp *pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
318 parameterUpdatePublication(csChange,pp);
319 if (generateEvent(d,&objectGUID,(void*)pp,ORTE_TRUE) &&
321 debug(46,2) ("new publisher 0x%x-0x%x-0x%x accepted\n",
322 objectGUID.hid,objectGUID.aid,objectGUID.oid);
323 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)pp);
324 objectEntryOID->private=ORTE_FALSE;
325 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
326 PublicationList_insert(&d->psEntry,objectEntryOID);
327 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
328 NewPublisher(d,objectEntryOID);
332 if (!PublicationList_find(&d->psEntry,&objectGUID)) {
333 pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
334 PublicationList_insert(&d->psEntry,objectEntryOID);
335 pthread_rwlock_unlock(&d->psEntry.publicationsLock);
336 NewPublisher(d,objectEntryOID);
342 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
343 cstReader=&d->readerSubscriptions;
344 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
345 if (cstRemoteWriter) {
346 objectEntryOID=objectEntryFind(d,&objectGUID);
347 if (!objectEntryOID) {
348 ORTESubsProp *sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
350 parameterUpdateSubscription(csChange,sp);
351 if (generateEvent(d,&objectGUID,(void*)sp,ORTE_TRUE) &&
353 debug(46,2) ("new subscriber 0x%x-0x%x-0x%x accepted\n",
354 objectGUID.hid,objectGUID.aid,objectGUID.oid);
355 objectEntryOID=objectEntryAdd(d,&objectGUID,(void*)sp);
356 objectEntryOID->private=ORTE_FALSE;
357 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
358 SubscriptionList_insert(&d->psEntry,objectEntryOID);
359 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
360 NewSubscriber(d,objectEntryOID);
364 if (!SubscriptionList_find(&d->psEntry,&objectGUID)) {
365 pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
366 SubscriptionList_insert(&d->psEntry,objectEntryOID);
367 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
368 NewSubscriber(d,objectEntryOID);
375 if (!cstReader) return;
376 if (!cstRemoteWriter) {
377 pthread_rwlock_unlock(&cstReader->lock);
380 debug(46,10) ("recv: processing CSChange\n");
381 if (SeqNumberCmp(sn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
382 CSTReaderAddCSChange(cstRemoteWriter,csChange);
383 CSTReaderProcCSChanges(d,cstRemoteWriter);
386 parameterDelete(csChange);
389 pthread_rwlock_unlock(&cstReader->lock);
393 /**********************************************************************************/
395 NewPublisher(ORTEDomain *d,ObjectEntryOID *op) {
398 CSTWriter *cstWriter=NULL;
399 CSTReader *cstReader=NULL;
402 if ((d==NULL) || (op==NULL)) return;
403 pp=(ORTEPublProp*)op->attributes;
404 //***************************************
406 //try to find if subscription exists
407 pthread_rwlock_rdlock(&d->patternEntry.lock);
408 pthread_rwlock_rdlock(&d->subscriptions.lock);
409 gavl_cust_for_each(CSTReader,
410 &d->subscriptions,cstReader) {
411 if (cstReader->createdByPattern) {
413 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
414 if ((strcmp(sp->topic,pp->topic)==0) &&
415 (strcmp(sp->typeName,pp->typeName)==0))
419 pthread_rwlock_unlock(&d->subscriptions.lock);
420 if (!cstReader) { //not exists
421 ul_list_for_each(Pattern,&d->patternEntry,pnode) {
422 if ((fnmatch(pnode->topic,pp->topic,0)==0) &&
423 (fnmatch(pnode->type,pp->typeName,0)==0)) {
426 pthread_rwlock_unlock(&d->patternEntry.lock);
427 pthread_rwlock_unlock(&d->readerPublications.lock);
428 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
429 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
430 cstReader=pnode->subscriptionCallBack(
435 cstReader->createdByPattern=ORTE_TRUE;
438 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
439 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
440 pthread_rwlock_wrlock(&d->readerPublications.lock);
441 pthread_rwlock_rdlock(&d->patternEntry.lock);
445 pthread_rwlock_unlock(&d->patternEntry.lock);
447 pthread_rwlock_rdlock(&d->psEntry.subscriptionsLock);
448 gavl_cust_for_each(SubscriptionList,&d->psEntry,o) {
449 ORTESubsProp *sp=(ORTESubsProp*)o->attributes;
450 if ((strcmp(pp->topic,sp->topic)==0) &&
451 (strcmp(pp->typeName,sp->typeName)==0) &&
452 (pp->typeChecksum==sp->typeChecksum)) {
453 //add Subscription to Publisher (only if private)
455 pthread_rwlock_rdlock(&d->publications.lock);
456 if ((cstWriter=CSTWriter_find(&d->publications,&op->guid))) {
457 pthread_rwlock_wrlock(&cstWriter->lock);
458 if (!CSTRemoteReader_find(cstWriter,&o->guid)) {
459 CSTWriterAddRemoteReader(d,cstWriter,o,o->oid);
460 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
461 op->guid.hid,op->guid.aid,op->guid.oid,
462 o->guid.hid,o->guid.aid,o->guid.oid);
464 pthread_rwlock_unlock(&cstWriter->lock);
466 pthread_rwlock_unlock(&d->publications.lock);
468 //add Publisher to Subscriber (only if private)
470 pthread_rwlock_rdlock(&d->subscriptions.lock);
471 if ((cstReader=CSTReader_find(&d->subscriptions,&o->guid))) {
472 pthread_rwlock_wrlock(&cstReader->lock);
473 if (!CSTRemoteWriter_find(cstReader,&op->guid)) {
474 CSTReaderAddRemoteWriter(d,cstReader,op,op->oid);
475 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
476 o->guid.hid,o->guid.aid,o->guid.oid,
477 op->guid.hid,op->guid.aid,op->guid.oid);
479 pthread_rwlock_unlock(&cstReader->lock);
481 pthread_rwlock_unlock(&d->subscriptions.lock);
485 pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
488 /**********************************************************************************/
490 NewSubscriber(ORTEDomain *d,ObjectEntryOID *os) {
493 CSTWriter *cstWriter;
494 CSTReader *cstReader;
496 if ((d==NULL) || (os==NULL)) return;
497 sp=(ORTESubsProp*)os->attributes;
498 pthread_rwlock_rdlock(&d->psEntry.publicationsLock);
499 gavl_cust_for_each(PublicationList,&d->psEntry,o) {
500 ORTEPublProp *pp=(ORTEPublProp*)o->attributes;
501 if ((strcmp(sp->topic,pp->topic)==0) &&
502 (strcmp(sp->typeName,pp->typeName)==0) &&
503 (sp->typeChecksum==pp->typeChecksum)) {
504 //add Publication to Subscription (only if private)
506 pthread_rwlock_rdlock(&d->subscriptions.lock);
507 if ((cstReader=CSTReader_find(&d->subscriptions,&os->guid))) {
508 pthread_rwlock_wrlock(&cstReader->lock);
509 if (!CSTRemoteWriter_find(cstReader,&o->guid)) {
510 CSTReaderAddRemoteWriter(d,cstReader,o,o->oid);
511 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
512 os->guid.hid,os->guid.aid,os->guid.oid,
513 o->guid.hid,o->guid.aid,o->guid.oid);
515 pthread_rwlock_unlock(&cstReader->lock);
517 pthread_rwlock_unlock(&d->subscriptions.lock);
519 //add Subscriber to Publisher (only if private)
521 pthread_rwlock_rdlock(&d->publications.lock);
522 if ((cstWriter=CSTWriter_find(&d->publications,&o->guid))) {
523 pthread_rwlock_wrlock(&cstWriter->lock);
524 if (!CSTRemoteReader_find(cstWriter,&os->guid)) {
525 CSTWriterAddRemoteReader(d,cstWriter,os,os->oid);
526 debug(46,2) ("0x%x-0x%x-0x%x accepted 0x%x-0x%x-0x%x\n",
527 o->guid.hid,o->guid.aid,o->guid.oid,
528 os->guid.hid,os->guid.aid,os->guid.oid);
530 pthread_rwlock_unlock(&cstWriter->lock);
532 pthread_rwlock_unlock(&d->publications.lock);
536 pthread_rwlock_unlock(&d->psEntry.publicationsLock);