]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSAck.c
Added prerelease of ORTE-0.2 (Real Time Publisher Subscriber communication protocol...
[orte.git] / orte / liborte / RTPSAck.c
1 /*
2  *  $Id: RTPSAck.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 47                  RTPS message ACK
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /**********************************************************************************/
25 int32_t 
26 RTPSAckCreate(u_int8_t *rtps_msg,u_int32_t max_msg_len,
27     SequenceNumber *seqNumber,
28     ObjectId roid,ObjectId woid,Boolean f_bit) {
29   SequenceNumber        sn_tmp;                    
30                     
31   if (max_msg_len<28) return -1;
32   rtps_msg[0]=(u_int8_t)ACK;
33   rtps_msg[1]=ORTE_MY_MBO;
34   if (f_bit) rtps_msg[1]|=2;
35   *((ParameterLength*)(rtps_msg+2))=24;
36   conv_u32(&roid,0);
37   *((ObjectId*)(rtps_msg+4))=roid;
38   conv_u32(&woid,0);
39   *((ObjectId*)(rtps_msg+8))=woid;
40   SeqNumberInc(sn_tmp,*seqNumber);
41   *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
42   *((u_int32_t*)(rtps_msg+20))=32;
43   *((u_int32_t*)(rtps_msg+24))=0;
44   return 28;
45
46
47 /**********************************************************************************/
48 void 
49 RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
50   GUID_RTPS          readerGUID;
51   CSTWriter          *cstWriter=NULL;
52   CSTRemoteReader    *cstRemoteReader;
53   CSChangeForReader  *csChangeForReader;
54   StateMachineSend   stateMachineSendNew;
55   ObjectId               roid,woid;
56   SequenceNumber     sn;   
57   int8_t             e_bit,f_bit;
58
59   e_bit=rtps_msg[1] & 0x01;
60   f_bit=(rtps_msg[1] & 0x02)>>1;
61   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
62   conv_u32(&roid,0);
63   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
64   conv_u32(&woid,0);
65   sn=*((SequenceNumber*)(rtps_msg+12));         /* Bitmap - SN    */
66   conv_sn(&sn,e_bit);
67   readerGUID.hid=mi->sourceHostId;
68   readerGUID.aid=mi->sourceAppId;
69   readerGUID.oid=roid;
70
71   debug(47,3) ("recv: RTPS ACK%c(0x%x) from 0x%x-0x%x\n",
72                 f_bit ? 'F':'f',
73                 woid,mi->sourceHostId,mi->sourceAppId);
74   
75   //Manager
76   if ((d->guid.aid & 0x03)==MANAGER) {
77     switch (woid) {
78       case OID_WRITE_APPSELF:
79         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
80         cstWriter=&d->writerApplicationSelf;
81         readerGUID.hid=senderIPAddress;
82         readerGUID.aid=AID_UNKNOWN;
83         readerGUID.oid=roid;
84         break;
85       case OID_WRITE_MGR:
86         pthread_rwlock_wrlock(&d->writerManagers.lock);
87         cstWriter=&d->writerManagers;
88         break;
89       case OID_WRITE_APP:
90         pthread_rwlock_wrlock(&d->writerApplications.lock);
91         cstWriter=&d->writerApplications;
92         break;
93     }
94   }
95   //Application
96   if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) {
97     switch (roid) {
98       case OID_READ_APP:
99       case OID_READ_APPSELF:
100         pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
101         cstWriter=&d->writerApplicationSelf;
102         break;
103       case OID_READ_PUBL:
104         pthread_rwlock_wrlock(&d->writerPublications.lock);
105         cstWriter=&d->writerPublications;
106         break;
107       case OID_READ_SUBS:
108         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
109         cstWriter=&d->writerSubscriptions;
110         break;
111     }
112   }
113   if (!cstWriter) return;
114   cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
115   if (!cstRemoteReader) {
116     pthread_rwlock_unlock(&cstWriter->lock);
117     return;
118   }
119   stateMachineSendNew=NOTHNIGTOSEND;
120   gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
121     if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0)   {   //ACK
122       if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
123         csChangeForReader->commStateChFReader=ACKNOWLEDGED;
124 //        csChangeForReader->csChange->acknowledgedCounter++;
125       }    
126     } else {                                                      //NACK
127       csChangeForReader->commStateChFReader=TOSEND;
128       stateMachineSendNew=MUSTSENDDATA;
129     }
130   }
131   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
132       (stateMachineSendNew==MUSTSENDDATA)) {
133     cstRemoteReader->commStateSend=stateMachineSendNew;
134     eventDetach(d,
135         cstRemoteReader->objectEntryOID->objectEntryAID,
136         &cstRemoteReader->delayResponceTimer,
137         1);
138     eventAdd(d,
139         cstRemoteReader->objectEntryOID->objectEntryAID,
140         &cstRemoteReader->delayResponceTimer,
141         1,   //metatraffic timer
142         "CSTWriterSendTimer",
143         CSTWriterSendTimer,
144         &cstRemoteReader->cstWriter->lock,
145         cstRemoteReader,
146         &cstRemoteReader->cstWriter->params.delayResponceTime);               
147   } 
148   if ((cstRemoteReader->commStateSend==MUSTSENDDATA) && 
149       (stateMachineSendNew==NOTHNIGTOSEND)) {
150     cstRemoteReader->commStateSend=stateMachineSendNew;
151     eventDetach(d,
152         cstRemoteReader->objectEntryOID->objectEntryAID,
153         &cstRemoteReader->delayResponceTimer,
154         1);
155   }  
156   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
157     eventDetach(d,
158         cstRemoteReader->objectEntryOID->objectEntryAID,
159         &cstRemoteReader->delayResponceTimer,
160         1);
161     eventAdd(d,
162         cstRemoteReader->objectEntryOID->objectEntryAID,
163         &cstRemoteReader->delayResponceTimer,
164         1,   //metatraffic timer
165         "CSTWriterSendTimer",
166         CSTWriterSendTimer,
167         &cstRemoteReader->cstWriter->lock,
168         cstRemoteReader,
169         &cstRemoteReader->cstWriter->params.delayResponceTime);               
170   } 
171   pthread_rwlock_unlock(&cstWriter->lock);
172
173
174