ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
unsigned int retries,unsigned int noSubscriptions) {
unsigned int rSubscriptions;
- u_int32_t sec,ms;
+ uint32_t sec,ms;
if (!cstWriter) return ORTE_BAD_HANDLE;
NtpTimeDisAssembToMs(sec,ms,wait);
if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
NtpTime expire,atime=getActualNtpTime();
struct timespec wtime;
- //Count max block time
+ //count max block time
NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
wtime.tv_nsec*=1000; //conver to nano seconds
pthread_rwlock_unlock(&cstWriter->lock);
- sem_timedwait(
- &cstWriter->semCSChangeDestroyed,
- &wtime);
+ //wait till a message will be processed
+ pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
+ if (cstWriter->condValueCSChangeDestroyed==0) {
+ pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+ &cstWriter->mutexCSChangeDestroyed,
+ &wtime);
+ }
+ cstWriter->condValueCSChangeDestroyed=0;
+ pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+
pthread_rwlock_wrlock(&cstWriter->lock);
pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
csChange->alive=ORTE_FALSE;
csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+ //HEADER+INFO_TS+ISSUE
+20+cstWriter->typeRegister->getMaxSize;
- csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
+ csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
SeqNumberInc(snNext,cstWriter->lastSN);
RTPSHeaderCreate(csChange->cdrStream.buffer,