]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomainApp.c
e244362a9ec15e6c63aacadeee7569967cbc4619
[orte.git] / orte / liborte / ORTEDomainApp.c
1 /*
2  *  $Id: ORTEDomainApp.c,v 0.0.0.1      2003/08/21 
3  *
4  *  DEBUG:  section 21                  Domain application
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24
25 /*****************************************************************************/
26 ORTEDomain * 
27 ORTEDomainAppCreate(int domain, ORTEDomainProp *prop,
28                     ORTEDomainAppEvents *events,Boolean suspended) {
29   ORTEDomain        *d;
30   ObjectEntryOID    *objectEntryOID;
31   AppParams         *appParams;
32   CSTWriterParams   cstWriterParams;
33   CSTReaderParams   cstReaderParams;
34   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
35   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
36   char              sbuff[128];
37   int               i;
38   uint16_t          port=0;
39
40   debug(21,10) ("ORTEDomainAppCreate: start\n");
41   //Create domainApplication
42   d=MALLOC(sizeof(ORTEDomain));
43   if (!d) return NULL;  //no memory
44   //initialization local values
45   d->domain=domain;
46   d->taskRecvMetatraffic.terminate=ORTE_TRUE;
47   d->taskRecvUserdata.terminate=ORTE_TRUE;
48   d->taskSend.terminate=ORTE_TRUE;
49   d->taskRecvMetatraffic.sock.port=0;
50   d->taskRecvUserdata.sock.port=0;
51   d->taskSend.sock.port=0;
52   //init structure objectEntry
53   ObjectEntryHID_init_root_field(&d->objectEntry);
54   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
55   htimerRoot_init_queue(&d->objectEntry);
56   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
57   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
58   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
59   d->objectEntry.htimSendCondValue=0;
60   //publication,subscriptions
61   d->publications.counter=d->subscriptions.counter=0;
62   CSTWriter_init_root_field(&d->publications);
63   CSTReader_init_root_field(&d->subscriptions);
64   pthread_rwlock_init(&d->publications.lock,NULL);
65   pthread_rwlock_init(&d->subscriptions.lock,NULL);
66   //publication,subscriptions lists
67   PublicationList_init_root_field(&d->psEntry);
68   pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
69   SubscriptionList_init_root_field(&d->psEntry);
70   pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
71   
72   //pattern
73   pthread_rwlock_init(&d->patternEntry.lock,NULL);
74   ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
75   Pattern_init_head(&d->patternEntry);
76     
77   //create domainProp 
78   if (prop!=NULL) {
79     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
80   } else {
81     ORTEDomainPropDefaultGet(&d->domainProp);
82   }
83   
84   //print local IP addresses
85   iflocal[0]=0;
86   if (d->domainProp.IFCount) {
87     for(i=0;i<d->domainProp.IFCount;i++)
88       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
89     debug(21,2) ("ORTEDomainAppCreate: localIPAddres(es) %s\n",iflocal);
90   } else{
91     debug(21,2) ("ORTEDomainAppCreate: no active interface card\n");
92   }
93
94   //DomainEvents
95   if (events!=NULL) {
96     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
97   } else {
98     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
99   }
100
101   //local buffers
102   d->mbRecvMetatraffic.cdrStream.buffer=
103       (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
104   d->mbRecvUserdata.cdrStream.buffer=
105       (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
106   d->mbSend.cdrStream.buffer=
107       (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
108   if ((!d->mbRecvMetatraffic.cdrStream.buffer) || 
109       (!d->mbRecvUserdata.cdrStream.buffer) || 
110       (!d->mbSend.cdrStream.buffer)) {    //no memory
111     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
112     FREE(d->mbRecvUserdata.cdrStream.buffer);
113     FREE(d->mbSend.cdrStream.buffer);
114     FREE(d);
115     return NULL;
116   }
117   d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
118   d->mbRecvMetatraffic.cdrStream.length=0;
119   d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
120   d->mbRecvUserdata.cdrStream.length=0;
121   d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
122   d->mbSend.cdrStream.length=0;
123
124   //TypeRegister
125   ORTEType_init_root_field(&d->typeEntry);
126   pthread_rwlock_init(&d->typeEntry.lock,NULL);
127
128   //Sockets
129   sock_init_udp(&d->taskRecvMetatraffic.sock);
130   sock_init_udp(&d->taskRecvUserdata.sock);
131   sock_init_udp(&d->taskSend.sock);
132   if (d->domainProp.multicast.enabled) {
133     Domain2PortMulticastMetatraffic(d->domain,port);   
134   } else {
135     Domain2Port(d->domain,port);
136   }
137   sock_bind(&d->taskRecvMetatraffic.sock,0); //give me receiving port (metatraffic)
138   debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvMetatraffic):%u\n",
139                d->taskRecvMetatraffic.sock.port);
140   sock_bind(&d->taskRecvUserdata.sock,0); //give me receiving port (userdata)
141   debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvUserdata):%u\n",
142                d->taskRecvUserdata.sock.port);
143   sock_bind(&d->taskSend.sock,0);           //give me sending port
144   debug(21,2) ("ORTEDomainAppCreate: Bind on port(Send):%u\n",
145                d->taskSend.sock.port);
146   if ((d->taskRecvMetatraffic.sock.fd<0) || 
147       (d->taskRecvUserdata.sock.fd<0) ||
148       (d->taskSend.sock.fd<0)) {
149     debug(21,0) ("Error creating socket(s).\n");
150     sock_cleanup(&d->taskRecvMetatraffic.sock);
151     sock_cleanup(&d->taskRecvUserdata.sock);
152     sock_cleanup(&d->taskSend.sock);
153     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
154     FREE(d->mbRecvUserdata.cdrStream.buffer);
155     FREE(d->mbSend.cdrStream.buffer);
156     FREE(d);
157     return NULL;
158   }
159   if (d->domainProp.multicast.enabled) {
160     struct ip_mreq mreq;
161     //ttl
162     if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL, 
163         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
164       debug(21,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
165            d->domainProp.multicast.ttl);
166     } 
167     // join multicast group
168     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
169     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
170     if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
171         (void *) &mreq, sizeof(mreq))>=0) {
172       debug(21,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
173            IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
174     }
175   }
176
177   //Generates local GUID
178   if (d->domainProp.IFCount>0) 
179     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
180   else
181     d->guid.hid=StringToIPAddress("127.0.0.1");
182   d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
183   d->guid.oid=OID_APP;
184   debug(29,2) ("ORTEDomainAppCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
185                d->guid.hid,d->guid.aid,d->guid.oid); 
186
187   //create HEADER of message for sending task
188   RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
189   d->mbSend.cdrStream.bufferPtr=
190       d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
191   d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;    
192   d->mbSend.needSend=ORTE_FALSE;
193   d->mbSend.containsInfoReply=ORTE_FALSE;  
194   
195   //Self object data & fellow managers object data
196   appParams=(AppParams*)MALLOC(sizeof(AppParams));
197   AppParamsInit(appParams);
198   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
199   VENDOR_ID_OCERA(appParams->vendorId);
200   appParams->hostId=d->guid.hid;
201   appParams->appId=d->guid.aid;
202   appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
203   appParams->userdataUnicastPort=d->taskRecvUserdata.sock.port;  
204   if (d->domainProp.multicast.enabled) {
205     //multicast
206     for(i=0;i<d->domainProp.IFCount;i++)
207       appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
208     appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
209   } else {
210     //unicast
211     if (d->domainProp.IFCount) {
212       for(i=0;i<d->domainProp.IFCount;i++)
213         appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
214       appParams->unicastIPAddressCount=d->domainProp.IFCount;
215     } else {
216       appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
217       appParams->unicastIPAddressCount=1;
218     }
219   }
220   //ApplicatonKeyList
221   if (!d->domainProp.keys) {
222     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
223     for(i=0;i<d->domainProp.IFCount;i++)
224       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
225     appParams->managerKeyCount=d->domainProp.IFCount+1;
226   } else {
227     appParams->managerKeyCount=i=0;
228     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
229       appParams->managerKeyList[appParams->managerKeyCount++]=
230           StringToIPAddress(sbuff);
231     }
232   }
233   d->appParams=appParams;
234   //insert object, doesn't need to be locked
235   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
236   d->objectEntryOID->privateCreated=ORTE_TRUE;
237
238   //CST objects
239   //  writerApplicationSelf (WAS)
240   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
241   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
242   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
243   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
244   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
245   cstWriterParams.fullAcknowledge=ORTE_TRUE;
246   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
247       OID_WRITE_APPSELF,&cstWriterParams,NULL);
248   //  add to WAS remote writer(s)
249   if (d->domainProp.appLocalManager) {
250     GUID_RTPS guid;
251     guid.hid=d->domainProp.appLocalManager;
252     guid.aid=AID_UNKNOWN;
253     guid.oid=OID_APP;
254     if (!objectEntryFind(d,&guid)) {
255       appParams=(AppParams*)MALLOC(sizeof(AppParams));
256       AppParamsInit(appParams);
257       appParams->hostId=guid.hid;
258       appParams->appId=guid.aid;
259       appParams->metatrafficUnicastPort=port;
260       appParams->userdataUnicastPort=0;  //Manager support only metatraffic
261       appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
262       appParams->unicastIPAddressCount=1;
263       objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
264       CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
265           OID_READ_MGR);
266       debug(21,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
267                   IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
268     }
269   }
270   //  readerManagers
271   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
272   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
273   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
274   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
275   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
276   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
277       OID_READ_MGR,&cstReaderParams,NULL);
278   //  readerApplications
279   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
280   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
281   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
282   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
283   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
284   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
285       OID_READ_APP,&cstReaderParams,NULL);
286   //  writerPublications
287   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
288   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
289   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
290   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
291   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
292   cstWriterParams.fullAcknowledge=ORTE_TRUE;
293   CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
294       OID_WRITE_PUBL,&cstWriterParams,NULL);
295   //  writerSubscriptions
296   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
297   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
298   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
299   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
300   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
301   cstWriterParams.fullAcknowledge=ORTE_TRUE;
302   CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
303       OID_WRITE_SUBS,&cstWriterParams,NULL);
304   //  readerPublications
305   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
306   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
307   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
308   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
309   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
310   CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
311       OID_READ_PUBL,&cstReaderParams,NULL);
312   //  readerSubscriptions
313   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
314   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
315   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
316   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
317   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
318   CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
319       OID_READ_SUBS,&cstReaderParams,NULL);
320   
321   //add csChange for WAS
322   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
323   
324   //Start threads
325   if (!suspended) {
326     ORTEDomainStart(d,ORTE_TRUE,ORTE_TRUE,ORTE_TRUE);
327   }
328   debug(21,10) ("ORTEDomainAppCreate: finished\n");
329   return d;
330 }
331
332 /*****************************************************************************/
333 Boolean
334 ORTEDomainAppDestroy(ORTEDomain *d) {
335   CSTWriter             *cstWriter;
336   CSTReader             *cstReader;
337
338   debug(21,10) ("ORTEDomainAppDestroy: start\n");
339   if (!d) return ORTE_FALSE;
340   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
341   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
342   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
343   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
344   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
345   //Stoping threads
346   if (!d->taskRecvMetatraffic.terminate) {
347     d->taskRecvMetatraffic.terminate=ORTE_TRUE;
348     ORTEDomainWakeUpReceivingThread(d,
349         &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
350     pthread_join(d->taskRecvMetatraffic.thread,NULL); 
351   }
352   if (!d->taskRecvUserdata.terminate) {
353     d->taskRecvUserdata.terminate=ORTE_TRUE;
354     ORTEDomainWakeUpReceivingThread(d,
355         &d->taskSend.sock,d->taskRecvUserdata.sock.port); 
356     pthread_join(d->taskRecvUserdata.thread,NULL); 
357   }
358   if (!d->taskSend.terminate) {
359     d->taskSend.terminate=ORTE_TRUE;
360     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
361     pthread_join(d->taskSend.thread,NULL); 
362   }
363   debug(21,3) ("ORTEDomainAppDestroy: threads stoped\n");
364   
365   //Sockets
366   sock_cleanup(&d->taskRecvMetatraffic.sock);
367   sock_cleanup(&d->taskRecvUserdata.sock);
368   sock_cleanup(&d->taskSend.sock);
369
370   //Signals
371   pthread_cond_destroy(&d->objectEntry.htimSendCond);
372   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
373
374   //rwLocks
375   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
376   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
377   pthread_rwlock_destroy(&d->publications.lock);
378   pthread_rwlock_destroy(&d->subscriptions.lock);
379   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
380   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
381
382   //TypeRegister
383   ORTETypeRegisterDestroyAll(d);
384   pthread_rwlock_destroy(&d->typeEntry.lock);
385   
386   //Pattern
387   ORTEDomainAppSubscriptionPatternDestroy(d);
388   pthread_rwlock_unlock(&d->typeEntry.lock);    
389   pthread_rwlock_destroy(&d->patternEntry.lock);
390   
391   //CSTReaders and CSTWriters
392   CSTWriterDelete(d,&d->writerApplicationSelf);
393   CSTReaderDelete(d,&d->readerManagers);
394   CSTReaderDelete(d,&d->readerApplications);
395   CSTWriterDelete(d,&d->writerPublications);
396   CSTWriterDelete(d,&d->writerSubscriptions);
397   CSTReaderDelete(d,&d->readerPublications);
398   CSTReaderDelete(d,&d->readerSubscriptions);
399
400   while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
401     CSTWriterDelete(d,cstWriter);
402     FREE(cstWriter);
403   }  
404   while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
405     CSTReaderDelete(d,cstReader);
406     FREE(cstReader);
407   }  
408     
409   //objects in objectsEntry
410   objectEntryDeleteAll(d,&d->objectEntry);
411   
412   FREE(d->mbRecvMetatraffic.cdrStream.buffer);
413   FREE(d->mbRecvUserdata.cdrStream.buffer);
414   FREE(d->mbSend.cdrStream.buffer);
415   FREE(d);
416   debug(21,10) ("ORTEDomainAppDestroy: finished\n");
417   return ORTE_TRUE;
418 }
419
420 /*****************************************************************************/
421 Boolean 
422 ORTEDomainAppSubscriptionPatternAdd(ORTEDomain *d,const char *topic,
423     const char *type,ORTESubscriptionPatternCallBack subscriptionCallBack, 
424     void *param) {
425   PatternNode *pnode;
426   
427   if (!d) return ORTE_FALSE;
428   pnode=(PatternNode*)MALLOC(sizeof(PatternNode));
429   strcpy(pnode->topic,topic);
430   strcpy(pnode->type,type);
431   pnode->subscriptionCallBack=subscriptionCallBack;
432   pnode->param=param;
433   pthread_rwlock_wrlock(&d->patternEntry.lock);
434   Pattern_insert(&d->patternEntry,pnode);
435   pthread_rwlock_unlock(&d->patternEntry.lock);
436   return ORTE_TRUE;
437 }
438
439 /*****************************************************************************/
440 Boolean 
441 ORTEDomainAppSubscriptionPatternRemove(ORTEDomain *d,const char *topic,
442     const char *type) {
443   PatternNode *pnode;
444   
445   if (!d) return ORTE_FALSE;
446   pthread_rwlock_wrlock(&d->patternEntry.lock);
447   ul_list_for_each(Pattern,&d->patternEntry,pnode) {
448     if ((strcmp(pnode->topic,topic)==0) &&
449         (strcmp(pnode->type,type)==0)) {
450       Pattern_delete(&d->patternEntry,pnode);
451       FREE(pnode);
452       return ORTE_TRUE;
453     }
454   }
455   pthread_rwlock_unlock(&d->patternEntry.lock);
456   return ORTE_FALSE;
457 }
458
459 /*****************************************************************************/
460 Boolean 
461 ORTEDomainAppSubscriptionPatternDestroy(ORTEDomain *d) {
462   PatternNode *pnode;
463   
464   if (!d) return ORTE_FALSE;
465   pthread_rwlock_wrlock(&d->patternEntry.lock);
466   while((pnode=Pattern_cut_first(&d->patternEntry))) {
467     FREE(pnode);
468   }
469   pthread_rwlock_unlock(&d->patternEntry.lock);
470   return ORTE_TRUE;
471 }