*
*/
-#include "orte.h"
+#include "orte_all.h"
GAVL_CUST_NODE_INT_IMP(PublicationList,
PSEntry, ObjectEntryOID, GUID_RTPS,
CSChange *csChange;
TypeNode *typeNode;
+ debug(31,10) ("ORTEPublicationCreate: start\n");
cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
if (!cstWriter) return NULL;
+ debug(31,10) ("ORTEPublicationCreate: memory OK\n");
pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
pthread_rwlock_rdlock(&d->typeEntry.lock);
guid.hid=d->guid.hid;guid.aid=d->guid.aid;
guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
- memcpy(pp,&d->publPropDefault,sizeof(ORTEPublProp));
+ memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
strcpy(pp->topic,topic);
strcpy(pp->typeName,typeName);
pp->persistence=*persistence;
PID_VALUE_RELIABILITY_STRICT;
//insert object to structure objectEntry
objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
- objectEntryOID->private=ORTE_TRUE;
+ objectEntryOID->privateCreated=ORTE_TRUE;
objectEntryOID->instance=instance;
objectEntryOID->sendCallBack=sendCallBack;
objectEntryOID->callBackParam=sendCallBackParam;
csChange->guid=guid;
csChange->alive=ORTE_TRUE;
csChange->cdrStream.buffer=NULL;
+ debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
CSTWriterAddCSChange(d,&d->writerPublications,csChange);
pthread_rwlock_unlock(&d->writerPublications.lock);
pthread_rwlock_unlock(&d->publications.lock);
pthread_rwlock_unlock(&d->typeEntry.lock);
pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
pthread_rwlock_unlock(&d->objectEntry.objRootLock);
+ debug(31,10) ("ORTEPublicationCreate: finished\n");
return cstWriter;
}
ORTEPublicationDestroy(ORTEPublication *cstWriter) {
CSChange *csChange;
- if (!cstWriter) return -1;
+ if (!cstWriter) return ORTE_BAD_HANDLE;
//generate csChange for writerPublisher
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
- return 0;
+ return ORTE_OK;
}
/*****************************************************************************/
int
ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
+ if (!cstWriter) return ORTE_BAD_HANDLE;
pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
pthread_rwlock_rdlock(&cstWriter->lock);
*pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
pthread_rwlock_unlock(&cstWriter->lock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
- return 0;
+ return ORTE_OK;
}
/*****************************************************************************/
ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
CSChange *csChange;
+ if (!cstWriter) return ORTE_BAD_HANDLE;
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
CSTWriterAddCSChange(cstWriter->domain,
&cstWriter->domain->writerPublications,csChange);
pthread_rwlock_unlock(&cstWriter->lock);
- pthread_rwlock_unlock(&cstWriter->domain->publications.lock);
+ pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
- return 0;
+ return ORTE_OK;
}
/*****************************************************************************/
int
ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
unsigned int retries,unsigned int noSubscriptions) {
- return 0;
+ unsigned int rSubscriptions;
+ uint32_t sec,ms;
+
+ if (!cstWriter) return ORTE_BAD_HANDLE;
+ NtpTimeDisAssembToMs(sec,ms,wait);
+ do {
+ pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+ pthread_rwlock_rdlock(&cstWriter->lock);
+ rSubscriptions=cstWriter->cstRemoteReaderCounter;
+ pthread_rwlock_unlock(&cstWriter->lock);
+ pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+ if (rSubscriptions>=noSubscriptions)
+ return ORTE_OK;
+ ORTESleepMs(sec*1000+ms);
+ } while (retries--);
+ return ORTE_TIMEOUT;
}
/*****************************************************************************/
int
ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
- return 0;
+ CSChange *csChange;
+
+ if (!cstWriter) return ORTE_BAD_HANDLE;
+ pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+ pthread_rwlock_rdlock(&cstWriter->lock);
+ status->strict=cstWriter->strictReliableCounter;
+ status->bestEffort=cstWriter->bestEffortsCounter;
+ status->issues=0;
+ ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
+ status->issues++;
+ pthread_rwlock_unlock(&cstWriter->lock);
+ pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+ return ORTE_OK;
}
/*****************************************************************************/
ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
ORTEPublProp *pp;
- if (!cstWriter) return -1;
+ if (!cstWriter) return ORTE_BAD_HANDLE;
pthread_rwlock_wrlock(&cstWriter->lock);
pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
NtpTime expire,atime=getActualNtpTime();
struct timespec wtime;
- //Count max block time
+ //count max block time
NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
wtime.tv_nsec*=1000; //conver to nano seconds
+ pthread_rwlock_unlock(&cstWriter->lock);
+ //wait till a message will be processed
pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
- pthread_rwlock_unlock(&cstWriter->lock);
- pthread_mutex_timedlock(
- &cstWriter->mutexCSChangeDestroyed,
- &wtime);
+ if (cstWriter->condValueCSChangeDestroyed==0) {
+ pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+ &cstWriter->mutexCSChangeDestroyed,
+ &wtime);
+ }
+ cstWriter->condValueCSChangeDestroyed=0;
pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+
pthread_rwlock_wrlock(&cstWriter->lock);
pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
cstWriter->csChangesCounter);
pthread_rwlock_unlock(&cstWriter->lock);
- return -2;
+ return ORTE_QUEUE_FULL;
}
}
}
pthread_rwlock_unlock(&cstWriter->lock);
- return 0;
+ return ORTE_OK;
}
/*****************************************************************************/
CSChange *csChange;
SequenceNumber snNext;
- if (!cstWriter) return -1;
+ if (!cstWriter) return ORTE_BAD_HANDLE;
pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
if (!CSTRemoteReader_is_empty(cstWriter)) {
csChange->alive=ORTE_FALSE;
csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+ //HEADER+INFO_TS+ISSUE
+20+cstWriter->typeRegister->getMaxSize;
- csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
+ csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
SeqNumberInc(snNext,cstWriter->lastSN);
RTPSHeaderCreate(csChange->cdrStream.buffer,
csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
}
csChange->cdrStream.needByteSwap=ORTE_FALSE;
+ debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
+ cstWriter->typeRegister->getMaxSize);
CSTWriterAddCSChange(cstWriter->domain,
cstWriter,
csChange);
}
pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);
pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
- return 0;
+ return ORTE_OK;
}
/*****************************************************************************/
ORTEPublicationSend(ORTEPublication *cstWriter) {
int r;
- if (!cstWriter) return -1;
- //PrepareSendingQueue
+ if (!cstWriter) return ORTE_BAD_HANDLE;
+ //prepare sending queue
if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
//send
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);