2 * $Id: RTPSIssue.c,v 0.0.0.1 2003/12/08
4 * DEBUG: section 56 message ISSUE
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /**********************************************************************************/
36 RTPSIssueCreateHeader(CDR_Codec *cdrCodec, uint32_t length,
37 ObjectId roid, ObjectId woid, SequenceNumber sn)
39 CDR_Endianness data_endian;
42 if (cdrCodec->buf_len < cdrCodec->wptr+20)
46 CDR_put_octet(cdrCodec, ISSUE);
49 flags = cdrCodec->data_endian;
50 CDR_put_octet(cdrCodec, flags);
53 CDR_put_ushort(cdrCodec, (CORBA_unsigned_short)length);
55 data_endian = cdrCodec->data_endian;
56 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
59 CDR_put_ulong(cdrCodec, roid);
62 CDR_put_ulong(cdrCodec, woid);
64 cdrCodec->data_endian = data_endian;
66 CDR_put_ulong(cdrCodec, sn.high);
67 CDR_put_ulong(cdrCodec, sn.low);
71 /**********************************************************************************/
73 RTPSIssue(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
75 GUID_RTPS guid, writerGUID;
77 SequenceNumber sn, sn_tmp;
79 CORBA_unsigned_short submsg_len;
81 CSTRemoteWriter *cstRemoteWriter;
82 CSChange *csChange = NULL;
83 CDR_Endianness data_endian;
85 /* restore flag possition in submessage */
89 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
91 /* submessage length */
92 CDR_get_ushort(cdrCodec, &submsg_len);
94 /* next data are sent in big endianing */
95 data_endian = cdrCodec->data_endian;
96 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
99 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
102 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
104 cdrCodec->data_endian = data_endian;
107 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
108 CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
110 /* at this moment is not supported p_bit */
114 writerGUID.hid = mi->sourceHostId;
115 writerGUID.aid = mi->sourceAppId;
116 writerGUID.oid = woid;
118 debug(56, 3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
119 woid, mi->sourceHostId, mi->sourceAppId);
121 pthread_rwlock_rdlock(&d->subscriptions.lock);
125 gavl_cust_for_each(CSTReader, &d->subscriptions, cstReader) {
126 if (roid != OID_UNKNOWN)
127 cstReader = CSTReader_find(&d->subscriptions, &guid);
130 pthread_rwlock_wrlock(&cstReader->lock);
131 sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
132 cstRemoteWriter = CSTRemoteWriter_find(cstReader, &writerGUID);
133 if (cstRemoteWriter) {
134 ORTEPublProp *pp, *pps;
135 pp = (ORTEPublProp *)cstRemoteWriter->spobject->attributes;
136 if (cstReader->cstRemoteWriterSubscribed != NULL) {
137 pps = (ORTEPublProp *)cstReader->cstRemoteWriterSubscribed->
138 spobject->attributes;
139 if ((pp->strength > pps->strength) || (NtpTimeCmp(pps->persistence, zNtpTime) == 0)) {
140 cstReader->cstRemoteWriterSubscribed = cstRemoteWriter;
143 cstReader->cstRemoteWriterSubscribed = cstRemoteWriter;
145 if (cstReader->cstRemoteWriterSubscribed == cstRemoteWriter) {
147 cstReader->objectEntryOID->objectEntryAID,
148 &cstReader->persistenceTimer,
151 cstReader->objectEntryOID->objectEntryAID,
152 &cstReader->persistenceTimer,
154 "CSTReaderPersistenceTimer",
155 CSTReaderPersistenceTimer,
161 if ((SeqNumberCmp(sn, cstRemoteWriter->sn) > 0) && //have to be sn>writer_sn
162 (CSChangeFromWriter_find(cstRemoteWriter, &sn) == NULL)) {
164 csChange = (CSChange *)MALLOC(sizeof(CSChange));
165 csChange->guid = writerGUID;
167 SEQUENCE_NUMBER_NONE(csChange->gapSN);
168 CSChangeAttributes_init_head(csChange);
170 CDR_codec_init_static(&csChange->cdrCodec);
171 CDR_buffer_init(&csChange->cdrCodec,
173 csChange->cdrCodec.data_endian = cdrCodec->data_endian;
175 memcpy(csChange->cdrCodec.buffer,
176 &cdrCodec->buffer[cdrCodec->rptr], submsg_len-16);
178 if (SeqNumberCmp(sn, cstRemoteWriter->firstSN) >= 0) { //sn>=firstSN
179 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
180 if (sp->recvQueueSize > cstRemoteWriter->csChangesCounter) {
182 sn_tmp.low = sp->recvQueueSize;
186 if (SeqNumberCmp(sn, sn_tmp) <= 0) { //sn<=(firstSN+QueueSize)
187 csChange->remoteTimePublished = mi->timestamp;
188 csChange->localTimeReceived = getActualNtpTime();
189 CSTReaderAddCSChange(cstRemoteWriter, csChange);
194 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
195 if ((sp->recvQueueSize <= cstRemoteWriter->csChangesCounter) ||
196 (cstReader->cstRemoteWriterSubscribed != cstRemoteWriter)) {
197 CSChangeFromWriter *csChangeFromWriter;
198 csChangeFromWriter = CSChangeFromWriter_first(cstRemoteWriter);
199 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
203 if (sp->recvQueueSize > cstRemoteWriter->csChangesCounter) {
204 csChange->remoteTimePublished = mi->timestamp;
205 csChange->localTimeReceived = getActualNtpTime();
206 CSTReaderAddCSChange(cstRemoteWriter, csChange);
214 FREE(csChange->cdrCodec.buffer);
217 CSTReaderProcCSChangesIssue(cstRemoteWriter, ORTE_FALSE);
219 pthread_rwlock_unlock(&cstReader->lock);
221 break; //break traceing all cstReaders
223 pthread_rwlock_unlock(&d->subscriptions.lock);