* $Id: ORTEPublication.c,v 0.0.0.1 2003/11/21
*
* DEBUG: section 31 Functions working over publication
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr.smolik@wo.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
}
}
//create writerPublication
+ cstWriterParams.registrationRetries=0;
+ NTPTIME_ZERO(cstWriterParams.registrationPeriod);
NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
NTPTIME_ZERO(cstWriterParams.delayResponceTime);
- cstWriterParams.refreshPeriod=iNtpTime; //cann't refresh csChange(s)
+ cstWriterParams.refreshPeriod=iNtpTime; //can't refresh csChange(s)
cstWriterParams.repeatAnnounceTime=pp->HBNornalRate;
cstWriterParams.HBMaxRetries=pp->HBMaxRetries;
cstWriterParams.fullAcknowledge=ORTE_TRUE;
parameterUpdateCSChangeFromPublication(csChange,pp);
csChange->guid=guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
debug(31,10) ("ORTEPublicationCreate: add CSChange\n");
CSTWriterAddCSChange(d,&d->writerPublications,csChange);
pthread_rwlock_unlock(&d->writerPublications.lock);
}
csChange=(CSChange*)MALLOC(sizeof(CSChange));
CSChangeAttributes_init_head(csChange);
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
csChange->guid=cstWriter->guid;
csChange->alive=ORTE_FALSE;
CSTWriterAddCSChange(cstWriter->domain,
parameterUpdateCSChangeFromPublication(csChange,pp);
csChange->guid=cstWriter->guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
CSTWriterAddCSChange(cstWriter->domain,
&cstWriter->domain->writerPublications,csChange);
pthread_rwlock_unlock(&cstWriter->lock);
NtpTimeAdd(expire,atime,cstWriter->domain->domainProp.baseProp.maxBlockTime);
NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,expire);
wtime.tv_nsec*=1000; //conver to nano seconds
- pthread_rwlock_unlock(&cstWriter->lock);
- //wait till a message will be processed
- pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
- if (cstWriter->condValueCSChangeDestroyed==0) {
- pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
- &cstWriter->mutexCSChangeDestroyed,
- &wtime);
- }
- cstWriter->condValueCSChangeDestroyed=0;
- pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
-
- pthread_rwlock_wrlock(&cstWriter->lock);
- pp=(ORTEPublProp*)cstWriter->objectEntryOID->attributes;
- if (cstWriter->csChangesCounter>=pp->sendQueueSize) {
- debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
- cstWriter->csChangesCounter);
- pthread_rwlock_unlock(&cstWriter->lock);
- return ORTE_QUEUE_FULL;
+ while(cstWriter->csChangesCounter>=pp->sendQueueSize) {
+ pthread_rwlock_unlock(&cstWriter->lock);
+ //wait till a message will be processed
+ pthread_mutex_lock(&cstWriter->mutexCSChangeDestroyed);
+ if (cstWriter->condValueCSChangeDestroyed==0) {
+ if (pthread_cond_timedwait(&cstWriter->condCSChangeDestroyed,
+ &cstWriter->mutexCSChangeDestroyed,
+ &wtime)==ETIMEDOUT) {
+ debug(31,5) ("Publication: queue level (%d), queue full!!!\n",
+ cstWriter->csChangesCounter);
+ pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+ pthread_rwlock_unlock(&cstWriter->lock);
+ return ORTE_QUEUE_FULL;
+ }
+ }
+ cstWriter->condValueCSChangeDestroyed=0;
+ pthread_mutex_unlock(&cstWriter->mutexCSChangeDestroyed);
+ pthread_rwlock_wrlock(&cstWriter->lock);
}
}
}
/*****************************************************************************/
int
-ORTEPublicationSendLocked(ORTEPublication *cstWriter) {
+ORTEPublicationSendLocked(ORTEPublication *cstWriter,
+ ORTEPublicationSendParam *psp) {
CSChange *csChange;
SequenceNumber snNext;
+ int max_size;
if (!cstWriter) return ORTE_BAD_HANDLE;
pthread_rwlock_rdlock(&cstWriter->domain->typeEntry.lock);
pthread_rwlock_wrlock(&cstWriter->domain->writerPublications.lock);
if (!CSTRemoteReader_is_empty(cstWriter)) {
+ ORTEGetMaxSizeParam gms;
+
csChange=(CSChange*)MALLOC(sizeof(CSChange));
CSChangeAttributes_init_head(csChange);
csChange->guid=cstWriter->guid;
csChange->alive=ORTE_FALSE;
- csChange->cdrStream.length=RTPS_HEADER_LENGTH+12+ //HEADER+INFO_TS+ISSUE
- +20+cstWriter->typeRegister->getMaxSize;
- csChange->cdrStream.buffer=(uint8_t*)MALLOC(csChange->cdrStream.length);
- csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer+RTPS_HEADER_LENGTH+12+20;
+ CDR_codec_init_static(&csChange->cdrCodec);
+ csChange->cdrCodec.data_endian = FLAG_ENDIANNESS;
+
+ if (psp) {
+ csChange->cdrCodec.data_endian = psp->data_endian;
+ cstWriter->objectEntryOID->instance=psp->instance;
+ }
+
+ /* determine maximal size */
+ gms.host_endian=csChange->cdrCodec.host_endian;
+ gms.data_endian=csChange->cdrCodec.data_endian;
+ gms.data=cstWriter->objectEntryOID->instance;
+ gms.max_size=cstWriter->typeRegister->maxSize;
+ gms.recv_size=-1;
+ gms.csize=0;
+ if (cstWriter->typeRegister->getMaxSize)
+ max_size=cstWriter->typeRegister->getMaxSize(&gms);
+ else
+ max_size=cstWriter->typeRegister->maxSize;
+
+ /* prepare csChange */
+ CDR_buffer_init(&csChange->cdrCodec,
+ RTPS_HEADER_LENGTH+12+20+max_size); //HEADER+INFO_TS+ISSUE
+ csChange->cdrCodec.wptr_max=
+ cstWriter->domain->domainProp.wireProp.userBytesPerPacket;
+
+ /* SN for next issue */
SeqNumberInc(snNext,cstWriter->lastSN);
- RTPSHeaderCreate(csChange->cdrStream.buffer,
- cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
- RTPSInfoTSCreate(csChange->cdrStream.buffer+RTPS_HEADER_LENGTH,
- 12,getActualNtpTime());
- RTPSIssueCreateHeader(csChange->cdrStream.buffer+
- RTPS_HEADER_LENGTH+12,20,16+cstWriter->typeRegister->getMaxSize,
+
+ /* prepare data */
+ RTPSHeaderCreate(&csChange->cdrCodec,
+ cstWriter->domain->guid.hid,cstWriter->domain->guid.aid);
+ RTPSInfoTSCreate(&csChange->cdrCodec,
+ getActualNtpTime());
+ RTPSIssueCreateHeader(&csChange->cdrCodec,16+max_size,
OID_UNKNOWN,cstWriter->guid.oid,snNext);
+
//serialization routine
if (cstWriter->typeRegister->serialize) {
cstWriter->typeRegister->serialize(
- &csChange->cdrStream,
+ &csChange->cdrCodec,
cstWriter->objectEntryOID->instance);
} else {
//no deserialization -> memcpy
- memcpy(csChange->cdrStream.bufferPtr,
- cstWriter->objectEntryOID->instance,
- cstWriter->typeRegister->getMaxSize);
- csChange->cdrStream.bufferPtr+=cstWriter->typeRegister->getMaxSize;
+ CDR_buffer_puts(&csChange->cdrCodec,
+ cstWriter->objectEntryOID->instance,max_size);
}
- csChange->cdrStream.needByteSwap=ORTE_FALSE;
- debug(31,10) ("ORTEPublicationCreate: message length:%d\n",
- cstWriter->typeRegister->getMaxSize);
+
+ debug(31,10) ("ORTEPublicationCreate: message length:%d, sn(low):%u\n",
+ max_size,snNext.low);
+
CSTWriterAddCSChange(cstWriter->domain,
cstWriter,
csChange);
/*****************************************************************************/
int
-ORTEPublicationSend(ORTEPublication *cstWriter) {
+ORTEPublicationSendEx(ORTEPublication *cstWriter,
+ ORTEPublicationSendParam *psp) {
int r;
if (!cstWriter) return ORTE_BAD_HANDLE;
//prepare sending queue
- if ((r=ORTEPublicationPrepareQueue(cstWriter))<0) return r;
+ r=ORTEPublicationPrepareQueue(cstWriter);
+ if (r<0)
+ return r;
//send
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.objRootLock);
pthread_rwlock_wrlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_wrlock(&cstWriter->lock);
- r=ORTEPublicationSendLocked(cstWriter);
+ r=ORTEPublicationSendLocked(cstWriter,psp);
pthread_rwlock_unlock(&cstWriter->lock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.htimRootLock);
pthread_rwlock_unlock(&cstWriter->domain->objectEntry.objRootLock);
return r;
}
+
+/*****************************************************************************/
+inline int
+ORTEPublicationSend(ORTEPublication *cstWriter) {
+ return ORTEPublicationSendEx(cstWriter,NULL);
+}
+
+
+/*****************************************************************************/
+inline void *
+ORTEPublicationGetInstance(ORTEPublication *cstWriter) {
+ return cstWriter->objectEntryOID->instance;
+}