]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSIssue.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / RTPSIssue.c
index be025cf331e69a1abacd49aa6e47572c2d3c553d..39e98f1dc6e86d6ee3dba650dc171f36089eb6c4 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: RTPSIssue.c,v 0.0.0.1          2003/12/08
  *
  *  DEBUG:  section 56                  message ISSUE
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr@smoliku.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
+
 /**********************************************************************************/
-int32_t
-RTPSIssueCreateHeader(u_int8_t *rtps_msg,u_int32_t max_msg_len,u_int32_t length,
+int
+RTPSIssueCreateHeader(CDR_Codec *cdrCodec,uint32_t length,
     ObjectId roid,ObjectId woid,SequenceNumber sn) {
+  CDR_Endianness     data_endian;
+  CORBA_octet        flags;
+  
+  if (cdrCodec->buf_len<cdrCodec->wptr+20) return -1;
+
+  /* submessage id */
+  CDR_put_octet(cdrCodec,ISSUE);
+
+  /* flags */
+  flags=cdrCodec->data_endian;
+  CDR_put_octet(cdrCodec,flags);
+
+  /* length */
+  CDR_put_ushort(cdrCodec,(CORBA_unsigned_short)length);
+
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_put_ulong(cdrCodec,roid);
+
+  /* writerObjectId */
+  CDR_put_ulong(cdrCodec,woid);
+
+  cdrCodec->data_endian=data_endian;
   
-  if (max_msg_len<20) return -1;
-  rtps_msg[0]=(u_int8_t)ISSUE;
-  rtps_msg[1]=ORTE_MY_MBO;
-  *((ParameterLength*)(rtps_msg+2))=(u_int16_t)length;
-  conv_u32(&roid,0);
-  *((ObjectId*)(rtps_msg+4))=roid;
-  conv_u32(&woid,0);
-  *((ObjectId*)(rtps_msg+8))=woid;
-  *((SequenceNumber*)(rtps_msg+12))=sn;
+  CDR_put_ulong(cdrCodec,sn.high);
+  CDR_put_ulong(cdrCodec,sn.low);
   return 0;
 }
 
 /**********************************************************************************/
 void 
-RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSIssue(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi,IPAddress senderIPAddress) {
   GUID_RTPS          guid,writerGUID;
   ObjectId           roid,woid;
-  SequenceNumber     sn,sn_tmp;   
-  int8_t             e_bit,p_bit;
-  u_int16_t          submsg_len;
+  SequenceNumber     sn,sn_tmp; 
+  CORBA_octet        flags;  
+  CORBA_unsigned_short submsg_len;
   CSTReader          *cstReader;
   CSTRemoteWriter    *cstRemoteWriter;
   CSChange           *csChange=NULL;
+  CDR_Endianness     data_endian;
+
+  /* restore flag possition in submessage */
+  cdrCodec->rptr-=3;
+
+  /* flags */
+  CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
+
+  /* submessage length */
+  CDR_get_ushort(cdrCodec,&submsg_len);
+
+  /* next data are sent in big endianing */
+  data_endian=cdrCodec->data_endian;
+  cdrCodec->data_endian=FLAG_BIG_ENDIAN;
+
+  /* readerObjectId */
+  CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&roid);
+
+  /* writerObjectId */
+  CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&woid);
+
+  cdrCodec->data_endian=data_endian;
+
+  /* sn */
+  CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.high);
+  CDR_get_ulong(cdrCodec, (CORBA_unsigned_long *)&sn.low);
+
+  /* at this moment is not supported p_bit */
+  if (flags & 0x02) return;                     /* p_bit */
 
-  e_bit=rtps_msg[1] & 0x01;
-  p_bit=(rtps_msg[1] & 0x02)>>1;
-  submsg_len=*((u_int16_t*)(rtps_msg+2));
-  conv_u16(&submsg_len,e_bit);
-  roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
-  conv_u32(&roid,0);
-  woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
-  conv_u32(&woid,0);
-  sn=*((SequenceNumber*)(rtps_msg+12));         /* sn             */
-  conv_sn(&sn,e_bit);
-  if (p_bit) return;       /* at this moment is not supported p_bit */
   writerGUID.hid=mi->sourceHostId;
   writerGUID.aid=mi->sourceAppId;
   writerGUID.oid=woid;
@@ -70,6 +117,7 @@ RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress sender
   pthread_rwlock_rdlock(&d->subscriptions.lock);
   guid=d->guid;
   guid.oid=roid;
+
   gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
     if (roid!=OID_UNKNOWN)
       cstReader=CSTReader_find(&d->subscriptions,&guid);
@@ -80,11 +128,11 @@ RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress sender
       cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
       if (cstRemoteWriter) {
         ORTEPublProp *pp,*pps;
-        pp=(ORTEPublProp*)cstRemoteWriter->objectEntryOID->attributes;
+        pp=(ORTEPublProp*)cstRemoteWriter->spobject->attributes;
         if (cstReader->cstRemoteWriterSubscribed!=NULL) {
           pps=(ORTEPublProp*)cstReader->cstRemoteWriterSubscribed->
-                            objectEntryOID->attributes;
-          if (pp->strength>pps->strength) {
+                            spobject->attributes;
+          if ((pp->strength>pps->strength) || (NtpTimeCmp(pps->persistence,zNtpTime)==0)) {
             cstReader->cstRemoteWriterSubscribed=cstRemoteWriter;
           }
         } else {
@@ -105,21 +153,24 @@ RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress sender
               cstReader,
               &pp->persistence);
         }
+
         if ((SeqNumberCmp(sn,cstRemoteWriter->sn)>0) &&   //have to be sn>writer_sn
             (CSChangeFromWriter_find(cstRemoteWriter,&sn)==NULL)) {
+
           csChange=(CSChange*)MALLOC(sizeof(CSChange));
-          csChange->cdrStream.buffer=NULL;
           csChange->guid=writerGUID;
           csChange->sn=sn;
           SEQUENCE_NUMBER_NONE(csChange->gapSN);
           CSChangeAttributes_init_head(csChange);
-          csChange->cdrStream.length=submsg_len-16;
-          csChange->cdrStream.buffer=(int8_t*)MALLOC(submsg_len-16);
-          csChange->cdrStream.bufferPtr=csChange->cdrStream.buffer;
-          csChange->cdrStream.needByteSwap=ORTE_FALSE;
-          if (e_bit ^ ORTE_MY_MBO)        
-            csChange->cdrStream.needByteSwap=ORTE_TRUE;
-          memcpy(csChange->cdrStream.buffer,rtps_msg+20,submsg_len-16);
+
+          CDR_codec_init_static(&csChange->cdrCodec);
+          CDR_buffer_init(&csChange->cdrCodec,
+                         submsg_len-16);
+         csChange->cdrCodec.data_endian=cdrCodec->data_endian;
+
+          memcpy(csChange->cdrCodec.buffer,
+                &cdrCodec->buffer[cdrCodec->rptr],submsg_len-16);
+
           if (SeqNumberCmp(sn,cstRemoteWriter->firstSN)>=0) { //sn>=firstSN
             if ((sp->reliabilityRequested & PID_VALUE_RELIABILITY_STRICT)!=0) {
               if (sp->recvQueueSize>cstRemoteWriter->csChangesCounter) {
@@ -156,7 +207,7 @@ RTPSIssue(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress sender
           } 
         }
         if (csChange) {
-          FREE(csChange->cdrStream.buffer);
+          FREE(csChange->cdrCodec.buffer);
           FREE(csChange);
         }
         CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);