]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEPublication.c
version 0.2.2 (mac, solaris patch)
[orte.git] / orte / liborte / ORTEPublication.c
index 98918e4968a712d55439dbf5f3fd745177c69d2e..236037f535db03b0bc819c09815994f4fbf42917 100644 (file)
@@ -188,7 +188,7 @@ int
 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
     unsigned int retries,unsigned int noSubscriptions) {
   unsigned int rSubscriptions;
-  u_int32_t sec,ms;
+  uint32_t sec,ms;
 
   if (!cstWriter) return ORTE_BAD_HANDLE;
   NtpTimeDisAssembToMs(sec,ms,wait);
@@ -235,14 +235,21 @@ ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
       NtpTime             expire,atime=getActualNtpTime();
       struct timespec     wtime; 
-      //Count max block time
+      //count max block time
       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
       wtime.tv_nsec*=1000;  //conver to nano seconds
       pthread_rwlock_unlock(&cstWriter->lock);    
-      sem_timedwait(
-          &cstWriter->semCSChangeDestroyed,
-          &wtime);
+      //wait till a message will be processed
+      pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
+      if (cstWriter->condValueCSChangeDestroyed==0) {
+        pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+                              &cstWriter->mutexCSChangeDestroyed,
+                              &wtime);
+      }
+      cstWriter->condValueCSChangeDestroyed=0;
+      pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+
       pthread_rwlock_wrlock(&cstWriter->lock);    
       pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
       if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
@@ -273,7 +280,7 @@ ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
     csChange->alive=ORTE_FALSE;
     csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
                               +20+cstWriter->typeRegister->getMaxSize;
-    csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
+    csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
     csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
     SeqNumberInc(snNext,cstWriter->lastSN);
     RTPSHeaderCreate(csChange->cdrStream.buffer,