]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSCSTReaderProc.c
Added prerelease of ORTE-0.2 (Real Time Publisher Subscriber communication protocol...
[orte.git] / orte / liborte / RTPSCSTReaderProc.c
1 /*
2  *  $Id: RTPSCSTReaderProc.c,v 0.0.0.1 2003/09/13 
3  *
4  *  DEBUG:  section 54                  CSChanges processing
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 void
26 CSTReaderProcCSChangesManager(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
27     CSChangeFromWriter *csChangeFromWriter) {
28   CSChange           *csChange;
29   ObjectEntryOID     *objectEntryOID;
30
31   
32   csChange=csChangeFromWriter->csChange;
33   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
34   if (!objectEntryOID) return;
35   if (!csChange->alive) {
36     eventDetach(d,
37             objectEntryOID->objectEntryAID,
38             &objectEntryOID->expirationPurgeTimer,
39             0);
40     eventAdd(d,
41             objectEntryOID->objectEntryAID,
42             &objectEntryOID->expirationPurgeTimer,
43             0,
44             "ExpirationTimer",
45             objectEntryExpirationTimer,
46             NULL,
47             objectEntryOID,
48             NULL);
49    return;
50   }
51   switch (csChange->guid.aid & 0x03) {
52     case MANAGER:
53       //update parameters of object
54       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
55       //copy csChange to writerManagers
56       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
57                                csChangeFromWriter,
58                                ORTE_TRUE);
59       pthread_rwlock_wrlock(&d->writerManagers.lock);
60       CSTWriterAddCSChange(d,&d->writerManagers,csChange);
61       pthread_rwlock_unlock(&d->writerManagers.lock);
62     break;
63     case MANAGEDAPPLICATION: 
64       //update parameters of object
65       parameterUpdateApplication(csChange,(AppParams*)objectEntryOID->attributes);
66       //changes can make only local Apps
67       if (cstRemoteWriter->objectEntryOID->appMOM) {
68         CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
69                                  csChangeFromWriter,
70                                  ORTE_TRUE);
71         pthread_rwlock_wrlock(&d->writerApplications.lock);
72         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
73         pthread_rwlock_unlock(&d->writerApplications.lock);
74       }
75     break;
76   }
77 }
78
79 /*****************************************************************************/
80 void 
81 CSTReaderProcCSChangesApp(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter,
82     CSChangeFromWriter *csChangeFromWriter) {
83   CSChange           *csChange;
84   ObjectEntryOID     *objectEntryOID;
85     
86   csChange=csChangeFromWriter->csChange;
87   objectEntryOID=objectEntryFind(d,&csChangeFromWriter->csChange->guid);
88   if (!objectEntryOID) return;
89   if (!csChange->alive) {
90     eventDetach(d,
91             objectEntryOID->objectEntryAID,
92             &objectEntryOID->expirationPurgeTimer,
93             0);
94     eventAdd(d,
95             objectEntryOID->objectEntryAID,
96             &objectEntryOID->expirationPurgeTimer,
97             0,
98             "ExpirationTimer",
99             objectEntryExpirationTimer,
100             NULL,
101             objectEntryOID,
102             NULL);
103     return;
104   }
105   switch (csChangeFromWriter->csChange->guid.oid & 0x07) {
106     case OID_APPLICATION:
107       break;
108     case OID_PUBLICATION:      
109       break;
110     case OID_SUBSCRIPTION: 
111       break;
112   }
113 }
114
115 /*****************************************************************************/
116 void 
117 CSTReaderProcCSChanges(ORTEDomain *d,CSTRemoteWriter *cstRemoteWriter) {
118   CSChangeFromWriter *csChangeFromWriter;
119   SequenceNumber     snNext;
120
121   debug(54,10) ("CSTReaderProcCSChanges: start\n");
122   if (!cstRemoteWriter) return;
123   while (1) {
124     csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter);
125     if (!csChangeFromWriter) break;
126     if (SeqNumberCmp(csChangeFromWriter->csChange->sn,
127                      cstRemoteWriter->firstSN)>=0) {
128       SeqNumberInc(snNext,cstRemoteWriter->sn);
129       debug(54,10) ("CSTReaderProcCSChanges: processing sn:%u,Change sn:%u\n",snNext.low,
130                                              csChangeFromWriter->csChange->sn.low);
131       if ((SeqNumberCmp(csChangeFromWriter->csChange->sn,snNext)==0) &&
132           (csChangeFromWriter->commStateChFWriter==RECEIVED)) {
133         if (SeqNumberCmp(csChangeFromWriter->csChange->gapSN,noneSN)==0) {
134           if ((d->guid.aid & 0x03)==MANAGER) 
135               CSTReaderProcCSChangesManager(d,cstRemoteWriter,
136                                             csChangeFromWriter);
137           if ((d->guid.aid & 0x03)==MANAGEDAPPLICATION) 
138               CSTReaderProcCSChangesApp(d,cstRemoteWriter,
139                                         csChangeFromWriter);
140           SeqNumberInc(cstRemoteWriter->sn,cstRemoteWriter->sn);
141         } else {
142           //GAP
143           SeqNumberAdd(cstRemoteWriter->sn,
144                       cstRemoteWriter->sn,
145                       csChangeFromWriter->csChange->gapSN);
146         }
147         CSTReaderDestroyCSChange(cstRemoteWriter,  //note:csChange can be coped to another CSTWriter!!!
148             &snNext,ORTE_FALSE);
149       } else
150         break;
151     } else {
152       CSTReaderDestroyCSChangeFromWriter(cstRemoteWriter,
153           csChangeFromWriter,ORTE_FALSE);
154     }
155   }
156   CSTReaderSetupState(cstRemoteWriter);    
157   debug(54,10) ("CSTReaderProcCSChanges: finished\n");
158 }
159
160 /*****************************************************************************/
161 void 
162 CSTReaderProcCSChangesIssue(CSTRemoteWriter *cstRemoteWriter,Boolean pullCalled) {
163   ORTESubsProp         *sp;
164   CSChangeFromWriter   *csChangeFromWriter;
165   ORTERecvInfo         info;  
166  
167   debug(54,10) ("CSTReaderProcIssue: start\n");
168   if (cstRemoteWriter==NULL) return;
169   sp=(ORTESubsProp*)cstRemoteWriter->cstReader->objectEntryOID->attributes;
170   //Strict
171   if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
172   } else {
173     //Best Effort
174     if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_BEST_EFFORTS)!=0) {
175       if ((cstRemoteWriter!=
176            cstRemoteWriter->cstReader->cstRemoteWriterSubscribed) ||
177           (cstRemoteWriter->cstReader->cstRemoteWriterSubscribed==NULL)) 
178         return;
179       if ((sp->mode==PULLED) && (pullCalled==ORTE_FALSE)) return;
180       while((csChangeFromWriter=CSChangeFromWriter_first(cstRemoteWriter))) {
181         ObjectEntryOID *objectEntryOID;
182         objectEntryOID=cstRemoteWriter->cstReader->objectEntryOID;
183         if (objectEntryOID->recvCallBack) {
184           //deserialization routine
185           if (cstRemoteWriter->cstReader->typeRegister->deserialize) {
186             cstRemoteWriter->cstReader->typeRegister->deserialize(
187                 &csChangeFromWriter->csChange->cdrStream,
188                 objectEntryOID->instance);
189           } else {
190             int length=csChangeFromWriter->csChange->cdrStream.length;
191             if (cstRemoteWriter->cstReader->typeRegister->getMaxSize<length)
192               length=cstRemoteWriter->cstReader->typeRegister->getMaxSize;
193             //no deserialization -> memcpy
194             memcpy(objectEntryOID->instance,
195                    csChangeFromWriter->csChange->cdrStream.buffer,
196                    length);
197           }
198           info.status=NEW_DATA;
199           info.topic=sp->topic;
200           info.type=sp->typeName;
201           info.senderGUID=csChangeFromWriter->csChange->guid;
202           info.localTimeReceived=csChangeFromWriter->csChange->localTimeReceived;
203           info.remoteTimePublished=csChangeFromWriter->csChange->remoteTimePublished;
204           info.sn=csChangeFromWriter->csChange->sn;
205           objectEntryOID->recvCallBack(&info,
206                                    objectEntryOID->instance,
207                                    objectEntryOID->callBackParam);
208           if (sp->mode==IMMEDIATE) {
209             //setup new time for deadline timer
210             eventDetach(cstRemoteWriter->cstReader->domain,
211                 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
212                 &cstRemoteWriter->cstReader->deadlineTimer,
213                 0);
214             eventAdd(cstRemoteWriter->cstReader->domain,
215                 cstRemoteWriter->cstReader->objectEntryOID->objectEntryAID,
216                 &cstRemoteWriter->cstReader->deadlineTimer,
217                 0,   //common timer
218                 "CSTReaderDeadlineTimer",
219                 CSTReaderDeadlineTimer,
220                 &cstRemoteWriter->cstReader->lock,
221                 cstRemoteWriter->cstReader,
222                 &sp->deadline);
223           }
224           if (sp->mode==PULLED) {
225             NtpTime timeNext;
226             NtpTimeAdd(timeNext,
227                       (getActualNtpTime()),
228                       sp->deadline);
229             htimerUnicastCommon_set_expire(&cstRemoteWriter->
230                       cstReader->deadlineTimer,timeNext);
231           }
232         }
233         CSTReaderDestroyCSChangeFromWriter(
234             cstRemoteWriter,
235             csChangeFromWriter,
236             ORTE_FALSE);
237       }
238     }
239   }  
240   CSTReaderSetupState(cstRemoteWriter);    
241   debug(54,10) ("CSTReaderProcIssue: finished\n");
242 }