2 * $Id: RTPSAck.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 47 RTPS message ACK
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37 SequenceNumber *seqNumber,
38 ObjectId roid,ObjectId woid,Boolean f_bit)
40 SequenceNumber sn_tmp;
41 CDR_Endianness data_endian;
44 if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
47 CDR_put_octet(cdrCodec,ACK);
50 flags=cdrCodec->data_endian;
52 CDR_put_octet(cdrCodec,flags);
55 CDR_put_ushort(cdrCodec,24);
57 /* next data are sent in big endianing */
58 data_endian=cdrCodec->data_endian;
59 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
62 CDR_put_ulong(cdrCodec,roid);
65 CDR_put_ulong(cdrCodec,woid);
67 cdrCodec->data_endian=data_endian;
69 SeqNumberInc(sn_tmp,*seqNumber);
72 CDR_put_ulong(cdrCodec,sn_tmp.high);
73 CDR_put_ulong(cdrCodec,sn_tmp.low);
76 CDR_put_ulong(cdrCodec,32);
77 CDR_put_ulong(cdrCodec,0);
82 /**********************************************************************************/
84 RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
86 CSTWriter *cstWriter=NULL;
87 CSTRemoteReader *cstRemoteReader;
88 CSChangeForReader *csChangeForReader;
89 StateMachineSend stateMachineSendNew;
93 CDR_Endianness data_endian;
97 /* restore flag possition in submessage */
101 CDR_get_octet(cdrCodec,&flags);
104 /* move reading possition to begin of submessage */
107 /* next data are sent in big endianing */
108 data_endian=cdrCodec->data_endian;
109 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
112 CDR_get_ulong(cdrCodec,&roid);
115 CDR_get_ulong(cdrCodec,&woid);
117 cdrCodec->data_endian=data_endian;
120 CDR_get_ulong(cdrCodec,&sn.high);
121 CDR_get_ulong(cdrCodec,&sn.low);
123 readerGUID.hid=mi->sourceHostId;
124 readerGUID.aid=mi->sourceAppId;
127 debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
129 woid,mi->sourceHostId,mi->sourceAppId);
132 if ((d->guid.aid & 0x03)==MANAGER) {
134 case OID_WRITE_APPSELF:
135 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
136 cstWriter=&d->writerApplicationSelf;
137 readerGUID.hid=senderIPAddress;
138 readerGUID.aid=AID_UNKNOWN;
141 cstWriter->objectEntryOID->objectEntryAID,
142 &cstWriter->registrationTimer,
146 pthread_rwlock_wrlock(&d->writerManagers.lock);
147 cstWriter=&d->writerManagers;
150 pthread_rwlock_wrlock(&d->writerApplications.lock);
151 cstWriter=&d->writerApplications;
157 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
160 case OID_READ_APPSELF:
161 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
162 cstWriter=&d->writerApplicationSelf;
164 cstWriter->objectEntryOID->objectEntryAID,
165 &cstWriter->registrationTimer,
169 pthread_rwlock_wrlock(&d->writerPublications.lock);
170 cstWriter=&d->writerPublications;
173 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
174 cstWriter=&d->writerSubscriptions;
177 if ((woid & 0x07) == OID_PUBLICATION) {
178 GUID_RTPS guid=d->guid;
180 pthread_rwlock_rdlock(&d->publications.lock);
181 cstWriter=CSTWriter_find(&d->publications,&guid);
182 pthread_rwlock_wrlock(&cstWriter->lock);
190 if ((woid & 0x07) == OID_PUBLICATION)
191 pthread_rwlock_unlock(&d->publications.lock);
194 cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
195 if (!cstRemoteReader) {
196 pthread_rwlock_unlock(&cstWriter->lock);
197 if ((woid & 0x07) == OID_PUBLICATION)
198 pthread_rwlock_unlock(&d->publications.lock);
202 stateMachineSendNew=NOTHNIGTOSEND;
203 csChangeForReader=CSChangeForReader_first(cstRemoteReader);
204 while(csChangeForReader) {
205 if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0) { //ACK
206 if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
207 CSChangeForReader *csChangeForReaderDestroyed;
208 csChangeForReaderDestroyed=csChangeForReader;
209 csChangeForReader->commStateChFReader=ACKNOWLEDGED;
211 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
212 if ((woid & 0x07) == OID_PUBLICATION) {
213 CSTWriterDestroyCSChangeForReader(
214 csChangeForReaderDestroyed,ORTE_TRUE);
218 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
221 if (csChangeForReader->commStateChFReader!=TOSEND) {
222 csChangeForReader->commStateChFReader=TOSEND;
223 cstRemoteReader->commStateToSentCounter++;
225 stateMachineSendNew=MUSTSENDDATA;
227 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
231 if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
232 (stateMachineSendNew==MUSTSENDDATA)) {
233 cstRemoteReader->commStateSend=stateMachineSendNew;
235 cstRemoteReader->sobject->objectEntryAID,
236 &cstRemoteReader->delayResponceTimer,
240 cstRemoteReader->sobject->objectEntryAID,
241 &cstRemoteReader->delayResponceTimer,
242 queue, //metatraffic timer
243 "CSTWriterSendTimer",
245 &cstRemoteReader->cstWriter->lock,
247 &cstRemoteReader->cstWriter->params.delayResponceTime);
250 cstRemoteReader->sobject->objectEntryAID,
251 &cstRemoteReader->delayResponceTimer,
252 queue, //userdata timer
253 "CSTWriterSendStrictTimer",
254 CSTWriterSendStrictTimer,
255 &cstRemoteReader->cstWriter->lock,
257 &cstRemoteReader->cstWriter->params.delayResponceTime);
261 if (stateMachineSendNew==NOTHNIGTOSEND) {
262 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
265 cstRemoteReader->sobject->objectEntryAID,
266 &cstRemoteReader->delayResponceTimer,
270 cstRemoteReader->sobject->objectEntryAID,
271 &cstRemoteReader->repeatAnnounceTimer,
276 if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
277 cstRemoteReader->commStateHB=MUSTSENDHB;
279 cstRemoteReader->sobject->objectEntryAID,
280 &cstRemoteReader->delayResponceTimer,
284 cstRemoteReader->sobject->objectEntryAID,
285 &cstRemoteReader->delayResponceTimer,
286 queue, //metatraffic timer
287 "CSTWriterSendTimer",
289 &cstRemoteReader->cstWriter->lock,
291 &cstRemoteReader->cstWriter->params.delayResponceTime);
294 cstRemoteReader->sobject->objectEntryAID,
295 &cstRemoteReader->delayResponceTimer,
296 queue, //userdata timer
297 "CSTWriterSendStrictTimer",
298 CSTWriterSendStrictTimer,
299 &cstRemoteReader->cstWriter->lock,
301 &cstRemoteReader->cstWriter->params.delayResponceTime);
305 pthread_rwlock_unlock(&cstWriter->lock);
306 if ((woid & 0x07) == OID_PUBLICATION)
307 pthread_rwlock_unlock(&d->publications.lock);