]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSAck.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / RTPSAck.c
1 /*
2  *  $Id: RTPSAck.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 47                  RTPS message ACK
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37     SequenceNumber *seqNumber,
38     ObjectId roid,ObjectId woid,Boolean f_bit) 
39 {
40   SequenceNumber     sn_tmp;                    
41   CDR_Endianness     data_endian;
42   CORBA_octet        flags;
43
44   if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
45
46   /* submessage id */
47   CDR_put_octet(cdrCodec,ACK);
48
49   /* flags */
50   flags=cdrCodec->data_endian;
51   if (f_bit) flags|=2;
52   CDR_put_octet(cdrCodec,flags);
53
54   /* length */
55   CDR_put_ushort(cdrCodec,24);
56
57   /* next data are sent in big endianing */
58   data_endian=cdrCodec->data_endian;
59   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
60
61   /* readerObjectId */
62   CDR_put_ulong(cdrCodec,roid);
63   
64   /* writerObjectId */
65   CDR_put_ulong(cdrCodec,woid);
66
67   cdrCodec->data_endian=data_endian;
68
69   SeqNumberInc(sn_tmp,*seqNumber);
70
71   /* SeqNumber */
72   CDR_put_ulong(cdrCodec,sn_tmp.high);
73   CDR_put_ulong(cdrCodec,sn_tmp.low);
74
75   /* bitmap - bits */
76   CDR_put_ulong(cdrCodec,32);
77   CDR_put_ulong(cdrCodec,0);
78
79   return 28;
80
81
82 /**********************************************************************************/
83 void 
84 RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
85   GUID_RTPS          readerGUID;
86   CSTWriter          *cstWriter=NULL;
87   CSTRemoteReader    *cstRemoteReader;
88   CSChangeForReader  *csChangeForReader;
89   StateMachineSend   stateMachineSendNew;
90   ObjectId           roid,woid;
91   SequenceNumber     sn,isn;   
92   char               queue=1;
93   CDR_Endianness     data_endian;
94   CORBA_octet        flags;
95   char               f_bit;
96
97   /* restore flag possition in submessage */
98   cdrCodec->rptr-=3;
99
100   /* flags */
101   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
102   f_bit=flags & 2;
103
104   /* move reading possition to begin of submessage */
105   cdrCodec->rptr+=2;
106
107   /* next data are sent in big endianing */
108   data_endian=cdrCodec->data_endian;
109   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
110
111   /* readerObjectId */
112   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
113   
114   /* writerObjectId */
115   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
116
117   cdrCodec->data_endian=data_endian;
118
119   /* SeqNumber */
120   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
121   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
122
123   readerGUID.hid=mi->sourceHostId;
124   readerGUID.aid=mi->sourceAppId;
125   readerGUID.oid=roid;
126
127   debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
128                 f_bit ? 'F':'f',
129                 woid,mi->sourceHostId,mi->sourceAppId);
130   
131   /* Manager */
132   if ((d->guid.aid & 0x03)==MANAGER) {
133     switch (woid) {
134       case OID_WRITE_APPSELF:
135         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
136         cstWriter=&d->writerApplicationSelf;
137         readerGUID.hid=senderIPAddress;
138         readerGUID.aid=AID_UNKNOWN;
139         readerGUID.oid=roid;
140         eventDetach(d,
141             cstWriter->objectEntryOID->objectEntryAID,
142             &cstWriter->registrationTimer,
143             0);   //common timer
144         break;
145       case OID_WRITE_MGR:
146         pthread_rwlock_wrlock(&d->writerManagers.lock);
147         cstWriter=&d->writerManagers;
148         break;
149       case OID_WRITE_APP:
150         pthread_rwlock_wrlock(&d->writerApplications.lock);
151         cstWriter=&d->writerApplications;
152         break;
153     }
154   }
155
156   /* Application */
157   if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
158     switch (roid) {
159       case OID_READ_APP:
160       case OID_READ_APPSELF:
161         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
162         cstWriter=&d->writerApplicationSelf;
163         eventDetach(d,
164             cstWriter->objectEntryOID->objectEntryAID,
165             &cstWriter->registrationTimer,
166             0);   //common timer
167         break;
168       case OID_READ_PUBL:
169         pthread_rwlock_wrlock(&d->writerPublications.lock);
170         cstWriter=&d->writerPublications;
171         break;
172       case OID_READ_SUBS:
173         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
174         cstWriter=&d->writerSubscriptions;
175         break;
176       default:
177         if ((woid & 0x07) == OID_PUBLICATION) {
178           GUID_RTPS  guid=d->guid;
179           guid.oid=woid;
180           pthread_rwlock_rdlock(&d->publications.lock);
181           cstWriter=CSTWriter_find(&d->publications,&guid);
182           pthread_rwlock_wrlock(&cstWriter->lock);
183           queue=2;
184         }
185         break;
186     }
187   }
188
189   if (!cstWriter) {
190     if ((woid & 0x07) == OID_PUBLICATION) 
191       pthread_rwlock_unlock(&d->publications.lock);
192     return;
193   }
194   cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
195   if (!cstRemoteReader) {
196     pthread_rwlock_unlock(&cstWriter->lock);
197     if ((woid & 0x07) == OID_PUBLICATION) 
198       pthread_rwlock_unlock(&d->publications.lock);
199     return;
200   }
201
202   stateMachineSendNew=NOTHNIGTOSEND;
203   csChangeForReader=CSChangeForReader_first(cstRemoteReader);
204   while(csChangeForReader) {
205     isn=csChangeForReader->csChange->sn;
206     if (SeqNumberCmp(csChangeForReader->csChange->gapSN,noneSN)>0) {
207       SeqNumberAdd(isn,csChangeForReader->csChange->sn,csChangeForReader->csChange->gapSN);
208       SeqNumberDec(isn,isn);
209     }
210     if (SeqNumberCmp(isn,sn)<0)   {   //ACK
211       if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
212         CSChangeForReader *csChangeForReaderDestroyed;
213         csChangeForReaderDestroyed=csChangeForReader;
214         csChangeForReader->commStateChFReader=ACKNOWLEDGED;
215         csChangeForReader=
216           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
217         if ((woid & 0x07) == OID_PUBLICATION) {
218           CSTWriterDestroyCSChangeForReader(
219             csChangeForReaderDestroyed,ORTE_TRUE);
220         }
221       } else {
222         csChangeForReader=
223           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
224       }
225     } else {                                                      //NACK
226       if (csChangeForReader->commStateChFReader!=TOSEND) {
227         csChangeForReader->commStateChFReader=TOSEND;
228         cstRemoteReader->commStateToSentCounter++;
229       }
230       stateMachineSendNew=MUSTSENDDATA;
231       csChangeForReader=
232         CSChangeForReader_next(cstRemoteReader,csChangeForReader);
233     }
234   }
235
236   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
237       (stateMachineSendNew==MUSTSENDDATA)) {
238     cstRemoteReader->commStateSend=stateMachineSendNew;
239     eventDetach(d,
240         cstRemoteReader->sobject->objectEntryAID,
241         &cstRemoteReader->delayResponceTimer,
242         queue);
243     if (queue==1) {
244       eventAdd(d,
245           cstRemoteReader->sobject->objectEntryAID,
246           &cstRemoteReader->delayResponceTimer,
247           queue,   //metatraffic timer
248           "CSTWriterSendTimer",
249           CSTWriterSendTimer,
250           &cstRemoteReader->cstWriter->lock,
251           cstRemoteReader,
252           &cstRemoteReader->cstWriter->params.delayResponceTime);               
253     } else {
254       eventAdd(d,
255           cstRemoteReader->sobject->objectEntryAID,
256           &cstRemoteReader->delayResponceTimer,
257           queue,   //userdata timer
258           "CSTWriterSendStrictTimer",
259           CSTWriterSendStrictTimer,
260           &cstRemoteReader->cstWriter->lock,
261           cstRemoteReader,
262           &cstRemoteReader->cstWriter->params.delayResponceTime);               
263     }
264   } 
265
266   if (stateMachineSendNew==NOTHNIGTOSEND) {
267     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
268     if (queue==1) {
269       eventDetach(d,
270           cstRemoteReader->sobject->objectEntryAID,
271           &cstRemoteReader->delayResponceTimer,
272           queue);
273     } else {
274       eventDetach(d,
275           cstRemoteReader->sobject->objectEntryAID,
276           &cstRemoteReader->repeatAnnounceTimer,
277           queue);
278     }
279   }  
280
281   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
282     cstRemoteReader->commStateHB=MUSTSENDHB;
283     eventDetach(d,
284         cstRemoteReader->sobject->objectEntryAID,
285         &cstRemoteReader->delayResponceTimer,
286         queue);
287     if (queue==1) {
288       eventAdd(d,
289           cstRemoteReader->sobject->objectEntryAID,
290           &cstRemoteReader->delayResponceTimer,
291           queue,   //metatraffic timer
292           "CSTWriterSendTimer",
293           CSTWriterSendTimer,
294           &cstRemoteReader->cstWriter->lock,
295           cstRemoteReader,
296           &cstRemoteReader->cstWriter->params.delayResponceTime);               
297     } else {
298       eventAdd(d,
299           cstRemoteReader->sobject->objectEntryAID,
300           &cstRemoteReader->delayResponceTimer,
301           queue,   //userdata timer
302           "CSTWriterSendStrictTimer",
303           CSTWriterSendStrictTimer,
304           &cstRemoteReader->cstWriter->lock,
305           cstRemoteReader,
306           &cstRemoteReader->cstWriter->params.delayResponceTime);               
307     }
308   } 
309
310   pthread_rwlock_unlock(&cstWriter->lock);
311   if ((woid & 0x07) == OID_PUBLICATION) 
312     pthread_rwlock_unlock(&d->publications.lock);
313
314
315