]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSGap.c
804c5d80b49a05a378b244ec7864b5f92b0a6591
[orte.git] / orte / liborte / RTPSGap.c
1 /*
2  *  $Id: RTPSGap.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 49                  RTPS message GAP
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int 
36 RTPSGapCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange) 
37 {
38   CDR_Endianness     data_endian;
39   SequenceNumber     bsn;
40   CORBA_octet        flags;
41
42   if (cdrCodec->buf_len<cdrCodec->wptr+32) return -1;
43
44   /* submessage id */
45   CDR_put_octet(cdrCodec,GAP);
46
47   /* flags */
48   flags=cdrCodec->data_endian;
49   CDR_put_octet(cdrCodec,flags);
50
51   /* length */
52   CDR_put_ushort(cdrCodec,28);
53
54   /* next data are sent in big endianing */
55   data_endian=cdrCodec->data_endian;
56   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
57
58   /* readerObjectId */
59   CDR_put_ulong(cdrCodec,roid);
60   
61   /* writerObjectId */
62   CDR_put_ulong(cdrCodec,woid);
63
64   cdrCodec->data_endian=data_endian;
65
66   /* firstSeqNumber */
67   CDR_put_ulong(cdrCodec,csChange->sn.high);
68   CDR_put_ulong(cdrCodec,csChange->sn.low);
69
70   /* bitmap sn */
71   SeqNumberAdd(bsn,  
72                csChange->sn,
73                csChange->gapSN);
74   CDR_put_ulong(cdrCodec,bsn.high);
75   CDR_put_ulong(cdrCodec,bsn.low);
76
77   /* numbits */
78   CDR_put_ulong(cdrCodec,0);
79
80   return 32;
81 }
82
83 /*****************************************************************************/
84 void 
85 RTPSGapAdd(CSTRemoteWriter *cstRemoteWriter,GUID_RTPS *guid,SequenceNumber *fsn,
86     SequenceNumber *sn,uint32_t numbits,CDR_Codec *cdrCodec) {
87   SequenceNumber      lsn,ssn;
88   uint32_t            i;
89   int8_t              bit,bit_last=0;
90   CSChange            *csChange;
91   uint32_t            bitmap;
92   
93   if (SeqNumberCmp(*sn,cstRemoteWriter->sn)<0) return;//have to be sn>=writer_sn ! 
94   if (SeqNumberCmp(*fsn,*sn)==1) return;              //cannot be fsn>sn ! 
95   if (numbits>256) return;
96
97   //first case of GAP sn
98   if (SeqNumberCmp(*fsn,*sn)<0) {                        //if fsn<sn  
99     if (CSChangeFromWriter_find(cstRemoteWriter,fsn)) {
100       CSTReaderDestroyCSChange(cstRemoteWriter, 
101                                fsn,ORTE_FALSE);
102     }
103     csChange=(CSChange*)MALLOC(sizeof(CSChange));
104     csChange->cdrCodec.buffer=NULL;
105     csChange->sn=*fsn;
106     csChange->guid=*guid;
107     csChange->alive=ORTE_TRUE;
108     SeqNumberSub(csChange->gapSN,*sn,*fsn);            //setup flag GAP
109     CSChangeAttributes_init_head(csChange);
110     CSTReaderAddCSChange(cstRemoteWriter,csChange);
111   }
112
113   //second case of GAP sn
114   lsn=ssn=*sn;bit=0;
115   for(i=0;i<numbits;i++) {
116     if ((i%32)==0) 
117       CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&bitmap);
118
119     bit=(bitmap & (1<<(31-i%32))) ? 1:0;
120     if (i>0) {
121       if (bit_last && !bit) {                           //end of GAP     1->0
122         if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
123           if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
124             csChange=(CSChange*)MALLOC(sizeof(CSChange));
125             csChange->cdrCodec.buffer=NULL;
126             csChange->sn=ssn;
127             csChange->guid=*guid;
128             csChange->alive=ORTE_TRUE;
129             SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
130             CSChangeAttributes_init_head(csChange);
131             CSTReaderAddCSChange(cstRemoteWriter,csChange);
132           }
133         }
134       } else {
135         if (!bit_last && bit) {                         //begin GAP      0->1
136           ssn=lsn;                                      //start pointer
137         }
138       }
139     }  
140     SeqNumberInc(lsn,lsn);
141     bit_last=bit;
142   }
143
144   if (bit) {
145     if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
146       if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
147         csChange=(CSChange*)MALLOC(sizeof(CSChange));
148         csChange->cdrCodec.buffer=NULL;
149         csChange->sn=ssn;
150         csChange->guid=*guid;
151         csChange->alive=ORTE_TRUE;
152         SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
153         CSChangeAttributes_init_head(csChange);
154         CSTReaderAddCSChange(cstRemoteWriter,csChange);
155       }
156     }
157   }
158 }
159
160 /**********************************************************************************/
161 void 
162 RTPSGap(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
163   CSTReader          *cstReader=NULL;
164   CSTRemoteWriter    *cstRemoteWriter;
165   GUID_RTPS          writerGUID;
166   ObjectId           roid,woid;
167   SequenceNumber     sn,fsn;
168   uint32_t           numbits;
169   CDR_Endianness     data_endian;
170
171   /* next data are sent in big endianing */
172   data_endian=cdrCodec->data_endian;
173   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
174
175   /* readerObjectId */
176   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
177   
178   /* writerObjectId */
179   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
180
181   cdrCodec->data_endian=data_endian;
182
183   /* firstSeqNumber */
184   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
185   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
186
187   /* Bitmap - SN  */
188   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
189   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
190
191   /* numbits  */
192   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&numbits);
193
194   writerGUID.hid=mi->sourceHostId;
195   writerGUID.aid=mi->sourceAppId;
196   writerGUID.oid=woid;
197
198   debug(49,3) ("recv: RTPS_GAP(0x%x) from 0x%x-0x%x fSN:%d, bSN:%d, numbits:%d\n",
199                 woid,mi->sourceHostId,mi->sourceAppId,fsn.low,sn.low,numbits);
200   
201   /* Manager */
202   if ((d->guid.aid & 0x03)==MANAGER) {
203     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
204         ((writerGUID.aid & 0x03)==MANAGER)) {
205       pthread_rwlock_wrlock(&d->readerManagers.lock);
206       cstReader=&d->readerManagers;
207     }
208     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
209          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
210         ((writerGUID.oid==OID_WRITE_APP) &&
211          ((writerGUID.aid & 0x03)==MANAGER))) {
212       pthread_rwlock_wrlock(&d->readerApplications.lock);
213       cstReader=&d->readerApplications;
214     }
215   }
216
217   /* Application */
218   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
219     switch (writerGUID.oid) {
220       case OID_WRITE_MGR:
221         pthread_rwlock_wrlock(&d->readerManagers.lock);
222         cstReader=&d->readerManagers;
223         break;
224       case OID_WRITE_APP:
225         pthread_rwlock_wrlock(&d->readerApplications.lock);
226         cstReader=&d->readerApplications;
227         break;
228       case OID_WRITE_PUBL:
229         pthread_rwlock_wrlock(&d->readerPublications.lock);
230         cstReader=&d->readerPublications;
231         break;
232       case OID_WRITE_SUBS:
233         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
234         cstReader=&d->readerSubscriptions;
235         break;
236     }
237   }  
238
239   if (!cstReader) return;
240   cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
241   if (!cstRemoteWriter) {
242     pthread_rwlock_unlock(&cstReader->lock);
243     return;
244   }
245
246   RTPSGapAdd(cstRemoteWriter,&writerGUID,&fsn,&sn,numbits,
247              cdrCodec);
248
249   CSTReaderProcCSChanges(d,cstRemoteWriter);
250   pthread_rwlock_unlock(&cstReader->lock);
251
252
253