]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/objectEntryTimer.c
Added prerelease of ORTE-0.2 (Real Time Publisher Subscriber communication protocol...
[orte.git] / orte / liborte / objectEntryTimer.c
1     /*
2  *  $Id: objectEntryTimer.c,v 0.0.0.1   2003/09/10
3  *
4  *  DEBUG:  section 12                  Timer function on object from eventEntry
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 int
26 objectEntryPurgeTimer(ORTEDomain *d,void *vobjectEntryOID) {
27   ObjectEntryOID   *objectEntryOID=(ObjectEntryOID*)vobjectEntryOID;
28   GUID_RTPS        guid;
29   
30   guid=objectEntryOID->guid;
31   if ((d->guid.aid & 0x03) == MANAGER) {
32     if ((guid.aid & 0x03) == MANAGER) {
33       pthread_rwlock_wrlock(&d->writerManagers.lock);
34       CSTWriterMakeGAP(d,&d->writerManagers,&guid);
35       pthread_rwlock_unlock(&d->writerManagers.lock);
36     }
37     if (((guid.aid & 0x03) == MANAGEDAPPLICATION) &&
38         (objectEntryOID->appMOM)) {
39       pthread_rwlock_wrlock(&d->writerApplications.lock);
40       CSTWriterMakeGAP(d,&d->writerApplications,&guid);
41       pthread_rwlock_unlock(&d->writerApplications.lock);
42     }
43   }
44   if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
45     switch (guid.oid & 0x07) {
46       case OID_APPLICATION:
47         break;
48       case OID_PUBLICATION:
49         pthread_rwlock_wrlock(&d->writerPublications.lock);
50         CSTWriterMakeGAP(d,&d->writerPublications,&guid);
51         pthread_rwlock_unlock(&d->writerPublications.lock);
52         break;
53       case OID_SUBSCRIPTION: 
54         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
55         CSTWriterMakeGAP(d,&d->writerSubscriptions,&guid);
56         pthread_rwlock_unlock(&d->writerSubscriptions.lock);
57         break;
58     }  
59   }
60   debug(12,3) ("purged: 0x%x-0x%x-0x%x object removed\n",
61                objectEntryOID->objectEntryHID->hid,
62                objectEntryOID->objectEntryAID->aid,
63                objectEntryOID->oid);
64   objectEntryDelete(d,objectEntryOID);
65   objectEntryDump(&d->objectEntry);
66   
67   debug(12,10) ("objectEntryPurgeTimer: finished\n");
68   return 2;
69 }
70
71 /*****************************************************************************/
72 void
73 removeApplication(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
74   GUID_RTPS        guid;
75   ObjectEntryOID   *objectEntryOID;
76   CSTWriter        *cstWriter;
77   CSTReader        *cstReader;
78   CSTRemoteWriter  *cstRemoteWriter;
79   CSTRemoteReader  *cstRemoteReader;
80   
81   if (!robjectEntryOID) return;
82   if (!gavl_cmp_guid(&robjectEntryOID->guid,&d->guid)) return;
83   debug(12,3) ("application removed\n");
84   
85   guid=robjectEntryOID->guid;
86   //publication, subsription and application
87   pthread_rwlock_wrlock(&d->writerPublications.lock);
88   guid.oid=OID_READ_PUBL;
89   cstRemoteReader=CSTRemoteReader_find(&d->writerPublications,&guid);
90   CSTWriterDestroyRemoteReader(d,cstRemoteReader);
91   pthread_rwlock_unlock(&d->writerPublications.lock);
92   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
93   guid.oid=OID_READ_SUBS;
94   cstRemoteReader=CSTRemoteReader_find(&d->writerSubscriptions,&guid);
95   CSTWriterDestroyRemoteReader(d,cstRemoteReader);
96   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
97   pthread_rwlock_wrlock(&d->readerPublications.lock);
98   guid.oid=OID_WRITE_PUBL;
99   cstRemoteWriter=CSTRemoteWriter_find(&d->readerPublications,&guid);
100   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
101   pthread_rwlock_unlock(&d->readerPublications.lock);
102   pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
103   guid.oid=OID_WRITE_SUBS;
104   cstRemoteWriter=CSTRemoteWriter_find(&d->readerSubscriptions,&guid);
105   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
106   pthread_rwlock_unlock(&d->readerSubscriptions.lock);
107   //destroy all services
108   //from publisheres
109   pthread_rwlock_wrlock(&d->publications.lock);
110   gavl_cust_for_each(CSTWriter,
111                      &d->publications,cstWriter) {
112     pthread_rwlock_wrlock(&cstWriter->lock);
113     gavl_cust_for_each(ObjectEntryOID,
114                        robjectEntryOID->objectEntryAID,
115                        objectEntryOID) {
116       cstRemoteReader=CSTRemoteReader_find(cstWriter,&objectEntryOID->guid);
117       CSTWriterDestroyRemoteReader(d,cstRemoteReader);
118     }
119     pthread_rwlock_unlock(&cstWriter->lock);
120   }
121   pthread_rwlock_unlock(&d->publications.lock);
122   //from subscriberes
123   pthread_rwlock_wrlock(&d->subscriptions.lock);
124   gavl_cust_for_each(CSTReader,
125                      &d->subscriptions,cstReader) {
126     pthread_rwlock_wrlock(&cstReader->lock);
127     gavl_cust_for_each(ObjectEntryOID,
128                        robjectEntryOID->objectEntryAID,
129                        objectEntryOID) {
130       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&objectEntryOID->guid);
131       CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
132     }
133     pthread_rwlock_unlock(&cstReader->lock);
134   }
135   pthread_rwlock_unlock(&d->subscriptions.lock);
136   //destroy all object - the object will be disconneced in objectEntryDelete
137   while((objectEntryOID=ObjectEntryOID_first(robjectEntryOID->objectEntryAID))) {
138     switch (objectEntryOID->oid & 0x07) {
139       case OID_PUBLICATION:
140         pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
141         PublicationList_delete(&d->psEntry,objectEntryOID);
142         pthread_rwlock_unlock(&d->psEntry.publicationsLock);
143         break;
144       case OID_SUBSCRIPTION: 
145         pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
146         SubscriptionList_delete(&d->psEntry,objectEntryOID);
147         pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
148         break;
149     }
150     if (objectEntryDelete(d,objectEntryOID)>1) //AID was deleted
151       break;
152   }
153 }
154
155 /*****************************************************************************/
156 //Remove manager
157 void
158 removeManager(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
159   CSTRemoteWriter  *cstRemoteWriter;
160   ObjectEntryAID   *objectEntryAID;
161   GUID_RTPS        guid;
162
163   if (!robjectEntryOID) return;
164   debug(12,3) ("manager removed\n");
165   
166   guid=robjectEntryOID->guid;
167   //exists another live Manager on going down node
168   gavl_cust_for_each(ObjectEntryAID,
169                      robjectEntryOID->objectEntryHID,objectEntryAID) {
170     if (((objectEntryAID->aid & 0x03) == MANAGER) &&
171         (objectEntryAID->aid!=robjectEntryOID->guid.aid))
172       break;  //yes
173   }
174   if (!objectEntryAID) {  //not exists 
175     gavl_cust_for_each(ObjectEntryAID,
176                        robjectEntryOID->objectEntryHID,objectEntryAID) {
177       if ((objectEntryAID->aid & 0x03) == MANAGEDAPPLICATION) {
178         ObjectEntryOID   *objectEntryOID;
179         objectEntryOID=ObjectEntryOID_find(objectEntryAID,&guid.oid);
180         if (gavl_cmp_guid(&objectEntryOID->guid,&d->guid)) { //!=
181           removeApplication(d,objectEntryOID);
182           objectEntryAID=  //start
183             ObjectEntryAID_first(robjectEntryOID->objectEntryHID);
184         }
185       }
186     }
187   } 
188   pthread_rwlock_wrlock(&d->readerApplications.lock);
189   pthread_rwlock_wrlock(&d->readerManagers.lock);        
190   guid.oid=OID_WRITE_APP;      
191   cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
192   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
193   guid.oid=OID_WRITE_MGR;      
194   cstRemoteWriter=CSTRemoteWriter_find(&d->readerManagers,&guid);
195   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
196   pthread_rwlock_unlock(&d->readerApplications.lock);
197   pthread_rwlock_unlock(&d->readerManagers.lock);        
198   objectEntryDelete(d,robjectEntryOID);
199 }
200
201        
202 /*****************************************************************************/
203 int
204 objectEntryExpirationTimer(ORTEDomain *d,void *vobjectEntryOID) {
205   ObjectEntryOID   *objectEntryOID=(ObjectEntryOID*)vobjectEntryOID;
206   ObjectEntryOID   *objectEntryOID1;
207   ObjectEntryAID   *objectEntryAID;
208   CSTWriter        *cstWriter;
209   CSTReader        *cstReader;
210   CSTRemoteWriter  *cstRemoteWriter;
211   CSTRemoteReader  *cstRemoteReader;
212   CSChange         *csChange;
213   GUID_RTPS        guid;
214   
215   //Manager, Manager expired
216   guid=objectEntryOID->guid;
217   //Event
218   generateEvent(d,&guid,objectEntryOID->attributes,ORTE_FALSE);
219   debug(12,3) ("expired: 0x%x-0x%x removed\n",
220                objectEntryOID->objectEntryHID->hid,
221                objectEntryOID->objectEntryAID->aid);               
222   if (((d->guid.aid & 3) == MANAGER) && 
223       ((guid.aid & 0x03) == MANAGER)) {
224     pthread_rwlock_wrlock(&d->readerManagers.lock);
225     pthread_rwlock_wrlock(&d->writerApplications.lock);
226     pthread_rwlock_wrlock(&d->readerApplications.lock);
227     guid.oid=OID_WRITE_APPSELF;  
228     cstRemoteWriter=CSTRemoteWriter_find(&d->readerManagers,&guid);
229     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
230     guid.oid=OID_WRITE_APP;      
231     cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
232     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
233     guid.oid=OID_READ_APP;  
234     cstRemoteReader=CSTRemoteReader_find(&d->writerApplications,&guid);
235     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
236     guid.oid=objectEntryOID->oid;  //restore oid
237     //generate csChange for writerManager with alive=FALSE
238     csChange=(CSChange*)MALLOC(sizeof(CSChange));
239     CSChangeAttributes_init_head(csChange);
240     csChange->guid=guid;
241     csChange->alive=ORTE_FALSE;
242     csChange->cdrStream.buffer=NULL;
243     CSTWriterAddCSChange(d,&d->writerManagers,csChange);
244     gavl_cust_for_each(ObjectEntryAID,
245                        objectEntryOID->objectEntryHID,objectEntryAID) {
246       if (((objectEntryAID->aid & 0x03) == MANAGER) &&
247           (objectEntryAID->aid!=objectEntryOID->guid.aid))
248         break;  //yes
249     }
250     //if there is no another manager from expired node -> remove all app.
251     if (!objectEntryAID) {
252       gavl_cust_for_each(ObjectEntryAID,
253                          objectEntryOID->objectEntryHID,objectEntryAID) {
254         if ((objectEntryAID->aid & 0x03) == MANAGEDAPPLICATION) {
255           if ((objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&guid.oid))) { 
256             eventAdd(d,
257                 objectEntryOID1->objectEntryAID,
258                 &objectEntryOID1->expirationPurgeTimer,
259                 0,
260                 "ExpirationTimer",
261                 objectEntryExpirationTimer,
262                 NULL,
263                 objectEntryOID1,
264                 NULL);
265           }
266         }
267       }
268     }
269     pthread_rwlock_unlock(&d->readerApplications.lock);
270     pthread_rwlock_unlock(&d->writerApplications.lock);
271     pthread_rwlock_unlock(&d->readerManagers.lock);
272   }
273   //Manager, Application expired
274   if (((d->guid.aid & 3) == MANAGER) && 
275       ((guid.aid & 0x03) == MANAGEDAPPLICATION)) {
276      pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
277      pthread_rwlock_wrlock(&d->writerManagers.lock);
278      pthread_rwlock_wrlock(&d->writerApplications.lock);
279      pthread_rwlock_wrlock(&d->readerApplications.lock);
280      guid.oid=OID_WRITE_APPSELF;  /* local app */
281      cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
282      CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
283      guid.oid=OID_WRITE_APP;      /* remote app */
284      cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
285      CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
286      guid.oid=OID_READ_APP;  
287      cstRemoteReader=CSTRemoteReader_find(&d->writerApplications,&guid);
288      CSTWriterDestroyRemoteReader(d,cstRemoteReader);
289      guid.oid=OID_READ_MGR;  
290      cstRemoteReader=CSTRemoteReader_find(&d->writerManagers,&guid);
291      CSTWriterDestroyRemoteReader(d,cstRemoteReader);
292      if (objectEntryOID->appMOM) {
293         guid.oid=objectEntryOID->oid;  //restore oid
294         //generate csChange for writerApplication with alive=FALSE
295         csChange=(CSChange*)MALLOC(sizeof(CSChange));
296         parameterUpdateCSChange(csChange,d->appParams,ORTE_TRUE);
297         csChange->guid=guid;
298         csChange->alive=ORTE_FALSE;
299         csChange->cdrStream.buffer=NULL;
300         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
301         //increment vargAppsSequenceNumber and make csChange
302         SeqNumberInc(d->appParams->vargAppsSequenceNumber,
303                      d->appParams->vargAppsSequenceNumber);
304         appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
305      } else {
306        objectEntryDelete(d,objectEntryOID);
307        objectEntryOID=NULL;  
308      }
309      pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
310      pthread_rwlock_unlock(&d->writerManagers.lock);
311      pthread_rwlock_unlock(&d->writerApplications.lock);
312      pthread_rwlock_unlock(&d->readerApplications.lock);
313   }
314   //Application 
315   if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
316     switch (guid.oid & 0x07) {
317       case OID_APPLICATION:
318         if ((guid.aid & 0x03) == MANAGER) {                  //Manager
319           removeManager(d,objectEntryOID);
320           objectEntryOID=NULL;
321         }
322         if ((guid.aid & 0x03) == MANAGEDAPPLICATION) {       //Application
323           removeApplication(d,objectEntryOID);
324           objectEntryOID=NULL;
325          }
326         break;
327       case OID_PUBLICATION:
328         pthread_rwlock_wrlock(&d->subscriptions.lock);
329         gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
330           cstRemoteWriter=CSTRemoteWriter_find(cstReader,&guid);
331           CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
332         }
333         pthread_rwlock_unlock(&d->subscriptions.lock);
334         pthread_rwlock_wrlock(&d->publications.lock);
335         cstWriter=CSTWriter_find(&d->publications,&guid);
336         if (cstWriter) {
337           CSTWriterDelete(d,cstWriter);
338           CSTWriter_delete(&d->publications,cstWriter);
339           FREE(cstWriter);
340         }
341         pthread_rwlock_unlock(&d->publications.lock);
342         pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
343         PublicationList_delete(&d->psEntry,objectEntryOID);
344         pthread_rwlock_unlock(&d->psEntry.publicationsLock);
345         if (!objectEntryOID->private) { //not local object cann't be purged
346           objectEntryDelete(d,objectEntryOID);
347           objectEntryOID=NULL;
348         }
349         break;
350       case OID_SUBSCRIPTION: 
351         pthread_rwlock_wrlock(&d->publications.lock);
352         gavl_cust_for_each(CSTWriter,&d->publications,cstWriter) {
353           cstRemoteReader=CSTRemoteReader_find(cstWriter,&guid);
354           CSTWriterDestroyRemoteReader(d,cstRemoteReader);
355         }
356         pthread_rwlock_unlock(&d->publications.lock);
357         pthread_rwlock_wrlock(&d->subscriptions.lock);
358         cstReader=CSTReader_find(&d->subscriptions,&guid);
359         if (cstReader) {
360           CSTReaderDelete(d,cstReader);
361           CSTReader_delete(&d->subscriptions,cstReader);
362           FREE(cstReader);
363         }
364         pthread_rwlock_unlock(&d->subscriptions.lock);
365         pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
366         SubscriptionList_delete(&d->psEntry,objectEntryOID);
367         pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
368         if (!objectEntryOID->private) { //local object cann't be purged
369           objectEntryDelete(d,objectEntryOID);
370           objectEntryOID=NULL;
371         }
372         break;
373     }
374   }      
375   if (objectEntryOID) {
376     eventDetach(d,
377             objectEntryOID->objectEntryAID,
378             &objectEntryOID->expirationPurgeTimer,
379             0);
380     eventAdd(d,
381             objectEntryOID->objectEntryAID,
382             &objectEntryOID->expirationPurgeTimer,
383             0,
384             "PurgeTimer",
385             objectEntryPurgeTimer,
386             NULL,
387             objectEntryOID,
388             &d->domainProp.baseProp.purgeTime);
389     debug(12,3) ("expired: 0x%x-0x%x marked for remove\n",
390                  objectEntryOID->objectEntryHID->hid,
391                  objectEntryOID->objectEntryAID->aid);
392   }
393   objectEntryDump(&d->objectEntry);
394   if (!objectEntryOID) return 2;
395   return 0;  
396 }