]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSAck.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSAck.c
1 /*
2  *  $Id: RTPSAck.c,v 0.0.0.1            2003/10/07
3  *
4  *  DEBUG:  section 47                  RTPS message ACK
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37               SequenceNumber *seqNumber,
38               ObjectId roid, ObjectId woid, Boolean f_bit)
39 {
40   SequenceNumber     sn_tmp;
41   CDR_Endianness     data_endian;
42   CORBA_octet        flags;
43
44   if (cdrCodec->buf_len < cdrCodec->wptr+28)
45     return -1;
46
47   /* submessage id */
48   CDR_put_octet(cdrCodec, ACK);
49
50   /* flags */
51   flags = cdrCodec->data_endian;
52   if (f_bit)
53     flags |= 2;
54   CDR_put_octet(cdrCodec, flags);
55
56   /* length */
57   CDR_put_ushort(cdrCodec, 24);
58
59   /* next data are sent in big endianing */
60   data_endian = cdrCodec->data_endian;
61   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
62
63   /* readerObjectId */
64   CDR_put_ulong(cdrCodec, roid);
65
66   /* writerObjectId */
67   CDR_put_ulong(cdrCodec, woid);
68
69   cdrCodec->data_endian = data_endian;
70
71   SeqNumberInc(sn_tmp, *seqNumber);
72
73   /* SeqNumber */
74   CDR_put_ulong(cdrCodec, sn_tmp.high);
75   CDR_put_ulong(cdrCodec, sn_tmp.low);
76
77   /* bitmap - bits */
78   CDR_put_ulong(cdrCodec, 32);
79   CDR_put_ulong(cdrCodec, 0);
80
81   return 28;
82 }
83
84 /**********************************************************************************/
85 void
86 RTPSAck(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
87 {
88   GUID_RTPS          readerGUID;
89   CSTWriter          *cstWriter = NULL;
90   CSTRemoteReader    *cstRemoteReader;
91   CSChangeForReader  *csChangeForReader;
92   StateMachineSend   stateMachineSendNew;
93   ObjectId           roid, woid;
94   SequenceNumber     sn, isn;
95   char               queue = 1;
96   CDR_Endianness     data_endian;
97   CORBA_octet        flags;
98   char               f_bit;
99
100   /* restore flag possition in submessage */
101   cdrCodec->rptr -= 3;
102
103   /* flags */
104   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
105   f_bit = flags & 2;
106
107   /* move reading possition to begin of submessage */
108   cdrCodec->rptr += 2;
109
110   /* next data are sent in big endianing */
111   data_endian = cdrCodec->data_endian;
112   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
113
114   /* readerObjectId */
115   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
116
117   /* writerObjectId */
118   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
119
120   cdrCodec->data_endian = data_endian;
121
122   /* SeqNumber */
123   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
124   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
125
126   readerGUID.hid = mi->sourceHostId;
127   readerGUID.aid = mi->sourceAppId;
128   readerGUID.oid = roid;
129
130   debug(47, 3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
131                 f_bit ? 'F' : 'f',
132                 woid, mi->sourceHostId, mi->sourceAppId);
133
134   /* Manager */
135   if ((d->guid.aid & 0x03) == MANAGER) {
136     switch (woid) {
137       case OID_WRITE_APPSELF:
138         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
139         cstWriter = &d->writerApplicationSelf;
140         readerGUID.hid = senderIPAddress;
141         readerGUID.aid = AID_UNKNOWN;
142         readerGUID.oid = roid;
143         eventDetach(d,
144                     cstWriter->objectEntryOID->objectEntryAID,
145                     &cstWriter->registrationTimer,
146                     0); //common timer
147         break;
148       case OID_WRITE_MGR:
149         pthread_rwlock_wrlock(&d->writerManagers.lock);
150         cstWriter = &d->writerManagers;
151         break;
152       case OID_WRITE_APP:
153         pthread_rwlock_wrlock(&d->writerApplications.lock);
154         cstWriter = &d->writerApplications;
155         break;
156     }
157   }
158
159   /* Application */
160   if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
161     switch (roid) {
162       case OID_READ_APP:
163       case OID_READ_APPSELF:
164         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
165         cstWriter = &d->writerApplicationSelf;
166         eventDetach(d,
167                     cstWriter->objectEntryOID->objectEntryAID,
168                     &cstWriter->registrationTimer,
169                     0); //common timer
170         break;
171       case OID_READ_PUBL:
172         pthread_rwlock_wrlock(&d->writerPublications.lock);
173         cstWriter = &d->writerPublications;
174         break;
175       case OID_READ_SUBS:
176         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
177         cstWriter = &d->writerSubscriptions;
178         break;
179       default:
180         if ((woid & 0x07) == OID_PUBLICATION) {
181           GUID_RTPS  guid = d->guid;
182           guid.oid = woid;
183           pthread_rwlock_rdlock(&d->publications.lock);
184           cstWriter = CSTWriter_find(&d->publications, &guid);
185           pthread_rwlock_wrlock(&cstWriter->lock);
186           queue = 2;
187         }
188         break;
189     }
190   }
191
192   if (!cstWriter) {
193     if ((woid & 0x07) == OID_PUBLICATION)
194       pthread_rwlock_unlock(&d->publications.lock);
195     return;
196   }
197   cstRemoteReader = CSTRemoteReader_find(cstWriter, &readerGUID);
198   if (!cstRemoteReader) {
199     pthread_rwlock_unlock(&cstWriter->lock);
200     if ((woid & 0x07) == OID_PUBLICATION)
201       pthread_rwlock_unlock(&d->publications.lock);
202     return;
203   }
204
205   stateMachineSendNew = NOTHNIGTOSEND;
206   csChangeForReader = CSChangeForReader_first(cstRemoteReader);
207   while (csChangeForReader) {
208     isn = csChangeForReader->csChange->sn;
209     if (SeqNumberCmp(csChangeForReader->csChange->gapSN, noneSN) > 0) {
210       SeqNumberAdd(isn, csChangeForReader->csChange->sn, csChangeForReader->csChange->gapSN);
211       SeqNumberDec(isn, isn);
212     }
213     if (SeqNumberCmp(isn, sn) < 0) {     //ACK
214       if (csChangeForReader->commStateChFReader != ACKNOWLEDGED) {
215         CSChangeForReader *csChangeForReaderDestroyed;
216         csChangeForReaderDestroyed = csChangeForReader;
217         csChangeForReader->commStateChFReader = ACKNOWLEDGED;
218         csChangeForReader =
219           CSChangeForReader_next(cstRemoteReader, csChangeForReader);
220         if ((woid & 0x07) == OID_PUBLICATION) {
221           CSTWriterDestroyCSChangeForReader(
222             csChangeForReaderDestroyed, ORTE_TRUE);
223         }
224       } else {
225         csChangeForReader =
226           CSChangeForReader_next(cstRemoteReader, csChangeForReader);
227       }
228     } else {                                                      //NACK
229       if (csChangeForReader->commStateChFReader != TOSEND) {
230         csChangeForReader->commStateChFReader = TOSEND;
231         cstRemoteReader->commStateToSentCounter++;
232       }
233       stateMachineSendNew = MUSTSENDDATA;
234       csChangeForReader =
235         CSChangeForReader_next(cstRemoteReader, csChangeForReader);
236     }
237   }
238
239   if ((cstRemoteReader->commStateSend == NOTHNIGTOSEND) &&
240       (stateMachineSendNew == MUSTSENDDATA)) {
241     cstRemoteReader->commStateSend = stateMachineSendNew;
242     eventDetach(d,
243                 cstRemoteReader->sobject->objectEntryAID,
244                 &cstRemoteReader->delayResponceTimer,
245                 queue);
246     if (queue == 1) {
247       eventAdd(d,
248                cstRemoteReader->sobject->objectEntryAID,
249                &cstRemoteReader->delayResponceTimer,
250                queue, //metatraffic timer
251                "CSTWriterSendTimer",
252                CSTWriterSendTimer,
253                &cstRemoteReader->cstWriter->lock,
254                cstRemoteReader,
255                &cstRemoteReader->cstWriter->params.delayResponceTime);
256     } else {
257       eventAdd(d,
258                cstRemoteReader->sobject->objectEntryAID,
259                &cstRemoteReader->delayResponceTimer,
260                queue, //userdata timer
261                "CSTWriterSendStrictTimer",
262                CSTWriterSendStrictTimer,
263                &cstRemoteReader->cstWriter->lock,
264                cstRemoteReader,
265                &cstRemoteReader->cstWriter->params.delayResponceTime);
266     }
267   }
268
269   if (stateMachineSendNew == NOTHNIGTOSEND) {
270     cstRemoteReader->commStateSend = NOTHNIGTOSEND;
271     if (queue == 1) {
272       eventDetach(d,
273                   cstRemoteReader->sobject->objectEntryAID,
274                   &cstRemoteReader->delayResponceTimer,
275                   queue);
276     } else {
277       eventDetach(d,
278                   cstRemoteReader->sobject->objectEntryAID,
279                   &cstRemoteReader->repeatAnnounceTimer,
280                   queue);
281     }
282   }
283
284   if ((!f_bit) && (cstRemoteReader->commStateSend == NOTHNIGTOSEND)) {
285     cstRemoteReader->commStateHB = MUSTSENDHB;
286     eventDetach(d,
287                 cstRemoteReader->sobject->objectEntryAID,
288                 &cstRemoteReader->delayResponceTimer,
289                 queue);
290     if (queue == 1) {
291       eventAdd(d,
292                cstRemoteReader->sobject->objectEntryAID,
293                &cstRemoteReader->delayResponceTimer,
294                queue, //metatraffic timer
295                "CSTWriterSendTimer",
296                CSTWriterSendTimer,
297                &cstRemoteReader->cstWriter->lock,
298                cstRemoteReader,
299                &cstRemoteReader->cstWriter->params.delayResponceTime);
300     } else {
301       eventAdd(d,
302                cstRemoteReader->sobject->objectEntryAID,
303                &cstRemoteReader->delayResponceTimer,
304                queue, //userdata timer
305                "CSTWriterSendStrictTimer",
306                CSTWriterSendStrictTimer,
307                &cstRemoteReader->cstWriter->lock,
308                cstRemoteReader,
309                &cstRemoteReader->cstWriter->params.delayResponceTime);
310     }
311   }
312
313   pthread_rwlock_unlock(&cstWriter->lock);
314   if ((woid & 0x07) == OID_PUBLICATION)
315     pthread_rwlock_unlock(&d->publications.lock);
316 }