]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEPublication.c
changed name to Open Real-time Ethernet, some source header arranging
[orte.git] / orte / liborte / ORTEPublication.c
index fab4ca543bc743a32975a1b29ae824c62eee3d65..f37cc1bbf5634b1af83916f19c406f7a0ee91406 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
  *
  *  DEBUG:  section 31                  Functions working over publication
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr.smolik@wo.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -19,7 +29,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(PublicationList, 
                        PSEntry, ObjectEntryOID, GUID_RTPS,
@@ -27,7 +37,7 @@ GAVL_CUST_NODE_INT_IMP(PublicationList,
 
 /*****************************************************************************/
 ORTEPublication * 
-ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
+ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
     void *instance,NtpTime *persistence,int strength,
     ORTESendCallBack sendCallBack,void *sendCallBackParam,
     NtpTime *sendCallBackDelay) {
@@ -39,8 +49,10 @@ ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
   CSChange              *csChange;
   TypeNode              *typeNode;
   
+  debug(31,10) ("ORTEPublicationCreate: start\n");
   cstWriter=(CSTWriter*)MALLOC(sizeof(CSTWriter));
   if (!cstWriter) return NULL;
+  debug(31,10) ("ORTEPublicationCreate: memory OK\n");
   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
   pthread_rwlock_rdlock(&d->typeEntry.lock);    
@@ -58,7 +70,7 @@ ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
   guid.oid=(d->publications.counter<<8)|OID_PUBLICATION;
   pp=(ORTEPublProp*)MALLOC(sizeof(ORTEPublProp));
-  memcpy(pp,&d->publPropDefault,sizeof(ORTEPublProp));
+  memcpy(pp,&d->domainProp.publPropDefault,sizeof(ORTEPublProp));
   strcpy(pp->topic,topic);
   strcpy(pp->typeName,typeName);
   pp->persistence=*persistence;
@@ -67,7 +79,7 @@ ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
                          PID_VALUE_RELIABILITY_STRICT;
   //insert object to structure objectEntry
   objectEntryOID=objectEntryAdd(d,&guid,(void*)pp);
-  objectEntryOID->private=ORTE_TRUE;
+  objectEntryOID->privateCreated=ORTE_TRUE;
   objectEntryOID->instance=instance;
   objectEntryOID->sendCallBack=sendCallBack;
   objectEntryOID->callBackParam=sendCallBackParam;
@@ -86,9 +98,11 @@ ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
     }
   }
   //create writerPublication
+  cstWriterParams.registrationRetries=0; 
+  NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
-  cstWriterParams.refreshPeriod=iNtpTime;  //cann't refresh csChange(s)
+  cstWriterParams.refreshPeriod=iNtpTime;  //can't refresh csChange(s)
   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
   cstWriterParams.fullAcknowledge=ORTE_TRUE;
@@ -102,13 +116,15 @@ ORTEPublicationCreate(ORTEDomain *d,char *topic,char *typeName,
   parameterUpdateCSChangeFromPublication(csChange,pp);
   csChange->guid=guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
+  debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
   pthread_rwlock_unlock(&d->writerPublications.lock);
   pthread_rwlock_unlock(&d->publications.lock);
   pthread_rwlock_unlock(&d->typeEntry.lock);    
   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
+  debug(31,10) ("ORTEPublicationCreate: finished\n");
   return cstWriter;
 }
 
