2 * $Id: RTPSGap.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 49 RTPS message GAP
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /**********************************************************************************/
26 RTPSGapCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange)
28 CDR_Endianness data_endian;
32 if (cdrCodec->buf_len<cdrCodec->wptr+32) return -1;
35 CDR_put_octet(cdrCodec,GAP);
38 flags=cdrCodec->data_endian;
39 CDR_put_octet(cdrCodec,flags);
42 CDR_put_ushort(cdrCodec,28);
44 /* next data are sent in big endianing */
45 data_endian=cdrCodec->data_endian;
46 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
49 CDR_put_ulong(cdrCodec,roid);
52 CDR_put_ulong(cdrCodec,woid);
54 cdrCodec->data_endian=data_endian;
57 CDR_put_ulong(cdrCodec,csChange->sn.high);
58 CDR_put_ulong(cdrCodec,csChange->sn.low);
64 CDR_put_ulong(cdrCodec,bsn.high);
65 CDR_put_ulong(cdrCodec,bsn.low);
68 CDR_put_ulong(cdrCodec,0);
73 /*****************************************************************************/
75 RTPSGapAdd(CSTRemoteWriter *cstRemoteWriter,GUID_RTPS *guid,SequenceNumber *fsn,
76 SequenceNumber *sn,uint32_t numbits,CDR_Codec *cdrCodec) {
77 SequenceNumber lsn,ssn;
79 int8_t bit,bit_last=0;
83 if (SeqNumberCmp(*sn,cstRemoteWriter->sn)<0) return;//have to be sn>=writer_sn !
84 if (SeqNumberCmp(*fsn,*sn)==1) return; //cannot be fsn>sn !
85 if (numbits>256) return;
87 //first case of GAP sn
88 if (SeqNumberCmp(*fsn,*sn)<0) { //if fsn<sn
89 if (!CSChangeFromWriter_find(cstRemoteWriter,fsn)) {
90 if (SeqNumberCmp(*fsn,cstRemoteWriter->sn)>0) { //have to be sn>writer_sn
91 csChange=(CSChange*)MALLOC(sizeof(CSChange));
92 csChange->cdrCodec.buffer=NULL;
95 csChange->alive=ORTE_TRUE;
96 SeqNumberSub(csChange->gapSN,*sn,*fsn); //setup flag GAP
97 CSChangeAttributes_init_head(csChange);
98 CSTReaderAddCSChange(cstRemoteWriter,csChange);
103 //second case of GAP sn
105 for(i=0;i<numbits;i++) {
107 CDR_get_ulong(cdrCodec,&bitmap);
109 bit=(bitmap & (1<<(31-i%32))) ? 1:0;
111 if (bit_last && !bit) { //end of GAP 1->0
112 if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
113 if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {
114 csChange=(CSChange*)MALLOC(sizeof(CSChange));
115 csChange->cdrCodec.buffer=NULL;
117 csChange->guid=*guid;
118 csChange->alive=ORTE_TRUE;
119 SeqNumberSub(csChange->gapSN,lsn,ssn); //setup flag GAP
120 CSChangeAttributes_init_head(csChange);
121 CSTReaderAddCSChange(cstRemoteWriter,csChange);
125 if (!bit_last && bit) { //begin GAP 0->1
126 ssn=lsn; //start pointer
130 SeqNumberInc(lsn,lsn);
135 if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
136 if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {
137 csChange=(CSChange*)MALLOC(sizeof(CSChange));
138 csChange->cdrCodec.buffer=NULL;
140 csChange->guid=*guid;
141 csChange->alive=ORTE_TRUE;
142 SeqNumberSub(csChange->gapSN,lsn,ssn); //setup flag GAP
143 CSChangeAttributes_init_head(csChange);
144 CSTReaderAddCSChange(cstRemoteWriter,csChange);
150 /**********************************************************************************/
152 RTPSGap(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
153 CSTReader *cstReader=NULL;
154 CSTRemoteWriter *cstRemoteWriter;
155 GUID_RTPS writerGUID;
157 SequenceNumber sn,fsn;
159 CDR_Endianness data_endian;
161 /* next data are sent in big endianing */
162 data_endian=cdrCodec->data_endian;
163 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
166 CDR_get_ulong(cdrCodec,&roid);
169 CDR_get_ulong(cdrCodec,&woid);
171 cdrCodec->data_endian=data_endian;
174 CDR_get_ulong(cdrCodec,&fsn.high);
175 CDR_get_ulong(cdrCodec,&fsn.low);
178 CDR_get_ulong(cdrCodec,&sn.high);
179 CDR_get_ulong(cdrCodec,&sn.low);
182 CDR_get_ulong(cdrCodec,&numbits);
184 writerGUID.hid=mi->sourceHostId;
185 writerGUID.aid=mi->sourceAppId;
188 debug(49,3) ("recv: RTPS_GAP(0x%x) from 0x%x-0x%x\n",
189 woid,mi->sourceHostId,mi->sourceAppId);
192 if ((d->guid.aid & 0x03)==MANAGER) {
193 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
194 ((writerGUID.aid & 0x03)==MANAGER)) {
195 pthread_rwlock_wrlock(&d->readerManagers.lock);
196 cstReader=&d->readerManagers;
198 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
199 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
200 ((writerGUID.oid==OID_WRITE_APP) &&
201 ((writerGUID.aid & 0x03)==MANAGER))) {
202 pthread_rwlock_wrlock(&d->readerApplications.lock);
203 cstReader=&d->readerApplications;
208 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
209 switch (writerGUID.oid) {
211 pthread_rwlock_wrlock(&d->readerManagers.lock);
212 cstReader=&d->readerManagers;
215 pthread_rwlock_wrlock(&d->readerApplications.lock);
216 cstReader=&d->readerApplications;
219 pthread_rwlock_wrlock(&d->readerPublications.lock);
220 cstReader=&d->readerPublications;
223 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
224 cstReader=&d->readerSubscriptions;
229 if (!cstReader) return;
230 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
231 if (!cstRemoteWriter) {
232 pthread_rwlock_unlock(&cstReader->lock);
236 RTPSGapAdd(cstRemoteWriter,&writerGUID,&fsn,&sn,numbits,
239 CSTReaderProcCSChanges(d,cstRemoteWriter);
240 pthread_rwlock_unlock(&cstReader->lock);