2 * $Id: RTPSGap.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 49 RTPS message GAP
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSGapCreate(CDR_Codec *cdrCodec,ObjectId roid,ObjectId woid,CSChange *csChange)
38 CDR_Endianness data_endian;
42 if (cdrCodec->buf_len<cdrCodec->wptr+32) return -1;
45 CDR_put_octet(cdrCodec,GAP);
48 flags=cdrCodec->data_endian;
49 CDR_put_octet(cdrCodec,flags);
52 CDR_put_ushort(cdrCodec,28);
54 /* next data are sent in big endianing */
55 data_endian=cdrCodec->data_endian;
56 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
59 CDR_put_ulong(cdrCodec,roid);
62 CDR_put_ulong(cdrCodec,woid);
64 cdrCodec->data_endian=data_endian;
67 CDR_put_ulong(cdrCodec,csChange->sn.high);
68 CDR_put_ulong(cdrCodec,csChange->sn.low);
74 CDR_put_ulong(cdrCodec,bsn.high);
75 CDR_put_ulong(cdrCodec,bsn.low);
78 CDR_put_ulong(cdrCodec,0);
83 /*****************************************************************************/
85 RTPSGapAdd(CSTRemoteWriter *cstRemoteWriter,GUID_RTPS *guid,SequenceNumber *fsn,
86 SequenceNumber *sn,uint32_t numbits,CDR_Codec *cdrCodec) {
87 SequenceNumber lsn,ssn;
89 int8_t bit,bit_last=0;
93 if (SeqNumberCmp(*sn,cstRemoteWriter->sn)<0) return;//have to be sn>=writer_sn !
94 if (SeqNumberCmp(*fsn,*sn)==1) return; //cannot be fsn>sn !
95 if (numbits>256) return;
97 //first case of GAP sn
98 if (SeqNumberCmp(*fsn,*sn)<0) { //if fsn<sn
99 if (CSChangeFromWriter_find(cstRemoteWriter,fsn)) {
100 CSTReaderDestroyCSChange(cstRemoteWriter,
103 csChange=(CSChange*)MALLOC(sizeof(CSChange));
104 csChange->cdrCodec.buffer=NULL;
106 csChange->guid=*guid;
107 csChange->alive=ORTE_TRUE;
108 SeqNumberSub(csChange->gapSN,*sn,*fsn); //setup flag GAP
109 CSChangeAttributes_init_head(csChange);
110 CSTReaderAddCSChange(cstRemoteWriter,csChange);
113 //second case of GAP sn
115 for(i=0;i<numbits;i++) {
117 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&bitmap);
119 bit=(bitmap & (1<<(31-i%32))) ? 1:0;
121 if (bit_last && !bit) { //end of GAP 1->0
122 if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
123 if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {
124 csChange=(CSChange*)MALLOC(sizeof(CSChange));
125 csChange->cdrCodec.buffer=NULL;
127 csChange->guid=*guid;
128 csChange->alive=ORTE_TRUE;
129 SeqNumberSub(csChange->gapSN,lsn,ssn); //setup flag GAP
130 CSChangeAttributes_init_head(csChange);
131 CSTReaderAddCSChange(cstRemoteWriter,csChange);
135 if (!bit_last && bit) { //begin GAP 0->1
136 ssn=lsn; //start pointer
140 SeqNumberInc(lsn,lsn);
145 if (!CSChangeFromWriter_find(cstRemoteWriter,&ssn)) {
146 if (SeqNumberCmp(ssn,cstRemoteWriter->sn)>0) {
147 csChange=(CSChange*)MALLOC(sizeof(CSChange));
148 csChange->cdrCodec.buffer=NULL;
150 csChange->guid=*guid;
151 csChange->alive=ORTE_TRUE;
152 SeqNumberSub(csChange->gapSN,lsn,ssn); //setup flag GAP
153 CSChangeAttributes_init_head(csChange);
154 CSTReaderAddCSChange(cstRemoteWriter,csChange);
160 /**********************************************************************************/
162 RTPSGap(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
163 CSTReader *cstReader=NULL;
164 CSTRemoteWriter *cstRemoteWriter;
165 GUID_RTPS writerGUID;
167 SequenceNumber sn,fsn;
169 CDR_Endianness data_endian;
171 /* next data are sent in big endianing */
172 data_endian=cdrCodec->data_endian;
173 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
176 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
179 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
181 cdrCodec->data_endian=data_endian;
184 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.high);
185 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&fsn.low);
188 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
189 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
192 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&numbits);
194 writerGUID.hid=mi->sourceHostId;
195 writerGUID.aid=mi->sourceAppId;
198 debug(49,3) ("recv: RTPS_GAP(0x%x) from 0x%x-0x%x fSN:%d, bSN:%d, numbits:%d\n",
199 woid,mi->sourceHostId,mi->sourceAppId,fsn.low,sn.low,numbits);
202 if ((d->guid.aid & 0x03)==MANAGER) {
203 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
204 ((writerGUID.aid & 0x03)==MANAGER)) {
205 pthread_rwlock_wrlock(&d->readerManagers.lock);
206 cstReader=&d->readerManagers;
208 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
209 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
210 ((writerGUID.oid==OID_WRITE_APP) &&
211 ((writerGUID.aid & 0x03)==MANAGER))) {
212 pthread_rwlock_wrlock(&d->readerApplications.lock);
213 cstReader=&d->readerApplications;
218 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
219 switch (writerGUID.oid) {
221 pthread_rwlock_wrlock(&d->readerManagers.lock);
222 cstReader=&d->readerManagers;
225 pthread_rwlock_wrlock(&d->readerApplications.lock);
226 cstReader=&d->readerApplications;
229 pthread_rwlock_wrlock(&d->readerPublications.lock);
230 cstReader=&d->readerPublications;
233 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
234 cstReader=&d->readerSubscriptions;
239 if (!cstReader) return;
240 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
241 if (!cstRemoteWriter) {
242 pthread_rwlock_unlock(&cstReader->lock);
246 RTPSGapAdd(cstRemoteWriter,&writerGUID,&fsn,&sn,numbits,
249 CSTReaderProcCSChanges(d,cstRemoteWriter);
250 pthread_rwlock_unlock(&cstReader->lock);