]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/objectEntryTimer.c
2396e11b54cbbc19df0fad21966ea8a33efa8826
[orte.git] / orte / liborte / objectEntryTimer.c
1     /*
2  *  $Id: objectEntryTimer.c,v 0.0.0.1   2003/09/10
3  *
4  *  DEBUG:  section 12                  Timer function on object from eventEntry
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 int
26 objectEntryPurgeTimer(ORTEDomain *d,void *vobjectEntryOID) {
27   ObjectEntryOID   *objectEntryOID=(ObjectEntryOID*)vobjectEntryOID;
28   GUID_RTPS        guid;
29   
30   guid=objectEntryOID->guid;
31   if ((d->guid.aid & 0x03) == MANAGER) {
32     if ((guid.aid & 0x03) == MANAGER) {
33       pthread_rwlock_wrlock(&d->writerManagers.lock);
34       CSTWriterMakeGAP(d,&d->writerManagers,&guid);
35       pthread_rwlock_unlock(&d->writerManagers.lock);
36     }
37     if (((guid.aid & 0x03) == MANAGEDAPPLICATION) &&
38         (objectEntryOID->appMOM)) {
39       pthread_rwlock_wrlock(&d->writerApplications.lock);
40       CSTWriterMakeGAP(d,&d->writerApplications,&guid);
41       pthread_rwlock_unlock(&d->writerApplications.lock);
42     }
43   }
44   if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
45     switch (guid.oid & 0x07) {
46       case OID_APPLICATION:
47         break;
48       case OID_PUBLICATION:
49         pthread_rwlock_wrlock(&d->writerPublications.lock);
50         CSTWriterMakeGAP(d,&d->writerPublications,&guid);
51         pthread_rwlock_unlock(&d->writerPublications.lock);
52         break;
53       case OID_SUBSCRIPTION: 
54         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
55         CSTWriterMakeGAP(d,&d->writerSubscriptions,&guid);
56         pthread_rwlock_unlock(&d->writerSubscriptions.lock);
57         break;
58     }  
59   }
60   debug(12,3) ("purged: 0x%x-0x%x-0x%x object removed\n",
61                objectEntryOID->objectEntryHID->hid,
62                objectEntryOID->objectEntryAID->aid,
63                objectEntryOID->oid);
64   objectEntryDelete(d,objectEntryOID);
65   objectEntryDump(&d->objectEntry);
66   
67   debug(12,10) ("objectEntryPurgeTimer: finished\n");
68   return 2;
69 }
70
71 /*****************************************************************************/
72 void
73 removeSubscriptionsOnLocalPublications(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
74   CSTWriter       *cstWriter;
75   CSTRemoteReader *cstRemoteReader;
76   ObjectEntryOID  *objectEntryOID;
77     
78   if ((!d) || (!robjectEntryOID)) return;
79   pthread_rwlock_wrlock(&d->publications.lock);
80   gavl_cust_for_each(CSTWriter,
81                      &d->publications,cstWriter) {
82     pthread_rwlock_wrlock(&cstWriter->lock);
83     gavl_cust_for_each(ObjectEntryOID,
84                        robjectEntryOID->objectEntryAID,
85                        objectEntryOID) {
86       cstRemoteReader=CSTRemoteReader_find(cstWriter,&objectEntryOID->guid);
87       CSTWriterDestroyRemoteReader(d,cstRemoteReader);
88     }
89     pthread_rwlock_unlock(&cstWriter->lock);
90   }
91   pthread_rwlock_unlock(&d->publications.lock);
92 }
93
94 /*****************************************************************************/
95 void
96 removePublicationsOnLocalSubscriptions(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
97   CSTReader       *cstReader;
98   CSTRemoteWriter *cstRemoteWriter;
99   ObjectEntryOID  *objectEntryOID;
100   
101   if ((!d) || (!robjectEntryOID)) return;
102   pthread_rwlock_wrlock(&d->subscriptions.lock);
103   gavl_cust_for_each(CSTReader,
104                      &d->subscriptions,cstReader) {
105     pthread_rwlock_wrlock(&cstReader->lock);
106     gavl_cust_for_each(ObjectEntryOID,
107                        robjectEntryOID->objectEntryAID,
108                        objectEntryOID) {
109       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&objectEntryOID->guid);
110       if (cstRemoteWriter) {
111         CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
112         if ((cstReader->cstRemoteWriterCounter==0) && (cstReader->createdByPattern)) {
113           debug(12,2) ("scheduled: 0x%x-0x%x-0x%x for remove (patternSubscription)\n",
114                       cstReader->guid.hid,cstReader->guid.aid,cstReader->guid.oid);               
115           ORTESubscriptionDestroyLocked(cstReader);
116         }
117       }
118     }
119     pthread_rwlock_unlock(&cstReader->lock);
120   }
121   pthread_rwlock_unlock(&d->subscriptions.lock);
122 }
123
124 /*****************************************************************************/
125 void
126 removeApplication(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
127   GUID_RTPS        guid;
128   ObjectEntryOID   *objectEntryOID;
129   CSTRemoteWriter  *cstRemoteWriter;
130   CSTRemoteReader  *cstRemoteReader;
131   
132   if (!robjectEntryOID) return;
133   if (!gavl_cmp_guid(&robjectEntryOID->guid,&d->guid)) return;
134   debug(12,3) ("application removed\n");
135   
136   guid=robjectEntryOID->guid;
137   //publication, subsription and application
138   pthread_rwlock_wrlock(&d->writerPublications.lock);
139   guid.oid=OID_READ_PUBL;
140   cstRemoteReader=CSTRemoteReader_find(&d->writerPublications,&guid);
141   CSTWriterDestroyRemoteReader(d,cstRemoteReader);
142   pthread_rwlock_unlock(&d->writerPublications.lock);
143   pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
144   guid.oid=OID_READ_SUBS;
145   cstRemoteReader=CSTRemoteReader_find(&d->writerSubscriptions,&guid);
146   CSTWriterDestroyRemoteReader(d,cstRemoteReader);
147   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
148   pthread_rwlock_wrlock(&d->readerPublications.lock);
149   guid.oid=OID_WRITE_PUBL;
150   cstRemoteWriter=CSTRemoteWriter_find(&d->readerPublications,&guid);
151   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
152   pthread_rwlock_unlock(&d->readerPublications.lock);
153   pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
154   guid.oid=OID_WRITE_SUBS;
155   cstRemoteWriter=CSTRemoteWriter_find(&d->readerSubscriptions,&guid);
156   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
157   pthread_rwlock_unlock(&d->readerSubscriptions.lock);
158   //destroy all services
159   removePublicationsOnLocalSubscriptions(d,robjectEntryOID);
160   removeSubscriptionsOnLocalPublications(d,robjectEntryOID);
161   //destroy all object - the object will be disconneced in objectEntryDelete
162   while((objectEntryOID=ObjectEntryOID_first(robjectEntryOID->objectEntryAID))) {
163     switch (objectEntryOID->oid & 0x07) {
164       case OID_PUBLICATION:
165         pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
166         PublicationList_delete(&d->psEntry,objectEntryOID);
167         pthread_rwlock_unlock(&d->psEntry.publicationsLock);
168         break;
169       case OID_SUBSCRIPTION: 
170         pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
171         SubscriptionList_delete(&d->psEntry,objectEntryOID);
172         pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
173         break;
174     }
175     if (objectEntryDelete(d,objectEntryOID)>1) //AID was deleted
176       break;
177   }
178 }
179
180 /*****************************************************************************/
181 //Remove manager
182 void
183 removeManager(ORTEDomain *d,ObjectEntryOID *robjectEntryOID) {
184   CSTRemoteWriter  *cstRemoteWriter;
185   ObjectEntryAID   *objectEntryAID;
186   GUID_RTPS        guid;
187
188   if (!robjectEntryOID) return;
189   debug(12,3) ("manager removed\n");
190   
191   guid=robjectEntryOID->guid;
192   //exists another live Manager on going down node
193   gavl_cust_for_each(ObjectEntryAID,
194                      robjectEntryOID->objectEntryHID,objectEntryAID) {
195     if (((objectEntryAID->aid & 0x03) == MANAGER) &&
196         (objectEntryAID->aid!=robjectEntryOID->guid.aid))
197       break;  //yes
198   }
199   if (!objectEntryAID) {  //not exists 
200     gavl_cust_for_each(ObjectEntryAID,
201                        robjectEntryOID->objectEntryHID,objectEntryAID) {
202       if ((objectEntryAID->aid & 0x03) == MANAGEDAPPLICATION) {
203         ObjectEntryOID   *objectEntryOID;
204         objectEntryOID=ObjectEntryOID_find(objectEntryAID,&guid.oid);
205         if (gavl_cmp_guid(&objectEntryOID->guid,&d->guid)) { //!=
206           removeApplication(d,objectEntryOID);
207           objectEntryAID=  //start
208             ObjectEntryAID_first(robjectEntryOID->objectEntryHID);
209         }
210       }
211     }
212   } 
213   pthread_rwlock_wrlock(&d->readerApplications.lock);
214   pthread_rwlock_wrlock(&d->readerManagers.lock);        
215   guid.oid=OID_WRITE_APP;      
216   cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
217   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
218   guid.oid=OID_WRITE_MGR;      
219   cstRemoteWriter=CSTRemoteWriter_find(&d->readerManagers,&guid);
220   CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
221   pthread_rwlock_unlock(&d->readerApplications.lock);
222   pthread_rwlock_unlock(&d->readerManagers.lock);        
223   objectEntryDelete(d,robjectEntryOID);
224 }
225
226        
227 /*****************************************************************************/
228 int
229 objectEntryExpirationTimer(ORTEDomain *d,void *vobjectEntryOID) {
230   ObjectEntryOID   *objectEntryOID=(ObjectEntryOID*)vobjectEntryOID;
231   ObjectEntryOID   *objectEntryOID1;
232   ObjectEntryAID   *objectEntryAID;
233   CSTWriter        *cstWriter;
234   CSTReader        *cstReader;
235   CSTRemoteWriter  *cstRemoteWriter;
236   CSTRemoteReader  *cstRemoteReader;
237   CSChange         *csChange;
238   GUID_RTPS        guid;
239   
240   //Manager, Manager expired
241   guid=objectEntryOID->guid;
242   //Event
243   generateEvent(d,&guid,objectEntryOID->attributes,ORTE_FALSE);
244   debug(12,3) ("expired: 0x%x-0x%x-0x%x removed\n",
245                objectEntryOID->guid.hid,
246                objectEntryOID->guid.aid,
247                objectEntryOID->guid.oid);               
248   if (((d->guid.aid & 3) == MANAGER) && 
249       ((guid.aid & 0x03) == MANAGER)) {
250     pthread_rwlock_wrlock(&d->readerManagers.lock);
251     pthread_rwlock_wrlock(&d->writerApplications.lock);
252     pthread_rwlock_wrlock(&d->readerApplications.lock);
253     guid.oid=OID_WRITE_APPSELF;  
254     cstRemoteWriter=CSTRemoteWriter_find(&d->readerManagers,&guid);
255     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
256     guid.oid=OID_WRITE_APP;      
257     cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
258     CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
259     guid.oid=OID_READ_APP;  
260     cstRemoteReader=CSTRemoteReader_find(&d->writerApplications,&guid);
261     CSTWriterDestroyRemoteReader(d,cstRemoteReader);
262     guid.oid=objectEntryOID->oid;  //restore oid
263     //generate csChange for writerManager with alive=FALSE
264     csChange=(CSChange*)MALLOC(sizeof(CSChange));
265     CSChangeAttributes_init_head(csChange);
266     csChange->guid=guid;
267     csChange->alive=ORTE_FALSE;
268     csChange->cdrStream.buffer=NULL;
269     CSTWriterAddCSChange(d,&d->writerManagers,csChange);
270     gavl_cust_for_each(ObjectEntryAID,
271                        objectEntryOID->objectEntryHID,objectEntryAID) {
272       if (((objectEntryAID->aid & 0x03) == MANAGER) &&
273           (objectEntryAID->aid!=objectEntryOID->guid.aid))
274         break;  //yes
275     }
276     //if there is no another manager from expired node -> remove all app.
277     if (!objectEntryAID) {
278       gavl_cust_for_each(ObjectEntryAID,
279                          objectEntryOID->objectEntryHID,objectEntryAID) {
280         if ((objectEntryAID->aid & 0x03) == MANAGEDAPPLICATION) {
281           if ((objectEntryOID1=ObjectEntryOID_find(objectEntryAID,&guid.oid))) { 
282             eventDetach(d,
283                 objectEntryOID1->objectEntryAID,
284                 &objectEntryOID1->expirationPurgeTimer,
285                 0);
286             eventAdd(d,
287                 objectEntryOID1->objectEntryAID,
288                 &objectEntryOID1->expirationPurgeTimer,
289                 0,
290                 "ExpirationTimer",
291                 objectEntryExpirationTimer,
292                 NULL,
293                 objectEntryOID1,
294                 NULL);
295           }
296         }
297       }
298     }
299     pthread_rwlock_unlock(&d->readerApplications.lock);
300     pthread_rwlock_unlock(&d->writerApplications.lock);
301     pthread_rwlock_unlock(&d->readerManagers.lock);
302   }
303   //Manager, Application expired
304   if (((d->guid.aid & 3) == MANAGER) && 
305       ((guid.aid & 0x03) == MANAGEDAPPLICATION)) {
306      pthread_rwlock_wrlock(&d->writerApplicationSelf.lock);
307      pthread_rwlock_wrlock(&d->writerManagers.lock);
308      pthread_rwlock_wrlock(&d->writerApplications.lock);
309      pthread_rwlock_wrlock(&d->readerApplications.lock);
310      guid.oid=OID_WRITE_APPSELF;  /* local app */
311      cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
312      CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
313      guid.oid=OID_WRITE_APP;      /* remote app */
314      cstRemoteWriter=CSTRemoteWriter_find(&d->readerApplications,&guid);
315      CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
316      guid.oid=OID_READ_APP;  
317      cstRemoteReader=CSTRemoteReader_find(&d->writerApplications,&guid);
318      CSTWriterDestroyRemoteReader(d,cstRemoteReader);
319      guid.oid=OID_READ_MGR;  
320      cstRemoteReader=CSTRemoteReader_find(&d->writerManagers,&guid);
321      CSTWriterDestroyRemoteReader(d,cstRemoteReader);
322      if (objectEntryOID->appMOM) {
323         guid.oid=objectEntryOID->oid;  //restore oid
324         //generate csChange for writerApplication with alive=FALSE
325         csChange=(CSChange*)MALLOC(sizeof(CSChange));
326         parameterUpdateCSChange(csChange,d->appParams,ORTE_TRUE);
327         csChange->guid=guid;
328         csChange->alive=ORTE_FALSE;
329         csChange->cdrStream.buffer=NULL;
330         CSTWriterAddCSChange(d,&d->writerApplications,csChange);
331         //increment vargAppsSequenceNumber and make csChange
332         SeqNumberInc(d->appParams->vargAppsSequenceNumber,
333                      d->appParams->vargAppsSequenceNumber);
334         appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE,ORTE_TRUE);
335      } else {
336        objectEntryDelete(d,objectEntryOID);
337        objectEntryOID=NULL;  
338      }
339      pthread_rwlock_unlock(&d->writerApplicationSelf.lock);
340      pthread_rwlock_unlock(&d->writerManagers.lock);
341      pthread_rwlock_unlock(&d->writerApplications.lock);
342      pthread_rwlock_unlock(&d->readerApplications.lock);
343   }
344   //Application 
345   if ((d->guid.aid & 0x03) == MANAGEDAPPLICATION) {
346     switch (guid.oid & 0x07) {
347       case OID_APPLICATION:
348         if ((guid.aid & 0x03) == MANAGER) {                  //Manager
349           removeManager(d,objectEntryOID);
350           objectEntryOID=NULL;
351         }
352         if ((guid.aid & 0x03) == MANAGEDAPPLICATION) {       //Application
353           removeApplication(d,objectEntryOID);
354           objectEntryOID=NULL;
355          }
356         break;
357       case OID_PUBLICATION:
358         pthread_rwlock_wrlock(&d->subscriptions.lock);
359         gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
360           pthread_rwlock_wrlock(&cstReader->lock);
361           cstRemoteWriter=CSTRemoteWriter_find(cstReader,&guid);
362           if (cstRemoteWriter) {
363             CSTReaderDestroyRemoteWriter(d,cstRemoteWriter);
364             if ((cstReader->cstRemoteWriterCounter==0) && (cstReader->createdByPattern)) {
365               debug(12,2) ("scheduled: 0x%x-0x%x-0x%x for remove (patternSubscription)\n",
366                           cstReader->guid.hid,cstReader->guid.aid,cstReader->guid.oid);               
367               ORTESubscriptionDestroyLocked(cstReader);
368             }
369           }
370           pthread_rwlock_unlock(&cstReader->lock);
371         }
372         pthread_rwlock_unlock(&d->subscriptions.lock);
373         pthread_rwlock_wrlock(&d->publications.lock);
374         cstWriter=CSTWriter_find(&d->publications,&guid);
375         if (cstWriter) {
376           CSTWriterDelete(d,cstWriter);
377           CSTWriter_delete(&d->publications,cstWriter);
378           FREE(cstWriter);
379         }
380         pthread_rwlock_unlock(&d->publications.lock);
381         pthread_rwlock_wrlock(&d->psEntry.publicationsLock);
382         PublicationList_delete(&d->psEntry,objectEntryOID);
383         pthread_rwlock_unlock(&d->psEntry.publicationsLock);
384         if (!objectEntryOID->privateCreated) { //not local object cann't be purged
385           objectEntryDelete(d,objectEntryOID);
386           objectEntryOID=NULL;
387         }
388         break;
389       case OID_SUBSCRIPTION: 
390         pthread_rwlock_wrlock(&d->publications.lock);
391         gavl_cust_for_each(CSTWriter,&d->publications,cstWriter) {
392           cstRemoteReader=CSTRemoteReader_find(cstWriter,&guid);
393           CSTWriterDestroyRemoteReader(d,cstRemoteReader);
394         }
395         pthread_rwlock_unlock(&d->publications.lock);
396         pthread_rwlock_wrlock(&d->subscriptions.lock);
397         cstReader=CSTReader_find(&d->subscriptions,&guid);
398         if (cstReader) {
399           CSTReaderDelete(d,cstReader);
400           CSTReader_delete(&d->subscriptions,cstReader);
401           FREE(cstReader);
402         }
403         pthread_rwlock_unlock(&d->subscriptions.lock);
404         pthread_rwlock_wrlock(&d->psEntry.subscriptionsLock);
405         SubscriptionList_delete(&d->psEntry,objectEntryOID);
406         pthread_rwlock_unlock(&d->psEntry.subscriptionsLock);
407         if (!objectEntryOID->privateCreated) { //local object cann't be purged immediately
408           objectEntryDelete(d,objectEntryOID);
409           objectEntryOID=NULL;
410         }
411         break;
412     }
413   }      
414   if (objectEntryOID) {
415     eventDetach(d,
416             objectEntryOID->objectEntryAID,
417             &objectEntryOID->expirationPurgeTimer,
418             0);
419     eventAdd(d,
420             objectEntryOID->objectEntryAID,
421             &objectEntryOID->expirationPurgeTimer,
422             0,
423             "PurgeTimer",
424             objectEntryPurgeTimer,
425             NULL,
426             objectEntryOID,
427             &d->domainProp.baseProp.purgeTime);
428     debug(12,3) ("expired: 0x%x-0x%x-0x%x marked for remove\n",
429                  objectEntryOID->objectEntryHID->hid,
430                  objectEntryOID->objectEntryAID->aid,
431                  objectEntryOID->oid);
432   }
433   objectEntryDump(&d->objectEntry);
434   if (!objectEntryOID) return 2;
435   return 0;  
436 }