]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSIssue.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSIssue.c
1 /*
2  *  $Id: RTPSIssue.c,v 0.0.0.1          2003/12/08
3  *
4  *  DEBUG:  section 56                  message ISSUE
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /**********************************************************************************/
35 int
36 RTPSIssueCreateHeader(CDR_Codec *cdrCodec, uint32_t length,
37                       ObjectId roid, ObjectId woid, SequenceNumber sn)
38 {
39   CDR_Endianness     data_endian;
40   CORBA_octet        flags;
41
42   if (cdrCodec->buf_len < cdrCodec->wptr+20)
43     return -1;
44
45   /* submessage id */
46   CDR_put_octet(cdrCodec, ISSUE);
47
48   /* flags */
49   flags = cdrCodec->data_endian;
50   CDR_put_octet(cdrCodec, flags);
51
52   /* length */
53   CDR_put_ushort(cdrCodec, (CORBA_unsigned_short)length);
54
55   data_endian = cdrCodec->data_endian;
56   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
57
58   /* readerObjectId */
59   CDR_put_ulong(cdrCodec, roid);
60
61   /* writerObjectId */
62   CDR_put_ulong(cdrCodec, woid);
63
64   cdrCodec->data_endian = data_endian;
65
66   CDR_put_ulong(cdrCodec, sn.high);
67   CDR_put_ulong(cdrCodec, sn.low);
68   return 0;
69 }
70
71 /**********************************************************************************/
72 void
73 RTPSIssue(ORTEDomain *d, CDR_Codec *cdrCodec, MessageInterpret *mi, IPAddress senderIPAddress)
74 {
75   GUID_RTPS          guid, writerGUID;
76   ObjectId           roid, woid;
77   SequenceNumber     sn, sn_tmp;
78   CORBA_octet        flags;
79   CORBA_unsigned_short submsg_len;
80   CSTReader          *cstReader;
81   CSTRemoteWriter    *cstRemoteWriter;
82   CSChange           *csChange = NULL;
83   CDR_Endianness     data_endian;
84
85   /* restore flag possition in submessage */
86   cdrCodec->rptr -= 3;
87
88   /* flags */
89   CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
90
91   /* submessage length */
92   CDR_get_ushort(cdrCodec, &submsg_len);
93
94   /* next data are sent in big endianing */
95   data_endian = cdrCodec->data_endian;
96   cdrCodec->data_endian = FLAG_BIG_ENDIAN;
97
98   /* readerObjectId */
99   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
100
101   /* writerObjectId */
102   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
103
104   cdrCodec->data_endian = data_endian;
105
106   /* sn */
107   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
108   CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
109
110   /* at this moment is not supported p_bit */
111   if (flags & 0x02)
112     return;                                     /* p_bit */
113
114   writerGUID.hid = mi->sourceHostId;
115   writerGUID.aid = mi->sourceAppId;
116   writerGUID.oid = woid;
117
118   debug(56, 3) ("recv: RTPS_ISSUE(0x%x) from 0x%x-0x%x\n",
119                 woid, mi->sourceHostId, mi->sourceAppId);
120
121   pthread_rwlock_rdlock(&d->subscriptions.lock);
122   guid = d->guid;
123   guid.oid = roid;
124
125   gavl_cust_for_each(CSTReader, &d->subscriptions, cstReader) {
126     if (roid != OID_UNKNOWN)
127       cstReader = CSTReader_find(&d->subscriptions, &guid);
128     if (cstReader) {
129       ORTESubsProp *sp;
130       pthread_rwlock_wrlock(&cstReader->lock);
131       sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
132       cstRemoteWriter = CSTRemoteWriter_find(cstReader, &writerGUID);
133       if (cstRemoteWriter) {
134         ORTEPublProp *pp, *pps;
135         pp = (ORTEPublProp *)cstRemoteWriter->spobject->attributes;
136         if (cstReader->cstRemoteWriterSubscribed != NULL) {
137           pps = (ORTEPublProp *)cstReader->cstRemoteWriterSubscribed->
138                 spobject->attributes;
139           if ((pp->strength > pps->strength) || (NtpTimeCmp(pps->persistence, zNtpTime) == 0)) {
140             cstReader->cstRemoteWriterSubscribed = cstRemoteWriter;
141           }
142         } else {
143           cstReader->cstRemoteWriterSubscribed = cstRemoteWriter;
144         }
145         if (cstReader->cstRemoteWriterSubscribed == cstRemoteWriter) {
146           eventDetach(d,
147                       cstReader->objectEntryOID->objectEntryAID,
148                       &cstReader->persistenceTimer,
149                       0); //common timer
150           eventAdd(d,
151                    cstReader->objectEntryOID->objectEntryAID,
152                    &cstReader->persistenceTimer,
153                    0, //common timer
154                    "CSTReaderPersistenceTimer",
155                    CSTReaderPersistenceTimer,
156                    &cstReader->lock,
157                    cstReader,
158                    &pp->persistence);
159         }
160
161         if ((SeqNumberCmp(sn, cstRemoteWriter->sn) > 0) &&   //have to be sn>writer_sn
162             (CSChangeFromWriter_find(cstRemoteWriter, &sn) == NULL)) {
163
164           csChange = (CSChange *)MALLOC(sizeof(CSChange));
165           csChange->guid = writerGUID;
166           csChange->sn = sn;
167           SEQUENCE_NUMBER_NONE(csChange->gapSN);
168           CSChangeAttributes_init_head(csChange);
169
170           CDR_codec_init_static(&csChange->cdrCodec);
171           CDR_buffer_init(&csChange->cdrCodec,
172                           submsg_len-16);
173           csChange->cdrCodec.data_endian = cdrCodec->data_endian;
174
175           memcpy(csChange->cdrCodec.buffer,
176                  &cdrCodec->buffer[cdrCodec->rptr], submsg_len-16);
177
178           if (SeqNumberCmp(sn, cstRemoteWriter->firstSN) >= 0) { //sn>=firstSN
179             if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT) != 0) {
180               if (sp->recvQueueSize > cstRemoteWriter->csChangesCounter) {
181                 sn_tmp.high = 0;
182                 sn_tmp.low = sp->recvQueueSize;
183                 SeqNumberAdd(sn_tmp,
184                              cstRemoteWriter->sn,
185                              sn_tmp);
186                 if (SeqNumberCmp(sn, sn_tmp) <= 0) {         //sn<=(firstSN+QueueSize)
187                   csChange->remoteTimePublished = mi->timestamp;
188                   csChange->localTimeReceived = getActualNtpTime();
189                   CSTReaderAddCSChange(cstRemoteWriter, csChange);
190                   csChange = NULL;
191                 }
192               }
193             } else {
194               if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0) {
195                 if ((sp->recvQueueSize <= cstRemoteWriter->csChangesCounter) ||
196                     (cstReader->cstRemoteWriterSubscribed != cstRemoteWriter)) {
197                   CSChangeFromWriter *csChangeFromWriter;
198                   csChangeFromWriter = CSChangeFromWriter_first(cstRemoteWriter);
199                   CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
200                                                      csChangeFromWriter,
201                                                      ORTE_FALSE);
202                 }
203                 if (sp->recvQueueSize > cstRemoteWriter->csChangesCounter) {
204                   csChange->remoteTimePublished = mi->timestamp;
205                   csChange->localTimeReceived = getActualNtpTime();
206                   CSTReaderAddCSChange(cstRemoteWriter, csChange);
207                   csChange = NULL;
208                 }
209               }
210             }
211           }
212         }
213         if (csChange) {
214           FREE(csChange->cdrCodec.buffer);
215           FREE(csChange);
216         }
217         CSTReaderProcCSChangesIssue(cstRemoteWriter, ORTE_FALSE);
218       }
219       pthread_rwlock_unlock(&cstReader->lock);
220     } else
221       break;  //break traceing all cstReaders
222   }
223   pthread_rwlock_unlock(&d->subscriptions.lock);
224 }