2 * $Id: RTPSIssue.c,v 0.0.0.1 2003/12/08
4 * DEBUG: section 56 message ISSUE
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /**********************************************************************************/
26 RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
27 ObjectId roid,ObjectId woid,SequenceNumber sn) {
28 CDR_Endianness data_endian;
31 if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
34 CDR_put_octet(cdrCodec,ISSUE);
37 flags=cdrCodec->data_endian;
38 CDR_put_octet(cdrCodec,flags);
41 CDR_put_ushort(cdrCodec,length);
43 data_endian=cdrCodec->data_endian;
44 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
47 CDR_put_ulong(cdrCodec,roid);
50 CDR_put_ulong(cdrCodec,woid);
52 cdrCodec->data_endian=data_endian;
54 CDR_put_ulong(cdrCodec,sn.high);
55 CDR_put_ulong(cdrCodec,sn.low);
59 /**********************************************************************************/
61 RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
62 GUID_RTPS guid,writerGUID;
64 SequenceNumber sn,sn_tmp;
66 CORBA_unsigned_short submsg_len;
68 CSTRemoteWriter *cstRemoteWriter;
69 CSChange *csChange=NULL;
70 CDR_Endianness data_endian;
72 /* restore flag possition in submessage */
76 CDR_get_octet(cdrCodec,&flags);
78 /* submessage length */
79 CDR_get_ushort(cdrCodec,&submsg_len);
81 /* next data are sent in big endianing */
82 data_endian=cdrCodec->data_endian;
83 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
86 CDR_get_ulong(cdrCodec,&roid);
89 CDR_get_ulong(cdrCodec,&woid);
91 cdrCodec->data_endian=data_endian;
94 CDR_get_ulong(cdrCodec,&sn.high);
95 CDR_get_ulong(cdrCodec,&sn.low);
97 /* at this moment is not supported p_bit */
98 if (flags & 0x02) return; /* p_bit */
100 writerGUID.hid=mi->sourceHostId;
101 writerGUID.aid=mi->sourceAppId;
104 debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
105 woid,mi->sourceHostId,mi->sourceAppId);
107 pthread_rwlock_rdlock(&d->subscriptions.lock);
111 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
112 if (roid!=OID_UNKNOWN)
113 cstReader=CSTReader_find(&d->subscriptions,&guid);
116 pthread_rwlock_wrlock(&cstReader->lock);
117 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
118 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
119 if (cstRemoteWriter) {
120 ORTEPublProp *pp,*pps;
121 pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
122 if (cstReader->cstRemoteWriterSubscribed!=NULL) {
123 pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
124 spobject->attributes;
125 if (pp->strength>pps->strength) {
126 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
129 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
131 if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
133 cstReader->objectEntryOID->objectEntryAID,
134 &cstReader->persistenceTimer,
137 cstReader->objectEntryOID->objectEntryAID,
138 &cstReader->persistenceTimer,
140 "CSTReaderPersistenceTimer",
141 CSTReaderPersistenceTimer,
147 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) && //have to be sn>writer_sn
148 (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
150 csChange=(CSChange*)MALLOC(sizeof(CSChange));
151 csChange->guid=writerGUID;
153 SEQUENCE_NUMBER_NONE(csChange->gapSN);
154 CSChangeAttributes_init_head(csChange);
156 CDR_codec_init_static(&csChange->cdrCodec);
157 CDR_buffer_init(&csChange->cdrCodec,
159 csChange->cdrCodec.data_endian=cdrCodec->data_endian;
161 memcpy(csChange->cdrCodec.buffer,
162 &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
164 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
165 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
166 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
168 sn_tmp.low=sp->recvQueueSize;
172 if (SeqNumberCmp(sn,sn_tmp)<=0) { //sn<=(firstSN+QueueSize)
173 csChange->remoteTimePublished=mi->timestamp;
174 csChange->localTimeReceived=getActualNtpTime();
175 CSTReaderAddCSChange(cstRemoteWriter,csChange);
180 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
181 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
182 (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
183 CSChangeFromWriter *csChangeFromWriter;
184 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
185 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
189 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
190 csChange->remoteTimePublished=mi->timestamp;
191 csChange->localTimeReceived=getActualNtpTime();
192 CSTReaderAddCSChange(cstRemoteWriter,csChange);
200 FREE(csChange->cdrCodec.buffer);
203 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
205 pthread_rwlock_unlock(&cstReader->lock);
207 break; //break traceing all cstReaders
209 pthread_rwlock_unlock(&d->subscriptions.lock);