]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEDomain.c
New ORTE version 0.3.0 committed
[orte.git] / orte / liborte / ORTEDomain.c
index 8e96bb9c5347b95698b5b91bb4ba298b928ec711..26206db9e080a8e71977c43415a09a305da576c2 100644 (file)
 
 /*****************************************************************************/
 void
-ORTEDomainStart(ORTEDomain *d,
-    Boolean recvMetatrafficThread,Boolean recvUserDataThread,Boolean sendThread) {
-  if(!d) return;
-  if ((recvMetatrafficThread) && (d->taskRecvMetatraffic.terminate)) {
-    d->taskRecvMetatraffic.terminate=ORTE_FALSE;
-    pthread_create(&d->taskRecvMetatraffic.thread, NULL,
-                  (void*)&ORTEAppRecvMetatrafficThread, (void *)d); 
+ORTEDomainRecvThreadStart(TaskProp *tp) 
+{
+  if (tp->terminate) {
+    tp->terminate=ORTE_FALSE;
+    pthread_create(&(tp->thread), NULL,
+                  (void*)&ORTEAppRecvThread, (void *)tp); 
   }
-  if ((recvUserDataThread) && (d->taskRecvUserdata.terminate)) {
-    d->taskRecvUserdata.terminate=ORTE_FALSE;
-    pthread_create(&d->taskRecvUserdata.thread, NULL,
-                  (void*)&ORTEAppRecvUserdataThread, (void *)d); 
+}
+
+/*****************************************************************************/
+void
+ORTEDomainSendThreadStart(TaskProp *tp) 
+{
+  if (tp->terminate) {
+    tp->terminate=ORTE_FALSE;
+    pthread_create(&(tp->thread), NULL,
+                  (void*)&ORTEAppSendThread, (void *)tp); 
   }
-  if ((sendThread) && (d->taskSend.terminate)) {
-    d->taskSend.terminate=ORTE_FALSE;
-    pthread_create(&d->taskSend.thread, NULL,
-                  (void*)&ORTEAppSendThread, (void *)d); 
+}
+
+/*****************************************************************************/
+void
+ORTEDomainRecvThreadStop(TaskProp *tp) 
+{
+  ORTEDomain *d=tp->d;
+
+  if (!tp->terminate) {
+    tp->terminate=ORTE_TRUE;
+    ORTEDomainWakeUpReceivingThread(d,
+        &d->taskSend.sock,tp->sock.port); 
+    pthread_join(tp->thread,NULL); 
   }
 }
 
+/*****************************************************************************/
+void
+ORTEDomainSendThreadStop(TaskProp *tp) 
+{
+  ORTEDomain *d=tp->d;
+
+  if (!tp->terminate) {
+    tp->terminate=ORTE_TRUE;
+    ORTEDomainWakeUpSendingThread(&d->objectEntry); 
+    pthread_join(tp->thread,NULL); 
+  }
+}
+
+/*****************************************************************************/
+void
+ORTEDomainStart(ORTEDomain *d,
+    Boolean recvUnicastMetatrafficThread,
+    Boolean recvMulticastMetatrafficThread,
+    Boolean recvUnicastUserdataThread,
+    Boolean recvMulticastUserdataThread,
+    Boolean sendThread) {
+
+  if(!d) return;
+
+  if (recvUnicastMetatrafficThread) 
+    ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
+
+  if (recvMulticastMetatrafficThread) 
+    ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
+
+  if (recvUnicastUserdataThread) 
+    ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
+
+  if (recvMulticastUserdataThread) 
+    ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
+
+  if (sendThread) 
+    ORTEDomainSendThreadStart(&d->taskSend);
+}
+
 /*****************************************************************************/
 Boolean
 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
@@ -72,12 +126,16 @@ ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
   prop->recvBuffSize=0x4000;
   prop->sendBuffSize=0x4000; 
   prop->wireProp.metaBytesPerPacket=1500;
-  prop->wireProp.metaBytesPerFastPacket=1000;
-  prop->wireProp.metabitsPerACKBitmap=32;
-  prop->wireProp.userMaxSerDeserSize=0x4000;
+  prop->wireProp.metaBytesPerFastPacket=1000; //not used
+  prop->wireProp.metabitsPerACKBitmap=32;     //not used
+  prop->wireProp.userBytesPerPacket=0x3000;
   
   //domainBaseProp
-  NTPTIME_BUILD(prop->baseProp.expirationTime,180); //180s
+  prop->baseProp.registrationMgrRetries=0;
+  NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
+  prop->baseProp.registrationAppRetries=3;
+  NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
+  NTPTIME_BUILD(prop->baseProp.expirationTime,180);  //180s
   NTPTIME_BUILD(prop->baseProp.refreshPeriod,72);    //72s - refresh self parameters
   NTPTIME_BUILD(prop->baseProp.purgeTime,60);        //60s - purge time of parameters
   NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
@@ -100,3 +158,596 @@ ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
   memset(events,0,sizeof(ORTEDomainAppEvents));
   return ORTE_TRUE;
 }
+
+
+/*****************************************************************************/
+ORTEDomain * 
+ORTEDomainCreate(int domain, ORTEDomainProp *prop,
+               ORTEDomainAppEvents *events,Boolean manager) {
+  ORTEDomain        *d;
+  ObjectEntryOID    *objectEntryOID;
+  AppParams         *appParams;
+  CSTWriterParams   cstWriterParams;
+  CSTReaderParams   cstReaderParams;
+  char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
+  char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+  char              sbuff[128];
+  int               i;
+  uint16_t          port=0;
+  Boolean           error=ORTE_FALSE;
+
+  debug(30,2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
+                ORTE_PACKAGE_STRING,__DATE__,__TIME__);
+
+  debug(30,10) ("ORTEDomainCreate: start\n");
+  //Create domainApplication
+  d=MALLOC(sizeof(ORTEDomain));
+  if (!d) return NULL;  //no memory
+  //initialization local values
+  d->domain=domain;
+  d->taskRecvUnicastMetatraffic.d=d;
+  d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
+  d->taskRecvMulticastMetatraffic.d=d;
+  d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
+  d->taskRecvUnicastUserdata.d=d;
+  d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
+  d->taskRecvMulticastUserdata.d=d;
+  d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
+  d->taskSend.d=d;
+  d->taskSend.terminate=ORTE_TRUE;
+  d->taskRecvUnicastMetatraffic.sock.port=0;
+  d->taskRecvMulticastMetatraffic.sock.port=0;
+  d->taskRecvUnicastUserdata.sock.port=0;
+  d->taskRecvMulticastUserdata.sock.port=0;
+  d->taskSend.sock.port=0;
+  //init structure objectEntry
+  ObjectEntryHID_init_root_field(&d->objectEntry);
+  pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
+  htimerRoot_init_queue(&d->objectEntry);
+  pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
+  pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
+  pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
+  d->objectEntry.htimSendCondValue=0;
+  //publication,subscriptions
+  d->publications.counter=d->subscriptions.counter=0;
+  CSTWriter_init_root_field(&d->publications);
+  CSTReader_init_root_field(&d->subscriptions);
+  pthread_rwlock_init(&d->publications.lock,NULL);
+  pthread_rwlock_init(&d->subscriptions.lock,NULL);
+  //publication,subscriptions lists
+  PublicationList_init_root_field(&d->psEntry);
+  pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
+  SubscriptionList_init_root_field(&d->psEntry);
+  pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
+  
+  //pattern
+  pthread_rwlock_init(&d->patternEntry.lock,NULL);
+  ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
+  Pattern_init_head(&d->patternEntry);
+    
+  //create domainProp 
+  if (prop!=NULL) {
+    memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
+  } else {
+    ORTEDomainPropDefaultGet(&d->domainProp);
+  }
+  
+  //print local IP addresses
+  iflocal[0]=0;
+  if (d->domainProp.IFCount) {
+    for(i=0;i<d->domainProp.IFCount;i++)
+      strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
+    debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
+  } else{
+    debug(30,2) ("ORTEDomainCreate: no active interface card\n");
+    if (d->domainProp.multicast.enabled) {
+       debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
+       FREE(d);
+       return NULL;
+    }
+  }
+
+  //DomainEvents
+  if (events!=NULL) {
+    memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
+  } else {
+    memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
+  }
+
+  //local buffers
+  CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
+  CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
+  CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
+  CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
+  CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
+  CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
+                 d->domainProp.recvBuffSize);
+  CDR_buffer_init(&d->taskSend.mb.cdrCodec,
+                 d->domainProp.sendBuffSize);
+  d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
+  if (!manager) {
+    CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
+                   d->domainProp.recvBuffSize);
+    if (d->domainProp.multicast.enabled) {
+      CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
+                     d->domainProp.recvBuffSize);
+      CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
+                     d->domainProp.recvBuffSize);
+    }
+  }
+  d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
+
+  //TypeRegister
+  ORTEType_init_root_field(&d->typeEntry);
+  pthread_rwlock_init(&d->typeEntry.lock,NULL);
+
+  //Sockets
+  sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
+  sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
+  sock_init_udp(&d->taskRecvUnicastUserdata.sock);
+  sock_init_udp(&d->taskRecvMulticastUserdata.sock);
+  sock_init_udp(&d->taskSend.sock);
+
+  /************************************************************************/
+  /* UnicastMetatraffic */
+  Domain2Port(d->domain,port);
+  if (manager) {
+    sock_bind(&d->taskRecvUnicastMetatraffic.sock,port); 
+  } else {
+    /* give me receiving port (metatraffic) */
+    sock_bind(&d->taskRecvUnicastMetatraffic.sock,0); 
+  }
+  debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
+               d->taskRecvUnicastMetatraffic.sock.port);
+
+  /************************************************************************/
+  /* MulticastMetatraffic */
+  if (d->domainProp.multicast.enabled && !manager) {
+    Port mport;
+    int reuse=1;
+    
+    //reuseaddr
+    sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET, 
+                   SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
+    debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
+                 reuse);
+
+    //multicast loop
+    sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP, 
+                   IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
+                   sizeof(d->domainProp.multicast.loopBackEnabled));
+    debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
+                 d->domainProp.multicast.loopBackEnabled);
+
+    /* receiving multicast port (metatraffic) */
+    Domain2PortMulticastMetatraffic(d->domain,mport);
+    sock_bind(&d->taskRecvMulticastMetatraffic.sock,mport); 
+    debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
+                  d->taskRecvMulticastMetatraffic.sock.port);
+  }
+
+  /************************************************************************/
+  /* UserData */
+  if (!manager) {
+    /* give me receiving port (userdata) */
+    sock_bind(&d->taskRecvUnicastUserdata.sock,0); 
+    debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
+                  d->taskRecvUnicastUserdata.sock.port);
+
+    if (d->domainProp.multicast.enabled) {
+      Port mport;
+      int reuse=1;
+    
+      //reuseaddr
+      sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET, 
+                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
+      debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
+                     reuse);
+
+      //multicast loop
+      sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP, 
+                     IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
+                     sizeof(d->domainProp.multicast.loopBackEnabled));
+      debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
+                   d->domainProp.multicast.loopBackEnabled);
+      
+      /* receiving multicast port (userdata) */
+      Domain2PortMulticastUserdata(d->domain,mport);
+      sock_bind(&d->taskRecvMulticastUserdata.sock,mport); 
+      debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
+                    d->taskRecvMulticastUserdata.sock.port);
+    }
+  }
+
+  /************************************************************************/
+  /* Send */
+  /* give me sending port */
+  sock_bind(&d->taskSend.sock,0);         
+  debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
+               d->taskSend.sock.port);
+  if (d->domainProp.multicast.enabled) {
+    //ttl
+    if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, 
+        &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
+      debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
+           d->domainProp.multicast.ttl);
+    } 
+  }
+
+  /************************************************************************/
+  /* tests for valid resources */
+  if ((d->taskRecvUnicastMetatraffic.sock.fd<0) || 
+      (d->taskSend.sock.fd<0) ||
+      (d->domainProp.multicast.enabled &&
+       (d->taskRecvUnicastUserdata.sock.fd<0)) ||
+      (d->domainProp.multicast.enabled &&
+       (d->taskRecvMulticastUserdata.sock.fd<0)) ||
+      (d->domainProp.multicast.enabled && 
+       (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
+    debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
+    error=ORTE_TRUE;
+  }
+
+  if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) || 
+      (!d->taskSend.mb.cdrCodec.buffer) ||
+      (d->domainProp.multicast.enabled && !manager &&
+       !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) || 
+      (d->domainProp.multicast.enabled && !manager &&
+       !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) || 
+      (d->domainProp.multicast.enabled && !manager &&
+       !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
+    debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
+    error=ORTE_TRUE;
+  } 
+  /* a problem occure with resources */
+  if (error) {
+    sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
+    sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
+    sock_cleanup(&d->taskRecvUnicastUserdata.sock);
+    sock_cleanup(&d->taskRecvMulticastUserdata.sock);
+    sock_cleanup(&d->taskSend.sock);
+    CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
+    CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
+    CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
+    CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
+    CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
+    FREE(d);
+  }
+
+  /************************************************************************/
+  //Generates local GUID
+  if (d->domainProp.IFCount>0) 
+    d->guid.hid=d->domainProp.IFProp[0].ipAddress;
+  else
+    d->guid.hid=StringToIPAddress("127.0.0.1");
+  if (manager) {
+    d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
+  } else {
+    d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
+  }
+  d->guid.oid=OID_APP;
+  debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
+               GUID_PRINTF(d->guid)); 
+
+  //create HEADER of message for sending task
+  RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
+  d->taskSend.mb.needSend=ORTE_FALSE;
+  d->taskSend.mb.containsInfoReply=ORTE_FALSE;  
+  d->taskSend.mb.cdrCodecDirect=NULL;
+  
+  //Self object data & fellow managers object data
+  appParams=(AppParams*)MALLOC(sizeof(AppParams));
+  AppParamsInit(appParams);
+  appParams->expirationTime=d->domainProp.baseProp.expirationTime;
+  VENDOR_ID_OCERA(appParams->vendorId);
+  appParams->hostId=d->guid.hid;
+  appParams->appId=d->guid.aid;
+  appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
+  appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;  
+  //fill unicast/multicast ip addresses
+  if (d->domainProp.IFCount) {
+    for(i=0;i<d->domainProp.IFCount;i++)
+      appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
+    appParams->unicastIPAddressCount=d->domainProp.IFCount;
+  }
+  if (d->domainProp.multicast.enabled &&
+      IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
+    appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
+       d->domainProp.multicast.ipAddress;
+    appParams->metatrafficMulticastIPAddressCount++;
+  } else {
+    if (!d->domainProp.IFCount) {
+      appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
+           StringToIPAddress("127.0.0.1");
+      appParams->unicastIPAddressCount++;
+    }
+  }
+  //KeyList
+  if (!d->domainProp.keys) {
+    appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
+    for(i=0;i<d->domainProp.IFCount;i++)
+      appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
+    if (d->domainProp.multicast.enabled &&
+        IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
+      appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
+      i++;
+    }
+    appParams->managerKeyCount=i+1;
+  } else {
+    appParams->managerKeyCount=i=0;
+    while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
+      appParams->managerKeyList[appParams->managerKeyCount++]=
+          StringToIPAddress(sbuff);
+    }    
+  }
+  d->appParams=appParams;
+  //insert object, doesn't need to be locked
+  d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
+  d->objectEntryOID->privateCreated=ORTE_TRUE;
+
+  
+  /************************************************************************/
+  //CST objects
+  //  writerApplicationSelf (WAS)
+  NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
+  cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
+  cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
+  NTPTIME_ZERO(cstWriterParams.delayResponceTime);
+  cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
+  if (manager) {
+    cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries; 
+    cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod; 
+    cstWriterParams.fullAcknowledge=ORTE_FALSE;
+  } else {
+    cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries; 
+    cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod; 
+    cstWriterParams.fullAcknowledge=ORTE_TRUE;
+  }
+  CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
+      OID_WRITE_APPSELF,&cstWriterParams,NULL);
+  if (manager) {
+    i=0;
+    while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
+      GUID_RTPS guid;
+      IPAddress ipAddress=StringToIPAddress(sbuff);
+      guid.hid=ipAddress;
+      guid.aid=AID_UNKNOWN;
+      guid.oid=OID_APP;
+      if (!objectEntryFind(d,&guid)) {
+        CSTRemoteReader *cstRemoteReader;
+        appParams=(AppParams*)MALLOC(sizeof(AppParams));
+        AppParamsInit(appParams);
+        appParams->hostId=guid.hid;
+        appParams->appId=guid.aid;
+        appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
+        objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
+        if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
+          appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
+          appParams->metatrafficMulticastIPAddressCount=1;
+          objectEntryOID->multicastPort=port;
+        } else {
+          appParams->unicastIPAddressList[0]=ipAddress;
+          appParams->unicastIPAddressCount=1;
+          objectEntryOID->multicastPort=0;
+        }
+        appParams->userdataUnicastPort=0;  //Manager support only metatraffic
+        cstRemoteReader=CSTWriterAddRemoteReader(d,
+                                &d->writerApplicationSelf,
+                                objectEntryOID,
+                                OID_READ_MGR,
+                                objectEntryOID);
+        debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
+                    IPAddressToString(ipAddress,sIPAddress));
+      }
+    }
+  } else {
+    //  add to WAS remote writer(s)
+    if (d->domainProp.appLocalManager) {
+      GUID_RTPS guid;
+      guid.hid=d->domainProp.appLocalManager;
+      guid.aid=AID_UNKNOWN;
+      guid.oid=OID_APP;
+      if (!objectEntryFind(d,&guid)) {
+        appParams=(AppParams*)MALLOC(sizeof(AppParams));
+        AppParamsInit(appParams);
+        appParams->hostId=guid.hid;
+        appParams->appId=guid.aid;
+        appParams->metatrafficUnicastPort=port;
+        appParams->userdataUnicastPort=0;  //Manager support only metatraffic
+        appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
+        appParams->unicastIPAddressCount=1;
+        objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
+        CSTWriterAddRemoteReader(d,
+                                &d->writerApplicationSelf,
+                                objectEntryOID,
+                                OID_READ_MGR,
+                                objectEntryOID);
+        debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
+                      IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
+      }
+    }
+  }
+
+  //  readerManagers
+  cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
+  cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
+  cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
+  if (manager) {
+    cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
+    cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
+  } else {
+    cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
+    cstReaderParams.fullAcknowledge=ORTE_TRUE;      
+  }
+  CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
+      OID_READ_MGR,&cstReaderParams,NULL);
+
+  //  readerApplications
+  cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
+  cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
+  cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
+  cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
+  cstReaderParams.fullAcknowledge=ORTE_TRUE;      
+  CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
+      OID_READ_APP,&cstReaderParams,NULL);
+
+  if (manager) {
+    //  writerApplications
+    cstWriterParams.registrationRetries=0; 
+    NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
+    NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
+    cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
+    cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
+    NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
+    cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
+    cstWriterParams.fullAcknowledge=ORTE_FALSE;
+    CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
+        OID_WRITE_APP,&cstWriterParams,NULL);
+
+    //  writerManagers
+    cstWriterParams.registrationRetries=0; 
+    NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
+    NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
+    cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
+    cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
+    NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
+    cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
+    cstWriterParams.fullAcknowledge=ORTE_TRUE;
+    CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
+        OID_WRITE_MGR,&cstWriterParams,NULL);
+  }
+
+  if (!manager) {
+    //  writerPublications
+    cstWriterParams.registrationRetries=0; 
+    NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
+    NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
+    cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
+    cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
+    NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
+    cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
+    cstWriterParams.fullAcknowledge=ORTE_TRUE;
+    CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
+        OID_WRITE_PUBL,&cstWriterParams,NULL);
+    //  writerSubscriptions
+    cstWriterParams.registrationRetries=0; 
+    NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
+    NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
+    cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
+    cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
+    NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
+    cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
+    cstWriterParams.fullAcknowledge=ORTE_TRUE;
+    CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
+        OID_WRITE_SUBS,&cstWriterParams,NULL);
+    //  readerPublications
+    cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
+    cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
+    cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
+    cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
+    cstReaderParams.fullAcknowledge=ORTE_TRUE;      
+    CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
+        OID_READ_PUBL,&cstReaderParams,NULL);
+    //  readerSubscriptions
+    cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
+    cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
+    cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
+    cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
+    cstReaderParams.fullAcknowledge=ORTE_TRUE;      
+    CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
+        OID_READ_SUBS,&cstReaderParams,NULL);
+  }
+
+  //add csChange for WAS
+  appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
+
+  debug(30,10) ("ORTEDomainCreate: finished\n");
+  return d;
+}
+
+/*****************************************************************************/
+Boolean
+ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
+  CSTWriter             *cstWriter;
+  CSTReader             *cstReader;
+
+  debug(30,10) ("ORTEDomainDestroy: start\n");
+  if (!d) return ORTE_FALSE;
+
+  pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
+  pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
+  appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
+  pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+  pthread_rwlock_unlock(&d->objectEntry.objRootLock);
+
+  //Stoping threads
+  ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
+  ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
+  ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
+  ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
+  ORTEDomainSendThreadStop(&d->taskSend);
+  debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
+  
+  //Sockets
+  sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvUnicastUserdata.sock);
+  sock_cleanup(&d->taskRecvMulticastUserdata.sock);
+  sock_cleanup(&d->taskSend.sock);
+
+  //Signals
+  pthread_cond_destroy(&d->objectEntry.htimSendCond);
+  pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
+
+  //rwLocks
+  pthread_rwlock_destroy(&d->objectEntry.objRootLock);
+  pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
+  pthread_rwlock_destroy(&d->publications.lock);
+  pthread_rwlock_destroy(&d->subscriptions.lock);
+  pthread_rwlock_destroy(&d->psEntry.publicationsLock);
+  pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
+
+  //TypeRegister
+  ORTETypeRegisterDestroyAll(d);
+  pthread_rwlock_destroy(&d->typeEntry.lock);
+  
+  //Pattern
+  ORTEDomainAppSubscriptionPatternDestroy(d);
+  pthread_rwlock_unlock(&d->typeEntry.lock);    
+  pthread_rwlock_destroy(&d->patternEntry.lock);
+  
+  //CSTReaders and CSTWriters
+  CSTWriterDelete(d,&d->writerApplicationSelf);
+  CSTReaderDelete(d,&d->readerManagers);
+  CSTReaderDelete(d,&d->readerApplications);
+  if (manager) {
+    CSTWriterDelete(d,&d->writerManagers);
+    CSTWriterDelete(d,&d->writerApplications);
+  } else { 
+    CSTWriterDelete(d,&d->writerPublications);
+    CSTWriterDelete(d,&d->writerSubscriptions);
+    CSTReaderDelete(d,&d->readerPublications);
+    CSTReaderDelete(d,&d->readerSubscriptions);
+
+    while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
+      CSTWriterDelete(d,cstWriter);
+      FREE(cstWriter);
+    }  
+    while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
+      CSTReaderDelete(d,cstReader);
+      FREE(cstReader);
+    }
+  }  
+    
+  //objects in objectsEntry
+  objectEntryDeleteAll(d,&d->objectEntry);
+  
+  CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
+  CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
+  CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
+  CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
+  CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
+  FREE(d);
+  debug(30,10) ("ORTEDomainDestroy: finished\n");
+  return ORTE_TRUE;
+}