]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderProc.c
cf28d81a064f3d98876e4a3f27ea6ab6c74a28ff
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
1 /*
2  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
3  *
4  *  DEBUG:  section 54                  CSChanges processing
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 void
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27     CSChangeFromWriter *csChangeFromWriter) {
28   CSChange           *csChange;
29   ObjectEntryOID     *objectEntryOID;
30
31   
32   csChange=csChangeFromWriter->csChange;
33   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34   if (!objectEntryOID) return;
35   if (!csChange->alive) {
36     eventDetach(d,
37             objectEntryOID->objectEntryAID,
38             &objectEntryOID->expirationPurgeTimer,
39             0);
40     eventAdd(d,
41             objectEntryOID->objectEntryAID,
42             &objectEntryOID->expirationPurgeTimer,
43             0,
44             "ExpirationTimer",
45             objectEntryExpirationTimer,
46             NULL,
47             objectEntryOID,
48             NULL);
49    return;
50   }
51   switch (csChange->guid.aid & 0x03) {
52     case MANAGER:
53       //update parameters of object
54       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55       //copy csChange to writerManagers
56       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
57                                csChangeFromWriter,
58                                ORTE_TRUE);
59       pthread_rwlock_wrlock(&d->writerManagers.lock);
60       CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61       pthread_rwlock_unlock(&d->writerManagers.lock);
62     break;
63     case MANAGEDAPPLICATION: 
64       //update parameters of object
65       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66       //changes can make only local Apps
67       if (cstRemoteWriter->spobject->appMOM) {
68         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
69                                  csChangeFromWriter,
70                                  ORTE_TRUE);
71         pthread_rwlock_wrlock(&d->writerApplications.lock);
72         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73         pthread_rwlock_unlock(&d->writerApplications.lock);
74       }
75     break;
76   }
77 }
78
79 /*****************************************************************************/
80 void 
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82     CSChangeFromWriter *csChangeFromWriter) {
83   CSChange           *csChange;
84   ObjectEntryOID     *objectEntryOID;
85     
86   csChange=csChangeFromWriter->csChange;
87   objectEntryOID=objectEntryFind(d,&csChange->guid);
88   if (!objectEntryOID) return;
89   if (!csChange->alive) {
90     eventDetach(d,
91             objectEntryOID->objectEntryAID,
92             &objectEntryOID->expirationPurgeTimer,
93             0);
94     eventAdd(d,
95             objectEntryOID->objectEntryAID,
96             &objectEntryOID->expirationPurgeTimer,
97             0,
98             "ExpirationTimer",
99             objectEntryExpirationTimer,
100             NULL,
101             objectEntryOID,
102             NULL);
103     return;
104   }
105   switch (csChange->guid.oid & 0x07) {
106     case OID_APPLICATION:
107       break;
108     case OID_PUBLICATION:      
109       parameterUpdatePublication(csChange,
110           (ORTEPublProp*)objectEntryOID->attributes);
111       break;
112     case OID_SUBSCRIPTION: 
113       parameterUpdateSubscription(csChange,
114           (ORTESubsProp*)objectEntryOID->attributes);
115       break;
116   }
117 }
118
119 /*****************************************************************************/
120 void 
121 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
122   CSChangeFromWriter *csChangeFromWriter;
123   SequenceNumber     snNext;
124
125   debug(54,10) ("CSTReaderProcCSChanges: start\n");
126   if (!cstRemoteWriter) return;
127   while (1) {
128     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
129     if (!csChangeFromWriter) break;
130     if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
131                      cstRemoteWriter->firstSN)>=0) {
132       SeqNumberInc(snNext,cstRemoteWriter->sn);
133       debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
134                                              csChangeFromWriter->csChange->sn.low);
135       if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
136           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
137         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
138           if ((d->guid.aid & 0x03)==MANAGER) 
139               CSTReaderProcCSChangesManager(d,cstRemoteWriter,
140                                             csChangeFromWriter);
141           if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) 
142               CSTReaderProcCSChangesApp(d,cstRemoteWriter,
143                                         csChangeFromWriter);
144           SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
145         } else {
146           //GAP
147           SeqNumberAdd(cstRemoteWriter->sn,
148                       cstRemoteWriter->sn,
149                       csChangeFromWriter->csChange->gapSN);
150         }
151         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
152             &snNext,ORTE_FALSE);
153       } else
154         break;
155     } else {
156       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
157           csChangeFromWriter,ORTE_FALSE);
158     }
159   }
160   CSTReaderSetupState(cstRemoteWriter);    
161   debug(54,10) ("CSTReaderProcCSChanges: finished\n");
162 }
163
164 /*****************************************************************************/
165 void
166 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
167     CSChangeFromWriter *csChangeFromWriter) {
168   CSChange             *csChange=csChangeFromWriter->csChange;
169   ORTERecvInfo         info;  
170   ORTESubsProp         *sp;
171   ObjectEntryOID       *objectEntryOID;\r
172   unsigned int         max_size;
173         
174   if (cstRemoteWriter==NULL) return;
175   objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
176   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
177   if (objectEntryOID->recvCallBack) {
178     //deserialization routine
179     if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
180       cstRemoteWriter->cstReader->typeRegister->deserialize(
181           &csChange->cdrCodec,
182           objectEntryOID->instance);
183     } else {
184       //no deserialization -> memcpy
185       ORTEGetMaxSizeParam gms;
186
187       /* determine maximal size */
188       gms.host_endian=csChange->cdrCodec.host_endian;
189       gms.data_endian=csChange->cdrCodec.data_endian;
190       gms.data=csChange->cdrCodec.buffer;
191       gms.max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
192       gms.recv_size=csChange->cdrCodec.buf_len;
193       gms.csize=0;
194       if (cstRemoteWriter->cstReader->typeRegister->getMaxSize)
195         max_size=cstRemoteWriter->cstReader->typeRegister->getMaxSize(&gms);
196       else
197         max_size=cstRemoteWriter->cstReader->typeRegister->maxSize;
198       if (max_size>csChange->cdrCodec.buf_len)
199         max_size=csChange->cdrCodec.buf_len;
200       memcpy(objectEntryOID->instance,
201              csChange->cdrCodec.buffer,
202              max_size);
203     }
204     info.status=NEW_DATA;
205     info.topic=sp->topic;
206     info.type=sp->typeName;
207     info.senderGUID=csChange->guid;
208     info.localTimeReceived=csChange->localTimeReceived;
209     info.remoteTimePublished=csChange->remoteTimePublished;
210     info.sn=csChange->sn;
211     objectEntryOID->recvCallBack(&info,
212                             objectEntryOID->instance,
213                             objectEntryOID->callBackParam);
214     if (sp->mode==IMMEDIATE) {
215       //setup new time for deadline timer
216       eventDetach(cstRemoteWriter->cstReader->domain,
217           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
218           &cstRemoteWriter->cstReader->deadlineTimer,
219           0);
220       eventAdd(cstRemoteWriter->cstReader->domain,
221           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
222           &cstRemoteWriter->cstReader->deadlineTimer,
223           0,   //common timer
224           "CSTReaderDeadlineTimer",
225           CSTReaderDeadlineTimer,
226           &cstRemoteWriter->cstReader->lock,
227           cstRemoteWriter->cstReader,
228           &sp->deadline);
229     }
230     if (sp->mode==PULLED) {
231       NtpTime timeNext;
232       NtpTimeAdd(timeNext,
233                 (getActualNtpTime()),
234                 sp->deadline);
235       htimerUnicastCommon_set_expire(&cstRemoteWriter->
236                 cstReader->deadlineTimer,timeNext);
237     }
238   }
239 }
240
241 /*****************************************************************************/
242 void 
243 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
244   ORTESubsProp         *sp;
245   CSChangeFromWriter   *csChangeFromWriter;
246   SequenceNumber       snNext;
247  
248   debug(54,10) ("CSTReaderProcIssue: start\n");
249   if (cstRemoteWriter==NULL) return;
250   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
251   if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
252     //Strict
253     if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
254     while (1) {
255       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
256       if (!csChangeFromWriter) break;
257       if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
258                       cstRemoteWriter->firstSN)>=0) {
259         SeqNumberInc(snNext,cstRemoteWriter->sn);
260         if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
261             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
262           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
263             if ((cstRemoteWriter==
264                  cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
265                 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
266               //NewData                
267               CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
268             } 
269             SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
270           } else {
271             //GAP
272             SeqNumberAdd(cstRemoteWriter->sn,
273                         cstRemoteWriter->sn,
274                         csChangeFromWriter->csChange->gapSN);
275           }
276
277           CSTReaderDestroyCSChange(cstRemoteWriter,
278               &snNext,ORTE_FALSE);
279         } else
280           break;
281       } else {
282         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
283             csChangeFromWriter,ORTE_FALSE);
284       }
285     }
286   } else {
287     //Best Effort
288     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
289       if ((cstRemoteWriter!=
290            cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
291           (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL)) 
292         return;
293       if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
294       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
295         //NewData                
296         CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
297
298         cstRemoteWriter->sn=csChangeFromWriter->csChange->sn;
299
300         CSTReaderDestroyCSChangeFromWriter(
301             cstRemoteWriter,
302             csChangeFromWriter,
303             ORTE_FALSE);
304       }
305     }
306   }  
307   CSTReaderSetupState(cstRemoteWriter);    
308   debug(54,10) ("CSTReaderProcIssue: finished\n");
309 }