]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEPublication.c
changed name to Open Real-time Ethernet, some source header arranging
[orte.git] / orte / liborte / ORTEPublication.c
index f074baf981013d4965f42b43f0de8d950acc866e..f37cc1bbf5634b1af83916f19c406f7a0ee91406 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: ORTEPublication.c,v 0.0.0.1      2003/11/21
  *
  *  DEBUG:  section 31                  Functions working over publication
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr.smolik@wo.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -88,9 +98,11 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
     }
   }
   //create writerPublication
+  cstWriterParams.registrationRetries=0; 
+  NTPTIME_ZERO(cstWriterParams.registrationPeriod); 
   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
-  cstWriterParams.refreshPeriod=iNtpTime;  //cann't refresh csChange(s)
+  cstWriterParams.refreshPeriod=iNtpTime;  //can't refresh csChange(s)
   cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
   cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
   cstWriterParams.fullAcknowledge=ORTE_TRUE;
@@ -104,7 +116,7 @@ ORTEPublicationCreate(ORTEDomain *d,const char *topic,const char *typeName,
   parameterUpdateCSChangeFromPublication(csChange,pp);
   csChange->guid=guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
   CSTWriterAddCSChange(d,&d->writerPublications,csChange);
   pthread_rwlock_unlock(&d->writerPublications.lock);
@@ -134,7 +146,7 @@ ORTEPublicationDestroy(ORTEPublication *cstWriter) {
   }
   csChange=(CSChange*)MALLOC(sizeof(CSChange));
   CSChangeAttributes_init_head(csChange);
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   csChange->guid=cstWriter->guid;
   csChange->alive=ORTE_FALSE;
   CSTWriterAddCSChange(cstWriter->domain,
@@ -173,7 +185,7 @@ ORTEPublicationPropertiesSet(ORTEPublication *cstWriter,ORTEPublProp *pp) {
   parameterUpdateCSChangeFromPublication(csChange,pp);
   csChange->guid=cstWriter->guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstWriter->domain,
       &cstWriter->domain->writerPublications,csChange);
   pthread_rwlock_unlock(&cstWriter->lock);
@@ -239,24 +251,24 @@ ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
       NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
       NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
       wtime.tv_nsec*=1000;  //conver to nano seconds
-      pthread_rwlock_unlock(&cstWriter->lock);    
-      //wait till a message will be processed
-      pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
-      if (cstWriter->condValueCSChangeDestroyed==0) {
-        pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
-                              &cstWriter->mutexCSChangeDestroyed,
-                              &wtime);
-      }
-      cstWriter->condValueCSChangeDestroyed=0;
-      pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
-
-      pthread_rwlock_wrlock(&cstWriter->lock);    
-      pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
-      if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
-        debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
-                      cstWriter->csChangesCounter);
-        pthread_rwlock_unlock(&cstWriter->lock);
-        return ORTE_QUEUE_FULL;
+      while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
+        pthread_rwlock_unlock(&cstWriter->lock);    
+        //wait till a message will be processed
+        pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
+        if (cstWriter->condValueCSChangeDestroyed==0) {
+          if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+                                &cstWriter->mutexCSChangeDestroyed,
+                                &wtime)==ETIMEDOUT) {
+            debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
+                          cstWriter->csChangesCounter);
+            pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+            pthread_rwlock_unlock(&cstWriter->lock);
+            return ORTE_QUEUE_FULL;
+         }
+        }
+        cstWriter->condValueCSChangeDestroyed=0;
+        pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+        pthread_rwlock_wrlock(&cstWriter->lock);    
       }
     }
   }
