]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSIssue.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / RTPSIssue.c
1 /*
2  *  $Id: RTPSIssue.c,v 0.0.0.1          2003/12/08
3  *
4  *  DEBUG:  section 56                  message ISSUE
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /**********************************************************************************/
25 int
26 RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
27     ObjectId roid,ObjectId woid,SequenceNumber sn) {
28   CDR_Endianness     data_endian;
29   CORBA_octet        flags;
30   
31   if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
32
33   /* submessage id */
34   CDR_put_octet(cdrCodec,ISSUE);
35
36   /* flags */
37   flags=cdrCodec->data_endian;
38   CDR_put_octet(cdrCodec,flags);
39
40   /* length */
41   CDR_put_ushort(cdrCodec,length);
42
43   data_endian=cdrCodec->data_endian;
44   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
45
46   /* readerObjectId */
47   CDR_put_ulong(cdrCodec,roid);
48
49   /* writerObjectId */
50   CDR_put_ulong(cdrCodec,woid);
51
52   cdrCodec->data_endian=data_endian;
53   
54   CDR_put_ulong(cdrCodec,sn.high);
55   CDR_put_ulong(cdrCodec,sn.low);
56   return 0;
57 }
58
59 /**********************************************************************************/
60 void 
61 RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
62   GUID_RTPS          guid,writerGUID;
63   ObjectId           roid,woid;
64   SequenceNumber     sn,sn_tmp; 
65   CORBA_octet        flags;  
66   CORBA_unsigned_short submsg_len;
67   CSTReader          *cstReader;
68   CSTRemoteWriter    *cstRemoteWriter;
69   CSChange           *csChange=NULL;
70   CDR_Endianness     data_endian;
71
72   /* restore flag possition in submessage */
73   cdrCodec->rptr-=3;
74
75   /* flags */
76   CDR_get_octet(cdrCodec,&flags);
77
78   /* submessage length */
79   CDR_get_ushort(cdrCodec,&submsg_len);
80
81   /* next data are sent in big endianing */
82   data_endian=cdrCodec->data_endian;
83   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
84
85   /* readerObjectId */
86   CDR_get_ulong(cdrCodec,&roid);
87
88   /* writerObjectId */
89   CDR_get_ulong(cdrCodec,&woid);
90
91   cdrCodec->data_endian=data_endian;
92
93   /* sn */
94   CDR_get_ulong(cdrCodec,&sn.high);
95   CDR_get_ulong(cdrCodec,&sn.low);
96
97   /* at this moment is not supported p_bit */
98   if (flags & 0x02) return;                     /* p_bit */
99
100   writerGUID.hid=mi->sourceHostId;
101   writerGUID.aid=mi->sourceAppId;
102   writerGUID.oid=woid;
103   
104   debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
105                 woid,mi->sourceHostId,mi->sourceAppId);
106
107   pthread_rwlock_rdlock(&d->subscriptions.lock);
108   guid=d->guid;
109   guid.oid=roid;
110
111   gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
112     if (roid!=OID_UNKNOWN)
113       cstReader=CSTReader_find(&d->subscriptions,&guid);
114     if (cstReader) {
115       ORTESubsProp *sp;
116       pthread_rwlock_wrlock(&cstReader->lock);
117       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
118       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
119       if (cstRemoteWriter) {
120         ORTEPublProp *pp,*pps;
121         pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
122         if (cstReader->cstRemoteWriterSubscribed!=NULL) {
123           pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
124                             spobject->attributes;
125           if (pp->strength>pps->strength) {
126             cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
127           }
128         } else {
129           cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
130         }
131         if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
132           eventDetach(d,
133               cstReader->objectEntryOID->objectEntryAID,
134               &cstReader->persistenceTimer,
135               0);   //common timer
136           eventAdd(d,
137               cstReader->objectEntryOID->objectEntryAID,
138               &cstReader->persistenceTimer,
139               0,   //common timer
140               "CSTReaderPersistenceTimer",
141               CSTReaderPersistenceTimer,
142               &cstReader->lock,
143               cstReader,
144               &pp->persistence);
145         }
146
147         if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) &&   //have to be sn>writer_sn
148             (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
149
150           csChange=(CSChange*)MALLOC(sizeof(CSChange));
151           csChange->guid=writerGUID;
152           csChange->sn=sn;
153           SEQUENCE_NUMBER_NONE(csChange->gapSN);
154           CSChangeAttributes_init_head(csChange);
155
156           CDR_codec_init_static(&csChange->cdrCodec);
157           CDR_buffer_init(&csChange->cdrCodec,
158                           submsg_len-16);
159           csChange->cdrCodec.data_endian=cdrCodec->data_endian;
160
161           memcpy(csChange->cdrCodec.buffer,
162                  &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
163
164           if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
165             if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
166               if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
167                 sn_tmp.high=0;
168                 sn_tmp.low=sp->recvQueueSize;
169                 SeqNumberAdd(sn_tmp,
170                              cstRemoteWriter->sn,
171                              sn_tmp);
172                 if (SeqNumberCmp(sn,sn_tmp)<=0) {         //sn<=(firstSN+QueueSize)
173                   csChange->remoteTimePublished=mi->timestamp;
174                   csChange->localTimeReceived=getActualNtpTime();
175                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
176                   csChange=NULL;
177                 }
178               }
179             } else {
180               if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
181                 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
182                     (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
183                   CSChangeFromWriter *csChangeFromWriter;
184                   csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
185                   CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
186                         csChangeFromWriter,
187                         ORTE_FALSE);
188                 }
189                 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
190                   csChange->remoteTimePublished=mi->timestamp;
191                   csChange->localTimeReceived=getActualNtpTime();
192                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
193                   csChange=NULL;
194                 }
195               }
196             }
197           } 
198         }
199         if (csChange) {
200           FREE(csChange->cdrCodec.buffer);
201           FREE(csChange);
202         }
203         CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
204       }
205       pthread_rwlock_unlock(&cstReader->lock);  
206     } else
207       break;  //break traceing all cstReaders
208   }
209   pthread_rwlock_unlock(&d->subscriptions.lock);
210 }