]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSIssue.c
be025cf331e69a1abacd49aa6e47572c2d3c553d
[orte.git] / orte / liborte / RTPSIssue.c
1 /*
2  *  $Id: RTPSIssue.c,v 0.0.0.1          2003/12/08
3  *
4  *  DEBUG:  section 56                  message ISSUE
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23 /**********************************************************************************/
24 int32_t
25 RTPSIssueCreateHeader(u_int8_t *rtps_msg,u_int32_t max_msg_len,u_int32_t length,
26     ObjectId roid,ObjectId woid,SequenceNumber sn) {
27   
28   if (max_msg_len<20) return -1;
29   rtps_msg[0]=(u_int8_t)ISSUE;
30   rtps_msg[1]=ORTE_MY_MBO;
31   *((ParameterLength*)(rtps_msg+2))=(u_int16_t)length;
32   conv_u32(&roid,0);
33   *((ObjectId*)(rtps_msg+4))=roid;
34   conv_u32(&woid,0);
35   *((ObjectId*)(rtps_msg+8))=woid;
36   *((SequenceNumber*)(rtps_msg+12))=sn;
37   return 0;
38 }
39
40 /**********************************************************************************/
41 void 
42 RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
43   GUID_RTPS          guid,writerGUID;
44   ObjectId           roid,woid;
45   SequenceNumber     sn,sn_tmp;   
46   int8_t             e_bit,p_bit;
47   u_int16_t          submsg_len;
48   CSTReader          *cstReader;
49   CSTRemoteWriter    *cstRemoteWriter;
50   CSChange           *csChange=NULL;
51
52   e_bit=rtps_msg[1] & 0x01;
53   p_bit=(rtps_msg[1] & 0x02)>>1;
54   submsg_len=*((u_int16_t*)(rtps_msg+2));
55   conv_u16(&submsg_len,e_bit);
56   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
57   conv_u32(&roid,0);
58   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
59   conv_u32(&woid,0);
60   sn=*((SequenceNumber*)(rtps_msg+12));         /* sn             */
61   conv_sn(&sn,e_bit);
62   if (p_bit) return;       /* at this moment is not supported p_bit */
63   writerGUID.hid=mi->sourceHostId;
64   writerGUID.aid=mi->sourceAppId;
65   writerGUID.oid=woid;
66   
67   debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
68                 woid,mi->sourceHostId,mi->sourceAppId);
69
70   pthread_rwlock_rdlock(&d->subscriptions.lock);
71   guid=d->guid;
72   guid.oid=roid;
73   gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
74     if (roid!=OID_UNKNOWN)
75       cstReader=CSTReader_find(&d->subscriptions,&guid);
76     if (cstReader) {
77       ORTESubsProp *sp;
78       pthread_rwlock_wrlock(&cstReader->lock);
79       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
80       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
81       if (cstRemoteWriter) {
82         ORTEPublProp *pp,*pps;
83         pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
84         if (cstReader->cstRemoteWriterSubscribed!=NULL) {
85           pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
86                             objectEntryOID->attributes;
87           if (pp->strength>pps->strength) {
88             cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
89           }
90         } else {
91           cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
92         }
93         if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
94           eventDetach(d,
95               cstReader->objectEntryOID->objectEntryAID,
96               &cstReader->persistenceTimer,
97               0);   //common timer
98           eventAdd(d,
99               cstReader->objectEntryOID->objectEntryAID,
100               &cstReader->persistenceTimer,
101               0,   //common timer
102               "CSTReaderPersistenceTimer",
103               CSTReaderPersistenceTimer,
104               &cstReader->lock,
105               cstReader,
106               &pp->persistence);
107         }
108         if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) &&   //have to be sn>writer_sn
109             (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
110           csChange=(CSChange*)MALLOC(sizeof(CSChange));
111           csChange->cdrStream.buffer=NULL;
112           csChange->guid=writerGUID;
113           csChange->sn=sn;
114           SEQUENCE_NUMBER_NONE(csChange->gapSN);
115           CSChangeAttributes_init_head(csChange);
116           csChange->cdrStream.length=submsg_len-16;
117           csChange->cdrStream.buffer=(int8_t*)MALLOC(submsg_len-16);
118           csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer;
119           csChange->cdrStream.needByteSwap=ORTE_FALSE;
120           if (e_bit ^ ORTE_MY_MBO)        
121             csChange->cdrStream.needByteSwap=ORTE_TRUE;
122           memcpy(csChange->cdrStream.buffer,rtps_msg+20,submsg_len-16);
123           if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
124             if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
125               if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
126                 sn_tmp.high=0;
127                 sn_tmp.low=sp->recvQueueSize;
128                 SeqNumberAdd(sn_tmp,
129                              cstRemoteWriter->sn,
130                              sn_tmp);
131                 if (SeqNumberCmp(sn,sn_tmp)<=0) {         //sn<=(firstSN+QueueSize)
132                   csChange->remoteTimePublished=mi->timestamp;
133                   csChange->localTimeReceived=getActualNtpTime();
134                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
135                   csChange=NULL;
136                 }
137               }
138             } else {
139               if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
140                 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
141                     (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
142                   CSChangeFromWriter *csChangeFromWriter;
143                   csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
144                   CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
145                         csChangeFromWriter,
146                         ORTE_FALSE);
147                 }
148                 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
149                   csChange->remoteTimePublished=mi->timestamp;
150                   csChange->localTimeReceived=getActualNtpTime();
151                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
152                   csChange=NULL;
153                 }
154               }
155             }
156           } 
157         }
158         if (csChange) {
159           FREE(csChange->cdrStream.buffer);
160           FREE(csChange);
161         }
162         CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
163       }
164       pthread_rwlock_unlock(&cstReader->lock);  
165     } else
166       break;  //break traceing all cstReaders
167   }
168   pthread_rwlock_unlock(&d->subscriptions.lock);
169 }