2 * $Id: RTPSHeardBeat.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 48 RTPS message HeardBeat
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
37 SequenceNumber *fsn, SequenceNumber *lsn,
38 ObjectId roid, ObjectId woid, Boolean f_bit)
40 CDR_Endianness data_endian;
43 if (cdrCodec->buf_len < cdrCodec->wptr+28)
47 CDR_put_octet(cdrCodec, HEARTBEAT);
50 flags = cdrCodec->data_endian;
53 CDR_put_octet(cdrCodec, flags);
56 CDR_put_ushort(cdrCodec, 24);
58 /* next data are sent in big endianing */
59 data_endian = cdrCodec->data_endian;
60 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
63 CDR_put_ulong(cdrCodec, roid);
66 CDR_put_ulong(cdrCodec, woid);
68 cdrCodec->data_endian = data_endian;
71 CDR_put_ulong(cdrCodec, fsn->high);
72 CDR_put_ulong(cdrCodec, fsn->low);
75 CDR_put_ulong(cdrCodec, lsn->high);
76 CDR_put_ulong(cdrCodec, lsn->low);
81 /**********************************************************************************/
83 HeartBeatProc(CSTReader *cstReader, GUID_RTPS *writerGUID,
84 SequenceNumber *fsn, SequenceNumber *lsn, char f_bit)
86 CSTRemoteWriter *cstRemoteWriter;
90 cstRemoteWriter = CSTRemoteWriter_find(cstReader, writerGUID);
94 cstRemoteWriter->firstSN = *fsn;
95 cstRemoteWriter->lastSN = *lsn;
96 cstRemoteWriter->ACKRetriesCounter = 0;
98 if (SeqNumberCmp(cstRemoteWriter->sn, *lsn) > 0)
99 cstRemoteWriter->sn = *lsn;
100 if (SeqNumberCmp(cstRemoteWriter->sn, *fsn) < 0) {
101 if (SeqNumberCmp(*fsn, noneSN) != 0) {
102 SeqNumberDec(cstRemoteWriter->sn, *fsn);
106 if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
107 CSTReaderProcCSChangesIssue(cstRemoteWriter, ORTE_FALSE);
109 CSTReaderProcCSChanges(cstReader->domain, cstRemoteWriter);
112 if ((!f_bit) && (cstRemoteWriter->commStateACK == WAITING)) {
114 cstRemoteWriter->commStateACK = ACKPENDING;
115 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
117 eventDetach(cstReader->domain,
118 cstRemoteWriter->spobject->objectEntryAID,
119 &cstRemoteWriter->repeatActiveQueryTimer,
121 eventDetach(cstReader->domain,
122 cstRemoteWriter->spobject->objectEntryAID,
123 &cstRemoteWriter->delayResponceTimer,
124 queue); //metatraffic timer
125 eventAdd(cstReader->domain,
126 cstRemoteWriter->spobject->objectEntryAID,
127 &cstRemoteWriter->delayResponceTimer,
128 queue, //metatraffic timer
129 "CSTReaderResponceTimer",
130 CSTReaderResponceTimer,
131 &cstRemoteWriter->cstReader->lock,
133 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
137 /**********************************************************************************/
139 RTPSHeartBeat(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi)
141 GUID_RTPS writerGUID;
143 SequenceNumber fsn, lsn;
144 CSTReader *cstReader = NULL;
145 CDR_Endianness data_endian;
149 /* restore flag possition in submessage */
153 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
156 /* move reading possition to begin of submessage */
159 /* next data are sent in big endianing */
160 data_endian = cdrCodec->data_endian;
161 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
164 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
167 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
169 cdrCodec->data_endian = data_endian;
172 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
173 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
176 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.high);
177 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&lsn.low);
179 if (SeqNumberCmp(fsn, lsn) == 1)
180 return; // lsn<fsn -> break
181 writerGUID.hid = mi->sourceHostId;
182 writerGUID.aid = mi->sourceAppId;
183 writerGUID.oid = woid;
185 debug(48, 3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
186 f_bit ? 'F' : 'f', woid, mi->sourceHostId, mi->sourceAppId);
188 if ((d->guid.aid & 0x03) == MANAGER) {
189 if ((writerGUID.oid == OID_WRITE_APPSELF) &&
190 ((writerGUID.aid & 0x03) == MANAGER)) {
191 pthread_rwlock_wrlock(&d->readerManagers.lock);
192 cstReader = &d->readerManagers;
194 if (((writerGUID.oid == OID_WRITE_APPSELF) &&
195 ((writerGUID.aid & 0x03) == MANAGEDAPPLICATION)) ||
196 ((writerGUID.oid == OID_WRITE_APP) &&
197 ((writerGUID.aid & 0x03) == MANAGER))) {
198 pthread_rwlock_wrlock(&d->readerApplications.lock);
199 cstReader = &d->readerApplications;
203 if ((d->guid.aid & 3) == MANAGEDAPPLICATION) {
204 switch (writerGUID.oid) {
206 pthread_rwlock_wrlock(&d->readerManagers.lock);
207 cstReader = &d->readerManagers;
210 pthread_rwlock_wrlock(&d->readerApplications.lock);
211 cstReader = &d->readerApplications;
214 pthread_rwlock_wrlock(&d->readerPublications.lock);
215 cstReader = &d->readerPublications;
218 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
219 cstReader = &d->readerSubscriptions;
223 if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
224 pthread_rwlock_rdlock(&d->subscriptions.lock);
225 gavl_cust_for_each(CSTReader, &d->subscriptions, cstReader) {
226 pthread_rwlock_wrlock(&cstReader->lock);
227 HeartBeatProc(cstReader, &writerGUID, &fsn, &lsn, f_bit);
228 pthread_rwlock_unlock(&cstReader->lock);
230 pthread_rwlock_unlock(&d->subscriptions.lock);
235 HeartBeatProc(cstReader, &writerGUID, &fsn, &lsn, f_bit);
238 pthread_rwlock_unlock(&cstReader->lock);