]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomain.c
Fix deinitialization in an error path
[orte.git] / orte / liborte / ORTEDomain.c
1 /*
2  *  $Id: ORTEDomain.c,v 0.0.0.1         2003/08/21
3  *
4  *  DEBUG:  section 30                  Domain functions
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr@smoliku.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 ORTEDomainRecvThreadStart(TaskProp *tp) 
37 {
38   if (tp->terminate) {
39     tp->terminate=ORTE_FALSE;
40     pthread_create(&(tp->thread), NULL,
41                   (void*)&ORTEAppRecvThread, (void *)tp); 
42   }
43 }
44
45 /*****************************************************************************/
46 void
47 ORTEDomainSendThreadStart(TaskProp *tp) 
48 {
49   if (tp->terminate) {
50     tp->terminate=ORTE_FALSE;
51     pthread_create(&(tp->thread), NULL,
52                   (void*)&ORTEAppSendThread, (void *)tp); 
53   }
54 }
55
56 /*****************************************************************************/
57 void
58 ORTEDomainRecvThreadStop(TaskProp *tp) 
59 {
60   ORTEDomain *d=tp->d;
61
62   if (!tp->terminate) {
63     tp->terminate=ORTE_TRUE;
64     ORTEDomainWakeUpReceivingThread(d,
65         &d->taskSend.sock,tp->sock.port); 
66     pthread_join(tp->thread,NULL); 
67   }
68 }
69
70 /*****************************************************************************/
71 void
72 ORTEDomainSendThreadStop(TaskProp *tp) 
73 {
74   ORTEDomain *d=tp->d;
75
76   if (!tp->terminate) {
77     tp->terminate=ORTE_TRUE;
78     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
79     pthread_join(tp->thread,NULL); 
80   }
81 }
82
83 /*****************************************************************************/
84 void
85 ORTEDomainStart(ORTEDomain *d,
86     Boolean recvUnicastMetatrafficThread,
87     Boolean recvMulticastMetatrafficThread,
88     Boolean recvUnicastUserdataThread,
89     Boolean recvMulticastUserdataThread,
90     Boolean sendThread) {
91
92   if(!d) return;
93
94   if (recvUnicastMetatrafficThread) 
95     ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
96
97   if (recvMulticastMetatrafficThread) 
98     ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
99
100   if (recvUnicastUserdataThread) 
101     ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
102
103   if (recvMulticastUserdataThread) 
104     ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
105
106   if (sendThread) 
107     ORTEDomainSendThreadStart(&d->taskSend);
108 }
109
110 /*****************************************************************************/
111 Boolean
112 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
113   sock_t        sock;
114
115   memset(prop, 0, sizeof(*prop));
116
117   prop->multicast.enabled=ORTE_FALSE;
118   prop->multicast.ttl=1;
119   prop->multicast.loopBackEnabled=ORTE_TRUE;
120
121   //IFProp
122   sock_init_udp(&sock);
123   if (sock_bind(&sock,0,INADDR_ANY) == -1) {
124     return ORTE_FALSE;
125   }
126   sock_get_local_interfaces(&sock,prop->IFProp, (char *)&prop->IFCount);
127   sock_cleanup(&sock); 
128
129   prop->mgrs=NULL; //only from localhost
130   prop->appLocalManager=StringToIPAddress("127.0.0.1");
131   prop->listen=INADDR_ANY;
132   prop->keys=NULL; //are assign be orte
133   sprintf(prop->version,ORTE_PACKAGE_STRING\
134                         ", compiled: "\
135                         __DATE__\
136                         " "\
137                         __TIME__);
138                         
139   prop->recvBuffSize=0x4000;
140   prop->sendBuffSize=0x4000; 
141   prop->wireProp.metaBytesPerPacket=1500;
142   prop->wireProp.metaBytesPerFastPacket=1000; //not used
143   prop->wireProp.metabitsPerACKBitmap=32;     //not used
144   prop->wireProp.userBytesPerPacket=0x3000;
145   
146   //domainBaseProp
147   prop->baseProp.registrationMgrRetries=0;
148   NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
149   prop->baseProp.registrationAppRetries=3;
150   NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
151   NTPTIME_BUILD(prop->baseProp.expirationTime,180);  //180s
152   NTPTIME_BUILD(prop->baseProp.refreshPeriod,72);    //72s - refresh self parameters
153   NTPTIME_BUILD(prop->baseProp.purgeTime,60);        //60s - purge time of parameters
154   NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
155   NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
156   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
157   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
158   NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
159   prop->baseProp.ACKMaxRetries=10;
160   prop->baseProp.HBMaxRetries=10;
161   
162   PublParamsInit(&prop->publPropDefault);
163   SubsParamsInit(&prop->subsPropDefault);
164   
165   return ORTE_TRUE;
166 }
167
168 /*****************************************************************************/
169 Boolean
170 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
171   memset(events,0,sizeof(ORTEDomainAppEvents));
172   return ORTE_TRUE;
173 }
174
175
176 /*****************************************************************************/
177 ORTEDomain * 
178 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
179                ORTEDomainAppEvents *events,Boolean manager) {
180   ORTEDomain        *d;
181   ObjectEntryOID    *objectEntryOID;
182   AppParams         *appParams;
183   CSTWriterParams   cstWriterParams;
184   CSTReaderParams   cstReaderParams;
185   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
186   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
187   char              sbuff[128];
188   int               i;
189   uint16_t          port=0;
190   int               errno_save = 0;
191
192   debug(30,2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
193                  ORTE_PACKAGE_STRING,__DATE__,__TIME__);
194
195   debug(30,10) ("ORTEDomainCreate: start\n");
196   //Create domainApplication
197   d=MALLOC(sizeof(ORTEDomain));
198   if (!d) return NULL;  //no memory
199   //initialization local values
200   d->domain=domain;
201   d->taskRecvUnicastMetatraffic.d=d;
202   d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
203   d->taskRecvMulticastMetatraffic.d=d;
204   d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
205   d->taskRecvUnicastUserdata.d=d;
206   d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
207   d->taskRecvMulticastUserdata.d=d;
208   d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
209   d->taskSend.d=d;
210   d->taskSend.terminate=ORTE_TRUE;
211   d->taskRecvUnicastMetatraffic.sock.port=0;
212   d->taskRecvMulticastMetatraffic.sock.port=0;
213   d->taskRecvUnicastUserdata.sock.port=0;
214   d->taskRecvMulticastUserdata.sock.port=0;
215   d->taskSend.sock.port=0;
216   //init structure objectEntry
217   ObjectEntryHID_init_root_field(&d->objectEntry);
218   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
219   htimerRoot_init_queue(&d->objectEntry);
220   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
221   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
222   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
223   d->objectEntry.htimSendCondValue=0;
224   d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
225   //publication,subscriptions
226   d->publications.counter=d->subscriptions.counter=0;
227   CSTWriter_init_root_field(&d->publications);
228   CSTReader_init_root_field(&d->subscriptions);
229   pthread_rwlock_init(&d->publications.lock,NULL);
230   pthread_rwlock_init(&d->subscriptions.lock,NULL);
231   //publication,subscriptions lists
232   PublicationList_init_root_field(&d->psEntry);
233   pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
234   SubscriptionList_init_root_field(&d->psEntry);
235   pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
236   
237   //pattern
238   pthread_rwlock_init(&d->patternEntry.lock,NULL);
239   ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
240   Pattern_init_head(&d->patternEntry);
241     
242   //create domainProp 
243   if (prop!=NULL) {
244     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
245   } else {
246     if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
247       goto err_domainProp;
248     }
249   }
250   
251   //print local IP addresses
252   iflocal[0]=0;
253   if (d->domainProp.IFCount) {
254     for(i=0;i<d->domainProp.IFCount;i++)
255       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
256     debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
257   } else{
258     debug(30,2) ("ORTEDomainCreate: no active interface card\n");
259     if (d->domainProp.multicast.enabled) {
260        debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
261        goto err_domainProp;
262     }
263   }
264
265   //DomainEvents
266   if (events!=NULL) {
267     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
268   } else {
269     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
270   }
271
272   //local buffers
273   CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
274   CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
275   CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
276   CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
277   CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
278   CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
279                   d->domainProp.recvBuffSize);
280   CDR_buffer_init(&d->taskSend.mb.cdrCodec,
281                   d->domainProp.sendBuffSize);
282   d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
283   if (!manager) {
284     CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
285                     d->domainProp.recvBuffSize);
286     if (d->domainProp.multicast.enabled) {
287       CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
288                       d->domainProp.recvBuffSize);
289       CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
290                       d->domainProp.recvBuffSize);
291     }
292   }
293   d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
294
295   //TypeRegister
296   ORTEType_init_root_field(&d->typeEntry);
297   pthread_rwlock_init(&d->typeEntry.lock,NULL);
298
299   //Sockets
300   sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
301   sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
302   sock_init_udp(&d->taskRecvUnicastUserdata.sock);
303   sock_init_udp(&d->taskRecvMulticastUserdata.sock);
304   sock_init_udp(&d->taskSend.sock);
305
306   /************************************************************************/
307   /* UnicastMetatraffic */
308   Domain2Port(d->domain,port);
309   if (manager) {
310     if (d->domainProp.multicast.enabled) {
311       char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
312       struct ip_mreq mreq;
313       int reuse=1,loop=0;
314     
315       //reuseaddr
316       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET, 
317                     SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
318       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
319                     reuse);
320
321       //multicast loop
322       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP, 
323                     IP_MULTICAST_LOOP, (const char *)&loop, 
324                     sizeof(loop));
325       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
326                   loop);
327       
328       //joint to multicast group
329       memset(&mreq, 0, sizeof(mreq));
330       mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
331       mreq.imr_interface.s_addr=htonl(INADDR_ANY);
332       if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
333           IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
334         debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
335                       IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
336       } else
337               goto err_sock;
338     }
339     if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,port,d->domainProp.listen) == -1) {
340       goto err_sock;
341     }
342   } else {
343     /* give me receiving port (metatraffic) */
344     if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,0,d->domainProp.listen) == -1) {
345       goto err_sock;
346     }
347   }
348   debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
349                d->taskRecvUnicastMetatraffic.sock.port);
350
351   /************************************************************************/
352   /* MulticastMetatraffic */
353   if (d->domainProp.multicast.enabled && !manager) {
354     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
355     struct ip_mreq mreq;
356     Port mport;
357     int reuse=1;
358     
359     //reuseaddr
360     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET, 
361                     SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
362     debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
363                   reuse);
364
365     //multicast loop
366     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP, 
367                     IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled, 
368                     sizeof(d->domainProp.multicast.loopBackEnabled));
369     debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
370                   d->domainProp.multicast.loopBackEnabled);
371     
372     //joint to multicast group
373     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
374     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
375     if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
376         IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
377       debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
378                     IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
379     }
380     
381     /* receiving multicast port (metatraffic) */
382     Domain2PortMulticastMetatraffic(d->domain,mport);
383     if (sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
384       goto err_sock;
385     }
386     debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
387                   d->taskRecvMulticastMetatraffic.sock.port);
388   }
389
390   /************************************************************************/
391   /* UserData */
392   if (!manager) {
393     /* give me receiving port (userdata) */
394     if (sock_bind(&d->taskRecvUnicastUserdata.sock,0,d->domainProp.listen) == -1) {
395       goto err_sock;
396     }
397     debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
398                   d->taskRecvUnicastUserdata.sock.port);
399
400     if (d->domainProp.multicast.enabled) {
401       Port mport;
402       int reuse=1;
403     
404       //reuseaddr
405       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET, 
406                       SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
407       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
408                       reuse);
409
410       //multicast loop
411       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP, 
412                       IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled, 
413                       sizeof(d->domainProp.multicast.loopBackEnabled));
414       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
415                     d->domainProp.multicast.loopBackEnabled);
416       
417       /* receiving multicast port (userdata) */
418       Domain2PortMulticastUserdata(d->domain,mport);
419       if (sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
420         goto err_sock;
421       }
422       debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
423                     d->taskRecvMulticastUserdata.sock.port);
424     }
425   }
426
427   /************************************************************************/
428   /* Send */
429   /* give me sending port */
430   if (sock_bind(&d->taskSend.sock,0,d->domainProp.listen) == -1) {
431     goto err_sock;
432   }
433   debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
434                d->taskSend.sock.port);
435   if (d->domainProp.multicast.enabled) {
436     //ttl
437     if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, (const char *)&d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
438       debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
439            d->domainProp.multicast.ttl);
440     } 
441   }
442
443   /************************************************************************/
444   /* tests for valid resources */
445   if ((d->taskRecvUnicastMetatraffic.sock.fd<0) || 
446       (d->taskSend.sock.fd<0) ||
447       (d->domainProp.multicast.enabled &&
448        (d->taskRecvUnicastUserdata.sock.fd<0)) ||
449       (d->domainProp.multicast.enabled &&
450        (d->taskRecvMulticastUserdata.sock.fd<0)) ||
451       (d->domainProp.multicast.enabled && 
452        (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
453     debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
454     goto err_sock;
455   }
456
457   if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) || 
458       (!d->taskSend.mb.cdrCodec.buffer) ||
459       (d->domainProp.multicast.enabled && !manager &&
460        !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) || 
461       (d->domainProp.multicast.enabled && !manager &&
462        !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) || 
463       (d->domainProp.multicast.enabled && !manager &&
464        !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
465     debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
466     goto err_sock;
467   } 
468
469   /************************************************************************/
470   //Generates local GUID
471   if (d->domainProp.IFCount>0) 
472     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
473   else
474     d->guid.hid=StringToIPAddress("127.0.0.1");
475   if (manager) {
476     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
477   } else {
478     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
479   }
480   d->guid.oid=OID_APP;
481   debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
482                GUID_PRINTF(d->guid)); 
483
484   //create HEADER of message for sending task
485   RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
486   d->taskSend.mb.needSend=ORTE_FALSE;
487   d->taskSend.mb.containsInfoReply=ORTE_FALSE;  
488   d->taskSend.mb.cdrCodecDirect=NULL;
489   
490   //Self object data & fellow managers object data
491   appParams=(AppParams*)MALLOC(sizeof(AppParams));
492   if (!appParams) {
493     goto err_sock;
494   }
495   AppParamsInit(appParams);
496   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
497   VENDOR_ID_OCERA(appParams->vendorId);
498   appParams->hostId=d->guid.hid;
499   appParams->appId=d->guid.aid;
500   appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
501   appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;  
502   //fill unicast/multicast ip addresses
503   if (d->domainProp.IFCount) {
504     for(i=0;i<d->domainProp.IFCount;i++)
505       appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
506     appParams->unicastIPAddressCount=d->domainProp.IFCount;
507   }
508   if (d->domainProp.multicast.enabled &&
509       IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
510     appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
511         d->domainProp.multicast.ipAddress;
512     appParams->metatrafficMulticastIPAddressCount++;
513   } else {
514     if (!d->domainProp.IFCount) {
515       appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
516             StringToIPAddress("127.0.0.1");
517       appParams->unicastIPAddressCount++;
518     }
519   }
520   //KeyList
521   if (!d->domainProp.keys) {
522     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
523     for(i=0;i<d->domainProp.IFCount;i++)
524       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
525     if (d->domainProp.multicast.enabled &&
526         IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
527       appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
528       i++;
529     }
530     appParams->managerKeyCount=i+1;
531   } else {
532     appParams->managerKeyCount=i=0;
533     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
534       appParams->managerKeyList[appParams->managerKeyCount++]=
535           StringToIPAddress(sbuff);
536     }    
537   }
538   d->appParams=appParams;
539   //insert object, doesn't need to be locked
540   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
541   d->objectEntryOID->privateCreated=ORTE_TRUE;
542
543   
544   /************************************************************************/
545   //CST objects
546   //  writerApplicationSelf (WAS)
547   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
548   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
549   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
550   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
551   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
552   if (manager) {
553     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries; 
554     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod; 
555     cstWriterParams.fullAcknowledge=ORTE_FALSE;
556   } else {
557     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries; 
558     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod; 
559     cstWriterParams.fullAcknowledge=ORTE_TRUE;
560   }
561   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
562       OID_WRITE_APPSELF,&cstWriterParams,NULL);
563   if (manager) {
564     i=0;
565     while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
566       GUID_RTPS guid;
567       IPAddress ipAddress=StringToIPAddress(sbuff);
568       guid.hid=ipAddress;
569       guid.aid=AID_UNKNOWN;
570       guid.oid=OID_APP;
571       if (!objectEntryFind(d,&guid)) {
572         CSTRemoteReader *cstRemoteReader;
573         appParams=(AppParams*)MALLOC(sizeof(AppParams));
574         AppParamsInit(appParams);
575         appParams->hostId=guid.hid;
576         appParams->appId=guid.aid;
577         appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
578         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
579         if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
580           appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
581           appParams->metatrafficMulticastIPAddressCount=1;
582           objectEntryOID->multicastPort=port;
583         } else {
584           appParams->unicastIPAddressList[0]=ipAddress;
585           appParams->unicastIPAddressCount=1;
586           objectEntryOID->multicastPort=0;
587         }
588         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
589         cstRemoteReader=CSTWriterAddRemoteReader(d,
590                                  &d->writerApplicationSelf,
591                                  objectEntryOID,
592                                  OID_READ_MGR,
593                                  objectEntryOID);
594         debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
595                     IPAddressToString(ipAddress,sIPAddress));
596       }
597     }
598   } else {
599     //  add to WAS remote writer(s)
600     if (d->domainProp.appLocalManager) {
601       GUID_RTPS guid;
602       guid.hid=d->domainProp.appLocalManager;
603       guid.aid=AID_UNKNOWN;
604       guid.oid=OID_APP;
605       if (!objectEntryFind(d,&guid)) {
606         appParams=(AppParams*)MALLOC(sizeof(AppParams));
607         AppParamsInit(appParams);
608         appParams->hostId=guid.hid;
609         appParams->appId=guid.aid;
610         appParams->metatrafficUnicastPort=port;
611         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
612         appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
613         appParams->unicastIPAddressCount=1;
614         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
615         CSTWriterAddRemoteReader(d,
616                                  &d->writerApplicationSelf,
617                                  objectEntryOID,
618                                  OID_READ_MGR,
619                                  objectEntryOID);
620         debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
621                       IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
622       }
623     }
624   }
625
626   //  readerManagers
627   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
628   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
629   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
630   if (manager) {
631     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
632     cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
633   } else {
634     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
635     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
636   }
637   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
638       OID_READ_MGR,&cstReaderParams,NULL);
639
640   //  readerApplications
641   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
642   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
643   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
644   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
645   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
646   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
647       OID_READ_APP,&cstReaderParams,NULL);
648
649   if (manager) {
650     //  writerApplications
651     cstWriterParams.registrationRetries=0; 
652     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
653     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
654     cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
655     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
656     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
657     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
658     cstWriterParams.fullAcknowledge=ORTE_FALSE;
659     CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
660         OID_WRITE_APP,&cstWriterParams,NULL);
661
662     //  writerManagers
663     cstWriterParams.registrationRetries=0; 
664     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
665     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
666     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
667     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
668     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
669     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
670     cstWriterParams.fullAcknowledge=ORTE_TRUE;
671     CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
672         OID_WRITE_MGR,&cstWriterParams,NULL);
673   }
674
675   if (!manager) {
676     //  writerPublications
677     cstWriterParams.registrationRetries=0; 
678     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
679     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
680     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
681     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
682     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
683     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
684     cstWriterParams.fullAcknowledge=ORTE_TRUE;
685     CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
686         OID_WRITE_PUBL,&cstWriterParams,NULL);
687     //  writerSubscriptions
688     cstWriterParams.registrationRetries=0; 
689     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
690     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
691     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
692     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
693     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
694     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
695     cstWriterParams.fullAcknowledge=ORTE_TRUE;
696     CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
697         OID_WRITE_SUBS,&cstWriterParams,NULL);
698     //  readerPublications
699     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
700     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
701     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
702     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
703     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
704     CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
705         OID_READ_PUBL,&cstReaderParams,NULL);
706     //  readerSubscriptions
707     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
708     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
709     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
710     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
711     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
712     CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
713         OID_READ_SUBS,&cstReaderParams,NULL);
714   }
715
716   //add csChange for WAS
717   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
718
719   debug(30,10) ("ORTEDomainCreate: finished\n");
720   return d;
721
722 //err:
723   if (!errno_save) errno_save = errno;
724   /* TODO */
725   FREE(appParams);
726 err_sock:
727   if (!errno_save) errno_save = errno;
728   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
729   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
730   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
731   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
732   sock_cleanup(&d->taskSend.sock);
733   pthread_rwlock_destroy(&d->typeEntry.lock);
734   if (d->domainProp.multicast.enabled) {
735     CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
736     CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
737   }
738   if (!manager) {
739     CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
740   }
741   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
742 err_domainProp:
743   if (!errno_save) errno_save = errno;
744   pthread_rwlock_destroy(&d->patternEntry.lock);
745   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
746   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
747   pthread_rwlock_destroy(&d->subscriptions.lock);
748   pthread_rwlock_destroy(&d->publications.lock);
749   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
750   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
751   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
752   FREE(d);
753   errno = errno_save;
754   return NULL;
755 }
756
757 /*****************************************************************************/
758 Boolean
759 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
760   CSTWriter             *cstWriter;
761   CSTReader             *cstReader;
762
763   debug(30,10) ("ORTEDomainDestroy: start\n");
764   if (!d) return ORTE_FALSE;
765
766   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
767   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
768   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
769   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
770   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
771
772   //Stoping threads
773   ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
774   ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
775   ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
776   ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
777   ORTEDomainSendThreadStop(&d->taskSend);
778   debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
779   
780   //CSTReaders and CSTWriters
781   CSTWriterDelete(d,&d->writerApplicationSelf);
782   CSTReaderDelete(d,&d->readerManagers);
783   CSTReaderDelete(d,&d->readerApplications);
784   if (manager) {
785     CSTWriterDelete(d,&d->writerManagers);
786     CSTWriterDelete(d,&d->writerApplications);
787   } else { 
788     CSTWriterDelete(d,&d->writerPublications);
789     CSTWriterDelete(d,&d->writerSubscriptions);
790     CSTReaderDelete(d,&d->readerPublications);
791     CSTReaderDelete(d,&d->readerSubscriptions);
792
793     while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
794       CSTWriterDelete(d,cstWriter);
795       FREE(cstWriter);
796     }  
797     while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
798       CSTReaderDelete(d,cstReader);
799       FREE(cstReader);
800     }
801   }  
802     
803   //objects in objectsEntry
804   objectEntryDeleteAll(d,&d->objectEntry);
805   debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
806
807   //Sockets
808   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
809   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
810   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
811   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
812   sock_cleanup(&d->taskSend.sock);
813
814
815   //Signals
816   pthread_cond_destroy(&d->objectEntry.htimSendCond);
817   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
818
819   //rwLocks
820   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
821   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
822   pthread_rwlock_destroy(&d->publications.lock);
823   pthread_rwlock_destroy(&d->subscriptions.lock);
824   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
825   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
826
827   //TypeRegister
828   ORTETypeRegisterDestroyAll(d);
829   
830   //Pattern
831   ORTEDomainAppSubscriptionPatternDestroy(d);
832   pthread_rwlock_destroy(&d->patternEntry.lock);
833   
834   //Release buffers  
835   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
836   CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
837   CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
838   CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
839   CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
840   
841   //Free domain instance
842   FREE(d);
843   
844   debug(30,10) ("ORTEDomainDestroy: finished\n");
845   
846   return ORTE_TRUE;
847 }