]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSIssue.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / RTPSIssue.c
1 /*
2  *  $Id: RTPSIssue.c,v 0.0.0.1          2003/12/08
3  *
4  *  DEBUG:  section 56                  message ISSUE
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
37     ObjectId roid,ObjectId woid,SequenceNumber sn) {
38   CDR_Endianness     data_endian;
39   CORBA_octet        flags;
40   
41   if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
42
43   /* submessage id */
44   CDR_put_octet(cdrCodec,ISSUE);
45
46   /* flags */
47   flags=cdrCodec->data_endian;
48   CDR_put_octet(cdrCodec,flags);
49
50   /* length */
51   CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)length);
52
53   data_endian=cdrCodec->data_endian;
54   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
55
56   /* readerObjectId */
57   CDR_put_ulong(cdrCodec,roid);
58
59   /* writerObjectId */
60   CDR_put_ulong(cdrCodec,woid);
61
62   cdrCodec->data_endian=data_endian;
63   
64   CDR_put_ulong(cdrCodec,sn.high);
65   CDR_put_ulong(cdrCodec,sn.low);
66   return 0;
67 }
68
69 /**********************************************************************************/
70 void 
71 RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
72   GUID_RTPS          guid,writerGUID;
73   ObjectId           roid,woid;
74   SequenceNumber     sn,sn_tmp; 
75   CORBA_octet        flags;  
76   CORBA_unsigned_short submsg_len;
77   CSTReader          *cstReader;
78   CSTRemoteWriter    *cstRemoteWriter;
79   CSChange           *csChange=NULL;
80   CDR_Endianness     data_endian;
81
82   /* restore flag possition in submessage */
83   cdrCodec->rptr-=3;
84
85   /* flags */
86   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
87
88   /* submessage length */
89   CDR_get_ushort(cdrCodec,&submsg_len);
90
91   /* next data are sent in big endianing */
92   data_endian=cdrCodec->data_endian;
93   cdrCodec->data_endian=FLAG_BIG_ENDIAN;
94
95   /* readerObjectId */
96   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
97
98   /* writerObjectId */
99   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
100
101   cdrCodec->data_endian=data_endian;
102
103   /* sn */
104   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
105   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
106
107   /* at this moment is not supported p_bit */
108   if (flags & 0x02) return;                     /* p_bit */
109
110   writerGUID.hid=mi->sourceHostId;
111   writerGUID.aid=mi->sourceAppId;
112   writerGUID.oid=woid;
113   
114   debug(56,3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
115                 woid,mi->sourceHostId,mi->sourceAppId);
116
117   pthread_rwlock_rdlock(&d->subscriptions.lock);
118   guid=d->guid;
119   guid.oid=roid;
120
121   gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
122     if (roid!=OID_UNKNOWN)
123       cstReader=CSTReader_find(&d->subscriptions,&guid);
124     if (cstReader) {
125       ORTESubsProp *sp;
126       pthread_rwlock_wrlock(&cstReader->lock);
127       sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
128       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
129       if (cstRemoteWriter) {
130         ORTEPublProp *pp,*pps;
131         pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
132         if (cstReader->cstRemoteWriterSubscribed!=NULL) {
133           pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
134                             spobject->attributes;
135           if ((pp->strength>pps->strength) || (NtpTimeCmp(pps->persistence,zNtpTime)==0)) {
136             cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
137           }
138         } else {
139           cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
140         }
141         if (cstReader->cstRemoteWriterSubscribed==cstRemoteWriter) {
142           eventDetach(d,
143               cstReader->objectEntryOID->objectEntryAID,
144               &cstReader->persistenceTimer,
145               0);   //common timer
146           eventAdd(d,
147               cstReader->objectEntryOID->objectEntryAID,
148               &cstReader->persistenceTimer,
149               0,   //common timer
150               "CSTReaderPersistenceTimer",
151               CSTReaderPersistenceTimer,
152               &cstReader->lock,
153               cstReader,
154               &pp->persistence);
155         }
156
157         if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) &&   //have to be sn>writer_sn
158             (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
159
160           csChange=(CSChange*)MALLOC(sizeof(CSChange));
161           csChange->guid=writerGUID;
162           csChange->sn=sn;
163           SEQUENCE_NUMBER_NONE(csChange->gapSN);
164           CSChangeAttributes_init_head(csChange);
165
166           CDR_codec_init_static(&csChange->cdrCodec);
167           CDR_buffer_init(&csChange->cdrCodec,
168                           submsg_len-16);
169           csChange->cdrCodec.data_endian=cdrCodec->data_endian;
170
171           memcpy(csChange->cdrCodec.buffer,
172                  &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
173
174           if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
175             if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
176               if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
177                 sn_tmp.high=0;
178                 sn_tmp.low=sp->recvQueueSize;
179                 SeqNumberAdd(sn_tmp,
180                              cstRemoteWriter->sn,
181                              sn_tmp);
182                 if (SeqNumberCmp(sn,sn_tmp)<=0) {         //sn<=(firstSN+QueueSize)
183                   csChange->remoteTimePublished=mi->timestamp;
184                   csChange->localTimeReceived=getActualNtpTime();
185                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
186                   csChange=NULL;
187                 }
188               }
189             } else {
190               if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
191                 if ((sp->recvQueueSize<=cstRemoteWriter->csChangesCounter) ||
192                     (cstReader->cstRemoteWriterSubscribed!=cstRemoteWriter)) {
193                   CSChangeFromWriter *csChangeFromWriter;
194                   csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
195                   CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
196                         csChangeFromWriter,
197                         ORTE_FALSE);
198                 }
199                 if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
200                   csChange->remoteTimePublished=mi->timestamp;
201                   csChange->localTimeReceived=getActualNtpTime();
202                   CSTReaderAddCSChange(cstRemoteWriter,csChange);
203                   csChange=NULL;
204                 }
205               }
206             }
207           } 
208         }
209         if (csChange) {
210           FREE(csChange->cdrCodec.buffer);
211           FREE(csChange);
212         }
213         CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
214       }
215       pthread_rwlock_unlock(&cstReader->lock);  
216     } else
217       break;  //break traceing all cstReaders
218   }
219   pthread_rwlock_unlock(&d->subscriptions.lock);
220 }