]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEPublication.c
new version 0.2.3
[orte.git] / orte / liborte / ORTEPublication.c
index 577b48c1cfa62a1602d53ca4c57f4cae5e284177..f074baf981013d4965f42b43f0de8d950acc866e 100644 (file)
@@ -19,7 +19,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(PublicationList, 
                        PSEntry, ObjectEntryOID, GUID_RTPS,
@@ -39,8 +39,10 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
   CSChange              *csChange;
   TypeNode              *typeNode;
   
+  debug(31,10) ("ORTEPublicationCreate: start\n");
   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
   if (!cstWriter) return NULL;
+  debug(31,10) ("ORTEPublicationCreate: memory OK\n");
   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
   pthread_rwlock_rdlock(&d->typeEntry.lock);    
@@ -58,7 +60,7 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
-  memcpy(pp,&d->publPropDefault,sizeof(ORTEPublProp));
+  memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
   strcpy(pp->topic,topic);
   strcpy(pp->typeName,typeName);
   pp->persistence=*persistence;
@@ -67,7 +69,7 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
                          PID_VALUE_RELIABILITY_STRICT;
   //insert object to structure objectEntry
   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
-  objectEntryOID->private=ORTE_TRUE;
+  objectEntryOID->privateCreated=ORTE_TRUE;
   objectEntryOID->instance=instance;
   objectEntryOID->sendCallBack=sendCallBack;
   objectEntryOID->callBackParam=sendCallBackParam;
@@ -103,12 +105,14 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
   csChange->guid=guid;
   csChange->alive=ORTE_TRUE;
   csChange->cdrStream.buffer=NULL;
+  debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
   pthread_rwlock_unlock(&d->writerPublications.lock);
   pthread_rwlock_unlock(&d->publications.lock);
   pthread_rwlock_unlock(&d->typeEntry.lock);    
   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
+  debug(31,10) ("ORTEPublicationCreate: finished\n");
   return cstWriter;
 }
 
@@ -117,7 +121,7 @@ int
 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   CSChange              *csChange;
 
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   //generate csChange for writerPublisher
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
@@ -139,19 +143,20 @@ ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 
 /*****************************************************************************/
 int
 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_rdlock(&cstWriter->lock);
   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
   pthread_rwlock_unlock(&cstWriter->lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -159,6 +164,7 @@ int
 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
   CSChange              *csChange;
 
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
@@ -171,23 +177,50 @@ ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
   CSTWriterAddCSChange(cstWriter->domain,
       &cstWriter->domain->writerPublications,csChange);
   pthread_rwlock_unlock(&cstWriter->lock);
-  pthread_rwlock_unlock(&cstWriter->domain->publications.lock);
+  pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
 int
 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
     unsigned int retries,unsigned int noSubscriptions) {
-  return 0;
+  unsigned int rSubscriptions;
+  uint32_t sec,ms;
+
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  NtpTimeDisAssembToMs(sec,ms,wait);
+  do {
+    pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+    pthread_rwlock_rdlock(&cstWriter->lock);
+    rSubscriptions=cstWriter->cstRemoteReaderCounter;
+    pthread_rwlock_unlock(&cstWriter->lock);
+    pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+    if (rSubscriptions>=noSubscriptions)
+      return ORTE_OK;
+    ORTESleepMs(sec*1000+ms);
+  } while (retries--);
+  return ORTE_TIMEOUT;  
 }
 
 /*****************************************************************************/
 int
 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
-  return 0;
+  CSChange *csChange;
+
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+  pthread_rwlock_rdlock(&cstWriter->lock);
+  status->strict=cstWriter->strictReliableCounter;
+  status->bestEffort=cstWriter->bestEffortsCounter;
+  status->issues=0;
+  ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
+    status->issues++;
+  pthread_rwlock_unlock(&cstWriter->lock);
+  pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -195,35 +228,40 @@ int
 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
   ORTEPublProp          *pp;
   
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_wrlock(&cstWriter->lock);
   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
       NtpTime             expire,atime=getActualNtpTime();
       struct timespec     wtime; 
-      //Count max block time
+      //count max block time
       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
       wtime.tv_nsec*=1000;  //conver to nano seconds
+      pthread_rwlock_unlock(&cstWriter->lock);    
+      //wait till a message will be processed
       pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
-      pthread_rwlock_unlock(&cstWriter->lock);
-      pthread_mutex_timedlock(
-          &cstWriter->mutexCSChangeDestroyed,
-          &wtime);
+      if (cstWriter->condValueCSChangeDestroyed==0) {
+        pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+                              &cstWriter->mutexCSChangeDestroyed,
+                              &wtime);
+      }
+      cstWriter->condValueCSChangeDestroyed=0;
       pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+
       pthread_rwlock_wrlock(&cstWriter->lock);    
       pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
       if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
         debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
                       cstWriter->csChangesCounter);
         pthread_rwlock_unlock(&cstWriter->lock);
-        return -2;
+        return ORTE_QUEUE_FULL;
       }
     }
   }
   pthread_rwlock_unlock(&cstWriter->lock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -232,7 +270,7 @@ ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
   CSChange              *csChange;
   SequenceNumber        snNext;
   
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
   if (!CSTRemoteReader_is_empty(cstWriter)) {
@@ -242,7 +280,7 @@ ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
     csChange->alive=ORTE_FALSE;
     csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
                               +20+cstWriter->typeRegister->getMaxSize;
-    csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
+    csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
     csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
     SeqNumberInc(snNext,cstWriter->lastSN);
     RTPSHeaderCreate(csChange->cdrStream.buffer,
@@ -265,13 +303,15 @@ ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
       csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
     }
     csChange->cdrStream.needByteSwap=ORTE_FALSE;
+    debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
+                   cstWriter->typeRegister->getMaxSize);
     CSTWriterAddCSChange(cstWriter->domain,
                         cstWriter,
                         csChange);
   }
   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -279,8 +319,8 @@ int
 ORTEPublicationSend(ORTEPublication *cstWriter) {
   int             r;
 
-  if (!cstWriter) return -1;
-  //PrepareSendingQueue
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  //prepare sending queue
   if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
   //send
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);