]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSAck.c
changed name to Open Real-time Ethernet, some source header arranging
[orte.git] / orte / liborte / RTPSAck.c
1 /*
2  *  $Id: RTPSAck.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 47                  RTPS message ACK
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSAckCreate(CDR_Codec *cdrCodec,
37     SequenceNumber *seqNumber,
38     ObjectId roid,ObjectId woid,Boolean f_bit) 
39 {
40   SequenceNumber     sn_tmp;                    
41   CDR_Endianness     data_endian;
42   CORBA_octet        flags;
43
44   if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
45
46   /* submessage id */
47   CDR_put_octet(cdrCodec,ACK);
48
49   /* flags */
50   flags=cdrCodec->data_endian;
51   if (f_bit) flags|=2;
52   CDR_put_octet(cdrCodec,flags);
53
54   /* length */
55   CDR_put_ushort(cdrCodec,24);
56
57   /* next data are sent in big endianing */
58   data_endian=cdrCodec->data_endian;
59   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
60
61   /* readerObjectId */
62   CDR_put_ulong(cdrCodec,roid);
63   
64   /* writerObjectId */
65   CDR_put_ulong(cdrCodec,woid);
66
67   cdrCodec->data_endian=data_endian;
68
69   SeqNumberInc(sn_tmp,*seqNumber);
70
71   /* SeqNumber */
72   CDR_put_ulong(cdrCodec,sn_tmp.high);
73   CDR_put_ulong(cdrCodec,sn_tmp.low);
74
75   /* bitmap - bits */
76   CDR_put_ulong(cdrCodec,32);
77   CDR_put_ulong(cdrCodec,0);
78
79   return 28;
80
81
82 /**********************************************************************************/
83 void 
84 RTPSAck(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
85   GUID_RTPS          readerGUID;
86   CSTWriter          *cstWriter=NULL;
87   CSTRemoteReader    *cstRemoteReader;
88   CSChangeForReader  *csChangeForReader;
89   StateMachineSend   stateMachineSendNew;
90   ObjectId           roid,woid;
91   SequenceNumber     sn;   
92   char               queue=1;
93   CDR_Endianness     data_endian;
94   CORBA_octet        flags;
95   char               f_bit;
96
97   /* restore flag possition in submessage */
98   cdrCodec->rptr-=3;
99
100   /* flags */
101   CDR_get_octet(cdrCodec,&flags);
102   f_bit=flags & 2;
103
104   /* move reading possition to begin of submessage */
105   cdrCodec->rptr+=2;
106
107   /* next data are sent in big endianing */
108   data_endian=cdrCodec->data_endian;
109   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
110
111   /* readerObjectId */
112   CDR_get_ulong(cdrCodec,&roid);
113   
114   /* writerObjectId */
115   CDR_get_ulong(cdrCodec,&woid);
116
117   cdrCodec->data_endian=data_endian;
118
119   /* SeqNumber */
120   CDR_get_ulong(cdrCodec,&sn.high);
121   CDR_get_ulong(cdrCodec,&sn.low);
122
123   readerGUID.hid=mi->sourceHostId;
124   readerGUID.aid=mi->sourceAppId;
125   readerGUID.oid=roid;
126
127   debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
128                 f_bit ? 'F':'f',
129                 woid,mi->sourceHostId,mi->sourceAppId);
130   
131   /* Manager */
132   if ((d->guid.aid & 0x03)==MANAGER) {
133     switch (woid) {
134       case OID_WRITE_APPSELF:
135         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
136         cstWriter=&d->writerApplicationSelf;
137         readerGUID.hid=senderIPAddress;
138         readerGUID.aid=AID_UNKNOWN;
139         readerGUID.oid=roid;
140         eventDetach(d,
141             cstWriter->objectEntryOID->objectEntryAID,
142             &cstWriter->registrationTimer,
143             0);   //common timer
144         break;
145       case OID_WRITE_MGR:
146         pthread_rwlock_wrlock(&d->writerManagers.lock);
147         cstWriter=&d->writerManagers;
148         break;
149       case OID_WRITE_APP:
150         pthread_rwlock_wrlock(&d->writerApplications.lock);
151         cstWriter=&d->writerApplications;
152         break;
153     }
154   }
155
156   /* Application */
157   if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
158     switch (roid) {
159       case OID_READ_APP:
160       case OID_READ_APPSELF:
161         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
162         cstWriter=&d->writerApplicationSelf;
163         eventDetach(d,
164             cstWriter->objectEntryOID->objectEntryAID,
165             &cstWriter->registrationTimer,
166             0);   //common timer
167         break;
168       case OID_READ_PUBL:
169         pthread_rwlock_wrlock(&d->writerPublications.lock);
170         cstWriter=&d->writerPublications;
171         break;
172       case OID_READ_SUBS:
173         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
174         cstWriter=&d->writerSubscriptions;
175         break;
176       default:
177         if ((woid & 0x07) == OID_PUBLICATION) {
178           GUID_RTPS  guid=d->guid;
179           guid.oid=woid;
180           pthread_rwlock_rdlock(&d->publications.lock);
181           cstWriter=CSTWriter_find(&d->publications,&guid);
182           pthread_rwlock_wrlock(&cstWriter->lock);
183           queue=2;
184         }
185         break;
186     }
187   }
188
189   if (!cstWriter) {
190     if ((woid & 0x07) == OID_PUBLICATION) 
191       pthread_rwlock_unlock(&d->publications.lock);
192     return;
193   }
194   cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
195   if (!cstRemoteReader) {
196     pthread_rwlock_unlock(&cstWriter->lock);
197     if ((woid & 0x07) == OID_PUBLICATION) 
198       pthread_rwlock_unlock(&d->publications.lock);
199     return;
200   }
201
202   stateMachineSendNew=NOTHNIGTOSEND;
203   csChangeForReader=CSChangeForReader_first(cstRemoteReader);
204   while(csChangeForReader) {
205     if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0)   {   //ACK
206       if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
207         CSChangeForReader *csChangeForReaderDestroyed;
208         csChangeForReaderDestroyed=csChangeForReader;
209         csChangeForReader->commStateChFReader=ACKNOWLEDGED;
210         csChangeForReader=
211           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
212         if ((woid & 0x07) == OID_PUBLICATION) {
213           CSTWriterDestroyCSChangeForReader(
214             csChangeForReaderDestroyed,ORTE_TRUE);
215         }
216       } else {
217         csChangeForReader=
218           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
219       }
220     } else {                                                      //NACK
221       if (csChangeForReader->commStateChFReader!=TOSEND) {
222         csChangeForReader->commStateChFReader=TOSEND;
223         cstRemoteReader->commStateToSentCounter++;
224       }
225       stateMachineSendNew=MUSTSENDDATA;
226       csChangeForReader=
227         CSChangeForReader_next(cstRemoteReader,csChangeForReader);
228     }
229   }
230
231   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
232       (stateMachineSendNew==MUSTSENDDATA)) {
233     cstRemoteReader->commStateSend=stateMachineSendNew;
234     eventDetach(d,
235         cstRemoteReader->sobject->objectEntryAID,
236         &cstRemoteReader->delayResponceTimer,
237         queue);
238     if (queue==1) {
239       eventAdd(d,
240           cstRemoteReader->sobject->objectEntryAID,
241           &cstRemoteReader->delayResponceTimer,
242           queue,   //metatraffic timer
243           "CSTWriterSendTimer",
244           CSTWriterSendTimer,
245           &cstRemoteReader->cstWriter->lock,
246           cstRemoteReader,
247           &cstRemoteReader->cstWriter->params.delayResponceTime);               
248     } else {
249       eventAdd(d,
250           cstRemoteReader->sobject->objectEntryAID,
251           &cstRemoteReader->delayResponceTimer,
252           queue,   //userdata timer
253           "CSTWriterSendStrictTimer",
254           CSTWriterSendStrictTimer,
255           &cstRemoteReader->cstWriter->lock,
256           cstRemoteReader,
257           &cstRemoteReader->cstWriter->params.delayResponceTime);               
258     }
259   } 
260
261   if (stateMachineSendNew==NOTHNIGTOSEND) {
262     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
263     if (queue==1) {
264       eventDetach(d,
265           cstRemoteReader->sobject->objectEntryAID,
266           &cstRemoteReader->delayResponceTimer,
267           queue);
268     } else {
269       eventDetach(d,
270           cstRemoteReader->sobject->objectEntryAID,
271           &cstRemoteReader->repeatAnnounceTimer,
272           queue);
273     }
274   }  
275
276   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
277     cstRemoteReader->commStateHB=MUSTSENDHB;
278     eventDetach(d,
279         cstRemoteReader->sobject->objectEntryAID,
280         &cstRemoteReader->delayResponceTimer,
281         queue);
282     if (queue==1) {
283       eventAdd(d,
284           cstRemoteReader->sobject->objectEntryAID,
285           &cstRemoteReader->delayResponceTimer,
286           queue,   //metatraffic timer
287           "CSTWriterSendTimer",
288           CSTWriterSendTimer,
289           &cstRemoteReader->cstWriter->lock,
290           cstRemoteReader,
291           &cstRemoteReader->cstWriter->params.delayResponceTime);               
292     } else {
293       eventAdd(d,
294           cstRemoteReader->sobject->objectEntryAID,
295           &cstRemoteReader->delayResponceTimer,
296           queue,   //userdata timer
297           "CSTWriterSendStrictTimer",
298           CSTWriterSendStrictTimer,
299           &cstRemoteReader->cstWriter->lock,
300           cstRemoteReader,
301           &cstRemoteReader->cstWriter->params.delayResponceTime);               
302     }
303   } 
304
305   pthread_rwlock_unlock(&cstWriter->lock);
306   if ((woid & 0x07) == OID_PUBLICATION) 
307     pthread_rwlock_unlock(&d->publications.lock);
308
309
310