2 * $Id: RTPSIssue.c,v 0.0.0.1 2003/12/08
4 * DEBUG: section 56 message ISSUE
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
37 ObjectId roid,ObjectId woid,SequenceNumber sn) {
38 CDR_Endianness data_endian;
41 if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
44 CDR_put_octet(cdrCodec,ISSUE);
47 flags=cdrCodec->data_endian;
48 CDR_put_octet(cdrCodec,flags);
51 CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)length);
53 data_endian=cdrCodec->data_endian;
54 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
57 CDR_put_ulong(cdrCodec,roid);
60 CDR_put_ulong(cdrCodec,woid);
62 cdrCodec->data_endian=data_endian;
64 CDR_put_ulong(cdrCodec,sn.high);
65 CDR_put_ulong(cdrCodec,sn.low);
69 /**********************************************************************************/
71 RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
72 GUID_RTPS guid,writerGUID;
74 SequenceNumber sn,sn_tmp;
76 CORBA_unsigned_short submsg_len;
78 CSTRemoteWriter *cstRemoteWriter;
79 CSChange *csChange=NULL;
80 CDR_Endianness data_endian;
82 /* restore flag possition in submessage */
86 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
88 /* submessage length */
89 CDR_get_ushort(cdrCodec,&submsg_len);
91 /* next data are sent in big endianing */
92 data_endian=cdrCodec->data_endian;
93 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
96 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
99 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
101 cdrCodec->data_endian=data_endian;
104 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
105 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
107 /* at this moment is not supported p_bit */
108 if (flags & 0x02) return; /* p_bit */
110 writerGUID.hid=mi->sourceHostId;
111 writerGUID.aid=mi->sourceAppId;
114 debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
115 woid,mi->sourceHostId,mi->sourceAppId);
117 pthread_rwlock_rdlock(&d->subscriptions.lock);
121 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
122 if (roid!=OID_UNKNOWN)
123 cstReader=CSTReader_find(&d->subscriptions,&guid);
126 pthread_rwlock_wrlock(&cstReader->lock);
127 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
128 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
129 if (cstRemoteWriter) {
130 ORTEPublProp *pp,*pps;
131 pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
132 if (cstReader->cstRemoteWriterSubscribed!=NULL) {
133 pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
134 spobject->attributes;
135 if ((pp->strength>pps->strength) || (NtpTimeCmp(pps->persistence,zNtpTime)==0)) {
136 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
139 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
141 if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
143 cstReader->objectEntryOID->objectEntryAID,
144 &cstReader->persistenceTimer,
147 cstReader->objectEntryOID->objectEntryAID,
148 &cstReader->persistenceTimer,
150 "CSTReaderPersistenceTimer",
151 CSTReaderPersistenceTimer,
157 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) && //have to be sn>writer_sn
158 (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
160 csChange=(CSChange*)MALLOC(sizeof(CSChange));
161 csChange->guid=writerGUID;
163 SEQUENCE_NUMBER_NONE(csChange->gapSN);
164 CSChangeAttributes_init_head(csChange);
166 CDR_codec_init_static(&csChange->cdrCodec);
167 CDR_buffer_init(&csChange->cdrCodec,
169 csChange->cdrCodec.data_endian=cdrCodec->data_endian;
171 memcpy(csChange->cdrCodec.buffer,
172 &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
174 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
175 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
176 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
178 sn_tmp.low=sp->recvQueueSize;
182 if (SeqNumberCmp(sn,sn_tmp)<=0) { //sn<=(firstSN+QueueSize)
183 csChange->remoteTimePublished=mi->timestamp;
184 csChange->localTimeReceived=getActualNtpTime();
185 CSTReaderAddCSChange(cstRemoteWriter,csChange);
190 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
191 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
192 (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
193 CSChangeFromWriter *csChangeFromWriter;
194 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
195 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
199 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
200 csChange->remoteTimePublished=mi->timestamp;
201 csChange->localTimeReceived=getActualNtpTime();
202 CSTReaderAddCSChange(cstRemoteWriter,csChange);
210 FREE(csChange->cdrCodec.buffer);
213 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
215 pthread_rwlock_unlock(&cstReader->lock);
217 break; //break traceing all cstReaders
219 pthread_rwlock_unlock(&d->subscriptions.lock);