2 * $Id: RTPSAck.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 47 RTPS message ACK
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /**********************************************************************************/
26 RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
27 SequenceNumber *seqNumber,
28 ObjectId roid,ObjectId woid,Boolean f_bit) {
29 SequenceNumber sn_tmp;
31 if (max_msg_len<28) return -1;
32 rtps_msg[0]=(uint8_t)ACK;
33 rtps_msg[1]=ORTE_MY_MBO;
34 if (f_bit) rtps_msg[1]|=2;
35 *((ParameterLength*)(rtps_msg+2))=24;
37 *((ObjectId*)(rtps_msg+4))=roid;
39 *((ObjectId*)(rtps_msg+8))=woid;
40 SeqNumberInc(sn_tmp,*seqNumber);
41 *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
42 *((uint32_t*)(rtps_msg+20))=32;
43 *((uint32_t*)(rtps_msg+24))=0;
47 /**********************************************************************************/
49 RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
51 CSTWriter *cstWriter=NULL;
52 CSTRemoteReader *cstRemoteReader;
53 CSChangeForReader *csChangeForReader;
54 StateMachineSend stateMachineSendNew;
60 e_bit=rtps_msg[1] & 0x01;
61 f_bit=(rtps_msg[1] & 0x02)>>1;
62 roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
64 woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
66 sn=*((SequenceNumber*)(rtps_msg+12)); /* Bitmap - SN */
68 readerGUID.hid=mi->sourceHostId;
69 readerGUID.aid=mi->sourceAppId;
72 debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
74 woid,mi->sourceHostId,mi->sourceAppId);
77 if ((d->guid.aid & 0x03)==MANAGER) {
79 case OID_WRITE_APPSELF:
80 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
81 cstWriter=&d->writerApplicationSelf;
82 readerGUID.hid=senderIPAddress;
83 readerGUID.aid=AID_UNKNOWN;
87 pthread_rwlock_wrlock(&d->writerManagers.lock);
88 cstWriter=&d->writerManagers;
91 pthread_rwlock_wrlock(&d->writerApplications.lock);
92 cstWriter=&d->writerApplications;
97 if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
100 case OID_READ_APPSELF:
101 pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
102 cstWriter=&d->writerApplicationSelf;
105 pthread_rwlock_wrlock(&d->writerPublications.lock);
106 cstWriter=&d->writerPublications;
109 pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
110 cstWriter=&d->writerSubscriptions;
113 if ((woid & 0x07) == OID_PUBLICATION) {
114 GUID_RTPS guid=d->guid;
116 pthread_rwlock_rdlock(&d->publications.lock);
117 cstWriter=CSTWriter_find(&d->publications,&guid);
118 pthread_rwlock_wrlock(&cstWriter->lock);
125 if ((woid & 0x07) == OID_PUBLICATION)
126 pthread_rwlock_unlock(&d->publications.lock);
129 cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
130 if (!cstRemoteReader) {
131 pthread_rwlock_unlock(&cstWriter->lock);
132 if ((woid & 0x07) == OID_PUBLICATION)
133 pthread_rwlock_unlock(&d->publications.lock);
136 stateMachineSendNew=NOTHNIGTOSEND;
137 csChangeForReader=CSChangeForReader_first(cstRemoteReader);
138 while(csChangeForReader) {
139 if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0) { //ACK
140 if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
141 CSChangeForReader *csChangeForReaderDestroyed;
142 csChangeForReaderDestroyed=csChangeForReader;
143 csChangeForReader->commStateChFReader=ACKNOWLEDGED;
145 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
146 if ((woid & 0x07) == OID_PUBLICATION) {
147 CSTWriterDestroyCSChangeForReader(cstRemoteReader,
148 csChangeForReaderDestroyed,ORTE_TRUE);
152 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
155 csChangeForReader->commStateChFReader=TOSEND;
156 stateMachineSendNew=MUSTSENDDATA;
158 CSChangeForReader_next(cstRemoteReader,csChangeForReader);
161 if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) &&
162 (stateMachineSendNew==MUSTSENDDATA)) {
163 cstRemoteReader->commStateSend=stateMachineSendNew;
165 cstRemoteReader->objectEntryOID->objectEntryAID,
166 &cstRemoteReader->delayResponceTimer,
170 cstRemoteReader->objectEntryOID->objectEntryAID,
171 &cstRemoteReader->delayResponceTimer,
172 queue, //metatraffic timer
173 "CSTWriterSendTimer",
175 &cstRemoteReader->cstWriter->lock,
177 &cstRemoteReader->cstWriter->params.delayResponceTime);
180 cstRemoteReader->objectEntryOID->objectEntryAID,
181 &cstRemoteReader->delayResponceTimer,
182 queue, //userdata timer
183 "CSTWriterSendStrictTimer",
184 CSTWriterSendStrictTimer,
185 &cstRemoteReader->cstWriter->lock,
187 &cstRemoteReader->cstWriter->params.delayResponceTime);
190 if (stateMachineSendNew==NOTHNIGTOSEND) {
191 cstRemoteReader->commStateSend=NOTHNIGTOSEND;
194 cstRemoteReader->objectEntryOID->objectEntryAID,
195 &cstRemoteReader->delayResponceTimer,
199 cstRemoteReader->objectEntryOID->objectEntryAID,
200 &cstRemoteReader->repeatAnnounceTimer,
204 if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
205 cstRemoteReader->commStateHB=MUSTSENDHB;
207 cstRemoteReader->objectEntryOID->objectEntryAID,
208 &cstRemoteReader->delayResponceTimer,
212 cstRemoteReader->objectEntryOID->objectEntryAID,
213 &cstRemoteReader->delayResponceTimer,
214 queue, //metatraffic timer
215 "CSTWriterSendTimer",
217 &cstRemoteReader->cstWriter->lock,
219 &cstRemoteReader->cstWriter->params.delayResponceTime);
222 cstRemoteReader->objectEntryOID->objectEntryAID,
223 &cstRemoteReader->delayResponceTimer,
224 queue, //userdata timer
225 "CSTWriterSendStrictTimer",
226 CSTWriterSendStrictTimer,
227 &cstRemoteReader->cstWriter->lock,
229 &cstRemoteReader->cstWriter->params.delayResponceTime);
232 pthread_rwlock_unlock(&cstWriter->lock);
233 if ((woid & 0x07) == OID_PUBLICATION)
234 pthread_rwlock_unlock(&d->publications.lock);