]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderProc.c
5f8e868a0b9795c3bf171b1db6324efd7f00450b
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
1 /*
2  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
3  *
4  *  DEBUG:  section 54                  CSChanges processing
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 void
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27     CSChangeFromWriter *csChangeFromWriter) {
28   CSChange           *csChange;
29   ObjectEntryOID     *objectEntryOID;
30
31   
32   csChange=csChangeFromWriter->csChange;
33   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34   if (!objectEntryOID) return;
35   if (!csChange->alive) {
36     eventDetach(d,
37             objectEntryOID->objectEntryAID,
38             &objectEntryOID->expirationPurgeTimer,
39             0);
40     eventAdd(d,
41             objectEntryOID->objectEntryAID,
42             &objectEntryOID->expirationPurgeTimer,
43             0,
44             "ExpirationTimer",
45             objectEntryExpirationTimer,
46             NULL,
47             objectEntryOID,
48             NULL);
49    return;
50   }
51   switch (csChange->guid.aid & 0x03) {
52     case MANAGER:
53       //update parameters of object
54       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55       //copy csChange to writerManagers
56       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
57                                csChangeFromWriter,
58                                ORTE_TRUE);
59       pthread_rwlock_wrlock(&d->writerManagers.lock);
60       CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61       pthread_rwlock_unlock(&d->writerManagers.lock);
62     break;
63     case MANAGEDAPPLICATION: 
64       //update parameters of object
65       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66       //changes can make only local Apps
67       if (cstRemoteWriter->objectEntryOID->appMOM) {
68         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
69                                  csChangeFromWriter,
70                                  ORTE_TRUE);
71         pthread_rwlock_wrlock(&d->writerApplications.lock);
72         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73         pthread_rwlock_unlock(&d->writerApplications.lock);
74       }
75     break;
76   }
77 }
78
79 /*****************************************************************************/
80 void 
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82     CSChangeFromWriter *csChangeFromWriter) {
83   CSChange           *csChange;
84   ObjectEntryOID     *objectEntryOID;
85     
86   csChange=csChangeFromWriter->csChange;
87   objectEntryOID=objectEntryFind(d,&csChange->guid);
88   if (!objectEntryOID) return;
89   if (!csChange->alive) {
90     eventDetach(d,
91             objectEntryOID->objectEntryAID,
92             &objectEntryOID->expirationPurgeTimer,
93             0);
94     eventAdd(d,
95             objectEntryOID->objectEntryAID,
96             &objectEntryOID->expirationPurgeTimer,
97             0,
98             "ExpirationTimer",
99             objectEntryExpirationTimer,
100             NULL,
101             objectEntryOID,
102             NULL);
103     return;
104   }
105   switch (csChange->guid.oid & 0x07) {
106     case OID_APPLICATION:
107       break;
108     case OID_PUBLICATION:      
109       parameterUpdatePublication(csChange,
110           (ORTEPublProp*)objectEntryOID->attributes);
111       break;
112     case OID_SUBSCRIPTION: 
113       parameterUpdateSubscription(csChange,
114           (ORTESubsProp*)objectEntryOID->attributes);
115       break;
116   }
117 }
118
119 /*****************************************************************************/
120 void 
121 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
122   CSChangeFromWriter *csChangeFromWriter;
123   SequenceNumber     snNext;
124
125   debug(54,10) ("CSTReaderProcCSChanges: start\n");
126   if (!cstRemoteWriter) return;
127   while (1) {
128     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
129     if (!csChangeFromWriter) break;
130     if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
131                      cstRemoteWriter->firstSN)>=0) {
132       SeqNumberInc(snNext,cstRemoteWriter->sn);
133       debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
134                                              csChangeFromWriter->csChange->sn.low);
135       if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
136           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
137         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
138           if ((d->guid.aid & 0x03)==MANAGER) 
139               CSTReaderProcCSChangesManager(d,cstRemoteWriter,
140                                             csChangeFromWriter);
141           if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) 
142               CSTReaderProcCSChangesApp(d,cstRemoteWriter,
143                                         csChangeFromWriter);
144           SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
145         } else {
146           //GAP
147           SeqNumberAdd(cstRemoteWriter->sn,
148                       cstRemoteWriter->sn,
149                       csChangeFromWriter->csChange->gapSN);
150         }
151         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
152             &snNext,ORTE_FALSE);
153       } else
154         break;
155     } else {
156       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
157           csChangeFromWriter,ORTE_FALSE);
158     }
159   }
160   CSTReaderSetupState(cstRemoteWriter);    
161   debug(54,10) ("CSTReaderProcCSChanges: finished\n");
162 }
163
164 /*****************************************************************************/
165 void
166 CSTReaderNewData(CSTRemoteWriter *cstRemoteWriter,
167     CSChangeFromWriter *csChangeFromWriter) {
168   ORTERecvInfo         info;  
169   ORTESubsProp         *sp;
170   ObjectEntryOID       *objectEntryOID;\r
171   unsigned int         length;
172         
173   if (cstRemoteWriter==NULL) return;
174   objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
175   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
176   if (objectEntryOID->recvCallBack) {
177     //deserialization routine
178     if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
179       cstRemoteWriter->cstReader->typeRegister->deserialize(
180           &csChangeFromWriter->csChange->cdrStream,
181           objectEntryOID->instance);
182     } else {
183       length=csChangeFromWriter->csChange->cdrStream.length;
184       if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
185         length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
186       //no deserialization -> memcpy
187       memcpy(objectEntryOID->instance,
188             csChangeFromWriter->csChange->cdrStream.buffer,
189             length);
190     }
191     info.status=NEW_DATA;
192     info.topic=sp->topic;
193     info.type=sp->typeName;
194     info.senderGUID=csChangeFromWriter->csChange->guid;
195     info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
196     info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
197     info.sn=csChangeFromWriter->csChange->sn;
198     objectEntryOID->recvCallBack(&info,
199                             objectEntryOID->instance,
200                             objectEntryOID->callBackParam);
201     if (sp->mode==IMMEDIATE) {
202       //setup new time for deadline timer
203       eventDetach(cstRemoteWriter->cstReader->domain,
204           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
205           &cstRemoteWriter->cstReader->deadlineTimer,
206           0);
207       eventAdd(cstRemoteWriter->cstReader->domain,
208           cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
209           &cstRemoteWriter->cstReader->deadlineTimer,
210           0,   //common timer
211           "CSTReaderDeadlineTimer",
212           CSTReaderDeadlineTimer,
213           &cstRemoteWriter->cstReader->lock,
214           cstRemoteWriter->cstReader,
215           &sp->deadline);
216     }
217     if (sp->mode==PULLED) {
218       NtpTime timeNext;
219       NtpTimeAdd(timeNext,
220                 (getActualNtpTime()),
221                 sp->deadline);
222       htimerUnicastCommon_set_expire(&cstRemoteWriter->
223                 cstReader->deadlineTimer,timeNext);
224     }
225   }
226 }
227
228 /*****************************************************************************/
229 void 
230 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
231   ORTESubsProp         *sp;
232   CSChangeFromWriter   *csChangeFromWriter;
233   SequenceNumber       snNext;
234  
235   debug(54,10) ("CSTReaderProcIssue: start\n");
236   if (cstRemoteWriter==NULL) return;
237   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
238   if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
239     //Strict
240     if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
241     while (1) {
242       csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
243       if (!csChangeFromWriter) break;
244       if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
245                       cstRemoteWriter->firstSN)>=0) {
246         SeqNumberInc(snNext,cstRemoteWriter->sn);
247         debug(54,10) ("CSTReaderProcChangesIssue: processing sn:%u,Change sn:%u\n",snNext.low,
248                                               csChangeFromWriter->csChange->sn.low);
249         if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
250             (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
251           if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
252             if ((cstRemoteWriter==
253                  cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) &&
254                 (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed!=NULL)) {
255               //NewData                
256               CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
257             } 
258             SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
259           } else {
260             //GAP
261             SeqNumberAdd(cstRemoteWriter->sn,
262                         cstRemoteWriter->sn,
263                         csChangeFromWriter->csChange->gapSN);
264           }
265           CSTReaderDestroyCSChange(cstRemoteWriter,
266               &snNext,ORTE_FALSE);
267         } else
268           break;
269       } else {
270         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
271             csChangeFromWriter,ORTE_FALSE);
272       }
273     }
274   } else {
275     //Best Effort
276     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
277       if ((cstRemoteWriter!=
278            cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
279           (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL)) 
280         return;
281       if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
282       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
283         //NewData                
284         CSTReaderNewData(cstRemoteWriter,csChangeFromWriter);
285         CSTReaderDestroyCSChangeFromWriter(
286             cstRemoteWriter,
287             csChangeFromWriter,
288             ORTE_FALSE);
289       }
290     }
291   }  
292   CSTReaderSetupState(cstRemoteWriter);    
293   debug(54,10) ("CSTReaderProcIssue: finished\n");
294 }