]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReader.c
0b476cd1cd5d1116c3283a6a4dc6af92a1007909
[orte.git] / orte / liborte / RTPSCSTReader.c
1 /*
2  *  $Id: RTPSCSTReader.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 53                  CSTReader
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 GAVL_CUST_NODE_INT_IMP(CSTReader, 
35                        CSTSubscriptions, CSTReader, GUID_RTPS,
36                        cstReader, node, guid, gavl_cmp_guid);
37 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter, 
38                        CSTReader, CSTRemoteWriter, GUID_RTPS,
39                        cstRemoteWriter, node, guid, gavl_cmp_guid);
40 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
41                        CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
42                        csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
43
44 /*****************************************************************************/
45 void 
46 CSTReaderInit(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
47     ObjectId oid,CSTReaderParams *params,ORTETypeRegister *typeRegister) {
48
49   debug(53,10) ("CSTReaderInit: start\n");
50   //init values of cstReader
51   cstReader->guid.hid=object->objectEntryHID->hid;
52   cstReader->guid.aid=object->objectEntryAID->aid;
53   cstReader->guid.oid=oid;
54   cstReader->objectEntryOID=object;
55   memcpy(&cstReader->params,params,sizeof(CSTReaderParams));
56   cstReader->strictReliableCounter=0;
57   cstReader->bestEffortsCounter=0;
58   cstReader->cstRemoteWriterCounter=0;
59   cstReader->createdByPattern=ORTE_FALSE;
60   CSTReaderCSChange_init_head(cstReader);
61   CSTRemoteWriter_init_root_field(cstReader);
62   pthread_rwlock_init(&cstReader->lock,NULL);
63   cstReader->domain=d;
64   cstReader->typeRegister=typeRegister;
65   ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
66   ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
67   cstReader->cstRemoteWriterSubscribed=NULL;
68   if ((oid & 0x07) == OID_SUBSCRIPTION) {
69     ORTESubsProp *sp;
70     sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
71     if (NtpTimeCmp(sp->deadline,zNtpTime)!=0) {
72       if (sp->mode==IMMEDIATE) {
73         eventAdd(d,
74             cstReader->objectEntryOID->objectEntryAID,
75             &cstReader->deadlineTimer,
76             0,   //common timer
77             "CSTReaderDeadlineTimer",
78             CSTReaderDeadlineTimer,
79             &cstReader->lock,
80             cstReader,
81             &sp->deadline);
82       }
83       if (sp->mode==PULLED) {
84         NtpTime timeNext;
85         NtpTimeAdd(timeNext,
86                   (getActualNtpTime()),
87                   sp->deadline);
88         htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
89       }
90     }
91   }
92   debug(53,4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
93                 cstReader->guid.hid,
94                 cstReader->guid.aid,
95                 cstReader->guid.oid);
96   debug(53,10) ("CSTReaderInit: finished\n");
97 }
98
99 /*****************************************************************************/
100 void 
101 CSTReaderDelete(ORTEDomain *d,CSTReader *cstReader) {
102   CSTRemoteWriter     *cstRemoteWriter;
103   
104   debug(53,10)("CSTReaderDelete: start\n");
105   debug(53,4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
106                 cstReader->guid.hid,
107                 cstReader->guid.aid,
108                 cstReader->guid.oid);
109   //Destroy all cstRemoteReader connected on cstWriter
110   while((cstRemoteWriter=CSTRemoteWriter_first(cstReader))) {
111     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
112   }
113   eventDetach(d,
114       cstReader->objectEntryOID->objectEntryAID,
115       &cstReader->deadlineTimer,
116       0);
117   eventDetach(d,
118       cstReader->objectEntryOID->objectEntryAID,
119       &cstReader->persistenceTimer,
120       0);   //common timer
121   pthread_rwlock_destroy(&cstReader->lock);
122   debug(53,10) ("CSTReaderDelete: finished\n");
123 }
124
125 /*****************************************************************************/
126 CSTRemoteWriter *
127 CSTReaderAddRemoteWriter(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
128     ObjectId oid) {
129   CSTRemoteWriter     *cstRemoteWriter;
130   
131   cstReader->cstRemoteWriterCounter++;
132   cstRemoteWriter=(CSTRemoteWriter*)MALLOC(sizeof(CSTRemoteWriter));
133   cstRemoteWriter->guid.hid=object->objectEntryHID->hid;
134   cstRemoteWriter->guid.aid=object->objectEntryAID->aid;
135   cstRemoteWriter->guid.oid=oid;
136   cstRemoteWriter->spobject=object;
137   cstRemoteWriter->cstReader=cstReader;
138   cstRemoteWriter->csChangesCounter=0;
139   cstRemoteWriter->ACKRetriesCounter=0;
140   cstRemoteWriter->commStateACK=WAITING;  
141   CSChangeFromWriter_init_root_field(cstRemoteWriter);
142   SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
143   SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
144   SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
145   ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
146   ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
147   CSTRemoteWriter_insert(cstReader,cstRemoteWriter);
148   //add event for repeatActiveTime
149   if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime,iNtpTime)!=0) {
150     eventAdd(d,
151         cstRemoteWriter->spobject->objectEntryAID,
152         &cstRemoteWriter->repeatActiveQueryTimer,
153         1,   //metatraffic timer
154         "CSTReaderQueryTimer",
155         CSTReaderQueryTimer,
156         &cstRemoteWriter->cstReader->lock,
157         cstRemoteWriter,
158         NULL);               
159   }
160   if ((cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
161     ORTEPublProp *pp=(ORTEPublProp*)object->attributes;
162     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
163       cstReader->strictReliableCounter++;
164     else {
165       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
166         cstReader->bestEffortsCounter++;
167     }
168   }  
169   debug(53,4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
170                 cstRemoteWriter->guid.hid,
171                 cstRemoteWriter->guid.aid,
172                 cstRemoteWriter->guid.oid);
173   return cstRemoteWriter;
174 }
175
176 /*****************************************************************************/
177 void 
178 CSTReaderDestroyRemoteWriter(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
179   CSChangeFromWriter   *csChangeFromWriter;
180   
181   if (!cstRemoteWriter) return;
182   cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
183   debug(53,4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
184                 cstRemoteWriter->guid.hid,
185                 cstRemoteWriter->guid.aid,
186                 cstRemoteWriter->guid.oid);
187   if ((cstRemoteWriter->cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
188     ORTEPublProp *pp;
189     pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
190     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
191       cstRemoteWriter->cstReader->strictReliableCounter++;
192     else {
193       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
194         cstRemoteWriter->cstReader->bestEffortsCounter++;
195     }
196   }
197   if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==cstRemoteWriter)
198     cstRemoteWriter->cstReader->cstRemoteWriterSubscribed=NULL;
199   while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
200     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
201         csChangeFromWriter,ORTE_FALSE);
202   }
203   eventDetach(d,
204       cstRemoteWriter->spobject->objectEntryAID,
205       &cstRemoteWriter->delayResponceTimer,
206       1);   //metatraffic timer
207   eventDetach(d,
208       cstRemoteWriter->spobject->objectEntryAID,
209       &cstRemoteWriter->repeatActiveQueryTimer,
210       1);   //metatraffic timer
211   CSTRemoteWriter_delete(cstRemoteWriter->cstReader,cstRemoteWriter);
212   FREE(cstRemoteWriter);
213 }
214
215 /*****************************************************************************/
216 void
217 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter,CSChange *csChange) {
218   CSChangeFromWriter   *csChangeFromWriter;
219   
220   cstRemoteWriter->csChangesCounter++;
221   cstRemoteWriter->ACKRetriesCounter=0;
222   csChangeFromWriter=(CSChangeFromWriter*)MALLOC(sizeof(CSChangeFromWriter));
223   csChangeFromWriter->csChange=csChange;
224   csChangeFromWriter->commStateChFWriter=RECEIVED;
225   CSChangeFromWriter_insert(cstRemoteWriter,csChangeFromWriter);
226   CSTReaderCSChange_insert(cstRemoteWriter->cstReader,csChange);
227 }
228
229 /*****************************************************************************/
230 void 
231 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
232     CSChangeFromWriter *csChangeFromWriter,Boolean keepCSChange) {
233   
234   if ((!csChangeFromWriter) || (!cstRemoteWriter)) return;
235   CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
236                            csChangeFromWriter->csChange);
237   if (!keepCSChange) {
238     if (csChangeFromWriter->csChange->cdrCodec.buffer)
239       FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
240     parameterDelete(csChangeFromWriter->csChange);
241     FREE(csChangeFromWriter->csChange);
242   }
243   CSChangeFromWriter_delete(cstRemoteWriter,csChangeFromWriter);
244   FREE(csChangeFromWriter);
245   cstRemoteWriter->csChangesCounter--;
246 }
247
248 /*****************************************************************************/
249 void 
250 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter,SequenceNumber *sn,
251     Boolean keepCSChange) {
252   CSChangeFromWriter   *csChangeFromWriter;
253   
254   csChangeFromWriter=CSChangeFromWriter_find(cstRemoteWriter,sn);
255   if (csChangeFromWriter) {  
256     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
257          csChangeFromWriter,keepCSChange);
258   }    
259 }
260
261 /*****************************************************************************/
262 void 
263 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter) {
264   
265   if (CSChangeFromWriter_first(cstRemoteWriter)==NULL) { //no csChanges
266     if (SeqNumberCmp(cstRemoteWriter->sn,cstRemoteWriter->lastSN)!=0) {
267       if (cstRemoteWriter->commStateACK!=WAITING) {
268         cstRemoteWriter->commStateACK=PULLING;
269         cstRemoteWriter->ACKRetriesCounter=0;
270         eventDetach(cstRemoteWriter->cstReader->domain,
271             cstRemoteWriter->spobject->objectEntryAID,
272             &cstRemoteWriter->repeatActiveQueryTimer,
273             1);  //metatraffic timer
274         eventDetach(cstRemoteWriter->cstReader->domain,
275             cstRemoteWriter->spobject->objectEntryAID,
276             &cstRemoteWriter->delayResponceTimer,
277             1);   //metatraffic timer
278         eventAdd(cstRemoteWriter->cstReader->domain,
279             cstRemoteWriter->spobject->objectEntryAID,
280             &cstRemoteWriter->delayResponceTimer,
281             1,   //metatraffic timer
282             "CSTReaderResponceTimer",
283             CSTReaderResponceTimer,
284             &cstRemoteWriter->cstReader->lock,
285             cstRemoteWriter,
286             &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
287       }
288     } else {
289       if (cstRemoteWriter->commStateACK==PULLING) {
290         cstRemoteWriter->commStateACK=WAITING;
291         cstRemoteWriter->ACKRetriesCounter=0;
292         eventDetach(cstRemoteWriter->cstReader->domain,
293             cstRemoteWriter->spobject->objectEntryAID,
294             &cstRemoteWriter->delayResponceTimer,
295             1);   //metatraffic timer
296         if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
297                        iNtpTime)!=0) {
298           eventDetach(cstRemoteWriter->cstReader->domain,
299               cstRemoteWriter->spobject->objectEntryAID,
300               &cstRemoteWriter->repeatActiveQueryTimer,
301               1);   //metatraffic timer
302           eventAdd(cstRemoteWriter->cstReader->domain,
303               cstRemoteWriter->spobject->objectEntryAID,
304               &cstRemoteWriter->repeatActiveQueryTimer,
305               1,   //metatraffic timer
306               "CSTReaderQueryTimer",
307               CSTReaderQueryTimer,
308               &cstRemoteWriter->cstReader->lock,
309               cstRemoteWriter,
310               &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
311         }
312       }
313     }
314   } else {
315     if (cstRemoteWriter->commStateACK==WAITING) {
316       cstRemoteWriter->commStateACK=PULLING;
317       cstRemoteWriter->ACKRetriesCounter=0;
318       eventDetach(cstRemoteWriter->cstReader->domain,
319           cstRemoteWriter->spobject->objectEntryAID,
320           &cstRemoteWriter->repeatActiveQueryTimer,
321           1);   //metatraffic timer
322       eventDetach(cstRemoteWriter->cstReader->domain,
323           cstRemoteWriter->spobject->objectEntryAID,
324           &cstRemoteWriter->delayResponceTimer,
325           1);   //metatraffic timer
326       eventAdd(cstRemoteWriter->cstReader->domain,
327           cstRemoteWriter->spobject->objectEntryAID,
328           &cstRemoteWriter->delayResponceTimer,
329           1,   //metatraffic timer
330           "CSTReaderResponceTimer",
331           CSTReaderResponceTimer,
332           &cstRemoteWriter->cstReader->lock,
333           cstRemoteWriter,
334           &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
335     }
336   }
337 }