]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEDomainMgr.c
version 0.2.2 (mac, solaris patch)
[orte.git] / orte / liborte / ORTEDomainMgr.c
index df702dd1d2d6203cff5c01cb14b69cc3c0e5cc66..26f6a0e3e7ddf39fa8114d61ea442bf1e1459f55 100644 (file)
 /*****************************************************************************/
 ORTEDomain *
 ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
-                    ORTEDomainAppEvents *events,Boolean startSendingThread) {
+                    ORTEDomainAppEvents *events,Boolean suspended) {
   ORTEDomain        *d;
   ObjectEntryOID    *objectEntryOID;
   AppParams         *appParams;
   CSTWriterParams   cstWriterParams;
   CSTReaderParams   cstReaderParams;
   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
+  char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+  char              sbuff[128];
   int               i;
-  u_int16_t         port=0;
+  uint16_t          port=0;
 
   debug(29,10) ("ORTEDomainMgrCreate: start\n");
   //Create domainApplication
@@ -40,8 +42,8 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
   if (!d) return NULL;  //no memory
   //initialization local values
   d->domain=domain;
-  d->taskRecvMetatraffic.terminate=ORTE_FALSE;
-  d->taskSend.terminate=ORTE_FALSE;
+  d->taskRecvMetatraffic.terminate=ORTE_TRUE;
+  d->taskSend.terminate=ORTE_TRUE;
   d->taskRecvMetatraffic.sock.port=0;
   d->taskSend.sock.port=0;
   //init structure objectEntry
@@ -49,7 +51,9 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
   htimerRoot_init_queue(&d->objectEntry);
   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
-  pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
+  pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
+  pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
+  d->objectEntry.htimSendCondValue=0;
   
   //create domainProp 
   if (prop!=NULL) {
@@ -62,10 +66,10 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
   iflocal[0]=0;
   if (d->domainProp.IFCount) {
     for(i=0;i<d->domainProp.IFCount;i++) 
-      strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress));
+      strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
     debug(29,2) ("ORTEDomainMgrCreate: localIPAddres(es) %s\n",iflocal);
   } else{
-    debug(29,2) ("ORTEDomainMgrCreate: no activ interface card\n");
+    debug(29,2) ("ORTEDomainMgrCreate: no active interface card\n");
   }
 
   //DomainEvents
@@ -77,10 +81,10 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
 
   //local buffers
   d->mbRecvMetatraffic.cdrStream.buffer=
-      (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
+      (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
   d->mbRecvUserdata.cdrStream.buffer=NULL;
   d->mbSend.cdrStream.buffer=
-      (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
+      (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
   if ((!d->mbRecvMetatraffic.cdrStream.buffer) || 
       (!d->mbSend.cdrStream.buffer)) {    //no memory
     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
@@ -123,7 +127,7 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
     if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
         (void *) &mreq, sizeof(mreq))>=0) {
       debug(29,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
-           IPAddressToString(d->domainProp.multicast.ipAddress));
+           IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
     }
   }
   if ((d->taskRecvMetatraffic.sock.fd<0) || (d->taskSend.sock.fd<0) ||
@@ -181,21 +185,25 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
     }
   }
   //managerKeyList
-  appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
-  for(i=0;i<d->domainProp.IFCount;i++)
-    appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
-  appParams->managerKeyCount=d->domainProp.IFCount+1;
-  if (d->domainProp.mgrAddKey!=0) {
-    appParams->managerKeyList[appParams->managerKeyCount]=d->domainProp.mgrAddKey;
-    appParams->managerKeyCount++;
-    debug(29,4) ("ORTEDomainMgrCreate: additional manager key accepted (%s)\n",
-                  IPAddressToString(d->domainProp.mgrAddKey));
-
+  if (!d->domainProp.keys) {
+    appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
+    for(i=0;i<d->domainProp.IFCount;i++)
+      appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
+    appParams->managerKeyCount=d->domainProp.IFCount+1;
+  } else {
+    appParams->managerKeyCount=i=0;
+    while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
+      printf("a");
+      ORTESleepMs(100);
+      appParams->managerKeyList[appParams->managerKeyCount++]=
+          StringToIPAddress(sbuff);
+    }
+    
   }
   d->appParams=appParams;
   //insert object, doesn't need to be locked
   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
-  d->objectEntryOID->private=ORTE_TRUE;
+  d->objectEntryOID->privateCreated=ORTE_TRUE;
 
   //CST objects
   //  writerApplicationSelf (WAS)
@@ -208,48 +216,27 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
       OID_WRITE_APPSELF,&cstWriterParams,NULL);
   //  add to WAS remote writer(s)
