]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSGap.c
Migration to new version of OMK system.
[orte.git] / orte / liborte / RTPSGap.c
1 /*
2  *  $Id: RTPSGap.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 49                  RTPS message GAP
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /**********************************************************************************/
25 int 
26 RTPSGapCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange) 
27 {
28   CDR_Endianness     data_endian;
29   SequenceNumber     bsn;
30   CORBA_octet        flags;
31
32   if (cdrCodec->buf_len<cdrCodec->wptr+32) return -1;
33
34   /* submessage id */
35   CDR_put_octet(cdrCodec,GAP);
36
37   /* flags */
38   flags=cdrCodec->data_endian;
39   CDR_put_octet(cdrCodec,flags);
40
41   /* length */
42   CDR_put_ushort(cdrCodec,28);
43
44   /* next data are sent in big endianing */
45   data_endian=cdrCodec->data_endian;
46   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
47
48   /* readerObjectId */
49   CDR_put_ulong(cdrCodec,roid);
50   
51   /* writerObjectId */
52   CDR_put_ulong(cdrCodec,woid);
53
54   cdrCodec->data_endian=data_endian;
55
56   /* firstSeqNumber */
57   CDR_put_ulong(cdrCodec,csChange->sn.high);
58   CDR_put_ulong(cdrCodec,csChange->sn.low);
59
60   /* bitmap sn */
61   SeqNumberAdd(bsn,  
62                csChange->sn,
63                csChange->gapSN);
64   CDR_put_ulong(cdrCodec,bsn.high);
65   CDR_put_ulong(cdrCodec,bsn.low);
66
67   /* numbits */
68   CDR_put_ulong(cdrCodec,0);
69
70   return 32;
71 }
72
73 /*****************************************************************************/
74 void 
75 RTPSGapAdd(CSTRemoteWriter *cstRemoteWriter,GUID_RTPS *guid,SequenceNumber *fsn,
76     SequenceNumber *sn,uint32_t numbits,CDR_Codec *cdrCodec) {
77   SequenceNumber      lsn,ssn;
78   uint32_t            i;
79   int8_t              bit,bit_last=0;
80   CSChange            *csChange;
81   uint32_t            bitmap;
82   
83   if (SeqNumberCmp(*sn,cstRemoteWriter->sn)<0) return;//have to be sn>=writer_sn ! 
84   if (SeqNumberCmp(*fsn,*sn)==1) return;              //cannot be fsn>sn ! 
85   if (numbits>256) return;
86
87   //first case of GAP sn
88   if (SeqNumberCmp(*fsn,*sn)<0) {                        //if fsn<sn  
89     if (!CSChangeFromWriter_find(cstRemoteWriter,fsn)) {
90       if (SeqNumberCmp(*fsn,cstRemoteWriter->sn)>0) {    //have to be sn>writer_sn
91         csChange=(CSChange*)MALLOC(sizeof(CSChange));
92         csChange->cdrCodec.buffer=NULL;
93         csChange->sn=*fsn;
94         csChange->guid=*guid;
95         csChange->alive=ORTE_TRUE;
96         SeqNumberSub(csChange->gapSN,*sn,*fsn);         //setup flag GAP
97         CSChangeAttributes_init_head(csChange);
98         CSTReaderAddCSChange(cstRemoteWriter,csChange);
99       }
100     }
101   }
102
103   //second case of GAP sn
104   lsn=ssn=*sn;bit=0;
105   for(i=0;i<numbits;i++) {
106     if ((i%32)==0) 
107       CDR_get_ulong(cdrCodec,&bitmap);
108
109     bit=(bitmap & (1<<(31-i%32))) ? 1:0;
110     if (i>0) {
111       if (bit_last && !bit) {                           //end of GAP     1->0
112         if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
113           if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
114             csChange=(CSChange*)MALLOC(sizeof(CSChange));
115             csChange->cdrCodec.buffer=NULL;
116             csChange->sn=ssn;
117             csChange->guid=*guid;
118             csChange->alive=ORTE_TRUE;
119             SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
120             CSChangeAttributes_init_head(csChange);
121             CSTReaderAddCSChange(cstRemoteWriter,csChange);
122           }
123         }
124       } else {
125         if (!bit_last && bit) {                         //begin GAP      0->1
126           ssn=lsn;                                      //start pointer
127         }
128       }
129     }  
130     SeqNumberInc(lsn,lsn);
131     bit_last=bit;
132   }
133
134   if (bit) {
135     if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
136       if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
137         csChange=(CSChange*)MALLOC(sizeof(CSChange));
138         csChange->cdrCodec.buffer=NULL;
139         csChange->sn=ssn;
140         csChange->guid=*guid;
141         csChange->alive=ORTE_TRUE;
142         SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
143         CSChangeAttributes_init_head(csChange);
144         CSTReaderAddCSChange(cstRemoteWriter,csChange);
145       }
146     }
147   }
148 }
149
150 /**********************************************************************************/
151 void 
152 RTPSGap(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
153   CSTReader          *cstReader=NULL;
154   CSTRemoteWriter    *cstRemoteWriter;
155   GUID_RTPS          writerGUID;
156   ObjectId           roid,woid;
157   SequenceNumber     sn,fsn;
158   uint32_t           numbits;
159   CDR_Endianness     data_endian;
160
161   /* next data are sent in big endianing */
162   data_endian=cdrCodec->data_endian;
163   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
164
165   /* readerObjectId */
166   CDR_get_ulong(cdrCodec,&roid);
167   
168   /* writerObjectId */
169   CDR_get_ulong(cdrCodec,&woid);
170
171   cdrCodec->data_endian=data_endian;
172
173   /* firstSeqNumber */
174   CDR_get_ulong(cdrCodec,&fsn.high);
175   CDR_get_ulong(cdrCodec,&fsn.low);
176
177   /* Bitmap - SN  */
178   CDR_get_ulong(cdrCodec,&sn.high);
179   CDR_get_ulong(cdrCodec,&sn.low);
180
181   /* numbits  */
182   CDR_get_ulong(cdrCodec,&numbits);
183
184   writerGUID.hid=mi->sourceHostId;
185   writerGUID.aid=mi->sourceAppId;
186   writerGUID.oid=woid;
187
188   debug(49,3) ("recv: RTPS_GAP(0x%x) from 0x%x-0x%x\n",
189                 woid,mi->sourceHostId,mi->sourceAppId);
190   
191   /* Manager */
192   if ((d->guid.aid & 0x03)==MANAGER) {
193     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
194         ((writerGUID.aid & 0x03)==MANAGER)) {
195       pthread_rwlock_wrlock(&d->readerManagers.lock);
196       cstReader=&d->readerManagers;
197     }
198     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
199          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
200         ((writerGUID.oid==OID_WRITE_APP) &&
201          ((writerGUID.aid & 0x03)==MANAGER))) {
202       pthread_rwlock_wrlock(&d->readerApplications.lock);
203       cstReader=&d->readerApplications;
204     }
205   }
206
207   /* Application */
208   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
209     switch (writerGUID.oid) {
210       case OID_WRITE_MGR:
211         pthread_rwlock_wrlock(&d->readerManagers.lock);
212         cstReader=&d->readerManagers;
213         break;
214       case OID_WRITE_APP:
215         pthread_rwlock_wrlock(&d->readerApplications.lock);
216         cstReader=&d->readerApplications;
217         break;
218       case OID_WRITE_PUBL:
219         pthread_rwlock_wrlock(&d->readerPublications.lock);
220         cstReader=&d->readerPublications;
221         break;
222       case OID_WRITE_SUBS:
223         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
224         cstReader=&d->readerSubscriptions;
225         break;
226     }
227   }  
228
229   if (!cstReader) return;
230   cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
231   if (!cstRemoteWriter) {
232     pthread_rwlock_unlock(&cstReader->lock);
233     return;
234   }
235
236   RTPSGapAdd(cstRemoteWriter,&writerGUID,&fsn,&sn,numbits,
237              cdrCodec);
238
239   CSTReaderProcCSChanges(d,cstRemoteWriter);
240   pthread_rwlock_unlock(&cstReader->lock);
241
242
243