]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomain.c
upgrade to new version 0.3.1
[orte.git] / orte / liborte / ORTEDomain.c
1 /*
2  *  $Id: ORTEDomain.c,v 0.0.0.1         2003/08/21
3  *
4  *  DEBUG:  section 30                  Domain functions
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 void
26 ORTEDomainRecvThreadStart(TaskProp *tp) 
27 {
28   if (tp->terminate) {
29     tp->terminate=ORTE_FALSE;
30     pthread_create(&(tp->thread), NULL,
31                   (void*)&ORTEAppRecvThread, (void *)tp); 
32   }
33 }
34
35 /*****************************************************************************/
36 void
37 ORTEDomainSendThreadStart(TaskProp *tp) 
38 {
39   if (tp->terminate) {
40     tp->terminate=ORTE_FALSE;
41     pthread_create(&(tp->thread), NULL,
42                   (void*)&ORTEAppSendThread, (void *)tp); 
43   }
44 }
45
46 /*****************************************************************************/
47 void
48 ORTEDomainRecvThreadStop(TaskProp *tp) 
49 {
50   ORTEDomain *d=tp->d;
51
52   if (!tp->terminate) {
53     tp->terminate=ORTE_TRUE;
54     ORTEDomainWakeUpReceivingThread(d,
55         &d->taskSend.sock,tp->sock.port); 
56     pthread_join(tp->thread,NULL); 
57   }
58 }
59
60 /*****************************************************************************/
61 void
62 ORTEDomainSendThreadStop(TaskProp *tp) 
63 {
64   ORTEDomain *d=tp->d;
65
66   if (!tp->terminate) {
67     tp->terminate=ORTE_TRUE;
68     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
69     pthread_join(tp->thread,NULL); 
70   }
71 }
72
73 /*****************************************************************************/
74 void
75 ORTEDomainStart(ORTEDomain *d,
76     Boolean recvUnicastMetatrafficThread,
77     Boolean recvMulticastMetatrafficThread,
78     Boolean recvUnicastUserdataThread,
79     Boolean recvMulticastUserdataThread,
80     Boolean sendThread) {
81
82   if(!d) return;
83
84   if (recvUnicastMetatrafficThread) 
85     ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
86
87   if (recvMulticastMetatrafficThread) 
88     ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
89
90   if (recvUnicastUserdataThread) 
91     ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
92
93   if (recvMulticastUserdataThread) 
94     ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
95
96   if (sendThread) 
97     ORTEDomainSendThreadStart(&d->taskSend);
98 }
99
100 /*****************************************************************************/
101 Boolean
102 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
103   sock_t        sock;
104
105   memset(prop, 0, sizeof(*prop));
106
107   prop->multicast.enabled=ORTE_FALSE;
108   prop->multicast.ttl=1;
109   prop->multicast.loopBackEnabled=ORTE_TRUE;
110
111   //IFProp
112   sock_init_udp(&sock);
113   sock_bind(&sock,0);
114   sock_get_local_interfaces(&sock,prop->IFProp,&prop->IFCount);
115   sock_cleanup(&sock); 
116
117   prop->mgrs=NULL; //only from localhost
118   prop->appLocalManager=StringToIPAddress("127.0.0.1");
119   prop->keys=NULL; //are assign be orte
120   sprintf(prop->version,ORTE_PACKAGE_STRING\
121                         ", compiled: "\
122                         __DATE__\
123                         " "\
124                         __TIME__);
125                         
126   prop->recvBuffSize=0x4000;
127   prop->sendBuffSize=0x4000; 
128   prop->wireProp.metaBytesPerPacket=1500;
129   prop->wireProp.metaBytesPerFastPacket=1000; //not used
130   prop->wireProp.metabitsPerACKBitmap=32;     //not used
131   prop->wireProp.userBytesPerPacket=0x3000;
132   
133   //domainBaseProp
134   prop->baseProp.registrationMgrRetries=0;
135   NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
136   prop->baseProp.registrationAppRetries=3;
137   NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
138   NTPTIME_BUILD(prop->baseProp.expirationTime,180);  //180s
139   NTPTIME_BUILD(prop->baseProp.refreshPeriod,72);    //72s - refresh self parameters
140   NTPTIME_BUILD(prop->baseProp.purgeTime,60);        //60s - purge time of parameters
141   NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
142   NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
143   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
144   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
145   NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
146   prop->baseProp.ACKMaxRetries=10;
147   prop->baseProp.HBMaxRetries=10;
148   
149   PublParamsInit(&prop->publPropDefault);
150   SubsParamsInit(&prop->subsPropDefault);
151   
152   return ORTE_TRUE;
153 }
154
155 /*****************************************************************************/
156 Boolean
157 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
158   memset(events,0,sizeof(ORTEDomainAppEvents));
159   return ORTE_TRUE;
160 }
161
162
163 /*****************************************************************************/
164 ORTEDomain * 
165 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
166                ORTEDomainAppEvents *events,Boolean manager) {
167   ORTEDomain        *d;
168   ObjectEntryOID    *objectEntryOID;
169   AppParams         *appParams;
170   CSTWriterParams   cstWriterParams;
171   CSTReaderParams   cstReaderParams;
172   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
173   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
174   char              sbuff[128];
175   int               i;
176   uint16_t          port=0;
177   Boolean           error=ORTE_FALSE;
178
179   debug(30,2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
180                  ORTE_PACKAGE_STRING,__DATE__,__TIME__);
181
182   debug(30,10) ("ORTEDomainCreate: start\n");
183   //Create domainApplication
184   d=MALLOC(sizeof(ORTEDomain));
185   if (!d) return NULL;  //no memory
186   //initialization local values
187   d->domain=domain;
188   d->taskRecvUnicastMetatraffic.d=d;
189   d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
190   d->taskRecvMulticastMetatraffic.d=d;
191   d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
192   d->taskRecvUnicastUserdata.d=d;
193   d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
194   d->taskRecvMulticastUserdata.d=d;
195   d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
196   d->taskSend.d=d;
197   d->taskSend.terminate=ORTE_TRUE;
198   d->taskRecvUnicastMetatraffic.sock.port=0;
199   d->taskRecvMulticastMetatraffic.sock.port=0;
200   d->taskRecvUnicastUserdata.sock.port=0;
201   d->taskRecvMulticastUserdata.sock.port=0;
202   d->taskSend.sock.port=0;
203   //init structure objectEntry
204   ObjectEntryHID_init_root_field(&d->objectEntry);
205   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
206   htimerRoot_init_queue(&d->objectEntry);
207   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
208   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
209   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
210   d->objectEntry.htimSendCondValue=0;
211   //publication,subscriptions
212   d->publications.counter=d->subscriptions.counter=0;
213   CSTWriter_init_root_field(&d->publications);
214   CSTReader_init_root_field(&d->subscriptions);
215   pthread_rwlock_init(&d->publications.lock,NULL);
216   pthread_rwlock_init(&d->subscriptions.lock,NULL);
217   //publication,subscriptions lists
218   PublicationList_init_root_field(&d->psEntry);
219   pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
220   SubscriptionList_init_root_field(&d->psEntry);
221   pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
222   
223   //pattern
224   pthread_rwlock_init(&d->patternEntry.lock,NULL);
225   ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
226   Pattern_init_head(&d->patternEntry);
227     
228   //create domainProp 
229   if (prop!=NULL) {
230     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
231   } else {
232     ORTEDomainPropDefaultGet(&d->domainProp);
233   }
234   
235   //print local IP addresses
236   iflocal[0]=0;
237   if (d->domainProp.IFCount) {
238     for(i=0;i<d->domainProp.IFCount;i++)
239       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
240     debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
241   } else{
242     debug(30,2) ("ORTEDomainCreate: no active interface card\n");
243     if (d->domainProp.multicast.enabled) {
244        debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
245        FREE(d);
246        return NULL;
247     }
248   }
249
250   //DomainEvents
251   if (events!=NULL) {
252     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
253   } else {
254     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
255   }
256
257   //local buffers
258   CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
259   CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
260   CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
261   CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
262   CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
263   CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
264                   d->domainProp.recvBuffSize);
265   CDR_buffer_init(&d->taskSend.mb.cdrCodec,
266                   d->domainProp.sendBuffSize);
267   d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
268   if (!manager) {
269     CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
270                     d->domainProp.recvBuffSize);
271     if (d->domainProp.multicast.enabled) {
272       CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
273                       d->domainProp.recvBuffSize);
274       CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
275                       d->domainProp.recvBuffSize);
276     }
277   }
278   d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
279
280   //TypeRegister
281   ORTEType_init_root_field(&d->typeEntry);
282   pthread_rwlock_init(&d->typeEntry.lock,NULL);
283
284   //Sockets
285   sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
286   sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
287   sock_init_udp(&d->taskRecvUnicastUserdata.sock);
288   sock_init_udp(&d->taskRecvMulticastUserdata.sock);
289   sock_init_udp(&d->taskSend.sock);
290
291   /************************************************************************/
292   /* UnicastMetatraffic */
293   Domain2Port(d->domain,port);
294   if (manager) {
295     if (d->domainProp.multicast.enabled) {
296       char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
297       struct ip_mreq mreq;
298       int reuse=1,loop=0;
299     
300       //reuseaddr
301       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET, 
302                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
303       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
304                     reuse);
305
306       //multicast loop
307       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP, 
308                     IP_MULTICAST_LOOP, (char*)&loop, 
309                     sizeof(loop));
310       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
311                   loop);
312       
313       //joint to multicast group
314       mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
315       mreq.imr_interface.s_addr=htonl(INADDR_ANY);
316       if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
317           IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
318         debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
319                       IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
320       }
321     }
322     sock_bind(&d->taskRecvUnicastMetatraffic.sock,port); 
323   } else {
324     /* give me receiving port (metatraffic) */
325     sock_bind(&d->taskRecvUnicastMetatraffic.sock,0); 
326   }
327   debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
328                d->taskRecvUnicastMetatraffic.sock.port);
329
330   /************************************************************************/
331   /* MulticastMetatraffic */
332   if (d->domainProp.multicast.enabled && !manager) {
333     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
334     struct ip_mreq mreq;
335     Port mport;
336     int reuse=1;
337     
338     //reuseaddr
339     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET, 
340                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
341     debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
342                   reuse);
343
344     //multicast loop
345     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP, 
346                     IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
347                     sizeof(d->domainProp.multicast.loopBackEnabled));
348     debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
349                   d->domainProp.multicast.loopBackEnabled);
350     
351     //joint to multicast group
352     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
353     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
354     if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
355         IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
356       debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
357                     IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
358     }
359     
360     /* receiving multicast port (metatraffic) */
361     Domain2PortMulticastMetatraffic(d->domain,mport);
362     sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport); 
363     debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
364                   d->taskRecvMulticastMetatraffic.sock.port);
365   }
366
367   /************************************************************************/
368   /* UserData */
369   if (!manager) {
370     /* give me receiving port (userdata) */
371     sock_bind(&d->taskRecvUnicastUserdata.sock,0); 
372     debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
373                   d->taskRecvUnicastUserdata.sock.port);
374
375     if (d->domainProp.multicast.enabled) {
376       Port mport;
377       int reuse=1;
378     
379       //reuseaddr
380       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET, 
381                       SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
382       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
383                       reuse);
384
385       //multicast loop
386       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP, 
387                       IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
388                       sizeof(d->domainProp.multicast.loopBackEnabled));
389       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
390                     d->domainProp.multicast.loopBackEnabled);
391       
392       /* receiving multicast port (userdata) */
393       Domain2PortMulticastUserdata(d->domain,mport);
394       sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport); 
395       debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
396                     d->taskRecvMulticastUserdata.sock.port);
397     }
398   }
399
400   /************************************************************************/
401   /* Send */
402   /* give me sending port */
403   sock_bind(&d->taskSend.sock,0);         
404   debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
405                d->taskSend.sock.port);
406   if (d->domainProp.multicast.enabled) {
407     //ttl
408     if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, 
409         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
410       debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
411            d->domainProp.multicast.ttl);
412     } 
413   }
414
415   /************************************************************************/
416   /* tests for valid resources */
417   if ((d->taskRecvUnicastMetatraffic.sock.fd<0) || 
418       (d->taskSend.sock.fd<0) ||
419       (d->domainProp.multicast.enabled &&
420        (d->taskRecvUnicastUserdata.sock.fd<0)) ||
421       (d->domainProp.multicast.enabled &&
422        (d->taskRecvMulticastUserdata.sock.fd<0)) ||
423       (d->domainProp.multicast.enabled && 
424        (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
425     debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
426     error=ORTE_TRUE;
427   }
428
429   if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) || 
430       (!d->taskSend.mb.cdrCodec.buffer) ||
431       (d->domainProp.multicast.enabled && !manager &&
432        !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) || 
433       (d->domainProp.multicast.enabled && !manager &&
434        !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) || 
435       (d->domainProp.multicast.enabled && !manager &&
436        !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
437     debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
438     error=ORTE_TRUE;
439   } 
440   /* a problem occure with resources */
441   if (error) {
442     sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
443     sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
444     sock_cleanup(&d->taskRecvUnicastUserdata.sock);
445     sock_cleanup(&d->taskRecvMulticastUserdata.sock);
446     sock_cleanup(&d->taskSend.sock);
447     CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
448     CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
449     CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
450     CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
451     CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
452     FREE(d);
453   }
454
455   /************************************************************************/
456   //Generates local GUID
457   if (d->domainProp.IFCount>0) 
458     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
459   else
460     d->guid.hid=StringToIPAddress("127.0.0.1");
461   if (manager) {
462     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
463   } else {
464     d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
465   }
466   d->guid.oid=OID_APP;
467   debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
468                GUID_PRINTF(d->guid)); 
469
470   //create HEADER of message for sending task
471   RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
472   d->taskSend.mb.needSend=ORTE_FALSE;
473   d->taskSend.mb.containsInfoReply=ORTE_FALSE;  
474   d->taskSend.mb.cdrCodecDirect=NULL;
475   
476   //Self object data & fellow managers object data
477   appParams=(AppParams*)MALLOC(sizeof(AppParams));
478   AppParamsInit(appParams);
479   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
480   VENDOR_ID_OCERA(appParams->vendorId);
481   appParams->hostId=d->guid.hid;
482   appParams->appId=d->guid.aid;
483   appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
484   appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;  
485   //fill unicast/multicast ip addresses
486   if (d->domainProp.IFCount) {
487     for(i=0;i<d->domainProp.IFCount;i++)
488       appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
489     appParams->unicastIPAddressCount=d->domainProp.IFCount;
490   }
491   if (d->domainProp.multicast.enabled &&
492       IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
493     appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
494         d->domainProp.multicast.ipAddress;
495     appParams->metatrafficMulticastIPAddressCount++;
496   } else {
497     if (!d->domainProp.IFCount) {
498       appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
499             StringToIPAddress("127.0.0.1");
500       appParams->unicastIPAddressCount++;
501     }
502   }
503   //KeyList
504   if (!d->domainProp.keys) {
505     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
506     for(i=0;i<d->domainProp.IFCount;i++)
507       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
508     if (d->domainProp.multicast.enabled &&
509         IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
510       appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
511       i++;
512     }
513     appParams->managerKeyCount=i+1;
514   } else {
515     appParams->managerKeyCount=i=0;
516     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
517       appParams->managerKeyList[appParams->managerKeyCount++]=
518           StringToIPAddress(sbuff);
519     }    
520   }
521   d->appParams=appParams;
522   //insert object, doesn't need to be locked
523   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
524   d->objectEntryOID->privateCreated=ORTE_TRUE;
525
526   
527   /************************************************************************/
528   //CST objects
529   //  writerApplicationSelf (WAS)
530   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
531   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
532   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
533   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
534   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
535   if (manager) {
536     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries; 
537     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod; 
538     cstWriterParams.fullAcknowledge=ORTE_FALSE;
539   } else {
540     cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries; 
541     cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod; 
542     cstWriterParams.fullAcknowledge=ORTE_TRUE;
543   }
544   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
545       OID_WRITE_APPSELF,&cstWriterParams,NULL);
546   if (manager) {
547     i=0;
548     while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
549       GUID_RTPS guid;
550       IPAddress ipAddress=StringToIPAddress(sbuff);
551       guid.hid=ipAddress;
552       guid.aid=AID_UNKNOWN;
553       guid.oid=OID_APP;
554       if (!objectEntryFind(d,&guid)) {
555         CSTRemoteReader *cstRemoteReader;
556         appParams=(AppParams*)MALLOC(sizeof(AppParams));
557         AppParamsInit(appParams);
558         appParams->hostId=guid.hid;
559         appParams->appId=guid.aid;
560         appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
561         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
562         if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
563           appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
564           appParams->metatrafficMulticastIPAddressCount=1;
565           objectEntryOID->multicastPort=port;
566         } else {
567           appParams->unicastIPAddressList[0]=ipAddress;
568           appParams->unicastIPAddressCount=1;
569           objectEntryOID->multicastPort=0;
570         }
571         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
572         cstRemoteReader=CSTWriterAddRemoteReader(d,
573                                  &d->writerApplicationSelf,
574                                  objectEntryOID,
575                                  OID_READ_MGR,
576                                  objectEntryOID);
577         debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
578                     IPAddressToString(ipAddress,sIPAddress));
579       }
580     }
581   } else {
582     //  add to WAS remote writer(s)
583     if (d->domainProp.appLocalManager) {
584       GUID_RTPS guid;
585       guid.hid=d->domainProp.appLocalManager;
586       guid.aid=AID_UNKNOWN;
587       guid.oid=OID_APP;
588       if (!objectEntryFind(d,&guid)) {
589         appParams=(AppParams*)MALLOC(sizeof(AppParams));
590         AppParamsInit(appParams);
591         appParams->hostId=guid.hid;
592         appParams->appId=guid.aid;
593         appParams->metatrafficUnicastPort=port;
594         appParams->userdataUnicastPort=0;  //Manager support only metatraffic
595         appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
596         appParams->unicastIPAddressCount=1;
597         objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
598         CSTWriterAddRemoteReader(d,
599                                  &d->writerApplicationSelf,
600                                  objectEntryOID,
601                                  OID_READ_MGR,
602                                  objectEntryOID);
603         debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
604                       IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
605       }
606     }
607   }
608
609   //  readerManagers
610   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
611   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
612   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
613   if (manager) {
614     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
615     cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
616   } else {
617     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
618     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
619   }
620   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
621       OID_READ_MGR,&cstReaderParams,NULL);
622
623   //  readerApplications
624   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
625   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
626   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
627   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
628   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
629   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
630       OID_READ_APP,&cstReaderParams,NULL);
631
632   if (manager) {
633     //  writerApplications
634     cstWriterParams.registrationRetries=0; 
635     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
636     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
637     cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
638     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
639     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
640     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
641     cstWriterParams.fullAcknowledge=ORTE_FALSE;
642     CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
643         OID_WRITE_APP,&cstWriterParams,NULL);
644
645     //  writerManagers
646     cstWriterParams.registrationRetries=0; 
647     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
648     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
649     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
650     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
651     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
652     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
653     cstWriterParams.fullAcknowledge=ORTE_TRUE;
654     CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
655         OID_WRITE_MGR,&cstWriterParams,NULL);
656   }
657
658   if (!manager) {
659     //  writerPublications
660     cstWriterParams.registrationRetries=0; 
661     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
662     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
663     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
664     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
665     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
666     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
667     cstWriterParams.fullAcknowledge=ORTE_TRUE;
668     CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
669         OID_WRITE_PUBL,&cstWriterParams,NULL);
670     //  writerSubscriptions
671     cstWriterParams.registrationRetries=0; 
672     NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
673     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
674     cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
675     cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
676     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
677     cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
678     cstWriterParams.fullAcknowledge=ORTE_TRUE;
679     CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
680         OID_WRITE_SUBS,&cstWriterParams,NULL);
681     //  readerPublications
682     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
683     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
684     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
685     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
686     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
687     CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
688         OID_READ_PUBL,&cstReaderParams,NULL);
689     //  readerSubscriptions
690     cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
691     cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
692     cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
693     cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
694     cstReaderParams.fullAcknowledge=ORTE_TRUE;      
695     CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
696         OID_READ_SUBS,&cstReaderParams,NULL);
697   }
698
699   //add csChange for WAS
700   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
701
702   debug(30,10) ("ORTEDomainCreate: finished\n");
703   return d;
704 }
705
706 /*****************************************************************************/
707 Boolean
708 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
709   CSTWriter             *cstWriter;
710   CSTReader             *cstReader;
711
712   debug(30,10) ("ORTEDomainDestroy: start\n");
713   if (!d) return ORTE_FALSE;
714
715   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
716   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
717   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
718   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
719   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
720
721   //Stoping threads
722   ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
723   ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
724   ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
725   ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
726   ORTEDomainSendThreadStop(&d->taskSend);
727   debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
728   
729   //CSTReaders and CSTWriters
730   CSTWriterDelete(d,&d->writerApplicationSelf);
731   CSTReaderDelete(d,&d->readerManagers);
732   CSTReaderDelete(d,&d->readerApplications);
733   if (manager) {
734     CSTWriterDelete(d,&d->writerManagers);
735     CSTWriterDelete(d,&d->writerApplications);
736   } else { 
737     CSTWriterDelete(d,&d->writerPublications);
738     CSTWriterDelete(d,&d->writerSubscriptions);
739     CSTReaderDelete(d,&d->readerPublications);
740     CSTReaderDelete(d,&d->readerSubscriptions);
741
742     while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
743       CSTWriterDelete(d,cstWriter);
744       FREE(cstWriter);
745     }  
746     while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
747       CSTReaderDelete(d,cstReader);
748       FREE(cstReader);
749     }
750   }  
751     
752   //objects in objectsEntry
753   objectEntryDeleteAll(d,&d->objectEntry);
754   debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
755
756   //Sockets
757   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
758   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
759   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
760   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
761   sock_cleanup(&d->taskSend.sock);
762
763
764   //Signals
765   pthread_cond_destroy(&d->objectEntry.htimSendCond);
766   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
767
768   //rwLocks
769   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
770   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
771   pthread_rwlock_destroy(&d->publications.lock);
772   pthread_rwlock_destroy(&d->subscriptions.lock);
773   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
774   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
775
776   //TypeRegister
777   ORTETypeRegisterDestroyAll(d);
778   
779   //Pattern
780   ORTEDomainAppSubscriptionPatternDestroy(d);
781   pthread_rwlock_destroy(&d->patternEntry.lock);
782   
783   //Release buffers  
784   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
785   CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
786   CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
787   CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
788   CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
789   
790   //Free domain instance
791   FREE(d);
792   
793   debug(30,10) ("ORTEDomainDestroy: finished\n");
794   
795   return ORTE_TRUE;
796 }