2 * $Id: RTPSAck.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 47 RTPS message ACK
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37 SequenceNumber *seqNumber,
38 ObjectId roid, ObjectId woid, Boolean f_bit)
40 SequenceNumber sn_tmp;
41 CDR_Endianness data_endian;
44 if (cdrCodec->buf_len < cdrCodec->wptr+28)
48 CDR_put_octet(cdrCodec, ACK);
51 flags = cdrCodec->data_endian;
54 CDR_put_octet(cdrCodec, flags);
57 CDR_put_ushort(cdrCodec, 24);
59 /* next data are sent in big endianing */
60 data_endian = cdrCodec->data_endian;
61 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
64 CDR_put_ulong(cdrCodec, roid);
67 CDR_put_ulong(cdrCodec, woid);
69 cdrCodec->data_endian = data_endian;
71 SeqNumberInc(sn_tmp, *seqNumber);
74 CDR_put_ulong(cdrCodec, sn_tmp.high);
75 CDR_put_ulong(cdrCodec, sn_tmp.low);
78 CDR_put_ulong(cdrCodec, 32);
79 CDR_put_ulong(cdrCodec, 0);
84 /**********************************************************************************/
86 RTPSAck(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
89 CSTWriter *cstWriter = NULL;
90 CSTRemoteReader *cstRemoteReader;
91 CSChangeForReader *csChangeForReader;
92 StateMachineSend stateMachineSendNew;
94 SequenceNumber sn, isn;
96 CDR_Endianness data_endian;
100 /* restore flag possition in submessage */
104 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
107 /* move reading possition to begin of submessage */
110 /* next data are sent in big endianing */
111 data_endian = cdrCodec->data_endian;
112 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
115 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
118 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
120 cdrCodec->data_endian = data_endian;
123 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
124 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
126 readerGUID.hid = mi->sourceHostId;
127 readerGUID.aid = mi->sourceAppId;
128 readerGUID.oid = roid;
130 debug(47, 3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
132 woid, mi->sourceHostId, mi->sourceAppId);
135 if ((d->guid.aid & 0x03) == MANAGER) {
137 case OID_WRITE_APPSELF:
138 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
139 cstWriter = &d->writerApplicationSelf;
140 readerGUID.hid = senderIPAddress;
141 readerGUID.aid = AID_UNKNOWN;
142 readerGUID.oid = roid;
144 cstWriter->objectEntryOID->objectEntryAID,
145 &cstWriter->registrationTimer,
149 pthread_rwlock_wrlock(&d->writerManagers.lock);
150 cstWriter = &d->writerManagers;
153 pthread_rwlock_wrlock(&d->writerApplications.lock);
154 cstWriter = &d->writerApplications;
160 if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
163 case OID_READ_APPSELF:
164 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
165 cstWriter = &d->writerApplicationSelf;
167 cstWriter->objectEntryOID->objectEntryAID,
168 &cstWriter->registrationTimer,
172 pthread_rwlock_wrlock(&d->writerPublications.lock);
173 cstWriter = &d->writerPublications;
176 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
177 cstWriter = &d->writerSubscriptions;
180 if ((woid & 0x07) == OID_PUBLICATION) {
181 GUID_RTPS guid = d->guid;
183 pthread_rwlock_rdlock(&d->publications.lock);
184 cstWriter = CSTWriter_find(&d->publications, &guid);
185 pthread_rwlock_wrlock(&cstWriter->lock);
193 if ((woid & 0x07) == OID_PUBLICATION)
194 pthread_rwlock_unlock(&d->publications.lock);
197 cstRemoteReader = CSTRemoteReader_find(cstWriter, &readerGUID);
198 if (!cstRemoteReader) {
199 pthread_rwlock_unlock(&cstWriter->lock);
200 if ((woid & 0x07) == OID_PUBLICATION)
201 pthread_rwlock_unlock(&d->publications.lock);
205 stateMachineSendNew = NOTHNIGTOSEND;
206 csChangeForReader = CSChangeForReader_first(cstRemoteReader);
207 while (csChangeForReader) {
208 isn = csChangeForReader->csChange->sn;
209 if (SeqNumberCmp(csChangeForReader->csChange->gapSN, noneSN) > 0) {
210 SeqNumberAdd(isn, csChangeForReader->csChange->sn, csChangeForReader->csChange->gapSN);
211 SeqNumberDec(isn, isn);
213 if (SeqNumberCmp(isn, sn) < 0) { //ACK
214 if (csChangeForReader->commStateChFReader != ACKNOWLEDGED) {
215 CSChangeForReader *csChangeForReaderDestroyed;
216 csChangeForReaderDestroyed = csChangeForReader;
217 csChangeForReader->commStateChFReader = ACKNOWLEDGED;
219 CSChangeForReader_next(cstRemoteReader, csChangeForReader);
220 if ((woid & 0x07) == OID_PUBLICATION) {
221 CSTWriterDestroyCSChangeForReader(
222 csChangeForReaderDestroyed, ORTE_TRUE);
226 CSChangeForReader_next(cstRemoteReader, csChangeForReader);
229 if (csChangeForReader->commStateChFReader != TOSEND) {
230 csChangeForReader->commStateChFReader = TOSEND;
231 cstRemoteReader->commStateToSentCounter++;
233 stateMachineSendNew = MUSTSENDDATA;
235 CSChangeForReader_next(cstRemoteReader, csChangeForReader);
239 if ((cstRemoteReader->commStateSend == NOTHNIGTOSEND) &&
240 (stateMachineSendNew == MUSTSENDDATA)) {
241 cstRemoteReader->commStateSend = stateMachineSendNew;
243 cstRemoteReader->sobject->objectEntryAID,
244 &cstRemoteReader->delayResponceTimer,
248 cstRemoteReader->sobject->objectEntryAID,
249 &cstRemoteReader->delayResponceTimer,
250 queue, //metatraffic timer
251 "CSTWriterSendTimer",
253 &cstRemoteReader->cstWriter->lock,
255 &cstRemoteReader->cstWriter->params.delayResponceTime);
258 cstRemoteReader->sobject->objectEntryAID,
259 &cstRemoteReader->delayResponceTimer,
260 queue, //userdata timer
261 "CSTWriterSendStrictTimer",
262 CSTWriterSendStrictTimer,
263 &cstRemoteReader->cstWriter->lock,
265 &cstRemoteReader->cstWriter->params.delayResponceTime);
269 if (stateMachineSendNew == NOTHNIGTOSEND) {
270 cstRemoteReader->commStateSend = NOTHNIGTOSEND;
273 cstRemoteReader->sobject->objectEntryAID,
274 &cstRemoteReader->delayResponceTimer,
278 cstRemoteReader->sobject->objectEntryAID,
279 &cstRemoteReader->repeatAnnounceTimer,
284 if ((!f_bit) && (cstRemoteReader->commStateSend == NOTHNIGTOSEND)) {
285 cstRemoteReader->commStateHB = MUSTSENDHB;
287 cstRemoteReader->sobject->objectEntryAID,
288 &cstRemoteReader->delayResponceTimer,
292 cstRemoteReader->sobject->objectEntryAID,
293 &cstRemoteReader->delayResponceTimer,
294 queue, //metatraffic timer
295 "CSTWriterSendTimer",
297 &cstRemoteReader->cstWriter->lock,
299 &cstRemoteReader->cstWriter->params.delayResponceTime);
302 cstRemoteReader->sobject->objectEntryAID,
303 &cstRemoteReader->delayResponceTimer,
304 queue, //userdata timer
305 "CSTWriterSendStrictTimer",
306 CSTWriterSendStrictTimer,
307 &cstRemoteReader->cstWriter->lock,
309 &cstRemoteReader->cstWriter->params.delayResponceTime);
313 pthread_rwlock_unlock(&cstWriter->lock);
314 if ((woid & 0x07) == OID_PUBLICATION)
315 pthread_rwlock_unlock(&d->publications.lock);