]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReader.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / RTPSCSTReader.c
1 /*
2  *  $Id: RTPSCSTReader.c,v 0.0.0.1      2003/09/13
3  *
4  *  DEBUG:  section 53                  CSTReader
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(CSTReader,
35                        CSTSubscriptions, CSTReader, GUID_RTPS,
36                        cstReader, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter,
38                        CSTReader, CSTRemoteWriter, GUID_RTPS,
39                        cstRemoteWriter, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
41                        CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
42                        csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
43
44 /*****************************************************************************/
45 void
46 CSTReaderInit(ORTEDomain *d, CSTReader *cstReader, ObjectEntryOID *object,
47               ObjectId oid, CSTReaderParams *params, ORTETypeRegister *typeRegister)
48 {
49
50   debug(53, 10) ("CSTReaderInit: start\n");
51   //init values of cstReader
52   cstReader->guid.hid = object->objectEntryHID->hid;
53   cstReader->guid.aid = object->objectEntryAID->aid;
54   cstReader->guid.oid = oid;
55   cstReader->objectEntryOID = object;
56   memcpy(&cstReader->params, params, sizeof(CSTReaderParams));
57   cstReader->strictReliableCounter = 0;
58   cstReader->bestEffortsCounter = 0;
59   cstReader->cstRemoteWriterCounter = 0;
60   cstReader->createdByPattern = ORTE_FALSE;
61   CSTReaderCSChange_init_head(cstReader);
62   CSTRemoteWriter_init_root_field(cstReader);
63   pthread_rwlock_init(&cstReader->lock, NULL);
64   cstReader->domain = d;
65   cstReader->typeRegister = typeRegister;
66   ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
67   ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
68   cstReader->cstRemoteWriterSubscribed = NULL;
69   if ((oid & 0x07) == OID_SUBSCRIPTION) {
70     ORTESubsProp *sp;
71     sp = (ORTESubsProp *)cstReader->objectEntryOID->attributes;
72     if (NtpTimeCmp(sp->deadline, zNtpTime) != 0) {
73       if (sp->mode == IMMEDIATE) {
74         eventAdd(d,
75                  cstReader->objectEntryOID->objectEntryAID,
76                  &cstReader->deadlineTimer,
77                  0, //common timer
78                  "CSTReaderDeadlineTimer",
79                  CSTReaderDeadlineTimer,
80                  &cstReader->lock,
81                  cstReader,
82                  &sp->deadline);
83       }
84       if (sp->mode == PULLED) {
85         NtpTime timeNext;
86         NtpTimeAdd(timeNext,
87                    (getActualNtpTime()),
88                    sp->deadline);
89         htimerUnicastCommon_set_expire(&cstReader->deadlineTimer, timeNext);
90       }
91     }
92   }
93   debug(53, 4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
94                 cstReader->guid.hid,
95                 cstReader->guid.aid,
96                 cstReader->guid.oid);
97   debug(53, 10) ("CSTReaderInit: finished\n");
98 }
99
100 /*****************************************************************************/
101 void
102 CSTReaderDelete(ORTEDomain *d, CSTReader *cstReader)
103 {
104   CSTRemoteWriter     *cstRemoteWriter;
105
106   debug(53, 10) ("CSTReaderDelete: start\n");
107   debug(53, 4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
108                 cstReader->guid.hid,
109                 cstReader->guid.aid,
110                 cstReader->guid.oid);
111   //Destroy all cstRemoteReader connected on cstWriter
112   while ((cstRemoteWriter = CSTRemoteWriter_first(cstReader)))
113     CSTReaderDestroyRemoteWriter(d, cstRemoteWriter);
114   eventDetach(d,
115               cstReader->objectEntryOID->objectEntryAID,
116               &cstReader->deadlineTimer,
117               0);
118   eventDetach(d,
119               cstReader->objectEntryOID->objectEntryAID,
120               &cstReader->persistenceTimer,
121               0); //common timer
122   pthread_rwlock_destroy(&cstReader->lock);
123   debug(53, 10) ("CSTReaderDelete: finished\n");
124 }
125
126 /*****************************************************************************/
127 CSTRemoteWriter *
128 CSTReaderAddRemoteWriter(ORTEDomain *d, CSTReader *cstReader, ObjectEntryOID *object,
129                          ObjectId oid)
130 {
131   CSTRemoteWriter     *cstRemoteWriter;
132
133   cstReader->cstRemoteWriterCounter++;
134   cstRemoteWriter = (CSTRemoteWriter *)MALLOC(sizeof(CSTRemoteWriter));
135   cstRemoteWriter->guid.hid = object->objectEntryHID->hid;
136   cstRemoteWriter->guid.aid = object->objectEntryAID->aid;
137   cstRemoteWriter->guid.oid = oid;
138   cstRemoteWriter->spobject = object;
139   cstRemoteWriter->cstReader = cstReader;
140   cstRemoteWriter->csChangesCounter = 0;
141   cstRemoteWriter->ACKRetriesCounter = 0;
142   cstRemoteWriter->commStateACK = WAITING;
143   CSChangeFromWriter_init_root_field(cstRemoteWriter);
144   SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
145   SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
146   SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
147   ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
148   ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
149   CSTRemoteWriter_insert(cstReader, cstRemoteWriter);
150   //add event for repeatActiveTime
151   if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime, iNtpTime) != 0) {
152     eventAdd(d,
153              cstRemoteWriter->spobject->objectEntryAID,
154              &cstRemoteWriter->repeatActiveQueryTimer,
155              1, //metatraffic timer
156              "CSTReaderQueryTimer",
157              CSTReaderQueryTimer,
158              &cstRemoteWriter->cstReader->lock,
159              cstRemoteWriter,
160              NULL);
161   }
162   if ((cstReader->guid.oid & 0x07) == OID_SUBSCRIPTION) {
163     ORTEPublProp *pp = (ORTEPublProp *)object->attributes;
164     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT) != 0)
165       cstReader->strictReliableCounter++;
166     else {
167       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
168         cstReader->bestEffortsCounter++;
169     }
170   }
171   debug(53, 4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
172                 cstRemoteWriter->guid.hid,
173                 cstRemoteWriter->guid.aid,
174                 cstRemoteWriter->guid.oid);
175   return cstRemoteWriter;
176 }
177
178 /*****************************************************************************/
179 void
180 CSTReaderDestroyRemoteWriter(ORTEDomain *d, CSTRemoteWriter *cstRemoteWriter)
181 {
182   CSChangeFromWriter   *csChangeFromWriter;
183
184   if (!cstRemoteWriter)
185     return;
186   cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
187   debug(53, 4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
188                 cstRemoteWriter->guid.hid,
189                 cstRemoteWriter->guid.aid,
190                 cstRemoteWriter->guid.oid);
191   if ((cstRemoteWriter->cstReader->guid.oid & 0x07) == OID_SUBSCRIPTION) {
192     ORTEPublProp *pp;
193     pp = (ORTEPublProp *)cstRemoteWriter->spobject->attributes;
194     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT) != 0)
195       cstRemoteWriter->cstReader->strictReliableCounter++;
196     else {
197       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS) != 0)
198         cstRemoteWriter->cstReader->bestEffortsCounter++;
199     }
200   }
201   if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed == cstRemoteWriter)
202     cstRemoteWriter->cstReader->cstRemoteWriterSubscribed = NULL;
203   while ((csChangeFromWriter = CSChangeFromWriter_first(cstRemoteWriter)))
204     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
205                                        csChangeFromWriter, ORTE_FALSE);
206   eventDetach(d,
207               cstRemoteWriter->spobject->objectEntryAID,
208               &cstRemoteWriter->delayResponceTimer,
209               1); //metatraffic timer
210   eventDetach(d,
211               cstRemoteWriter->spobject->objectEntryAID,
212               &cstRemoteWriter->repeatActiveQueryTimer,
213               1); //metatraffic timer
214   CSTRemoteWriter_delete(cstRemoteWriter->cstReader, cstRemoteWriter);
215   FREE(cstRemoteWriter);
216 }
217
218 /*****************************************************************************/
219 void
220 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter, CSChange *csChange)
221 {
222   CSChangeFromWriter   *csChangeFromWriter;
223
224   cstRemoteWriter->csChangesCounter++;
225   cstRemoteWriter->ACKRetriesCounter = 0;
226   csChangeFromWriter = (CSChangeFromWriter *)MALLOC(sizeof(CSChangeFromWriter));
227   csChangeFromWriter->csChange = csChange;
228   csChangeFromWriter->commStateChFWriter = RECEIVED;
229   CSChangeFromWriter_insert(cstRemoteWriter, csChangeFromWriter);
230   CSTReaderCSChange_insert(cstRemoteWriter->cstReader, csChange);
231 }
232
233 /*****************************************************************************/
234 void
235 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
236                                    CSChangeFromWriter *csChangeFromWriter, Boolean keepCSChange)
237 {
238
239   if ((!csChangeFromWriter) || (!cstRemoteWriter))
240     return;
241   CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
242                            csChangeFromWriter->csChange);
243   if (!keepCSChange) {
244     if (csChangeFromWriter->csChange->cdrCodec.buffer)
245       FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
246     parameterDelete(csChangeFromWriter->csChange);
247     FREE(csChangeFromWriter->csChange);
248   }
249   CSChangeFromWriter_delete(cstRemoteWriter, csChangeFromWriter);
250   FREE(csChangeFromWriter);
251   cstRemoteWriter->csChangesCounter--;
252 }
253
254 /*****************************************************************************/
255 void
256 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter, SequenceNumber *sn,
257                          Boolean keepCSChange)
258 {
259   CSChangeFromWriter   *csChangeFromWriter;
260
261   csChangeFromWriter = CSChangeFromWriter_find(cstRemoteWriter, sn);
262   if (csChangeFromWriter) {
263     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
264                                        csChangeFromWriter, keepCSChange);
265   }
266 }
267
268 /*****************************************************************************/
269 void
270 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter)
271 {
272
273   if (CSChangeFromWriter_first(cstRemoteWriter) == NULL) { //no csChanges
274     if (SeqNumberCmp(cstRemoteWriter->sn, cstRemoteWriter->lastSN) != 0) {
275       if (cstRemoteWriter->commStateACK != WAITING) {
276         cstRemoteWriter->commStateACK = PULLING;
277         cstRemoteWriter->ACKRetriesCounter = 0;
278         eventDetach(cstRemoteWriter->cstReader->domain,
279                     cstRemoteWriter->spobject->objectEntryAID,
280                     &cstRemoteWriter->repeatActiveQueryTimer,
281                     1); //metatraffic timer
282         eventDetach(cstRemoteWriter->cstReader->domain,
283                     cstRemoteWriter->spobject->objectEntryAID,
284                     &cstRemoteWriter->delayResponceTimer,
285                     1); //metatraffic timer
286         eventAdd(cstRemoteWriter->cstReader->domain,
287                  cstRemoteWriter->spobject->objectEntryAID,
288                  &cstRemoteWriter->delayResponceTimer,
289                  1, //metatraffic timer
290                  "CSTReaderResponceTimer",
291                  CSTReaderResponceTimer,
292                  &cstRemoteWriter->cstReader->lock,
293                  cstRemoteWriter,
294                  &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
295       }
296     } else {
297       if (cstRemoteWriter->commStateACK == PULLING) {
298         cstRemoteWriter->commStateACK = WAITING;
299         cstRemoteWriter->ACKRetriesCounter = 0;
300         eventDetach(cstRemoteWriter->cstReader->domain,
301                     cstRemoteWriter->spobject->objectEntryAID,
302                     &cstRemoteWriter->delayResponceTimer,
303                     1); //metatraffic timer
304         if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
305                        iNtpTime) != 0) {
306           eventDetach(cstRemoteWriter->cstReader->domain,
307                       cstRemoteWriter->spobject->objectEntryAID,
308                       &cstRemoteWriter->repeatActiveQueryTimer,
309                       1); //metatraffic timer
310           eventAdd(cstRemoteWriter->cstReader->domain,
311                    cstRemoteWriter->spobject->objectEntryAID,
312                    &cstRemoteWriter->repeatActiveQueryTimer,
313                    1, //metatraffic timer
314                    "CSTReaderQueryTimer",
315                    CSTReaderQueryTimer,
316                    &cstRemoteWriter->cstReader->lock,
317                    cstRemoteWriter,
318                    &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
319         }
320       }
321     }
322   } else {
323     if (cstRemoteWriter->commStateACK == WAITING) {
324       cstRemoteWriter->commStateACK = PULLING;
325       cstRemoteWriter->ACKRetriesCounter = 0;
326       eventDetach(cstRemoteWriter->cstReader->domain,
327                   cstRemoteWriter->spobject->objectEntryAID,
328                   &cstRemoteWriter->repeatActiveQueryTimer,
329                   1); //metatraffic timer
330       eventDetach(cstRemoteWriter->cstReader->domain,
331                   cstRemoteWriter->spobject->objectEntryAID,
332                   &cstRemoteWriter->delayResponceTimer,
333                   1); //metatraffic timer
334       eventAdd(cstRemoteWriter->cstReader->domain,
335                cstRemoteWriter->spobject->objectEntryAID,
336                &cstRemoteWriter->delayResponceTimer,
337                1, //metatraffic timer
338                "CSTReaderResponceTimer",
339                CSTReaderResponceTimer,
340                &cstRemoteWriter->cstReader->lock,
341                cstRemoteWriter,
342                &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
343     }
344   }
345 }