]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderProc.c
ROBOT_DEMO: incorporate changes in ORTETypeRegisterAdd()
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
1 /*
2  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
3  *
4  *  DEBUG:  section 54                  CSChanges processing
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
37     CSChangeFromWriter *csChangeFromWriter) {
38   CSChange           *csChange;
39   ObjectEntryOID     *objectEntryOID;
40
41   
42   csChange=csChangeFromWriter->csChange;
43   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
44   if (!objectEntryOID) return;
45   if (!csChange->alive) {
46     eventDetach(d,
47             objectEntryOID->objectEntryAID,
48             &objectEntryOID->expirationPurgeTimer,
49             0);
50     eventAdd(d,
51             objectEntryOID->objectEntryAID,
52             &objectEntryOID->expirationPurgeTimer,
53             0,
54             "ExpirationTimer",
55             objectEntryExpirationTimer,
56             NULL,
57             objectEntryOID,
58             NULL);
59    return;
60   }
61   switch (csChange->guid.aid & 0x03) {
62     case MANAGER:
63       //update parameters of object
64       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
65       //copy csChange to writerManagers
66       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
67                                csChangeFromWriter,
68                                ORTE_TRUE);
69       pthread_rwlock_wrlock(&d->writerManagers.lock);
70       CSTWriterAddCSChange(d,&d->writerManagers,csChange);
71       pthread_rwlock_unlock(&d->writerManagers.lock);
72     break;
73     case MANAGEDAPPLICATION: 
74       //update parameters of object
75       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
76       //changes can make only local Apps
77       if (cstRemoteWriter->spobject->appMOM) {
78         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
79                                  csChangeFromWriter,
80                                  ORTE_TRUE);
81         pthread_rwlock_wrlock(&d->writerApplications.lock);
82         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
83         pthread_rwlock_unlock(&d->writerApplications.lock);
84       }
85     break;
86   }
87 }
88
89 /*****************************************************************************/
90 void 
91 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
92     CSChangeFromWriter *csChangeFromWriter) {
93   CSChange           *csChange;
94   ObjectEntryOID     *objectEntryOID;
95     
96   csChange=csChangeFromWriter->csChange;
97   objectEntryOID=objectEntryFind(d,&csChange->guid);
98   if (!objectEntryOID) return;
99   if (!csChange->alive) {
100     eventDetach(d,
101             objectEntryOID->objectEntryAID,
102             &objectEntryOID->expirationPurgeTimer,
103             0);
104     eventAdd(d,
105             objectEntryOID->objectEntryAID,
106             &objectEntryOID->expirationPurgeTimer,
107             0,
108             "ExpirationTimer",
109             objectEntryExpirationTimer,
110             NULL,
111             objectEntryOID,
112             NULL);
113     return;
114   }
115   switch (csChange->guid.oid & 0x07) {
116     case OID_APPLICATION:
117       break;
118     case OID_PUBLICATION:      
119       parameterUpdatePublication(csChange,
120           (ORTEPublProp*)objectEntryOID->attributes);
121       break;
122     case OID_SUBSCRIPTION: 
123       parameterUpdateSubscription(csChange,
124           (ORTESubsProp*)objectEntryOID->attributes);
125       break;
126   }
127 }
128
129 /*****************************************************************************/
130 void 
131 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
132   CSChangeFromWriter *csChangeFromWriter;
133   SequenceNumber     sn,snNext,lastGapSN;
134
135   debug(54,10) ("CSTReaderProcCSChanges: start\n");
136   if (!cstRemoteWriter) return;
137   while (1) {
138     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
139     if (!csChangeFromWriter) break;
140     sn=csChangeFromWriter->csChange->sn;
141     if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
142       SeqNumberInc(snNext,cstRemoteWriter->sn);
143       debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,change sn:%u, gapsn:%u\n",snNext.low,
144                                              csChangeFromWriter->csChange->sn.low,
145                                              csChangeFromWriter->csChange->gapSN.low);
146       if ((SeqNumberCmp(sn,snNext)==0) &&
147           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
148         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
149           if ((d->guid.aid & 0x03)==MANAGER) 
150               CSTReaderProcCSChangesManager(d,cstRemoteWriter,
151                                             csChangeFromWriter);
152           if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) 
153               CSTReaderProcCSChangesApp(d,cstRemoteWriter,
154                                         csChangeFromWriter);
155           SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
156         } else {
157           //GAP
158           SeqNumberAdd(cstRemoteWriter->sn,
159                       cstRemoteWriter->sn,
160                       csChangeFromWriter->csChange->gapSN);
161         }
162         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
163             &snNext,ORTE_FALSE);
164       } else {
165         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
166           //GAP
167           SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
168           SeqNumberDec(lastGapSN,lastGapSN);
169           CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
170           //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
171           if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
172               (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
173             cstRemoteWriter->sn=lastGapSN;
174           } 
175         } else {
176           if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
177             CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
178           } else
179             /* stop processing of csChanges */
180             break;
181         }
182       }
183     } else {
184       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
185           csChangeFromWriter,ORTE_FALSE);
186     }
187   }
188   CSTReaderSetupState(cstRemoteWriter);    
189   debug(54,10) ("CSTReaderProcCSChanges: finished\n");
190 }
191
192 /*****************************************************************************/
193 void
194 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
195     CSChangeFromWriter *csChangeFromWriter) {
196   CSChange             *csChange=csChangeFromWriter->csChange;
197   ORTERecvInfo         info;  
198   ORTESubsProp         *sp;
199   ObjectEntryOID       *objectEntryOID;
200   unsigned int         max_size;
201         
202   if (cstRemoteWriter==NULL) return;
203   objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
204   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
205   if (objectEntryOID->recvCallBack) {
206     //deserialization routine
207     if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
208       cstRemoteWriter->cstReader->typeRegister->deserialize(
209           &csChange->cdrCodec,
210           objectEntryOID->instance);
211     } else {
212       //no deserialization -> memcpy
213       ORTEGetMaxSizeParam gms;
214
215       /* determine maximal size */
216       gms.host_endian=csChange->cdrCodec.host_endian;
217       gms.data_endian=csChange->cdrCodec.data_endian;
218       gms.data=csChange->cdrCodec.buffer;
219       gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
220       gms.recv_size=csChange->cdrCodec.buf_len;
221       gms.csize=0;
222       if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
223         max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms,1);
224       else
225         max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
226       if (max_size>csChange->cdrCodec.buf_len)
227         max_size=csChange->cdrCodec.buf_len;
228       memcpy(objectEntryOID->instance,
229              csChange->cdrCodec.buffer,
230              max_size);
231       if(cstRemoteWriter->cstReader->typeRegister->processEndianness) {
232         cstRemoteWriter->cstReader->typeRegister->processEndianness(
233             &csChange->cdrCodec,
234             cstRemoteWriter->cstReader->typeRegister->processParam);
235       }
236     }
237     info.status=NEW_DATA;
238     info.topic=(char*)sp->topic;
239     info.type=(char*)sp->typeName;
240     info.senderGUID=csChange->guid;
241     info.localTimeReceived=csChange->localTimeReceived;
242     info.remoteTimePublished=csChange->remoteTimePublished;
243     info.sn=csChange->sn;
244     objectEntryOID->recvCallBack(&info,
245                             objectEntryOID->instance,
246                             objectEntryOID->callBackParam);
247     if (sp->mode==IMMEDIATE) {
248       //setup new time for deadline timer
249       eventDetach(cstRemoteWriter->cstReader->domain,
250           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
251           &cstRemoteWriter->cstReader->deadlineTimer,
252           0);
253       eventAdd(cstRemoteWriter->cstReader->domain,
254           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
255           &cstRemoteWriter->cstReader->deadlineTimer,
256           0,   //common timer
257           "CSTReaderDeadlineTimer",
258           CSTReaderDeadlineTimer,
259           &cstRemoteWriter->cstReader->lock,
260           cstRemoteWriter->cstReader,
261           &sp->deadline);
262     }
263     if (sp->mode==PULLED) {
264       NtpTime timeNext;
265       NtpTimeAdd(timeNext,
266                 (getActualNtpTime()),
267                 sp->deadline);
268       htimerUnicastCommon_set_expire(&cstRemoteWriter->
269                 cstReader->deadlineTimer,timeNext);
270     }
271   }
272 }
273
274 /*****************************************************************************/
275 void 
276 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
277   ORTESubsProp         *sp;
278   CSChangeFromWriter   *csChangeFromWriter;
279   SequenceNumber       sn,snNext,lastGapSN;
280  
281   debug(54,10) ("CSTReaderProcIssue: start\n");
282   if (cstRemoteWriter==NULL) return;
283   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
284   if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
285     //Strict
286     if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
287     while (1) {
288       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
289       if (!csChangeFromWriter) break;
290       sn=csChangeFromWriter->csChange->sn;
291       if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) {
292         SeqNumberInc(snNext,cstRemoteWriter->sn);
293         if ((SeqNumberCmp(sn,snNext)==0) &&
294             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
295           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
296             if ((cstRemoteWriter==
297                  cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
298                 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
299               //NewData                
300               CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
301             } 
302             SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
303           } else {
304             //GAP
305             SeqNumberAdd(cstRemoteWriter->sn,
306                         cstRemoteWriter->sn,
307                         csChangeFromWriter->csChange->gapSN);
308           }
309           CSTReaderDestroyCSChange(cstRemoteWriter,
310               &snNext,ORTE_FALSE);
311         } else {
312           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)>0) {
313             //GAP
314             SeqNumberAdd(lastGapSN,sn,csChangeFromWriter->csChange->gapSN);
315             SeqNumberDec(lastGapSN,lastGapSN);
316             CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
317             //is first gapped sn lower then cstRemoteWrite sn and last gapped sn higher then cstRemoteWrite sn?
318             if ((SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) &&
319                 (SeqNumberCmp(lastGapSN,cstRemoteWriter->sn)>=0)) {
320               cstRemoteWriter->sn=lastGapSN;
321             }
322           } else {
323             if (SeqNumberCmp(sn,cstRemoteWriter->sn)<=0) {
324               CSTReaderDestroyCSChange(cstRemoteWriter,&sn,ORTE_FALSE);
325             } else
326               /* stop processing of csChanges */
327               break;
328           }
329         }
330       } else {
331         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
332             csChangeFromWriter,ORTE_FALSE);
333       }
334     }
335   } else {
336     //Best Effort
337     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
338       if ((cstRemoteWriter!=
339            cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
340           (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL)) 
341         return;
342       if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
343       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
344         //NewData                
345         CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
346
347         cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
348
349         CSTReaderDestroyCSChangeFromWriter(
350             cstRemoteWriter,
351             csChangeFromWriter,
352             ORTE_FALSE);
353       }
354     }
355   }  
356   CSTReaderSetupState(cstRemoteWriter);    
357   debug(54,10) ("CSTReaderProcIssue: finished\n");
358 }