2 * $Id: RTPSIssue.c,v 0.0.0.1 2003/12/08
4 * DEBUG: section 56 message ISSUE
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
23 /**********************************************************************************/
25 RTPSIssueCreateHeader(u_int8_t *rtps_msg,u_int32_t max_msg_len,u_int32_t length,
26 ObjectId roid,ObjectId woid,SequenceNumber sn) {
28 if (max_msg_len<20) return -1;
29 rtps_msg[0]=(u_int8_t)ISSUE;
30 rtps_msg[1]=ORTE_MY_MBO;
31 *((ParameterLength*)(rtps_msg+2))=(u_int16_t)length;
33 *((ObjectId*)(rtps_msg+4))=roid;
35 *((ObjectId*)(rtps_msg+8))=woid;
36 *((SequenceNumber*)(rtps_msg+12))=sn;
40 /**********************************************************************************/
42 RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
43 GUID_RTPS guid,writerGUID;
45 SequenceNumber sn,sn_tmp;
49 CSTRemoteWriter *cstRemoteWriter;
50 CSChange *csChange=NULL;
52 e_bit=rtps_msg[1] & 0x01;
53 p_bit=(rtps_msg[1] & 0x02)>>1;
54 submsg_len=*((u_int16_t*)(rtps_msg+2));
55 conv_u16(&submsg_len,e_bit);
56 roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
58 woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
60 sn=*((SequenceNumber*)(rtps_msg+12)); /* sn */
62 if (p_bit) return; /* at this moment is not supported p_bit */
63 writerGUID.hid=mi->sourceHostId;
64 writerGUID.aid=mi->sourceAppId;
67 debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
68 woid,mi->sourceHostId,mi->sourceAppId);
70 pthread_rwlock_rdlock(&d->subscriptions.lock);
73 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
74 if (roid!=OID_UNKNOWN)
75 cstReader=CSTReader_find(&d->subscriptions,&guid);
78 pthread_rwlock_wrlock(&cstReader->lock);
79 sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
80 cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
81 if (cstRemoteWriter) {
82 ORTEPublProp *pp,*pps;
83 pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
84 if (cstReader->cstRemoteWriterSubscribed!=NULL) {
85 pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
86 objectEntryOID->attributes;
87 if (pp->strength>pps->strength) {
88 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
91 cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
93 if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
95 cstReader->objectEntryOID->objectEntryAID,
96 &cstReader->persistenceTimer,
99 cstReader->objectEntryOID->objectEntryAID,
100 &cstReader->persistenceTimer,
102 "CSTReaderPersistenceTimer",
103 CSTReaderPersistenceTimer,
108 if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) && //have to be sn>writer_sn
109 (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
110 csChange=(CSChange*)MALLOC(sizeof(CSChange));
111 csChange->cdrStream.buffer=NULL;
112 csChange->guid=writerGUID;
114 SEQUENCE_NUMBER_NONE(csChange->gapSN);
115 CSChangeAttributes_init_head(csChange);
116 csChange->cdrStream.length=submsg_len-16;
117 csChange->cdrStream.buffer=(int8_t*)MALLOC(submsg_len-16);
118 csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer;
119 csChange->cdrStream.needByteSwap=ORTE_FALSE;
120 if (e_bit ^ ORTE_MY_MBO)
121 csChange->cdrStream.needByteSwap=ORTE_TRUE;
122 memcpy(csChange->cdrStream.buffer,rtps_msg+20,submsg_len-16);
123 if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
124 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
125 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
127 sn_tmp.low=sp->recvQueueSize;
131 if (SeqNumberCmp(sn,sn_tmp)<=0) { //sn<=(firstSN+QueueSize)
132 csChange->remoteTimePublished=mi->timestamp;
133 csChange->localTimeReceived=getActualNtpTime();
134 CSTReaderAddCSChange(cstRemoteWriter,csChange);
139 if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
140 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
141 (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
142 CSChangeFromWriter *csChangeFromWriter;
143 csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
144 CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
148 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
149 csChange->remoteTimePublished=mi->timestamp;
150 csChange->localTimeReceived=getActualNtpTime();
151 CSTReaderAddCSChange(cstRemoteWriter,csChange);
159 FREE(csChange->cdrStream.buffer);
162 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
164 pthread_rwlock_unlock(&cstReader->lock);
166 break; //break traceing all cstReaders
168 pthread_rwlock_unlock(&d->subscriptions.lock);