@@ -117,7 +133,7 @@ int
 ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   CSChange              *csChange;
 
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   //generate csChange for writerPublisher
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
@@ -130,7 +146,7 @@ ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   }
   csChange=(CSChange*)MALLOC(sizeof(CSChange));
   CSChangeAttributes_init_head(csChange);
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   csChange->guid=cstWriter->guid;
   csChange->alive=ORTE_FALSE;
   CSTWriterAddCSChange(cstWriter->domain,
@@ -139,19 +155,20 @@ ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 
 /*****************************************************************************/
 int
 ORTEPublicationPropertiesGet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_rdlock(&cstWriter->lock);
   *pp=*(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
   pthread_rwlock_unlock(&cstWriter->lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -159,6 +176,7 @@ int
 ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
   CSChange              *csChange;
 
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
@@ -167,27 +185,54 @@ ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
   parameterUpdateCSChangeFromPublication(csChange,pp);
   csChange->guid=cstWriter->guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstWriter->domain,
       &cstWriter->domain->writerPublications,csChange);
   pthread_rwlock_unlock(&cstWriter->lock);
-  pthread_rwlock_unlock(&cstWriter->domain->publications.lock);
+  pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
 int
 ORTEPublicationWaitForSubscriptions(ORTEPublication *cstWriter,NtpTime wait,
     unsigned int retries,unsigned int noSubscriptions) {
-  return 0;
+  unsigned int rSubscriptions;
+  uint32_t sec,ms;
+
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  NtpTimeDisAssembToMs(sec,ms,wait);
+  do {
+    pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+    pthread_rwlock_rdlock(&cstWriter->lock);
+    rSubscriptions=cstWriter->cstRemoteReaderCounter;
+    pthread_rwlock_unlock(&cstWriter->lock);
+    pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+    if (rSubscriptions>=noSubscriptions)
+      return ORTE_OK;
+    ORTESleepMs(sec*1000+ms);
+  } while (retries--);
+  return ORTE_TIMEOUT;  
 }
 
 /*****************************************************************************/
 int
 ORTEPublicationGetStatus(ORTEPublication *cstWriter,ORTEPublStatus *status) {
-  return 0;
+  CSChange *csChange;
+
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  pthread_rwlock_rdlock(&cstWriter->domain->objectEntry.objRootLock);
+  pthread_rwlock_rdlock(&cstWriter->lock);
+  status->strict=cstWriter->strictReliableCounter;
+  status->bestEffort=cstWriter->bestEffortsCounter;
+  status->issues=0;
+  ul_list_for_each(CSTWriterCSChange,cstWriter,csChange)
+    status->issues++;
+  pthread_rwlock_unlock(&cstWriter->lock);
+  pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
@@ -195,100 +240,151 @@ int
 ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
   ORTEPublProp          *pp;
   
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_wrlock(&cstWriter->lock);
   pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
   if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
     if (!CSTWriterTryDestroyBestEffortIssue(cstWriter)) {
       NtpTime             expire,atime=getActualNtpTime();
       struct timespec     wtime; 
-      //Count max block time
+      //count max block time
       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
       wtime.tv_nsec*=1000;  //conver to nano seconds
-      pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
-      pthread_rwlock_unlock(&cstWriter->lock);
-      pthread_mutex_timedlock(
-          &cstWriter->mutexCSChangeDestroyed,
-          &wtime);
-      pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
-      pthread_rwlock_wrlock(&cstWriter->lock);    
-      pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
-      if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
-        debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
-                      cstWriter->csChangesCounter);
-        pthread_rwlock_unlock(&cstWriter->lock);
-        return -2;
+      while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
+        pthread_rwlock_unlock(&cstWriter->lock);    
+        //wait till a message will be processed
+        pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
+        if (cstWriter->condValueCSChangeDestroyed==0) {
+          if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+                                &cstWriter->mutexCSChangeDestroyed,
+                                &wtime)==ETIMEDOUT) {
+            debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
+                          cstWriter->csChangesCounter);
+            pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+            pthread_rwlock_unlock(&cstWriter->lock);
+            return ORTE_QUEUE_FULL;
+         }
+        }
+        cstWriter->condValueCSChangeDestroyed=0;
+        pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+        pthread_rwlock_wrlock(&cstWriter->lock);    
       }
     }
   }
   pthread_rwlock_unlock(&cstWriter->lock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
 int
-ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
+ORTEPublicationSendLocked(ORTEPublication *cstWriter,
+    ORTEPublicationSendParam *psp) {
   CSChange              *csChange;
   SequenceNumber        snNext;
+  int                  max_size;
   
-  if (!cstWriter) return -1;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
   if (!CSTRemoteReader_is_empty(cstWriter)) {
+    ORTEGetMaxSizeParam gms;
+
     csChange=(CSChange*)MALLOC(sizeof(CSChange));
     CSChangeAttributes_init_head(csChange);
     csChange->guid=cstWriter->guid;
     csChange->alive=ORTE_FALSE;
-    csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
-                              +20+cstWriter->typeRegister->getMaxSize;
-    csChange->cdrStream.buffer=(u_int8_t*)MALLOC(csChange->cdrStream.length);
-    csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
+    CDR_codec_init_static(&csChange->cdrCodec);
+    csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
+
+    if (psp) {
+      csChange->cdrCodec.data_endian = psp->data_endian;
+      cstWriter->objectEntryOID->instance=psp->instance;
+    }
+
+    /* determine maximal size */
+    gms.host_endian=csChange->cdrCodec.host_endian;
+    gms.data_endian=csChange->cdrCodec.data_endian;
+    gms.data=cstWriter->objectEntryOID->instance;
+    gms.max_size=cstWriter->typeRegister->maxSize;
+    gms.recv_size=-1;
+    gms.csize=0;
+    if (cstWriter->typeRegister->getMaxSize)
+      max_size=cstWriter->typeRegister->getMaxSize(&gms);
+    else
+      max_size=cstWriter->typeRegister->maxSize;
+    
+    /* prepare csChange */
+    CDR_buffer_init(&csChange->cdrCodec,
+                   RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
+    csChange->cdrCodec.wptr_max=
+       cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
+
+    /* SN for next issue */
     SeqNumberInc(snNext,cstWriter->lastSN);
-    RTPSHeaderCreate(csChange->cdrStream.buffer,
-                    cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
-    RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
-                    12,getActualNtpTime());
-    RTPSIssueCreateHeader(csChange->cdrStream.buffer+
-                    RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
+
+    /* prepare data */
+    RTPSHeaderCreate(&csChange->cdrCodec,
+                     cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
+    RTPSInfoTSCreate(&csChange->cdrCodec,
+                     getActualNtpTime());
+    RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
+
     //serialization routine
     if (cstWriter->typeRegister->serialize) {
       cstWriter->typeRegister->serialize(
-          &csChange->cdrStream,
+          &csChange->cdrCodec,
           cstWriter->objectEntryOID->instance);
     } else {
       //no deserialization -> memcpy
-      memcpy(csChange->cdrStream.bufferPtr,
-            cstWriter->objectEntryOID->instance,
-            cstWriter->typeRegister->getMaxSize);
-      csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
+      CDR_buffer_puts(&csChange->cdrCodec,
+                     cstWriter->objectEntryOID->instance,max_size);
     }
-    csChange->cdrStream.needByteSwap=ORTE_FALSE;
+
+    debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
+                   max_size,snNext.low);
+                 
     CSTWriterAddCSChange(cstWriter->domain,
                         cstWriter,
                         csChange);
   }
   pthread_rwlock_unlock(&cstWriter->domain->typeEntry.lock);    
   pthread_rwlock_unlock(&cstWriter->domain->writerPublications.lock);
-  return 0;
+  return ORTE_OK;
 }
 
 /*****************************************************************************/
 int
-ORTEPublicationSend(ORTEPublication *cstWriter) {
+ORTEPublicationSendEx(ORTEPublication *cstWriter,
+    ORTEPublicationSendParam *psp) {
   int             r;
 
-  if (!cstWriter) return -1;
-  //PrepareSendingQueue
-  if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
+  if (!cstWriter) return ORTE_BAD_HANDLE;
+  //prepare sending queue
+  r=ORTEPublicationPrepareQueue(cstWriter);
+  if (r<0) 
+    return r;
   //send
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_wrlock(&cstWriter->lock);
-  r=ORTEPublicationSendLocked(cstWriter);
+  r=ORTEPublicationSendLocked(cstWriter,psp);
   pthread_rwlock_unlock(&cstWriter->lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
   return r;
 }
+
+/*****************************************************************************/
+inline int
+ORTEPublicationSend(ORTEPublication *cstWriter) {
+  return ORTEPublicationSendEx(cstWriter,NULL);
+}
+
+
+/*****************************************************************************/
+inline void *
+ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
+  return cstWriter->objectEntryOID->instance;
+}