]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderProc.c
3651c144b9a2a125b37f96262b7051e6bd581a5f
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
1 /*
2  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
3  *
4  *  DEBUG:  section 54                  CSChanges processing
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
37     CSChangeFromWriter *csChangeFromWriter) {
38   CSChange           *csChange;
39   ObjectEntryOID     *objectEntryOID;
40
41   
42   csChange=csChangeFromWriter->csChange;
43   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
44   if (!objectEntryOID) return;
45   if (!csChange->alive) {
46     eventDetach(d,
47             objectEntryOID->objectEntryAID,
48             &objectEntryOID->expirationPurgeTimer,
49             0);
50     eventAdd(d,
51             objectEntryOID->objectEntryAID,
52             &objectEntryOID->expirationPurgeTimer,
53             0,
54             "ExpirationTimer",
55             objectEntryExpirationTimer,
56             NULL,
57             objectEntryOID,
58             NULL);
59    return;
60   }
61   switch (csChange->guid.aid & 0x03) {
62     case MANAGER:
63       //update parameters of object
64       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
65       //copy csChange to writerManagers
66       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
67                                csChangeFromWriter,
68                                ORTE_TRUE);
69       pthread_rwlock_wrlock(&d->writerManagers.lock);
70       CSTWriterAddCSChange(d,&d->writerManagers,csChange);
71       pthread_rwlock_unlock(&d->writerManagers.lock);
72     break;
73     case MANAGEDAPPLICATION: 
74       //update parameters of object
75       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
76       //changes can make only local Apps
77       if (cstRemoteWriter->spobject->appMOM) {
78         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
79                                  csChangeFromWriter,
80                                  ORTE_TRUE);
81         pthread_rwlock_wrlock(&d->writerApplications.lock);
82         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
83         pthread_rwlock_unlock(&d->writerApplications.lock);
84       }
85     break;
86   }
87 }
88
89 /*****************************************************************************/
90 void 
91 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
92     CSChangeFromWriter *csChangeFromWriter) {
93   CSChange           *csChange;
94   ObjectEntryOID     *objectEntryOID;
95     
96   csChange=csChangeFromWriter->csChange;
97   objectEntryOID=objectEntryFind(d,&csChange->guid);
98   if (!objectEntryOID) return;
99   if (!csChange->alive) {
100     eventDetach(d,
101             objectEntryOID->objectEntryAID,
102             &objectEntryOID->expirationPurgeTimer,
103             0);
104     eventAdd(d,
105             objectEntryOID->objectEntryAID,
106             &objectEntryOID->expirationPurgeTimer,
107             0,
108             "ExpirationTimer",
109             objectEntryExpirationTimer,
110             NULL,
111             objectEntryOID,
112             NULL);
113     return;
114   }
115   switch (csChange->guid.oid & 0x07) {
116     case OID_APPLICATION:
117       break;
118     case OID_PUBLICATION:      
119       parameterUpdatePublication(csChange,
120           (ORTEPublProp*)objectEntryOID->attributes);
121       break;
122     case OID_SUBSCRIPTION: 
123       parameterUpdateSubscription(csChange,
124           (ORTESubsProp*)objectEntryOID->attributes);
125       break;
126   }
127 }
128
129 /*****************************************************************************/
130 void 
131 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
132   CSChangeFromWriter *csChangeFromWriter;
133   SequenceNumber     sn,snNext,lastGapSN;
134
135   debug(54,10) ("CSTReaderProcCSChanges: start\n");
136   if (!cstRemoteWriter) return;
137   while (1) {
138     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
139     if (!csChangeFromWriter) break;
140     sn=csChangeFromWriter->csChange->sn;
141     if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
142       SeqNumberInc(snNext,cstRemoteWriter->sn);
143       debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
144                                              csChangeFromWriter->csChange->sn.low,
145                                              csChangeFromWriter->csChange->gapSN.low);
146       if ((SeqNumberCmp(sn,snNext)==0) &&
147           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
148         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
149           if ((d->guid.aid & 0x03)==MANAGER) 
150               CSTReaderProcCSChangesManager(d,cstRemoteWriter,
151                                             csChangeFromWriter);
152           if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) 
153               CSTReaderProcCSChangesApp(d,cstRemoteWriter,
154                                         csChangeFromWriter);
155           SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
156         } else {
157           //GAP
158           SeqNumberAdd(cstRemoteWriter->sn,
159                       cstRemoteWriter->sn,
160                       csChangeFromWriter->csChange->gapSN);
161         }
162         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
163             &snNext,ORTE_FALSE);
164       } else {
165         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
166           //GAP
167           SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
168           SeqNumberDec(lastGapSN,lastGapSN);
169           CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
170           //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
171           if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
172               (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
173             cstRemoteWriter->sn=lastGapSN;
174           } 
175         } else {
176           if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
177             CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
178           } else
179             /* stop processing of csChanges */
180             break;
181         }
182       }
183     } else {
184       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
185           csChangeFromWriter,ORTE_FALSE);
186     }
187   }
188   CSTReaderSetupState(cstRemoteWriter);    
189   debug(54,10) ("CSTReaderProcCSChanges: finished\n");
190 }
191
192 /*****************************************************************************/
193 void
194 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
195     CSChangeFromWriter *csChangeFromWriter) {
196   CSChange             *csChange=csChangeFromWriter->csChange;
197   ORTERecvInfo         info;  
198   ORTESubsProp         *sp;
199   ObjectEntryOID       *objectEntryOID;
200   unsigned int         max_size;
201         
202   if (cstRemoteWriter==NULL) return;
203   objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
204   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
205   if (objectEntryOID->recvCallBack) {
206     //deserialization routine
207     if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
208       cstRemoteWriter->cstReader->typeRegister->deserialize(
209           &csChange->cdrCodec,
210           objectEntryOID->instance);
211     } else {
212       //no deserialization -> memcpy
213       ORTEGetMaxSizeParam gms;
214
215       /* determine maximal size */
216       gms.host_endian=csChange->cdrCodec.host_endian;
217       gms.data_endian=csChange->cdrCodec.data_endian;
218       gms.data=csChange->cdrCodec.buffer;
219       gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
220       gms.recv_size=csChange->cdrCodec.buf_len;
221       gms.csize=0;
222       if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
223         max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms,1);
224       else
225         max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
226       if (max_size>csChange->cdrCodec.buf_len)
227         max_size=csChange->cdrCodec.buf_len;
228       memcpy(objectEntryOID->instance,
229              csChange->cdrCodec.buffer,
230              max_size);
231     }
232     info.status=NEW_DATA;
233     info.topic=(char*)sp->topic;
234     info.type=(char*)sp->typeName;
235     info.senderGUID=csChange->guid;
236     info.localTimeReceived=csChange->localTimeReceived;
237     info.remoteTimePublished=csChange->remoteTimePublished;
238     info.sn=csChange->sn;
239     info.data_endian=csChange->cdrCodec.data_endian;
240     objectEntryOID->recvCallBack(&info,
241                             objectEntryOID->instance,
242                             objectEntryOID->callBackParam);
243     if (sp->mode==IMMEDIATE) {
244       //setup new time for deadline timer
245       eventDetach(cstRemoteWriter->cstReader->domain,
246           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
247           &cstRemoteWriter->cstReader->deadlineTimer,
248           0);
249       eventAdd(cstRemoteWriter->cstReader->domain,
250           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
251           &cstRemoteWriter->cstReader->deadlineTimer,
252           0,   //common timer
253           "CSTReaderDeadlineTimer",
254           CSTReaderDeadlineTimer,
255           &cstRemoteWriter->cstReader->lock,
256           cstRemoteWriter->cstReader,
257           &sp->deadline);
258     }
259     if (sp->mode==PULLED) {
260       NtpTime timeNext;
261       NtpTimeAdd(timeNext,
262                 (getActualNtpTime()),
263                 sp->deadline);
264       htimerUnicastCommon_set_expire(&cstRemoteWriter->
265                 cstReader->deadlineTimer,timeNext);
266     }
267   }
268 }
269
270 /*****************************************************************************/
271 void 
272 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
273   ORTESubsProp         *sp;
274   CSChangeFromWriter   *csChangeFromWriter;
275   SequenceNumber       sn,snNext,lastGapSN;
276  
277   debug(54,10) ("CSTReaderProcIssue: start\n");
278   if (cstRemoteWriter==NULL) return;
279   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
280   if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
281     //Strict
282     if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
283     while (1) {
284       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
285       if (!csChangeFromWriter) break;
286       sn=csChangeFromWriter->csChange->sn;
287       if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
288         SeqNumberInc(snNext,cstRemoteWriter->sn);
289         if ((SeqNumberCmp(sn,snNext)==0) &&
290             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
291           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
292             if ((cstRemoteWriter==
293                  cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
294                 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
295               //NewData                
296               CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
297             } 
298             SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
299           } else {
300             //GAP
301             SeqNumberAdd(cstRemoteWriter->sn,
302                         cstRemoteWriter->sn,
303                         csChangeFromWriter->csChange->gapSN);
304           }
305           CSTReaderDestroyCSChange(cstRemoteWriter,
306               &snNext,ORTE_FALSE);
307         } else {
308           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
309             //GAP
310             SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
311             SeqNumberDec(lastGapSN,lastGapSN);
312             CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
313             //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
314             if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
315                 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
316               cstRemoteWriter->sn=lastGapSN;
317             }
318           } else {
319             if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
320               CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
321             } else
322               /* stop processing of csChanges */
323               break;
324           }
325         }
326       } else {
327         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
328             csChangeFromWriter,ORTE_FALSE);
329       }
330     }
331   } else {
332     //Best Effort
333     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
334       if ((cstRemoteWriter!=
335            cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
336           (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL)) 
337         return;
338       if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
339       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
340         //NewData                
341         CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
342
343         cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
344
345         CSTReaderDestroyCSChangeFromWriter(
346             cstRemoteWriter,
347             csChangeFromWriter,
348             ORTE_FALSE);
349       }
350     }
351   }  
352   CSTReaderSetupState(cstRemoteWriter);    
353   debug(54,10) ("CSTReaderProcIssue: finished\n");
354 }