@@ -266,45 +278,73 @@ ORTEPublicationPrepareQueue(ORTEPublication *cstWriter) {
 
 /*****************************************************************************/
 int
-ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
+ORTEPublicationSendLocked(ORTEPublication *cstWriter,
+    ORTEPublicationSendParam *psp) {
   CSChange              *csChange;
   SequenceNumber        snNext;
+  int                  max_size;
   
   if (!cstWriter) return ORTE_BAD_HANDLE;
   pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);    
   pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
   if (!CSTRemoteReader_is_empty(cstWriter)) {
+    ORTEGetMaxSizeParam gms;
+
     csChange=(CSChange*)MALLOC(sizeof(CSChange));
     CSChangeAttributes_init_head(csChange);
     csChange->guid=cstWriter->guid;
     csChange->alive=ORTE_FALSE;
-    csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+     //HEADER+INFO_TS+ISSUE
-                              +20+cstWriter->typeRegister->getMaxSize;
-    csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
-    csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
+    CDR_codec_init_static(&csChange->cdrCodec);
+    csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
+
+    if (psp) {
+      csChange->cdrCodec.data_endian = psp->data_endian;
+      cstWriter->objectEntryOID->instance=psp->instance;
+    }
+
+    /* determine maximal size */
+    gms.host_endian=csChange->cdrCodec.host_endian;
+    gms.data_endian=csChange->cdrCodec.data_endian;
+    gms.data=cstWriter->objectEntryOID->instance;
+    gms.max_size=cstWriter->typeRegister->maxSize;
+    gms.recv_size=-1;
+    gms.csize=0;
+    if (cstWriter->typeRegister->getMaxSize)
+      max_size=cstWriter->typeRegister->getMaxSize(&gms);
+    else
+      max_size=cstWriter->typeRegister->maxSize;
+    
+    /* prepare csChange */
+    CDR_buffer_init(&csChange->cdrCodec,
+                   RTPS_HEADER_LENGTH+12+20+max_size);     //HEADER+INFO_TS+ISSUE
+    csChange->cdrCodec.wptr_max=
+       cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
+
+    /* SN for next issue */
     SeqNumberInc(snNext,cstWriter->lastSN);
-    RTPSHeaderCreate(csChange->cdrStream.buffer,
-                    cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
-    RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
-                    12,getActualNtpTime());
-    RTPSIssueCreateHeader(csChange->cdrStream.buffer+
-                    RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
+
+    /* prepare data */
+    RTPSHeaderCreate(&csChange->cdrCodec,
+                     cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
+    RTPSInfoTSCreate(&csChange->cdrCodec,
+                     getActualNtpTime());
+    RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
                     OID_UNKNOWN,cstWriter->guid.oid,snNext);
+
     //serialization routine
     if (cstWriter->typeRegister->serialize) {
       cstWriter->typeRegister->serialize(
-          &csChange->cdrStream,
+          &csChange->cdrCodec,
           cstWriter->objectEntryOID->instance);
     } else {
       //no deserialization -> memcpy
-      memcpy(csChange->cdrStream.bufferPtr,
-            cstWriter->objectEntryOID->instance,
-            cstWriter->typeRegister->getMaxSize);
-      csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
+      CDR_buffer_puts(&csChange->cdrCodec,
+                     cstWriter->objectEntryOID->instance,max_size);
     }
-    csChange->cdrStream.needByteSwap=ORTE_FALSE;
-    debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
-                   cstWriter->typeRegister->getMaxSize);
+
+    debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
+                   max_size,snNext.low);
+                 
     CSTWriterAddCSChange(cstWriter->domain,
                         cstWriter,
                         csChange);
@@ -316,19 +356,35 @@ ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
 
 /*****************************************************************************/
 int
-ORTEPublicationSend(ORTEPublication *cstWriter) {
+ORTEPublicationSendEx(ORTEPublication *cstWriter,
+    ORTEPublicationSendParam *psp) {
   int             r;
 
   if (!cstWriter) return ORTE_BAD_HANDLE;
   //prepare sending queue
-  if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
+  r=ORTEPublicationPrepareQueue(cstWriter);
+  if (r<0) 
+    return r;
   //send
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
   pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_wrlock(&cstWriter->lock);
-  r=ORTEPublicationSendLocked(cstWriter);
+  r=ORTEPublicationSendLocked(cstWriter,psp);
   pthread_rwlock_unlock(&cstWriter->lock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
   pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
   return r;
 }
+
+/*****************************************************************************/
+inline int
+ORTEPublicationSend(ORTEPublication *cstWriter) {
+  return ORTEPublicationSendEx(cstWriter,NULL);
+}
+
+
+/*****************************************************************************/
+inline void *
+ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
+  return cstWriter->objectEntryOID->instance;
+}