]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSAck.c
new version 0.2.3
[orte.git] / orte / liborte / RTPSAck.c
1 /*
2  *  $Id: RTPSAck.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 47                  RTPS message ACK
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /**********************************************************************************/
25 int32_t 
26 RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
27     SequenceNumber *seqNumber,
28     ObjectId roid,ObjectId woid,Boolean f_bit) {
29   SequenceNumber        sn_tmp;                    
30                     
31   if (max_msg_len<28) return -1;
32   rtps_msg[0]=(uint8_t)ACK;
33   rtps_msg[1]=ORTE_MY_MBO;
34   if (f_bit) rtps_msg[1]|=2;
35   *((ParameterLength*)(rtps_msg+2))=24;
36   conv_u32(&roid,0);
37   *((ObjectId*)(rtps_msg+4))=roid;
38   conv_u32(&woid,0);
39   *((ObjectId*)(rtps_msg+8))=woid;
40   SeqNumberInc(sn_tmp,*seqNumber);
41   *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
42   *((uint32_t*)(rtps_msg+20))=32;
43   *((uint32_t*)(rtps_msg+24))=0;
44   return 28;
45
46
47 /**********************************************************************************/
48 void 
49 RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
50   GUID_RTPS          readerGUID;
51   CSTWriter          *cstWriter=NULL;
52   CSTRemoteReader    *cstRemoteReader;
53   CSChangeForReader  *csChangeForReader;
54   StateMachineSend   stateMachineSendNew;
55   ObjectId               roid,woid;
56   SequenceNumber     sn;   
57   char               e_bit,f_bit;
58   char               queue=1;
59
60   e_bit=rtps_msg[1] & 0x01;
61   f_bit=(rtps_msg[1] & 0x02)>>1;
62   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
63   conv_u32(&roid,0);
64   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
65   conv_u32(&woid,0);
66   sn=*((SequenceNumber*)(rtps_msg+12));         /* Bitmap - SN    */
67   conv_sn(&sn,e_bit);
68   readerGUID.hid=mi->sourceHostId;
69   readerGUID.aid=mi->sourceAppId;
70   readerGUID.oid=roid;
71
72   debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
73                 f_bit ? 'F':'f',
74                 woid,mi->sourceHostId,mi->sourceAppId);
75   
76   //Manager
77   if ((d->guid.aid & 0x03)==MANAGER) {
78     switch (woid) {
79       case OID_WRITE_APPSELF:
80         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
81         cstWriter=&d->writerApplicationSelf;
82         readerGUID.hid=senderIPAddress;
83         readerGUID.aid=AID_UNKNOWN;
84         readerGUID.oid=roid;
85         break;
86       case OID_WRITE_MGR:
87         pthread_rwlock_wrlock(&d->writerManagers.lock);
88         cstWriter=&d->writerManagers;
89         break;
90       case OID_WRITE_APP:
91         pthread_rwlock_wrlock(&d->writerApplications.lock);
92         cstWriter=&d->writerApplications;
93         break;
94     }
95   }
96   //Application
97   if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
98     switch (roid) {
99       case OID_READ_APP:
100       case OID_READ_APPSELF:
101         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
102         cstWriter=&d->writerApplicationSelf;
103         break;
104       case OID_READ_PUBL:
105         pthread_rwlock_wrlock(&d->writerPublications.lock);
106         cstWriter=&d->writerPublications;
107         break;
108       case OID_READ_SUBS:
109         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
110         cstWriter=&d->writerSubscriptions;
111         break;
112       default:
113         if ((woid & 0x07) == OID_PUBLICATION) {
114           GUID_RTPS  guid=d->guid;
115           guid.oid=woid;
116           pthread_rwlock_rdlock(&d->publications.lock);
117           cstWriter=CSTWriter_find(&d->publications,&guid);
118           pthread_rwlock_wrlock(&cstWriter->lock);
119           queue=2;
120         }
121         break;
122     }
123   }
124   if (!cstWriter) {
125     if ((woid & 0x07) == OID_PUBLICATION) 
126       pthread_rwlock_unlock(&d->publications.lock);
127     return;
128   }
129   cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
130   if (!cstRemoteReader) {
131     pthread_rwlock_unlock(&cstWriter->lock);
132     if ((woid & 0x07) == OID_PUBLICATION) 
133       pthread_rwlock_unlock(&d->publications.lock);
134     return;
135   }
136   stateMachineSendNew=NOTHNIGTOSEND;
137   csChangeForReader=CSChangeForReader_first(cstRemoteReader);
138   while(csChangeForReader) {
139     if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0)   {   //ACK
140       if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
141         CSChangeForReader *csChangeForReaderDestroyed;
142         csChangeForReaderDestroyed=csChangeForReader;
143         csChangeForReader->commStateChFReader=ACKNOWLEDGED;
144         csChangeForReader=
145           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
146         if ((woid & 0x07) == OID_PUBLICATION) {
147           CSTWriterDestroyCSChangeForReader(cstRemoteReader,
148             csChangeForReaderDestroyed,ORTE_TRUE);
149         }
150       } else {
151         csChangeForReader=
152           CSChangeForReader_next(cstRemoteReader,csChangeForReader);
153       }
154     } else {                                                      //NACK
155       csChangeForReader->commStateChFReader=TOSEND;
156       stateMachineSendNew=MUSTSENDDATA;
157       csChangeForReader=
158         CSChangeForReader_next(cstRemoteReader,csChangeForReader);
159     }
160   }
161   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
162       (stateMachineSendNew==MUSTSENDDATA)) {
163     cstRemoteReader->commStateSend=stateMachineSendNew;
164     eventDetach(d,
165         cstRemoteReader->objectEntryOID->objectEntryAID,
166         &cstRemoteReader->delayResponceTimer,
167         queue);
168     if (queue==1) {
169       eventAdd(d,
170           cstRemoteReader->objectEntryOID->objectEntryAID,
171           &cstRemoteReader->delayResponceTimer,
172           queue,   //metatraffic timer
173           "CSTWriterSendTimer",
174           CSTWriterSendTimer,
175           &cstRemoteReader->cstWriter->lock,
176           cstRemoteReader,
177           &cstRemoteReader->cstWriter->params.delayResponceTime);               
178     } else {
179       eventAdd(d,
180           cstRemoteReader->objectEntryOID->objectEntryAID,
181           &cstRemoteReader->delayResponceTimer,
182           queue,   //userdata timer
183           "CSTWriterSendStrictTimer",
184           CSTWriterSendStrictTimer,
185           &cstRemoteReader->cstWriter->lock,
186           cstRemoteReader,
187           &cstRemoteReader->cstWriter->params.delayResponceTime);               
188     }
189   } 
190   if (stateMachineSendNew==NOTHNIGTOSEND) {
191     cstRemoteReader->commStateSend=NOTHNIGTOSEND;
192     if (queue==1) {
193       eventDetach(d,
194           cstRemoteReader->objectEntryOID->objectEntryAID,
195           &cstRemoteReader->delayResponceTimer,
196           queue);
197     } else {
198       eventDetach(d,
199           cstRemoteReader->objectEntryOID->objectEntryAID,
200           &cstRemoteReader->repeatAnnounceTimer,
201           queue);
202     }
203   }  
204   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
205     cstRemoteReader->commStateHB=MUSTSENDHB;
206     eventDetach(d,
207         cstRemoteReader->objectEntryOID->objectEntryAID,
208         &cstRemoteReader->delayResponceTimer,
209         queue);
210     if (queue==1) {
211       eventAdd(d,
212           cstRemoteReader->objectEntryOID->objectEntryAID,
213           &cstRemoteReader->delayResponceTimer,
214           queue,   //metatraffic timer
215           "CSTWriterSendTimer",
216           CSTWriterSendTimer,
217           &cstRemoteReader->cstWriter->lock,
218           cstRemoteReader,
219           &cstRemoteReader->cstWriter->params.delayResponceTime);               
220     } else {
221       eventAdd(d,
222           cstRemoteReader->objectEntryOID->objectEntryAID,
223           &cstRemoteReader->delayResponceTimer,
224           queue,   //userdata timer
225           "CSTWriterSendStrictTimer",
226           CSTWriterSendStrictTimer,
227           &cstRemoteReader->cstWriter->lock,
228           cstRemoteReader,
229           &cstRemoteReader->cstWriter->params.delayResponceTime);               
230     }
231   } 
232   pthread_rwlock_unlock(&cstWriter->lock);
233   if ((woid & 0x07) == OID_PUBLICATION) 
234     pthread_rwlock_unlock(&d->publications.lock);
235
236
237