2 * $Id: RTPSHeardBeat.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 48 RTPS message HeardBeat
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
37 SequenceNumber *fsn,SequenceNumber *lsn,
38 ObjectId roid,ObjectId woid,Boolean f_bit)
40 CDR_Endianness data_endian;
43 if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
46 CDR_put_octet(cdrCodec,HEARTBEAT);
49 flags=cdrCodec->data_endian;
51 CDR_put_octet(cdrCodec,flags);
54 CDR_put_ushort(cdrCodec,24);
56 /* next data are sent in big endianing */
57 data_endian=cdrCodec->data_endian;
58 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
61 CDR_put_ulong(cdrCodec,roid);
64 CDR_put_ulong(cdrCodec,woid);
66 cdrCodec->data_endian=data_endian;
69 CDR_put_ulong(cdrCodec,fsn->high);
70 CDR_put_ulong(cdrCodec,fsn->low);
73 CDR_put_ulong(cdrCodec,lsn->high);
74 CDR_put_ulong(cdrCodec,lsn->low);
79 /**********************************************************************************/
81 HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
82 SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
83 CSTRemoteWriter *cstRemoteWriter;
85 if (!cstReader) return;
86 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
87 if (!cstRemoteWriter) return;
89 cstRemoteWriter->firstSN=*fsn;
90 cstRemoteWriter->lastSN=*lsn;
91 cstRemoteWriter->ACKRetriesCounter=0;
93 if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
94 cstRemoteWriter->sn=*lsn;
95 if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
96 if (SeqNumberCmp(*fsn,noneSN)!=0) {
97 SeqNumberDec(cstRemoteWriter->sn,*fsn);
101 if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
102 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
104 CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
107 if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
109 cstRemoteWriter->commStateACK=ACKPENDING;
110 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
112 eventDetach(cstReader->domain,
113 cstRemoteWriter->spobject->objectEntryAID,
114 &cstRemoteWriter->repeatActiveQueryTimer,
116 eventDetach(cstReader->domain,
117 cstRemoteWriter->spobject->objectEntryAID,
118 &cstRemoteWriter->delayResponceTimer,
119 queue); //metatraffic timer
120 eventAdd(cstReader->domain,
121 cstRemoteWriter->spobject->objectEntryAID,
122 &cstRemoteWriter->delayResponceTimer,
123 queue, //metatraffic timer
124 "CSTReaderResponceTimer",
125 CSTReaderResponceTimer,
126 &cstRemoteWriter->cstReader->lock,
128 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
132 /**********************************************************************************/
134 RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
135 GUID_RTPS writerGUID;
137 SequenceNumber fsn,lsn;
138 CSTReader *cstReader=NULL;
139 CDR_Endianness data_endian;
143 /* restore flag possition in submessage */
147 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
150 /* move reading possition to begin of submessage */
153 /* next data are sent in big endianing */
154 data_endian=cdrCodec->data_endian;
155 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
158 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
161 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
163 cdrCodec->data_endian=data_endian;
166 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
167 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
170 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.high);
171 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.low);
173 if (SeqNumberCmp(fsn,lsn)==1) return; // lsn<fsn -> break
174 writerGUID.hid=mi->sourceHostId;
175 writerGUID.aid=mi->sourceAppId;
178 debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
179 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
181 if ((d->guid.aid & 0x03)==MANAGER) {
182 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
183 ((writerGUID.aid & 0x03)==MANAGER)) {
184 pthread_rwlock_wrlock(&d->readerManagers.lock);
185 cstReader=&d->readerManagers;
187 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
188 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
189 ((writerGUID.oid==OID_WRITE_APP) &&
190 ((writerGUID.aid & 0x03)==MANAGER))) {
191 pthread_rwlock_wrlock(&d->readerApplications.lock);
192 cstReader=&d->readerApplications;
196 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
197 switch (writerGUID.oid) {
199 pthread_rwlock_wrlock(&d->readerManagers.lock);
200 cstReader=&d->readerManagers;
203 pthread_rwlock_wrlock(&d->readerApplications.lock);
204 cstReader=&d->readerApplications;
207 pthread_rwlock_wrlock(&d->readerPublications.lock);
208 cstReader=&d->readerPublications;
211 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
212 cstReader=&d->readerSubscriptions;
216 if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
217 pthread_rwlock_rdlock(&d->subscriptions.lock);
218 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
219 pthread_rwlock_wrlock(&cstReader->lock);
220 HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
221 pthread_rwlock_unlock(&cstReader->lock);
223 pthread_rwlock_unlock(&d->subscriptions.lock);
228 HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
231 pthread_rwlock_unlock(&cstReader->lock);