]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/RTPSAck.c
version 0.2.2 (mac, solaris patch)
[orte.git] / orte / liborte / RTPSAck.c
index 2608f39c99be1a7e336f315584da10435b0962a9..a6d6133cc434a3a207a247684e1b12700c001e6f 100644 (file)
 
 /**********************************************************************************/
 int32_t 
-RTPSAckCreate(u_int8_t *rtps_msg,u_int32_t max_msg_len,
+RTPSAckCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
     SequenceNumber *seqNumber,
     ObjectId roid,ObjectId woid,Boolean f_bit) {
   SequenceNumber        sn_tmp;                    
                     
   if (max_msg_len<28) return -1;
-  rtps_msg[0]=(u_int8_t)ACK;
+  rtps_msg[0]=(uint8_t)ACK;
   rtps_msg[1]=ORTE_MY_MBO;
   if (f_bit) rtps_msg[1]|=2;
   *((ParameterLength*)(rtps_msg+2))=24;
@@ -39,14 +39,14 @@ RTPSAckCreate(u_int8_t *rtps_msg,u_int32_t max_msg_len,
   *((ObjectId*)(rtps_msg+8))=woid;
   SeqNumberInc(sn_tmp,*seqNumber);
   *((SequenceNumber*)(rtps_msg+12))=sn_tmp;
-  *((u_int32_t*)(rtps_msg+20))=32;
-  *((u_int32_t*)(rtps_msg+24))=0;
+  *((uint32_t*)(rtps_msg+20))=32;
+  *((uint32_t*)(rtps_msg+24))=0;
   return 28;
 } 
 
 /**********************************************************************************/
 void 
-RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
+RTPSAck(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIPAddress) {
   GUID_RTPS          readerGUID;
   CSTWriter          *cstWriter=NULL;
   CSTRemoteReader    *cstRemoteReader;
@@ -54,7 +54,8 @@ RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIP
   StateMachineSend   stateMachineSendNew;
   ObjectId              roid,woid;
   SequenceNumber     sn;   
-  int8_t             e_bit,f_bit;
+  char               e_bit,f_bit;
+  char               queue=1;
 
   e_bit=rtps_msg[1] & 0x01;
   f_bit=(rtps_msg[1] & 0x02)>>1;
@@ -108,24 +109,53 @@ RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIP
         pthread_rwlock_wrlock(&d->writerSubscriptions.lock);
         cstWriter=&d->writerSubscriptions;
         break;
+      default:
+        if ((woid & 0x07) == OID_PUBLICATION) {
+          GUID_RTPS  guid=d->guid;
+          guid.oid=woid;
+          pthread_rwlock_rdlock(&d->publications.lock);
+          cstWriter=CSTWriter_find(&d->publications,&guid);
+          pthread_rwlock_wrlock(&cstWriter->lock);
+          queue=2;
+        }
+        break;
     }
   }
-  if (!cstWriter) return;
+  if (!cstWriter) {
+    if ((woid & 0x07) == OID_PUBLICATION) 
+      pthread_rwlock_unlock(&d->publications.lock);
+    return;
+  }
   cstRemoteReader=CSTRemoteReader_find(cstWriter,&readerGUID);
   if (!cstRemoteReader) {
     pthread_rwlock_unlock(&cstWriter->lock);
+    if ((woid & 0x07) == OID_PUBLICATION) 
+      pthread_rwlock_unlock(&d->publications.lock);
     return;
   }
   stateMachineSendNew=NOTHNIGTOSEND;
