* $Id: RTPSIssue.c,v 0.0.0.1 2003/12/08
*
* DEBUG: section 56 message ISSUE
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr@smoliku.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
*
*/
-#include "orte.h"
+#include "orte_all.h"
+
/**********************************************************************************/
-int32_t
-RTPSIssueCreateHeader(u_int8_t *rtps_msg,u_int32_t max_msg_len,u_int32_t length,
+int
+RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
ObjectId roid,ObjectId woid,SequenceNumber sn) {
+ CDR_Endianness data_endian;
+ CORBA_octet flags;
+
+ if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
+
+ /* submessage id */
+ CDR_put_octet(cdrCodec,ISSUE);
+
+ /* flags */
+ flags=cdrCodec->data_endian;
+ CDR_put_octet(cdrCodec,flags);
+
+ /* length */
+ CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)length);
+
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_put_ulong(cdrCodec,roid);
+
+ /* writerObjectId */
+ CDR_put_ulong(cdrCodec,woid);
+
+ cdrCodec->data_endian=data_endian;
- if (max_msg_len<20) return -1;
- rtps_msg[0]=(u_int8_t)ISSUE;
- rtps_msg[1]=ORTE_MY_MBO;
- *((ParameterLength*)(rtps_msg+2))=(u_int16_t)length;
- conv_u32(&roid,0);
- *((ObjectId*)(rtps_msg+4))=roid;
- conv_u32(&woid,0);
- *((ObjectId*)(rtps_msg+8))=woid;
- *((SequenceNumber*)(rtps_msg+12))=sn;
+ CDR_put_ulong(cdrCodec,sn.high);
+ CDR_put_ulong(cdrCodec,sn.low);
return 0;
}
/**********************************************************************************/
void
-RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
GUID_RTPS guid,writerGUID;
ObjectId roid,woid;
- SequenceNumber sn,sn_tmp;
- int8_t e_bit,p_bit;
- u_int16_t submsg_len;
+ SequenceNumber sn,sn_tmp;
+ CORBA_octet flags;
+ CORBA_unsigned_short submsg_len;
CSTReader *cstReader;
CSTRemoteWriter *cstRemoteWriter;
CSChange *csChange=NULL;
+ CDR_Endianness data_endian;
+
+ /* restore flag possition in submessage */
+ cdrCodec->rptr-=3;
+
+ /* flags */
+ CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
+
+ /* submessage length */
+ CDR_get_ushort(cdrCodec,&submsg_len);
+
+ /* next data are sent in big endianing */
+ data_endian=cdrCodec->data_endian;
+ cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+ /* readerObjectId */
+ CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
+
+ /* writerObjectId */
+ CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
+
+ cdrCodec->data_endian=data_endian;
+
+ /* sn */
+ CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
+ CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
+
+ /* at this moment is not supported p_bit */
+ if (flags & 0x02) return; /* p_bit */
- e_bit=rtps_msg[1] & 0x01;
- p_bit=(rtps_msg[1] & 0x02)>>1;
- submsg_len=*((u_int16_t*)(rtps_msg+2));
- conv_u16(&submsg_len,e_bit);
- roid=*((ObjectId*)(rtps_msg+4)); /* readerObjectId */
- conv_u32(&roid,0);
- woid=*((ObjectId*)(rtps_msg+8)); /* writerObjectId */
- conv_u32(&woid,0);
- sn=*((SequenceNumber*)(rtps_msg+12)); /* sn */
- conv_sn(&sn,e_bit);
- if (p_bit) return; /* at this moment is not supported p_bit */
writerGUID.hid=mi->sourceHostId;
writerGUID.aid=mi->sourceAppId;
writerGUID.oid=woid;
pthread_rwlock_rdlock(&d->subscriptions.lock);
guid=d->guid;
guid.oid=roid;
+
gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
if (roid!=OID_UNKNOWN)
cstReader=CSTReader_find(&d->subscriptions,&guid);
cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
if (cstRemoteWriter) {
ORTEPublProp *pp,*pps;
- pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
+ pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
if (cstReader->cstRemoteWriterSubscribed!=NULL) {
pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
- objectEntryOID->attributes;
- if (pp->strength>pps->strength) {
+ spobject->attributes;
+ if ((pp->strength>pps->strength) || (NtpTimeCmp(pps->persistence,zNtpTime)==0)) {
cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
}
} else {
cstReader,
&pp->persistence);
}
+
if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) && //have to be sn>writer_sn
(CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
+
csChange=(CSChange*)MALLOC(sizeof(CSChange));
- csChange->cdrStream.buffer=NULL;
csChange->guid=writerGUID;
csChange->sn=sn;
SEQUENCE_NUMBER_NONE(csChange->gapSN);
CSChangeAttributes_init_head(csChange);
- csChange->cdrStream.length=submsg_len-16;
- csChange->cdrStream.buffer=(int8_t*)MALLOC(submsg_len-16);
- csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer;
- csChange->cdrStream.needByteSwap=ORTE_FALSE;
- if (e_bit ^ ORTE_MY_MBO)
- csChange->cdrStream.needByteSwap=ORTE_TRUE;
- memcpy(csChange->cdrStream.buffer,rtps_msg+20,submsg_len-16);
+
+ CDR_codec_init_static(&csChange->cdrCodec);
+ CDR_buffer_init(&csChange->cdrCodec,
+ submsg_len-16);
+ csChange->cdrCodec.data_endian=cdrCodec->data_endian;
+
+ memcpy(csChange->cdrCodec.buffer,
+ &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
+
if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
}
}
if (csChange) {
- FREE(csChange->cdrStream.buffer);
+ FREE(csChange->cdrCodec.buffer);
FREE(csChange);
}
CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);