]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomain.c
Added an option to allow ortemanager listen on a specific interface (-L).
[orte.git] / orte / liborte / ORTEDomain.c
1 /*
2  *  $Id: ORTEDomain.c,v 0.0.0.1         2003/08/21
3  *
4  *  DEBUG:  section 30                  Domain functions
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30  */ 
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 ORTEDomainRecvThreadStart(TaskProp *tp) 
37 {
38   if (tp->terminate) {
39     tp->terminate=ORTE_FALSE;
40     pthread_create(&(tp->thread), NULL,
41                   (void*)&ORTEAppRecvThread, (void *)tp); 
42   }
43 }
44
45 /*****************************************************************************/
46 void
47 ORTEDomainSendThreadStart(TaskProp *tp) 
48 {
49   if (tp->terminate) {
50     tp->terminate=ORTE_FALSE;
51     pthread_create(&(tp->thread), NULL,
52                   (void*)&ORTEAppSendThread, (void *)tp); 
53   }
54 }
55
56 /*****************************************************************************/
57 void
58 ORTEDomainRecvThreadStop(TaskProp *tp) 
59 {
60   ORTEDomain *d=tp->d;
61
62   if (!tp->terminate) {
63     tp->terminate=ORTE_TRUE;
64     ORTEDomainWakeUpReceivingThread(d,
65         &d->taskSend.sock,tp->sock.port); 
66     pthread_join(tp->thread,NULL); 
67   }
68 }
69
70 /*****************************************************************************/
71 void
72 ORTEDomainSendThreadStop(TaskProp *tp) 
73 {
74   ORTEDomain *d=tp->d;
75
76   if (!tp->terminate) {
77     tp->terminate=ORTE_TRUE;
78     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
79     pthread_join(tp->thread,NULL); 
80   }
81 }
82
83 /*****************************************************************************/
84 void
85 ORTEDomainStart(ORTEDomain *d,
86     Boolean recvUnicastMetatrafficThread,
87     Boolean recvMulticastMetatrafficThread,
88     Boolean recvUnicastUserdataThread,
89     Boolean recvMulticastUserdataThread,
90     Boolean sendThread) {
91
92   if(!d) return;
93
94   if (recvUnicastMetatrafficThread) 
95     ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
96
97   if (recvMulticastMetatrafficThread) 
98     ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
99
100   if (recvUnicastUserdataThread) 
101     ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
102
103   if (recvMulticastUserdataThread) 
104     ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
105
106   if (sendThread) 
107     ORTEDomainSendThreadStart(&d->taskSend);
108 }
109
110 /*****************************************************************************/
111 Boolean
112 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
113   sock_t        sock;
114
115   memset(prop, 0, sizeof(*prop));
116
117   prop->multicast.enabled=ORTE_FALSE;
118   prop->multicast.ttl=1;
119   prop->multicast.loopBackEnabled=ORTE_TRUE;
120
121   //IFProp
122   sock_init_udp(&sock);
123   sock_bind(&sock,0,INADDR_ANY);
124   sock_get_local_interfaces(&sock,prop->IFProp,&prop->IFCount);
125   sock_cleanup(&sock); 
126
127   prop->mgrs=NULL; //only from localhost
128   prop->appLocalManager=StringToIPAddress("127.0.0.1");
129   prop->listen=INADDR_ANY;
130   prop->keys=NULL; //are assign be orte
131   sprintf(prop->version,ORTE_PACKAGE_STRING\
132                         ", compiled: "\
133                         __DATE__\
134                         " "\
135                         __TIME__);
136                         
137   prop->recvBuffSize=0x4000;
138   prop->sendBuffSize=0x4000; 
139   prop->wireProp.metaBytesPerPacket=1500;
140   prop->wireProp.metaBytesPerFastPacket=1000; //not used
141   prop->wireProp.metabitsPerACKBitmap=32;     //not used
142   prop->wireProp.userBytesPerPacket=0x3000;
143   
144   //domainBaseProp
145   prop->baseProp.registrationMgrRetries=0;
146   NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
147   prop->baseProp.registrationAppRetries=3;
148   NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
149   NTPTIME_BUILD(prop->baseProp.expirationTime,180);  //180s
150   NTPTIME_BUILD(prop->baseProp.refreshPeriod,72);    //72s - refresh self parameters
151   NTPTIME_BUILD(prop->baseProp.purgeTime,60);        //60s - purge time of parameters
152   NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
153   NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
154   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
155   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
156   NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
157   prop->baseProp.ACKMaxRetries=10;
158   prop->baseProp.HBMaxRetries=10;
159   
160   PublParamsInit(&prop->publPropDefault);
161   SubsParamsInit(&prop->subsPropDefault);
162   
163   return ORTE_TRUE;
164 }
165
166 /*****************************************************************************/
167 Boolean
168 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
169   memset(events,0,sizeof(ORTEDomainAppEvents));
170   return ORTE_TRUE;
171 }
172
173
174 /*****************************************************************************/
175 ORTEDomain * 
176 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
177                ORTEDomainAppEvents *events,Boolean manager) {
178   ORTEDomain        *d;
179   ObjectEntryOID    *objectEntryOID;
180   AppParams         *appParams;
181   CSTWriterParams   cstWriterParams;
182   CSTReaderParams   cstReaderParams;
183   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
184   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
185   char              sbuff[128];
186   int               i;
187   uint16_t          port=0;
188   Boolean           error=ORTE_FALSE;
189
190   debug(30,2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
191                  ORTE_PACKAGE_STRING,__DATE__,__TIME__);
192
193   debug(30,10) ("ORTEDomainCreate: start\n");
194   //Create domainApplication
195   d=MALLOC(sizeof(ORTEDomain));
196   if (!d) return NULL;  //no memory
197   //initialization local values
198   d->domain=domain;
199   d->taskRecvUnicastMetatraffic.d=d;
200   d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
201   d->taskRecvMulticastMetatraffic.d=d;
202   d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
203   d->taskRecvUnicastUserdata.d=d;
204   d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
205   d->taskRecvMulticastUserdata.d=d;
206   d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
207   d->taskSend.d=d;
208   d->taskSend.terminate=ORTE_TRUE;
209   d->taskRecvUnicastMetatraffic.sock.port=0;
210   d->taskRecvMulticastMetatraffic.sock.port=0;
211   d->taskRecvUnicastUserdata.sock.port=0;
212   d->taskRecvMulticastUserdata.sock.port=0;
213   d->taskSend.sock.port=0;
214   //init structure objectEntry
215   ObjectEntryHID_init_root_field(&d->objectEntry);
216   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
217   htimerRoot_init_queue(&d->objectEntry);
218   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
219   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
220   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
221   d->objectEntry.htimSendCondValue=0;
222   //publication,subscriptions
223   d->publications.counter=d->subscriptions.counter=0;
224   CSTWriter_init_root_field(&d->publications);
225   CSTReader_init_root_field(&d->subscriptions);
226   pthread_rwlock_init(&d->publications.lock,NULL);
227   pthread_rwlock_init(&d->subscriptions.lock,NULL);
228   //publication,subscriptions lists
229   PublicationList_init_root_field(&d->psEntry);
230   pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
231   SubscriptionList_init_root_field(&d->psEntry);
232   pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
233   
234   //pattern
235   pthread_rwlock_init(&d->patternEntry.lock,NULL);
236   ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
237   Pattern_init_head(&d->patternEntry);
238     
239   //create domainProp 
240   if (prop!=NULL) {
241     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
242   } else {
243     ORTEDomainPropDefaultGet(&d->domainProp);
244   }
245   
246   //print local IP addresses
247   iflocal[0]=0;
248   if (d->domainProp.IFCount) {
249     for(i=0;i<d->domainProp.IFCount;i++)
250       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
251     debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
252   } else{
253     debug(30,2) ("ORTEDomainCreate: no active interface card\n");
254     if (d->domainProp.multicast.enabled) {
255        debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
256        FREE(d);
257        return NULL;
258     }
259   }
260
261   //DomainEvents
262   if (events!=NULL) {
263     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
264   } else {
265     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
266   }
267
268   //local buffers
269   CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
270   CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
271   CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
272   CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
273   CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
274   CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
275                   d->domainProp.recvBuffSize);
276   CDR_buffer_init(&d->taskSend.mb.cdrCodec,
277                   d->domainProp.sendBuffSize);
278   d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
279   if (!manager) {
280     CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
281                     d->domainProp.recvBuffSize);
282     if (d->domainProp.multicast.enabled) {
283       CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
284                       d->domainProp.recvBuffSize);
285       CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
286                       d->domainProp.recvBuffSize);
287     }
288   }
289   d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
290
291   //TypeRegister
292   ORTEType_init_root_field(&d->typeEntry);
293   pthread_rwlock_init(&d->typeEntry.lock,NULL);
294
295   //Sockets
296   sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
297   sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
298   sock_init_udp(&d->taskRecvUnicastUserdata.sock);
299   sock_init_udp(&d->taskRecvMulticastUserdata.sock);
300   sock_init_udp(&d->taskSend.sock);
301
302   /************************************************************************/
303   /* UnicastMetatraffic */
304   Domain2Port(d->domain,port);
305   if (manager) {
306     if (d->domainProp.multicast.enabled) {
307       char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
308       struct ip_mreq mreq;
309       int reuse=1,loop=0;
310     
311       //reuseaddr
312       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET, 
313                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
314       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
315                     reuse);
316
317       //multicast loop
318       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP, 
319                     IP_MULTICAST_LOOP, (char*)&loop, 
320                     sizeof(loop));
321       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
322                   loop);
323       
324       //joint to multicast group
325       mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
326       mreq.imr_interface.s_addr=htonl(INADDR_ANY);
327       if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
328           IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
329         debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
330                       IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
331       }
332     }
333     sock_bind(&d->taskRecvUnicastMetatraffic.sock,port,prop->listen); 
334   } else {
335     /* give me receiving port (metatraffic) */
336     sock_bind(&d->taskRecvUnicastMetatraffic.sock,0,prop->listen); 
337   }
338   debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
339                d->taskRecvUnicastMetatraffic.sock.port);
340
341   /************************************************************************/
342   /* MulticastMetatraffic */
343   if (d->domainProp.multicast.enabled && !manager) {
344     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
345     struct ip_mreq mreq;
346     Port mport;
347     int reuse=1;
348     
349     //reuseaddr
350     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET, 
351                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
352     debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
353                   reuse);
354
355     //multicast loop
356     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP, 
357                     IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
358                     sizeof(d->domainProp.multicast.loopBackEnabled));
359     debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
360                   d->domainProp.multicast.loopBackEnabled);
361     
362     //joint to multicast group
363     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
364     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
365     if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
366         IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
367       debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
368                     IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
369     }
370     
371     /* receiving multicast port (metatraffic) */
372     Domain2PortMulticastMetatraffic(d->domain,mport);
373     sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport,prop->listen); 
374     debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
375                   d->taskRecvMulticastMetatraffic.sock.port);
376   }
377
378   /************************************************************************/
379   /* UserData */
380   if (!manager) {
381     /* give me receiving port (userdata) */
382     sock_bind(&d->taskRecvUnicastUserdata.sock,0,prop->listen); 
383     debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
384                   d->taskRecvUnicastUserdata.sock.port);
385
386     if (d->domainProp.multicast.enabled) {
387       Port mport;
388       int reuse=1;
389     
390       //reuseaddr
391       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET, 
392                       SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
393       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
394                       reuse);
395
396       //multicast loop
397       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP, 
398                       IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
399                       sizeof(d->domainProp.multicast.loopBackEnabled));
400       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
401                     d->domainProp.multicast.loopBackEnabled);
402       
403       /* receiving multicast port (userdata) */
404       Domain2PortMulticastUserdata(d->domain,mport);
405       sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport,prop->listen);
406       debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
407                     d->taskRecvMulticastUserdata.sock.port);
408     }
409   }
410
411   /************************************************************************/
412   /* Send */
413   /* give me sending port */
414   sock_bind(&d->taskSend.sock,0,prop->listen);
415   debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
416                d->taskSend.sock.port);
417   if (d->domainProp.multicast.enabled) {
418     //ttl
419     if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, 
420         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
421       debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
422            d->domainProp.multicast.ttl);
423     } 
424   }
425
426   /************************************************************************/
427   /* tests for valid resources */
428   if ((d->taskRecvUnicastMetatraffic.sock.fd<0) || 
429       (d->taskSend.sock.fd<0) ||
430       (d->domainProp.multicast.enabled &&
431        (d->taskRecvUnicastUserdata.sock.fd<0)) ||
432       (d->domainProp.multicast.enabled &&
433        (d->taskRecvMulticastUserdata.sock.fd<0)) ||
434       (d->domainProp.multicast.enabled && 
435        (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
436     debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
437     error=ORTE_TRUE;
438   }
439
440   if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) || 
441       (!d->taskSend.mb.cdrCodec.buffer) ||
442       (d->domainProp.multicast.enabled && !manager &&
443        !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) || 
444       (d->domainProp.multicast.enabled && !manager &&
445        !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) || 
446       (d->domainProp.multicast.enabled && !manager &&
447        !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
448     debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
449     error=ORTE_TRUE;
450   } 
451   /* a problem occure with resources */
452   if (error) {
453     sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
454     sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
455     sock_cleanup(&d->taskRecvUnicastUserdata.sock);
456     sock_cleanup(&d->taskRecvMulticastUserdata.sock);
457     sock_cleanup(&d->taskSend.sock);
458     CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
459     CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
460     CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
461     CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
462     CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
463     FREE(d);
464   }
465
466   /************************************************************************/
467   //Generates local GUID
468   if (d->domainProp.IFCount>0) 
469     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
470   else
471     d->guid.hid=StringToIPAddress("127.0.0.1");
472   if (manager) {
473     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
474   } else {
475     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
476   }
477   d->guid.oid=OID_APP;
478   debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
479                GUID_PRINTF(d->guid)); 
480
481   //create HEADER of message for sending task
482   RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
483   d->taskSend.mb.needSend=ORTE_FALSE;
484   d->taskSend.mb.containsInfoReply=ORTE_FALSE;  
485   d->taskSend.mb.cdrCodecDirect=NULL;
486   
487   //Self object data & fellow managers object data
488   appParams=(AppParams*)MALLOC(sizeof(AppParams));
489   AppParamsInit(appParams);
490   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
491   VENDOR_ID_OCERA(appParams->vendorId);
492   appParams->hostId=d->guid.hid;
493   appParams->appId=d->guid.aid;
494   appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
495   appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;  
496   //fill unicast/multicast ip addresses
497   if (d->domainProp.IFCount) {
498     for(i=0;i<d->domainProp.IFCount;i++)
499       appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
500     appParams->unicastIPAddressCount=d->domainProp.IFCount;
501   }
502   if (d->domainProp.multicast.enabled &&
503       IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
504     appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
505         d->domainProp.multicast.ipAddress;
506     appParams->metatrafficMulticastIPAddressCount++;
507   } else {
508     if (!d->domainProp.IFCount) {
509       appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
510             StringToIPAddress("127.0.0.1");
511       appParams->unicastIPAddressCount++;
512     }
513   }
514   //KeyList
515   if (!d->domainProp.keys) {
516     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
517     for(i=0;i<d->domainProp.IFCount;i++)
518       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
519     if (d->domainProp.multicast.enabled &&
520         IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
521       appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
522       i++;
523     }
524     appParams->managerKeyCount=i+1;
525   } else {
526     appParams->managerKeyCount=i=0;
527     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
528       appParams->managerKeyList[appParams->managerKeyCount++]=
529           StringToIPAddress(sbuff);
530     }    
531   }
532   d->appParams=appParams;
533   //insert object, doesn't need to be locked
534   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
535   d->objectEntryOID->privateCreated=ORTE_TRUE;
536
537   
538   /************************************************************************/
539   //CST objects
540   //  writerApplicationSelf (WAS)
541   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
542   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
543   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
544   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
545   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
546   if (manager) {
547     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries; 
548     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod; 
549     cstWriterParams.fullAcknowledge=ORTE_FALSE;
550   } else {
551     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries; 
552     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod; 
553     cstWriterParams.fullAcknowledge=ORTE_TRUE;
554   }
555   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
556       OID_WRITE_APPSELF,&cstWriterParams,NULL);
557   if (manager) {
558     i=0;
559     while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
560       GUID_RTPS guid;
561       IPAddress ipAddress=StringToIPAddress(sbuff);
562       guid.hid=ipAddress;
563       guid.aid=AID_UNKNOWN;
564       guid.oid=OID_APP;
565       if (!objectEntryFind(d,&guid)) {
566         CSTRemoteReader *cstRemoteReader;
567         appParams=(AppParams*)MALLOC(sizeof(AppParams));
568         AppParamsInit(appParams);
569         appParams->hostId=guid.hid;
570         appParams->appId=guid.aid;
571         appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
572         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
573         if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
574           appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
575           appParams->metatrafficMulticastIPAddressCount=1;
576           objectEntryOID->multicastPort=port;
577         } else {
578           appParams->unicastIPAddressList[0]=ipAddress;
579           appParams->unicastIPAddressCount=1;
580           objectEntryOID->multicastPort=0;
581         }
582         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
583         cstRemoteReader=CSTWriterAddRemoteReader(d,
584                                  &d->writerApplicationSelf,
585                                  objectEntryOID,
586                                  OID_READ_MGR,
587                                  objectEntryOID);
588         debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
589                     IPAddressToString(ipAddress,sIPAddress));
590       }
591     }
592   } else {
593     //  add to WAS remote writer(s)
594     if (d->domainProp.appLocalManager) {
595       GUID_RTPS guid;
596       guid.hid=d->domainProp.appLocalManager;
597       guid.aid=AID_UNKNOWN;
598       guid.oid=OID_APP;
599       if (!objectEntryFind(d,&guid)) {
600         appParams=(AppParams*)MALLOC(sizeof(AppParams));
601         AppParamsInit(appParams);
602         appParams->hostId=guid.hid;
603         appParams->appId=guid.aid;
604         appParams->metatrafficUnicastPort=port;
605         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
606         appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
607         appParams->unicastIPAddressCount=1;
608         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
609         CSTWriterAddRemoteReader(d,
610                                  &d->writerApplicationSelf,
611                                  objectEntryOID,
612                                  OID_READ_MGR,
613                                  objectEntryOID);
614         debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
615                       IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
616       }
617     }
618   }
619
620   //  readerManagers
621   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
622   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
623   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
624   if (manager) {
625     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
626     cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
627   } else {
628     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
629     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
630   }
631   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
632       OID_READ_MGR,&cstReaderParams,NULL);
633
634   //  readerApplications
635   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
636   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
637   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
638   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
639   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
640   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
641       OID_READ_APP,&cstReaderParams,NULL);
642
643   if (manager) {
644     //  writerApplications
645     cstWriterParams.registrationRetries=0; 
646     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
647     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
648     cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
649     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
650     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
651     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
652     cstWriterParams.fullAcknowledge=ORTE_FALSE;
653     CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
654         OID_WRITE_APP,&cstWriterParams,NULL);
655
656     //  writerManagers
657     cstWriterParams.registrationRetries=0; 
658     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
659     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
660     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
661     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
662     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
663     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
664     cstWriterParams.fullAcknowledge=ORTE_TRUE;
665     CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
666         OID_WRITE_MGR,&cstWriterParams,NULL);
667   }
668
669   if (!manager) {
670     //  writerPublications
671     cstWriterParams.registrationRetries=0; 
672     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
673     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
674     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
675     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
676     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
677     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
678     cstWriterParams.fullAcknowledge=ORTE_TRUE;
679     CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
680         OID_WRITE_PUBL,&cstWriterParams,NULL);
681     //  writerSubscriptions
682     cstWriterParams.registrationRetries=0; 
683     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
684     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
685     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
686     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
687     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
688     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
689     cstWriterParams.fullAcknowledge=ORTE_TRUE;
690     CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
691         OID_WRITE_SUBS,&cstWriterParams,NULL);
692     //  readerPublications
693     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
694     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
695     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
696     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
697     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
698     CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
699         OID_READ_PUBL,&cstReaderParams,NULL);
700     //  readerSubscriptions
701     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
702     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
703     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
704     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
705     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
706     CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
707         OID_READ_SUBS,&cstReaderParams,NULL);
708   }
709
710   //add csChange for WAS
711   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
712
713   debug(30,10) ("ORTEDomainCreate: finished\n");
714   return d;
715 }
716
717 /*****************************************************************************/
718 Boolean
719 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
720   CSTWriter             *cstWriter;
721   CSTReader             *cstReader;
722
723   debug(30,10) ("ORTEDomainDestroy: start\n");
724   if (!d) return ORTE_FALSE;
725
726   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
727   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
728   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
729   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
730   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
731
732   //Stoping threads
733   ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
734   ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
735   ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
736   ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
737   ORTEDomainSendThreadStop(&d->taskSend);
738   debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
739   
740   //CSTReaders and CSTWriters
741   CSTWriterDelete(d,&d->writerApplicationSelf);
742   CSTReaderDelete(d,&d->readerManagers);
743   CSTReaderDelete(d,&d->readerApplications);
744   if (manager) {
745     CSTWriterDelete(d,&d->writerManagers);
746     CSTWriterDelete(d,&d->writerApplications);
747   } else { 
748     CSTWriterDelete(d,&d->writerPublications);
749     CSTWriterDelete(d,&d->writerSubscriptions);
750     CSTReaderDelete(d,&d->readerPublications);
751     CSTReaderDelete(d,&d->readerSubscriptions);
752
753     while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
754       CSTWriterDelete(d,cstWriter);
755       FREE(cstWriter);
756     }  
757     while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
758       CSTReaderDelete(d,cstReader);
759       FREE(cstReader);
760     }
761   }  
762     
763   //objects in objectsEntry
764   objectEntryDeleteAll(d,&d->objectEntry);
765   debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
766
767   //Sockets
768   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
769   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
770   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
771   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
772   sock_cleanup(&d->taskSend.sock);
773
774
775   //Signals
776   pthread_cond_destroy(&d->objectEntry.htimSendCond);
777   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
778
779   //rwLocks
780   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
781   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
782   pthread_rwlock_destroy(&d->publications.lock);
783   pthread_rwlock_destroy(&d->subscriptions.lock);
784   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
785   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
786
787   //TypeRegister
788   ORTETypeRegisterDestroyAll(d);
789   
790   //Pattern
791   ORTEDomainAppSubscriptionPatternDestroy(d);
792   pthread_rwlock_destroy(&d->patternEntry.lock);
793   
794   //Release buffers  
795   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
796   CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
797   CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
798   CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
799   CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
800   
801   //Free domain instance
802   FREE(d);
803   
804   debug(30,10) ("ORTEDomainDestroy: finished\n");
805   
806   return ORTE_TRUE;
807 }