-  gavl_cust_for_each(CSChangeForReader,cstRemoteReader,csChangeForReader) {
+  csChangeForReader=CSChangeForReader_first(cstRemoteReader);
+  while(csChangeForReader) {
     if (SeqNumberCmp(csChangeForReader->csChange->sn,sn)<0)   {   //ACK
       if (csChangeForReader->commStateChFReader!=ACKNOWLEDGED) {
+        CSChangeForReader *csChangeForReaderDestroyed;
+        csChangeForReaderDestroyed=csChangeForReader;
         csChangeForReader->commStateChFReader=ACKNOWLEDGED;
-//        csChangeForReader->csChange->acknowledgedCounter++;
-      }    
+        csChangeForReader=
+          CSChangeForReader_next(cstRemoteReader,csChangeForReader);
+        if ((woid & 0x07) == OID_PUBLICATION) {
+          CSTWriterDestroyCSChangeForReader(cstRemoteReader,
+            csChangeForReaderDestroyed,ORTE_TRUE);
+        }
+      } else {
+        csChangeForReader=
+          CSChangeForReader_next(cstRemoteReader,csChangeForReader);
+      }
     } else {                                                      //NACK
       csChangeForReader->commStateChFReader=TOSEND;
       stateMachineSendNew=MUSTSENDDATA;
+      csChangeForReader=
+        CSChangeForReader_next(cstRemoteReader,csChangeForReader);
     }
   }
   if ((cstRemoteReader->commStateSend==NOTHNIGTOSEND) && 
@@ -134,41 +164,74 @@ RTPSAck(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi,IPAddress senderIP
     eventDetach(d,
         cstRemoteReader->objectEntryOID->objectEntryAID,
         &cstRemoteReader->delayResponceTimer,
-        1);
-    eventAdd(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
-        &cstRemoteReader->delayResponceTimer,
-        1,   //metatraffic timer
-        "CSTWriterSendTimer",
-        CSTWriterSendTimer,
-        &cstRemoteReader->cstWriter->lock,
-        cstRemoteReader,
-        &cstRemoteReader->cstWriter->params.delayResponceTime);               
+        queue);
+    if (queue==1) {
+      eventAdd(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          queue,   //metatraffic timer
+          "CSTWriterSendTimer",
+          CSTWriterSendTimer,
+          &cstRemoteReader->cstWriter->lock,
+          cstRemoteReader,
+          &cstRemoteReader->cstWriter->params.delayResponceTime);               
+    } else {
+      eventAdd(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          queue,   //userdata timer
+          "CSTWriterSendStrictTimer",
+          CSTWriterSendStrictTimer,
+          &cstRemoteReader->cstWriter->lock,
+          cstRemoteReader,
+          &cstRemoteReader->cstWriter->params.delayResponceTime);               
+    }
   } 
-  if ((cstRemoteReader->commStateSend==MUSTSENDDATA) && 
-      (stateMachineSendNew==NOTHNIGTOSEND)) {
-    cstRemoteReader->commStateSend=stateMachineSendNew;
-    eventDetach(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
-        &cstRemoteReader->delayResponceTimer,
-        1);
+  if (stateMachineSendNew==NOTHNIGTOSEND) {
+    cstRemoteReader->commStateSend=NOTHNIGTOSEND;
+    if (queue==1) {
+      eventDetach(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          queue);
+    } else {
+      eventDetach(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->repeatAnnounceTimer,
+          queue);
+    }
   }  
   if ((!f_bit) && (cstRemoteReader->commStateSend==NOTHNIGTOSEND)) {
+    cstRemoteReader->commStateHB=MUSTSENDHB;
     eventDetach(d,
         cstRemoteReader->objectEntryOID->objectEntryAID,
         &cstRemoteReader->delayResponceTimer,
-        1);
-    eventAdd(d,
-        cstRemoteReader->objectEntryOID->objectEntryAID,
-        &cstRemoteReader->delayResponceTimer,
-        1,   //metatraffic timer
-        "CSTWriterSendTimer",
-        CSTWriterSendTimer,
-        &cstRemoteReader->cstWriter->lock,
-        cstRemoteReader,
-        &cstRemoteReader->cstWriter->params.delayResponceTime);               
+        queue);
+    if (queue==1) {
+      eventAdd(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          queue,   //metatraffic timer
+          "CSTWriterSendTimer",
+          CSTWriterSendTimer,
+          &cstRemoteReader->cstWriter->lock,
+          cstRemoteReader,
+          &cstRemoteReader->cstWriter->params.delayResponceTime);               
+    } else {
+      eventAdd(d,
+          cstRemoteReader->objectEntryOID->objectEntryAID,
+          &cstRemoteReader->delayResponceTimer,
+          queue,   //userdata timer
+          "CSTWriterSendStrictTimer",
+          CSTWriterSendStrictTimer,
+          &cstRemoteReader->cstWriter->lock,
+          cstRemoteReader,
+          &cstRemoteReader->cstWriter->params.delayResponceTime);               
+    }
   } 
   pthread_rwlock_unlock(&cstWriter->lock);
+  if ((woid & 0x07) == OID_PUBLICATION) 
+    pthread_rwlock_unlock(&d->publications.lock);
 }