2 * $Id: RTPSHeardBeat.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 48 RTPS message HeardBeat
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /**********************************************************************************/
26 RTPSHeardBeatCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
27 SequenceNumber *firstSeqNumber,SequenceNumber *lastSeqNumber,
28 ObjectId woid,ObjectId roid,Boolean f_bit) {
29 if (max_msg_len<28) return -1;
30 rtps_msg[0]=(uint8_t)HEARTBEAT;
31 rtps_msg[1]=ORTE_MY_MBO;
32 if (f_bit) rtps_msg[1]|=2;
33 *((ParameterLength*)(rtps_msg+2))=24;
35 *((ObjectId*)(rtps_msg+4))=roid;
37 *((ObjectId*)(rtps_msg+8))=woid;
38 *((SequenceNumber*)(rtps_msg+12))=*firstSeqNumber;
39 *((SequenceNumber*)(rtps_msg+20))=*lastSeqNumber;
43 /**********************************************************************************/
45 HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
46 SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
47 CSTRemoteWriter *cstRemoteWriter;
49 if (!cstReader) return;
50 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
51 if (!cstRemoteWriter) return;
52 cstRemoteWriter->firstSN=*fsn;
53 cstRemoteWriter->lastSN=*lsn;
54 cstRemoteWriter->ACKRetriesCounter=0;
55 if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
56 cstRemoteWriter->sn=*lsn;
57 if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
58 if (SeqNumberCmp(*fsn,noneSN)!=0) {
59 SeqNumberDec(cstRemoteWriter->sn,*fsn);
62 if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
63 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
65 CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
67 if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
69 cstRemoteWriter->commStateACK=ACKPENDING;
70 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
72 eventDetach(cstReader->domain,
73 cstRemoteWriter->objectEntryOID->objectEntryAID,
74 &cstRemoteWriter->repeatActiveQueryTimer,
76 eventDetach(cstReader->domain,
77 cstRemoteWriter->objectEntryOID->objectEntryAID,
78 &cstRemoteWriter->delayResponceTimer,
79 queue); //metatraffic timer
80 eventAdd(cstReader->domain,
81 cstRemoteWriter->objectEntryOID->objectEntryAID,
82 &cstRemoteWriter->delayResponceTimer,
83 queue, //metatraffic timer
84 "CSTReaderResponceTimer",
85 CSTReaderResponceTimer,
86 &cstRemoteWriter->cstReader->lock,
88 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
92 /**********************************************************************************/
94 RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
97 SequenceNumber fsn,lsn;
99 CSTReader *cstReader=NULL;
101 e_bit=rtps_msg[1] & 0x01;
102 f_bit=(rtps_msg[1] & 0x02)>>1;
103 roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
105 woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
107 fsn=*((SequenceNumber*)(rtps_msg+12)); /* firstSeqNumber */
109 lsn=*((SequenceNumber*)(rtps_msg+20)); /* lastSeqNumber */
111 if (SeqNumberCmp(fsn,lsn)==1) return; // lsn<fsn -> break
112 writerGUID.hid=mi->sourceHostId;
113 writerGUID.aid=mi->sourceAppId;
116 debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
117 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
119 if ((d->guid.aid & 0x03)==MANAGER) {
120 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
121 ((writerGUID.aid & 0x03)==MANAGER)) {
122 pthread_rwlock_wrlock(&d->readerManagers.lock);
123 cstReader=&d->readerManagers;
125 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
126 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
127 ((writerGUID.oid==OID_WRITE_APP) &&
128 ((writerGUID.aid & 0x03)==MANAGER))) {
129 pthread_rwlock_wrlock(&d->readerApplications.lock);
130 cstReader=&d->readerApplications;
133 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
134 switch (writerGUID.oid) {
136 pthread_rwlock_wrlock(&d->readerManagers.lock);
137 cstReader=&d->readerManagers;
140 pthread_rwlock_wrlock(&d->readerApplications.lock);
141 cstReader=&d->readerApplications;
144 pthread_rwlock_wrlock(&d->readerPublications.lock);
145 cstReader=&d->readerPublications;
148 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
149 cstReader=&d->readerSubscriptions;
152 if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
153 pthread_rwlock_rdlock(&d->subscriptions.lock);
154 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
155 pthread_rwlock_wrlock(&cstReader->lock);
156 HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
157 pthread_rwlock_unlock(&cstReader->lock);
159 pthread_rwlock_unlock(&d->subscriptions.lock);
163 HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
165 pthread_rwlock_unlock(&cstReader->lock);