2 * $Id: RTPSAck.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 47 RTPS message ACK
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37 SequenceNumber *seqNumber,
38 ObjectId roid,ObjectId woid,Boolean f_bit)
40 SequenceNumber sn_tmp;
41 CDR_Endianness data_endian;
44 if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
47 CDR_put_octet(cdrCodec,ACK);
50 flags=cdrCodec->data_endian;
52 CDR_put_octet(cdrCodec,flags);
55 CDR_put_ushort(cdrCodec,24);
57 /* next data are sent in big endianing */
58 data_endian=cdrCodec->data_endian;
59 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
62 CDR_put_ulong(cdrCodec,roid);
65 CDR_put_ulong(cdrCodec,woid);
67 cdrCodec->data_endian=data_endian;
69 SeqNumberInc(sn_tmp,*seqNumber);
72 CDR_put_ulong(cdrCodec,sn_tmp.high);
73 CDR_put_ulong(cdrCodec,sn_tmp.low);
76 CDR_put_ulong(cdrCodec,32);
77 CDR_put_ulong(cdrCodec,0);
82 /**********************************************************************************/
84 RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
86 CSTWriter *cstWriter=NULL;
87 CSTRemoteReader *cstRemoteReader;
88 CSChangeForReader *csChangeForReader;
89 StateMachineSend stateMachineSendNew;
91 SequenceNumber sn,isn;
93 CDR_Endianness data_endian;
97 /* restore flag possition in submessage */
101 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
104 /* move reading possition to begin of submessage */
107 /* next data are sent in big endianing */
108 data_endian=cdrCodec->data_endian;
109 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
112 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
115 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
117 cdrCodec->data_endian=data_endian;
120 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
121 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
123 readerGUID.hid=mi->sourceHostId;
124 readerGUID.aid=mi->sourceAppId;
127 debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
129 woid,mi->sourceHostId,mi->sourceAppId);
132 if ((d->guid.aid & 0x03)==MANAGER) {
134 case OID_WRITE_APPSELF:
135 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
136 cstWriter=&d->writerApplicationSelf;
137 readerGUID.hid=senderIPAddress;
138 readerGUID.aid=AID_UNKNOWN;
141 cstWriter->objectEntryOID->objectEntryAID,
142 &cstWriter->registrationTimer,
146 pthread_rwlock_wrlock(&d->writerManagers.lock);
147 cstWriter=&d->writerManagers;
150 pthread_rwlock_wrlock(&d->writerApplications.lock);
151 cstWriter=&d->writerApplications;
157 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
160 case OID_READ_APPSELF:
161 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
162 cstWriter=&d->writerApplicationSelf;
164 cstWriter->objectEntryOID->objectEntryAID,
165 &cstWriter->registrationTimer,
169 pthread_rwlock_wrlock(&d->writerPublications.lock);
170 cstWriter=&d->writerPublications;
173 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
174 cstWriter=&d->writerSubscriptions;
177 if ((woid & 0x07) == OID_PUBLICATION) {
178 GUID_RTPS guid=d->guid;
180 pthread_rwlock_rdlock(&d->publications.lock);
181 cstWriter=CSTWriter_find(&d->publications,&guid);
182 pthread_rwlock_wrlock(&cstWriter->lock);
190 if ((woid & 0x07) == OID_PUBLICATION)
191 pthread_rwlock_unlock(&d->publications.lock);
194 cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
195 if (!cstRemoteReader) {
196 pthread_rwlock_unlock(&cstWriter->lock);
197 if ((woid & 0x07) == OID_PUBLICATION)
198 pthread_rwlock_unlock(&d->publications.lock);
202 stateMachineSendNew=NOTHNIGTOSEND;
203 csChangeForReader=CSChangeForReader_first(cstRemoteReader);
204 while(csChangeForReader) {
205 isn=csChangeForReader->csChange->sn;
206 if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)>0) {
207 SeqNumberAdd(isn,csChangeForReader->csChange->sn,csChangeForReader->csChange->gapSN);
208 SeqNumberDec(isn,isn);
210 if (SeqNumberCmp(isn,sn)<0) { //ACK
211 if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
212 CSChangeForReader *csChangeForReaderDestroyed;
213 csChangeForReaderDestroyed=csChangeForReader;
214 csChangeForReader->commStateChFReader=ACKNOWLEDGED;
216 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
217 if ((woid & 0x07) == OID_PUBLICATION) {
218 CSTWriterDestroyCSChangeForReader(
219 csChangeForReaderDestroyed,ORTE_TRUE);
223 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
226 if (csChangeForReader->commStateChFReader!=TOSEND) {
227 csChangeForReader->commStateChFReader=TOSEND;
228 cstRemoteReader->commStateToSentCounter++;
230 stateMachineSendNew=MUSTSENDDATA;
232 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
236 if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
237 (stateMachineSendNew==MUSTSENDDATA)) {
238 cstRemoteReader->commStateSend=stateMachineSendNew;
240 cstRemoteReader->sobject->objectEntryAID,
241 &cstRemoteReader->delayResponceTimer,
245 cstRemoteReader->sobject->objectEntryAID,
246 &cstRemoteReader->delayResponceTimer,
247 queue, //metatraffic timer
248 "CSTWriterSendTimer",
250 &cstRemoteReader->cstWriter->lock,
252 &cstRemoteReader->cstWriter->params.delayResponceTime);
255 cstRemoteReader->sobject->objectEntryAID,
256 &cstRemoteReader->delayResponceTimer,
257 queue, //userdata timer
258 "CSTWriterSendStrictTimer",
259 CSTWriterSendStrictTimer,
260 &cstRemoteReader->cstWriter->lock,
262 &cstRemoteReader->cstWriter->params.delayResponceTime);
266 if (stateMachineSendNew==NOTHNIGTOSEND) {
267 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
270 cstRemoteReader->sobject->objectEntryAID,
271 &cstRemoteReader->delayResponceTimer,
275 cstRemoteReader->sobject->objectEntryAID,
276 &cstRemoteReader->repeatAnnounceTimer,
281 if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
282 cstRemoteReader->commStateHB=MUSTSENDHB;
284 cstRemoteReader->sobject->objectEntryAID,
285 &cstRemoteReader->delayResponceTimer,
289 cstRemoteReader->sobject->objectEntryAID,
290 &cstRemoteReader->delayResponceTimer,
291 queue, //metatraffic timer
292 "CSTWriterSendTimer",
294 &cstRemoteReader->cstWriter->lock,
296 &cstRemoteReader->cstWriter->params.delayResponceTime);
299 cstRemoteReader->sobject->objectEntryAID,
300 &cstRemoteReader->delayResponceTimer,
301 queue, //userdata timer
302 "CSTWriterSendStrictTimer",
303 CSTWriterSendStrictTimer,
304 &cstRemoteReader->cstWriter->lock,
306 &cstRemoteReader->cstWriter->params.delayResponceTime);
310 pthread_rwlock_unlock(&cstWriter->lock);
311 if ((woid & 0x07) == OID_PUBLICATION)
312 pthread_rwlock_unlock(&d->publications.lock);