/*****************************************************************************/
ORTEDomain *
ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
- ORTEDomainAppEvents *events,Boolean startSendingThread) {
+ ORTEDomainAppEvents *events,Boolean suspended) {
ORTEDomain *d;
ObjectEntryOID *objectEntryOID;
AppParams *appParams;
CSTWriterParams cstWriterParams;
CSTReaderParams cstReaderParams;
char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+ char sbuff[128];
int i;
- u_int16_t port=0;
+ uint16_t port=0;
debug(29,10) ("ORTEDomainMgrCreate: start\n");
//Create domainApplication
if (!d) return NULL; //no memory
//initialization local values
d->domain=domain;
- d->taskRecvMetatraffic.terminate=ORTE_FALSE;
- d->taskSend.terminate=ORTE_FALSE;
+ d->taskRecvMetatraffic.terminate=ORTE_TRUE;
+ d->taskSend.terminate=ORTE_TRUE;
d->taskRecvMetatraffic.sock.port=0;
d->taskSend.sock.port=0;
//init structure objectEntry
pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
htimerRoot_init_queue(&d->objectEntry);
pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
- pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
+ pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
+ pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
+ d->objectEntry.htimSendCondValue=0;
//create domainProp
if (prop!=NULL) {
iflocal[0]=0;
if (d->domainProp.IFCount) {
for(i=0;i<d->domainProp.IFCount;i++)
- strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress));
+ strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
debug(29,2) ("ORTEDomainMgrCreate: localIPAddres(es) %s\n",iflocal);
} else{
- debug(29,2) ("ORTEDomainMgrCreate: no activ interface card\n");
+ debug(29,2) ("ORTEDomainMgrCreate: no active interface card\n");
}
//DomainEvents
//local buffers
d->mbRecvMetatraffic.cdrStream.buffer=
- (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
+ (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
d->mbRecvUserdata.cdrStream.buffer=NULL;
d->mbSend.cdrStream.buffer=
- (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
+ (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
if ((!d->mbRecvMetatraffic.cdrStream.buffer) ||
(!d->mbSend.cdrStream.buffer)) { //no memory
FREE(d->mbRecvMetatraffic.cdrStream.buffer);
if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
(void *) &mreq, sizeof(mreq))>=0) {
debug(29,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
- IPAddressToString(d->domainProp.multicast.ipAddress));
+ IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
}
}
if ((d->taskRecvMetatraffic.sock.fd<0) || (d->taskSend.sock.fd<0) ||
}
}
//managerKeyList
- appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
- for(i=0;i<d->domainProp.IFCount;i++)
- appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
- appParams->managerKeyCount=d->domainProp.IFCount+1;
- if (d->domainProp.mgrAddKey!=0) {
- appParams->managerKeyList[appParams->managerKeyCount]=d->domainProp.mgrAddKey;
- appParams->managerKeyCount++;
- debug(29,4) ("ORTEDomainMgrCreate: additional manager key accepted (%s)\n",
- IPAddressToString(d->domainProp.mgrAddKey));
-
+ if (!d->domainProp.keys) {
+ appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
+ for(i=0;i<d->domainProp.IFCount;i++)
+ appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
+ appParams->managerKeyCount=d->domainProp.IFCount+1;
+ } else {
+ appParams->managerKeyCount=i=0;
+ while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
+ printf("a");
+ ORTESleepMs(100);
+ appParams->managerKeyList[appParams->managerKeyCount++]=
+ StringToIPAddress(sbuff);
+ }
+
}
d->appParams=appParams;
//insert object, doesn't need to be locked
d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
- d->objectEntryOID->private=ORTE_TRUE;
+ d->objectEntryOID->privateCreated=ORTE_TRUE;
//CST objects
// writerApplicationSelf (WAS)
CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
OID_WRITE_APPSELF,&cstWriterParams,NULL);
// add to WAS remote writer(s)
- if (d->domainProp.mgrs) {
- int8_t *cp=d->domainProp.mgrs;
- while (cp[0]!=0) { //till is length>0
-#ifndef __RTL__
- struct hostent *hostname;
-#endif
- int8_t *dcp,tcp;
- dcp=strchr(cp,':');
- if (!dcp) dcp=cp+strlen(cp);
- tcp=*dcp; //save ending value
- *dcp=0; //temporary end of string
-#ifdef __RTL__
- if (1) {
- GUID_RTPS guid;
- IPAddress ipAddress=StringToIPAddress(cp);
-#else
- if ((hostname=gethostbyname(cp))) {
- GUID_RTPS guid;
- IPAddress ipAddress=ntohl(*((long*)(hostname->h_addr_list[0])));
-#endif
- guid.hid=ipAddress;
- guid.aid=AID_UNKNOWN;
- guid.oid=OID_APP;
- if (!objectEntryFind(d,&guid)) {
- appParams=(AppParams*)MALLOC(sizeof(AppParams));
- AppParamsInit(appParams);
- appParams->hostId=guid.hid;
- appParams->appId=guid.aid;
- appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
- appParams->userdataUnicastPort=0; //Manager support only metatraffic
- appParams->unicastIPAddressList[0]=ipAddress;
- appParams->unicastIPAddressCount=1;
- objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
- CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
- OID_READ_MGR);
- debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
- IPAddressToString(ipAddress));
- }
- }
- *dcp=tcp; //restore value
- if (dcp[0]!=0) cp=dcp+1; //next value
- else cp=dcp;
+ i=0;
+ while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
+ GUID_RTPS guid;
+ IPAddress ipAddress=StringToIPAddress(sbuff);
+ guid.hid=ipAddress;
+ guid.aid=AID_UNKNOWN;
+ guid.oid=OID_APP;
+ if (!objectEntryFind(d,&guid)) {
+ appParams=(AppParams*)MALLOC(sizeof(AppParams));
+ AppParamsInit(appParams);
+ appParams->hostId=guid.hid;
+ appParams->appId=guid.aid;
+ appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
+ appParams->userdataUnicastPort=0; //Manager support only metatraffic
+ appParams->unicastIPAddressList[0]=ipAddress;
+ appParams->unicastIPAddressCount=1;
+ objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
+ CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
+ OID_READ_MGR);
+ debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
+ IPAddressToString(ipAddress,sIPAddress));
}
}
// readerManagers
OID_WRITE_MGR,&cstWriterParams,NULL);
//add csChange for WAS
- appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE);
+ appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
//Start threads
- pthread_mutex_lock(&d->objectEntry.htimSendMutex);
- pthread_create(&d->taskRecvMetatraffic.thread, NULL,
- (void*)&ORTEAppRecvMetatrafficThread, (void *)d);
- if (startSendingThread) {
- pthread_create(&d->taskSend.thread, NULL,
- (void*)&ORTEAppSendThread, (void *)d);
+ if (!suspended) {
+ ORTEDomainStart(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
}
debug(29,10) ("ORTEDomainMgrCreate: finished\n");
ORTEDomainMgrDestroy(ORTEDomain *d) {
debug(29,10) ("ORTEDomainMgrDestroy: start\n");
+ pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
+ pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
+ appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
+ pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+ pthread_rwlock_unlock(&d->objectEntry.objRootLock);
//Stoping threads
- d->taskRecvMetatraffic.terminate=ORTE_TRUE;
- d->taskSend.terminate=ORTE_TRUE;
- ORTEDomainWakeUpReceivingThread(d,
- &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
- pthread_join(d->taskRecvMetatraffic.thread,NULL);
- ORTEDomainWakeUpSendingThread(&d->objectEntry);
- pthread_join(d->taskSend.thread,NULL);
+ if(!d->taskRecvMetatraffic.terminate) {
+ d->taskRecvMetatraffic.terminate=ORTE_TRUE;
+ ORTEDomainWakeUpReceivingThread(d,
+ &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
+ pthread_join(d->taskRecvMetatraffic.thread,NULL);
+ }
+ if (!d->taskSend.terminate) {
+ d->taskSend.terminate=ORTE_TRUE;
+ ORTEDomainWakeUpSendingThread(&d->objectEntry);
+ pthread_join(d->taskSend.thread,NULL);
+ }
debug(29,8) ("ORTEDomainMgrDestroy: threads stoped and destroyed\n");
objectEntryDump(&d->objectEntry);
sock_cleanup(&d->taskRecvMetatraffic.sock);
sock_cleanup(&d->taskSend.sock);
- //Mutex(es)
- pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
+ //Signals
+ pthread_cond_destroy(&d->objectEntry.htimSendCond);
+ pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
//rwLocks
pthread_rwlock_destroy(&d->objectEntry.objRootLock);
pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
-
//CSTReaders and CSTWriters
CSTReaderDelete(d,&d->readerManagers);
CSTReaderDelete(d,&d->readerApplications);