]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReader.c
be974bddb1d26ec28961bdd0288527e68b5eefb2
[orte.git] / orte / liborte / RTPSCSTReader.c
1 /*
2  *  $Id: RTPSCSTReader.c,v 0.0.0.1      2003/09/13 
3  *
4  *  DEBUG:  section 53                  CSTReader
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 GAVL_CUST_NODE_INT_IMP(CSTReader, 
25                        CSTSubscriptions, CSTReader, GUID_RTPS,
26                        cstReader, node, guid, gavl_cmp_guid);
27 GAVL_CUST_NODE_INT_IMP(CSTRemoteWriter, 
28                        CSTReader, CSTRemoteWriter, GUID_RTPS,
29                        cstRemoteWriter, node, guid, gavl_cmp_guid);
30 GAVL_CUST_NODE_INT_IMP(CSChangeFromWriter,
31                        CSTRemoteWriter, CSChangeFromWriter, SequenceNumber,
32                        csChangeFromWriter, node, csChange->sn, gavl_cmp_sn);
33
34 /*****************************************************************************/
35 void 
36 CSTReaderInit(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
37     ObjectId oid,CSTReaderParams *params,ORTETypeRegister *typeRegister) {
38
39   debug(53,10) ("CSTReaderInit: start\n");
40   //init values of cstReader
41   cstReader->guid.hid=object->objectEntryHID->hid;
42   cstReader->guid.aid=object->objectEntryAID->aid;
43   cstReader->guid.oid=oid;
44   cstReader->objectEntryOID=object;
45   memcpy(&cstReader->params,params,sizeof(CSTReaderParams));
46   cstReader->strictReliableCounter=0;
47   cstReader->bestEffortsCounter=0;
48   cstReader->cstRemoteWriterCounter=0;
49   cstReader->createdByPattern=ORTE_FALSE;
50   CSTReaderCSChange_init_head(cstReader);
51   CSTRemoteWriter_init_root_field(cstReader);
52   pthread_rwlock_init(&cstReader->lock,NULL);
53   cstReader->domain=d;
54   cstReader->typeRegister=typeRegister;
55   ul_htim_queue_init_detached(&cstReader->deadlineTimer.htim);
56   ul_htim_queue_init_detached(&cstReader->persistenceTimer.htim);
57   cstReader->cstRemoteWriterSubscribed=NULL;
58   if ((oid & 0x07) == OID_SUBSCRIPTION) {
59     ORTESubsProp *sp;
60     sp=(ORTESubsProp*)cstReader->objectEntryOID->attributes;
61     if (NtpTimeCmp(sp->deadline,zNtpTime)!=0) {
62       if (sp->mode==IMMEDIATE) {
63         eventAdd(d,
64             cstReader->objectEntryOID->objectEntryAID,
65             &cstReader->deadlineTimer,
66             0,   //common timer
67             "CSTReaderDeadlineTimer",
68             CSTReaderDeadlineTimer,
69             &cstReader->lock,
70             cstReader,
71             &sp->deadline);
72       }
73       if (sp->mode==PULLED) {
74         NtpTime timeNext;
75         NtpTimeAdd(timeNext,
76                   (getActualNtpTime()),
77                   sp->deadline);
78         htimerUnicastCommon_set_expire(&cstReader->deadlineTimer,timeNext);
79       }
80     }
81   }
82   debug(53,4) ("CSTReaderInit: 0x%x-0x%x-0x%x\n",
83                 cstReader->guid.hid,
84                 cstReader->guid.aid,
85                 cstReader->guid.oid);
86   debug(53,10) ("CSTReaderInit: finished\n");
87 }
88
89 /*****************************************************************************/
90 void 
91 CSTReaderDelete(ORTEDomain *d,CSTReader *cstReader) {
92   CSTRemoteWriter     *cstRemoteWriter;
93   
94   debug(53,10)("CSTReaderDelete: start\n");
95   debug(53,4) ("CSTReaderDelete: 0x%x-0x%x-0x%x\n",
96                 cstReader->guid.hid,
97                 cstReader->guid.aid,
98                 cstReader->guid.oid);
99   //Destroy all cstRemoteReader connected on cstWriter
100   while((cstRemoteWriter=CSTRemoteWriter_first(cstReader))) {
101     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
102   }
103   eventDetach(d,
104       cstReader->objectEntryOID->objectEntryAID,
105       &cstReader->deadlineTimer,
106       0);
107   eventDetach(d,
108       cstReader->objectEntryOID->objectEntryAID,
109       &cstReader->persistenceTimer,
110       0);   //common timer
111   pthread_rwlock_destroy(&cstReader->lock);
112   debug(53,10) ("CSTReaderDelete: finished\n");
113 }
114
115 /*****************************************************************************/
116 CSTRemoteWriter *
117 CSTReaderAddRemoteWriter(ORTEDomain *d,CSTReader *cstReader,ObjectEntryOID *object,
118     ObjectId oid) {
119   CSTRemoteWriter     *cstRemoteWriter;
120   
121   cstReader->cstRemoteWriterCounter++;
122   cstRemoteWriter=(CSTRemoteWriter*)MALLOC(sizeof(CSTRemoteWriter));
123   cstRemoteWriter->guid.hid=object->objectEntryHID->hid;
124   cstRemoteWriter->guid.aid=object->objectEntryAID->aid;
125   cstRemoteWriter->guid.oid=oid;
126   cstRemoteWriter->spobject=object;
127   cstRemoteWriter->cstReader=cstReader;
128   cstRemoteWriter->csChangesCounter=0;
129   cstRemoteWriter->ACKRetriesCounter=0;
130   cstRemoteWriter->commStateACK=WAITING;  
131   CSChangeFromWriter_init_root_field(cstRemoteWriter);
132   SEQUENCE_NUMBER_NONE(cstRemoteWriter->sn);
133   SEQUENCE_NUMBER_NONE(cstRemoteWriter->firstSN);
134   SEQUENCE_NUMBER_NONE(cstRemoteWriter->lastSN);
135   ul_htim_queue_init_detached(&cstRemoteWriter->delayResponceTimer.htim);
136   ul_htim_queue_init_detached(&cstRemoteWriter->repeatActiveQueryTimer.htim);
137   CSTRemoteWriter_insert(cstReader,cstRemoteWriter);
138   //add event for repeatActiveTime
139   if (NtpTimeCmp(cstReader->params.repeatActiveQueryTime,iNtpTime)!=0) {
140     eventAdd(d,
141         cstRemoteWriter->spobject->objectEntryAID,
142         &cstRemoteWriter->repeatActiveQueryTimer,
143         1,   //metatraffic timer
144         "CSTReaderQueryTimer",
145         CSTReaderQueryTimer,
146         &cstRemoteWriter->cstReader->lock,
147         cstRemoteWriter,
148         NULL);               
149   }
150   if ((cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
151     ORTEPublProp *pp=(ORTEPublProp*)object->attributes;
152     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
153       cstReader->strictReliableCounter++;
154     else {
155       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
156         cstReader->bestEffortsCounter++;
157     }
158   }  
159   debug(53,4) ("CSTReaderAddRemoteWriter: 0x%x-0x%x-0x%x\n",
160                 cstRemoteWriter->guid.hid,
161                 cstRemoteWriter->guid.aid,
162                 cstRemoteWriter->guid.oid);
163   return cstRemoteWriter;
164 }
165
166 /*****************************************************************************/
167 void 
168 CSTReaderDestroyRemoteWriter(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
169   CSChangeFromWriter   *csChangeFromWriter;
170   
171   if (!cstRemoteWriter) return;
172   cstRemoteWriter->cstReader->cstRemoteWriterCounter--;
173   debug(53,4) ("CSTReaderDestroyRemoteWriter: 0x%x-0x%x-0x%x\n",
174                 cstRemoteWriter->guid.hid,
175                 cstRemoteWriter->guid.aid,
176                 cstRemoteWriter->guid.oid);
177   if ((cstRemoteWriter->cstReader->guid.oid & 0x07)==OID_SUBSCRIPTION) {
178     ORTEPublProp *pp;
179     pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
180     if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_STRICT)!=0)
181       cstRemoteWriter->cstReader->strictReliableCounter++;
182     else {
183       if ((pp->reliabilityOffered & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0)
184         cstRemoteWriter->cstReader->bestEffortsCounter++;
185     }
186   }
187   if (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==cstRemoteWriter)
188     cstRemoteWriter->cstReader->cstRemoteWriterSubscribed=NULL;
189   while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
190     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
191         csChangeFromWriter,ORTE_FALSE);
192   }
193   eventDetach(d,
194       cstRemoteWriter->spobject->objectEntryAID,
195       &cstRemoteWriter->delayResponceTimer,
196       1);   //metatraffic timer
197   eventDetach(d,
198       cstRemoteWriter->spobject->objectEntryAID,
199       &cstRemoteWriter->repeatActiveQueryTimer,
200       1);   //metatraffic timer
201   CSTRemoteWriter_delete(cstRemoteWriter->cstReader,cstRemoteWriter);
202   FREE(cstRemoteWriter);
203 }
204
205 /*****************************************************************************/
206 void
207 CSTReaderAddCSChange(CSTRemoteWriter *cstRemoteWriter,CSChange *csChange) {
208   CSChangeFromWriter   *csChangeFromWriter;
209   
210   cstRemoteWriter->csChangesCounter++;
211   cstRemoteWriter->ACKRetriesCounter=0;
212   csChangeFromWriter=(CSChangeFromWriter*)MALLOC(sizeof(CSChangeFromWriter));
213   csChangeFromWriter->csChange=csChange;
214   csChangeFromWriter->commStateChFWriter=RECEIVED;
215   CSChangeFromWriter_insert(cstRemoteWriter,csChangeFromWriter);
216   CSTReaderCSChange_insert(cstRemoteWriter->cstReader,csChange);
217 }
218
219 /*****************************************************************************/
220 void 
221 CSTReaderDestroyCSChangeFromWriter(CSTRemoteWriter *cstRemoteWriter,
222     CSChangeFromWriter *csChangeFromWriter,Boolean keepCSChange) {
223   
224   if ((!csChangeFromWriter) || (!cstRemoteWriter)) return;
225   CSTReaderCSChange_delete(cstRemoteWriter->cstReader,
226                            csChangeFromWriter->csChange);
227   if (!keepCSChange) {
228     if (csChangeFromWriter->csChange->cdrCodec.buffer)
229       FREE(csChangeFromWriter->csChange->cdrCodec.buffer);
230     parameterDelete(csChangeFromWriter->csChange);
231     FREE(csChangeFromWriter->csChange);
232   }
233   CSChangeFromWriter_delete(cstRemoteWriter,csChangeFromWriter);
234   FREE(csChangeFromWriter);
235   cstRemoteWriter->csChangesCounter--;
236 }
237
238 /*****************************************************************************/
239 void 
240 CSTReaderDestroyCSChange(CSTRemoteWriter *cstRemoteWriter,SequenceNumber *sn,
241     Boolean keepCSChange) {
242   CSChangeFromWriter   *csChangeFromWriter;
243   
244   csChangeFromWriter=CSChangeFromWriter_find(cstRemoteWriter,sn);
245   if (csChangeFromWriter) {  
246     CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
247          csChangeFromWriter,keepCSChange);
248   }    
249 }
250
251 /*****************************************************************************/
252 void 
253 CSTReaderSetupState(CSTRemoteWriter *cstRemoteWriter) {
254   
255   if (CSChangeFromWriter_first(cstRemoteWriter)==NULL) { //no csChanges
256     if (SeqNumberCmp(cstRemoteWriter->sn,cstRemoteWriter->lastSN)!=0) {
257       if (cstRemoteWriter->commStateACK!=WAITING) {
258         cstRemoteWriter->commStateACK=PULLING;
259         cstRemoteWriter->ACKRetriesCounter=0;
260         eventDetach(cstRemoteWriter->cstReader->domain,
261             cstRemoteWriter->spobject->objectEntryAID,
262             &cstRemoteWriter->repeatActiveQueryTimer,
263             1);  //metatraffic timer
264         eventDetach(cstRemoteWriter->cstReader->domain,
265             cstRemoteWriter->spobject->objectEntryAID,
266             &cstRemoteWriter->delayResponceTimer,
267             1);   //metatraffic timer
268         eventAdd(cstRemoteWriter->cstReader->domain,
269             cstRemoteWriter->spobject->objectEntryAID,
270             &cstRemoteWriter->delayResponceTimer,
271             1,   //metatraffic timer
272             "CSTReaderResponceTimer",
273             CSTReaderResponceTimer,
274             &cstRemoteWriter->cstReader->lock,
275             cstRemoteWriter,
276             &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
277       }
278     } else {
279       if (cstRemoteWriter->commStateACK==PULLING) {
280         cstRemoteWriter->commStateACK=WAITING;
281         cstRemoteWriter->ACKRetriesCounter=0;
282         eventDetach(cstRemoteWriter->cstReader->domain,
283             cstRemoteWriter->spobject->objectEntryAID,
284             &cstRemoteWriter->delayResponceTimer,
285             1);   //metatraffic timer
286         if (NtpTimeCmp(cstRemoteWriter->cstReader->params.repeatActiveQueryTime,
287                        iNtpTime)!=0) {
288           eventDetach(cstRemoteWriter->cstReader->domain,
289               cstRemoteWriter->spobject->objectEntryAID,
290               &cstRemoteWriter->repeatActiveQueryTimer,
291               1);   //metatraffic timer
292           eventAdd(cstRemoteWriter->cstReader->domain,
293               cstRemoteWriter->spobject->objectEntryAID,
294               &cstRemoteWriter->repeatActiveQueryTimer,
295               1,   //metatraffic timer
296               "CSTReaderQueryTimer",
297               CSTReaderQueryTimer,
298               &cstRemoteWriter->cstReader->lock,
299               cstRemoteWriter,
300               &cstRemoteWriter->cstReader->params.repeatActiveQueryTime);
301         }
302       }
303     }
304   } else {
305     if (cstRemoteWriter->commStateACK==WAITING) {
306       cstRemoteWriter->commStateACK=PULLING;
307       cstRemoteWriter->ACKRetriesCounter=0;
308       eventDetach(cstRemoteWriter->cstReader->domain,
309           cstRemoteWriter->spobject->objectEntryAID,
310           &cstRemoteWriter->repeatActiveQueryTimer,
311           1);   //metatraffic timer
312       eventDetach(cstRemoteWriter->cstReader->domain,
313           cstRemoteWriter->spobject->objectEntryAID,
314           &cstRemoteWriter->delayResponceTimer,
315           1);   //metatraffic timer
316       eventAdd(cstRemoteWriter->cstReader->domain,
317           cstRemoteWriter->spobject->objectEntryAID,
318           &cstRemoteWriter->delayResponceTimer,
319           1,   //metatraffic timer
320           "CSTReaderResponceTimer",
321           CSTReaderResponceTimer,
322           &cstRemoteWriter->cstReader->lock,
323           cstRemoteWriter,
324           &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
325     }
326   }
327 }