-  if (d->domainProp.mgrs) {
-    int8_t *cp=d->domainProp.mgrs;
-    while (cp[0]!=0) {  //till is length>0
-#ifndef __RTL__
-      struct hostent *hostname; 
-#endif
-      int8_t         *dcp,tcp; 
-      dcp=strchr(cp,':');
-      if (!dcp) dcp=cp+strlen(cp);
-      tcp=*dcp;  //save ending value
-      *dcp=0;    //temporary end of string
-#ifdef __RTL__
-      if (1) {
-        GUID_RTPS guid;
-        IPAddress ipAddress=StringToIPAddress(cp);
-#else
-      if ((hostname=gethostbyname(cp))) {
-        GUID_RTPS guid;
-        IPAddress ipAddress=ntohl(*((long*)(hostname->h_addr_list[0])));
-#endif
-        guid.hid=ipAddress;
-        guid.aid=AID_UNKNOWN;
-        guid.oid=OID_APP;
-        if (!objectEntryFind(d,&guid)) {
-          appParams=(AppParams*)MALLOC(sizeof(AppParams));
-          AppParamsInit(appParams);
-          appParams->hostId=guid.hid;
-          appParams->appId=guid.aid;
-          appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
-          appParams->userdataUnicastPort=0;  //Manager support only metatraffic
-          appParams->unicastIPAddressList[0]=ipAddress;
-          appParams->unicastIPAddressCount=1;
-          objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
-          CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
-              OID_READ_MGR);
-          debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
-                      IPAddressToString(ipAddress));
-        }
-      }
-      *dcp=tcp;                   //restore value
-      if (dcp[0]!=0) cp=dcp+1;    //next value
-      else cp=dcp;
+  i=0;
+  while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
+    GUID_RTPS guid;
+    IPAddress ipAddress=StringToIPAddress(sbuff);
+    guid.hid=ipAddress;
+    guid.aid=AID_UNKNOWN;
+    guid.oid=OID_APP;
+    if (!objectEntryFind(d,&guid)) {
+      appParams=(AppParams*)MALLOC(sizeof(AppParams));
+      AppParamsInit(appParams);
+      appParams->hostId=guid.hid;
+      appParams->appId=guid.aid;
+      appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
+      appParams->userdataUnicastPort=0;  //Manager support only metatraffic
+      appParams->unicastIPAddressList[0]=ipAddress;
+      appParams->unicastIPAddressCount=1;
+      objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
+      CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
+          OID_READ_MGR);
+      debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
+                  IPAddressToString(ipAddress,sIPAddress));
     }
   }
   //  readerManagers
@@ -288,15 +275,11 @@ ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
       OID_WRITE_MGR,&cstWriterParams,NULL);
   
   //add csChange for WAS
-  appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE);
+  appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
   
   //Start threads
-  pthread_mutex_lock(&d->objectEntry.htimSendMutex);
-  pthread_create(&d->taskRecvMetatraffic.thread, NULL,
-      (void*)&ORTEAppRecvMetatrafficThread, (void *)d); 
-  if (startSendingThread) {
-    pthread_create(&d->taskSend.thread, NULL,
-        (void*)&ORTEAppSendThread, (void *)d); 
+  if (!suspended) {
+    ORTEDomainStart(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
   }
 
   debug(29,10) ("ORTEDomainMgrCreate: finished\n");
@@ -308,14 +291,23 @@ Boolean
 ORTEDomainMgrDestroy(ORTEDomain *d) {
 
   debug(29,10) ("ORTEDomainMgrDestroy: start\n");
+  pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
+  pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
+  appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
+  pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+  pthread_rwlock_unlock(&d->objectEntry.objRootLock);
   //Stoping threads
-  d->taskRecvMetatraffic.terminate=ORTE_TRUE;
-  d->taskSend.terminate=ORTE_TRUE;
-  ORTEDomainWakeUpReceivingThread(d,
-      &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
-  pthread_join(d->taskRecvMetatraffic.thread,NULL); 
-  ORTEDomainWakeUpSendingThread(&d->objectEntry); 
-  pthread_join(d->taskSend.thread,NULL); 
+  if(!d->taskRecvMetatraffic.terminate) {
+    d->taskRecvMetatraffic.terminate=ORTE_TRUE;
+    ORTEDomainWakeUpReceivingThread(d,
+        &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
+    pthread_join(d->taskRecvMetatraffic.thread,NULL); 
+  }
+  if (!d->taskSend.terminate) {
+    d->taskSend.terminate=ORTE_TRUE;
+    ORTEDomainWakeUpSendingThread(&d->objectEntry); 
+    pthread_join(d->taskSend.thread,NULL); 
+  }
   debug(29,8) ("ORTEDomainMgrDestroy: threads stoped and destroyed\n");
 
   objectEntryDump(&d->objectEntry);  
@@ -324,14 +316,14 @@ ORTEDomainMgrDestroy(ORTEDomain *d) {
   sock_cleanup(&d->taskRecvMetatraffic.sock);
   sock_cleanup(&d->taskSend.sock);
 
-  //Mutex(es)
-  pthread_mutex_destroy(&d->objectEntry.htimSendMutex); 
+  //Signals
+  pthread_cond_destroy(&d->objectEntry.htimSendCond);
+  pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
   
   //rwLocks
   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
 
-
   //CSTReaders and CSTWriters
   CSTReaderDelete(d,&d->readerManagers);
   CSTReaderDelete(d,&d->readerApplications);