]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSGap.c
87db602c4795655ea77ea15a2ae6299dd37b0c27
[orte.git] / orte / liborte / RTPSGap.c
1 /*
2  *  $Id: RTPSGap.c,v 0.0.0.1            2003/10/07 
3  *
4  *  DEBUG:  section 49                  RTPS message GAP
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 void 
26 RTPSGapAdd(CSTRemoteWriter *cstRemoteWriter,GUID_RTPS *guid,SequenceNumber *fsn,
27     SequenceNumber *sn,u_int32_t numbits,u_int8_t *bitmaps,Boolean e_bit) {
28   SequenceNumber      lsn,ssn;
29   u_int32_t           i;
30   int8_t              bit,bit_last=0;
31   CSChange            *csChange;
32   u_int32_t           bitmap;
33   
34   if (SeqNumberCmp(*sn,cstRemoteWriter->sn)<0) return;//have to be sn>=writer_sn ! 
35   if (SeqNumberCmp(*fsn,*sn)==1) return;              //cannot be fsn>sn ! 
36   if (numbits>256) return;
37   //first case of GAP sn
38   if (SeqNumberCmp(*fsn,*sn)<0) {                        //if fsn<sn  
39     if (!CSChangeFromWriter_find(cstRemoteWriter,fsn)) {
40       if (SeqNumberCmp(*fsn,cstRemoteWriter->sn)>0) {    //have to be sn>writer_sn
41         csChange=(CSChange*)MALLOC(sizeof(CSChange));
42         csChange->cdrStream.buffer=NULL;
43         csChange->sn=*fsn;
44         csChange->guid=*guid;
45         csChange->alive=ORTE_TRUE;
46         SeqNumberSub(csChange->gapSN,*sn,*fsn);         //setup flag GAP
47         CSChangeAttributes_init_head(csChange);
48         CSTReaderAddCSChange(cstRemoteWriter,csChange);
49       }
50     }
51   }
52   //second case of GAP sn
53   lsn=ssn=*sn;bit=0;
54   for(i=0;i<numbits;i++) {
55     bitmap=*(((u_int32_t*)bitmaps)+i/32);
56     conv_u32(&bitmap,e_bit);
57     bit=(bitmap & (1<<(31-i%32))) ? 1:0;
58     if (i>0) {
59       if (bit_last && !bit) {                           //end of GAP     1->0
60         if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
61           if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
62             csChange=(CSChange*)MALLOC(sizeof(CSChange));
63             csChange->cdrStream.buffer=NULL;
64             csChange->sn=ssn;
65             csChange->guid=*guid;
66             csChange->alive=ORTE_TRUE;
67             SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
68             CSChangeAttributes_init_head(csChange);
69             CSTReaderAddCSChange(cstRemoteWriter,csChange);
70           }
71         }
72       } else {
73         if (!bit_last && bit) {                         //begin GAP      0->1
74           ssn=lsn;                                      //start pointer
75         }
76       }
77     }  
78     SeqNumberInc(lsn,lsn);
79     bit_last=bit;
80   }
81   if (bit) {
82     if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
83       if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {  
84         csChange=(CSChange*)MALLOC(sizeof(CSChange));
85         csChange->cdrStream.buffer=NULL;
86         csChange->sn=ssn;
87         csChange->guid=*guid;
88         csChange->alive=ORTE_TRUE;
89         SeqNumberSub(csChange->gapSN,lsn,ssn);         //setup flag GAP
90         CSChangeAttributes_init_head(csChange);
91         CSTReaderAddCSChange(cstRemoteWriter,csChange);
92       }
93     }
94   }
95 }
96
97 /**********************************************************************************/
98 void 
99 RTPSGap(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
100   CSTReader          *cstReader=NULL;
101   CSTRemoteWriter    *cstRemoteWriter;
102   GUID_RTPS          writerGUID;
103   ObjectId               roid,woid;
104   SequenceNumber     sn,fsn;
105   u_int32_t          numbits;
106   int8_t             e_bit;
107
108   e_bit=rtps_msg[1] & 0x01;
109   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
110   conv_u32(&roid,0);
111   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
112   conv_u32(&woid,0);
113   fsn=*((SequenceNumber*)(rtps_msg+12));        /* firstSeqNumber */
114   conv_sn(&fsn,e_bit);
115   sn=*((SequenceNumber*)(rtps_msg+20));         /* Bitmap - SN    */
116   conv_sn(&sn,e_bit);
117   numbits=*((u_int32_t*)(rtps_msg+28));         /* numbits */
118   conv_u32(&numbits,e_bit);
119   writerGUID.hid=mi->sourceHostId;
120   writerGUID.aid=mi->sourceAppId;
121   writerGUID.oid=woid;
122
123   debug(49,3) ("recv: RTPS_GAP(0x%x) from 0x%x-0x%x\n",
124                 woid,mi->sourceHostId,mi->sourceAppId);
125   
126   //Manager
127   if ((d->guid.aid & 0x03)==MANAGER) {
128     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
129         ((writerGUID.aid & 0x03)==MANAGER)) {
130       pthread_rwlock_wrlock(&d->readerManagers.lock);
131       cstReader=&d->readerManagers;
132     }
133     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
134          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
135         ((writerGUID.oid==OID_WRITE_APP) &&
136          ((writerGUID.aid & 0x03)==MANAGER))) {
137       pthread_rwlock_wrlock(&d->readerApplications.lock);
138       cstReader=&d->readerApplications;
139     }
140   }
141   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
142     switch (writerGUID.oid) {
143       case OID_WRITE_MGR:
144         pthread_rwlock_wrlock(&d->readerManagers.lock);
145         cstReader=&d->readerManagers;
146         break;
147       case OID_WRITE_APP:
148         pthread_rwlock_wrlock(&d->readerApplications.lock);
149         cstReader=&d->readerApplications;
150         break;
151       case OID_WRITE_PUBL:
152         pthread_rwlock_wrlock(&d->readerPublications.lock);
153         cstReader=&d->readerPublications;
154         break;
155       case OID_WRITE_SUBS:
156         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
157         cstReader=&d->readerSubscriptions;
158         break;
159     }
160   }  
161   if (!cstReader) return;
162   cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
163   if (!cstRemoteWriter) {
164     pthread_rwlock_unlock(&cstReader->lock);
165     return;
166   }
167   RTPSGapAdd(cstRemoteWriter,&writerGUID,&fsn,&sn,numbits,
168              rtps_msg+32,e_bit);
169   CSTReaderProcCSChanges(d,cstRemoteWriter);
170   pthread_rwlock_unlock(&cstReader->lock);
171
172
173