]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSHeardBeat.c
31098016ffa889d494d444e2226e02b7e93cda1c
[orte.git] / orte / liborte / RTPSHeardBeat.c
1 /*
2  *  $Id: RTPSHeardBeat.c,v 0.0.0.1      2003/10/07 
3  *
4  *  DEBUG:  section 48                  RTPS message HeardBeat
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /**********************************************************************************/
25 int 
26 RTPSHeardBeatCreate(u_int8_t *rtps_msg,u_int32_t max_msg_len,
27     SequenceNumber *firstSeqNumber,SequenceNumber *lastSeqNumber,
28     ObjectId woid,ObjectId roid,Boolean f_bit) {
29   if (max_msg_len<28) return -1;
30   rtps_msg[0]=(u_int8_t)HEARTBEAT;
31   rtps_msg[1]=ORTE_MY_MBO;
32   if (f_bit) rtps_msg[1]|=2;
33   *((ParameterLength*)(rtps_msg+2))=24;
34   conv_u32(&roid,0);
35   *((ObjectId*)(rtps_msg+4))=roid;
36   conv_u32(&woid,0);
37   *((ObjectId*)(rtps_msg+8))=woid;
38   *((SequenceNumber*)(rtps_msg+12))=*firstSeqNumber;
39   *((SequenceNumber*)(rtps_msg+20))=*lastSeqNumber;
40   return 28;
41 }
42
43 /**********************************************************************************/
44 void 
45 RTPSHeardBeat(ORTEDomain *d,u_int8_t *rtps_msg,MessageInterpret *mi) {
46   GUID_RTPS          writerGUID;
47   ObjectId               roid,woid;
48   SequenceNumber     fsn,lsn;
49   int8_t             e_bit,f_bit;
50   CSTReader          *cstReader=NULL;
51   CSTRemoteWriter    *cstRemoteWriter=NULL;
52
53   e_bit=rtps_msg[1] & 0x01;
54   f_bit=(rtps_msg[1] & 0x02)>>1;
55   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
56   conv_u32(&roid,0);
57   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
58   conv_u32(&woid,0);
59   fsn=*((SequenceNumber*)(rtps_msg+12));        /* firstSeqNumber */
60   conv_sn(&fsn,e_bit);
61   lsn=*((SequenceNumber*)(rtps_msg+20));        /* lastSeqNumber  */
62   conv_sn(&lsn,e_bit);
63   if (SeqNumberCmp(fsn,lsn)==1) return;         // lsn<fsn -> break
64   writerGUID.hid=mi->sourceHostId;
65   writerGUID.aid=mi->sourceAppId;
66   writerGUID.oid=woid;
67
68   debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
69                 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
70
71   if ((d->guid.aid & 0x03)==MANAGER) {
72     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
73         ((writerGUID.aid & 0x03)==MANAGER)) {
74       pthread_rwlock_wrlock(&d->readerManagers.lock);
75       cstReader=&d->readerManagers;
76     }
77     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
78          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
79         ((writerGUID.oid==OID_WRITE_APP) &&
80          ((writerGUID.aid & 0x03)==MANAGER))) {
81       pthread_rwlock_wrlock(&d->readerApplications.lock);
82       cstReader=&d->readerApplications;
83     }
84   }
85   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
86     switch (writerGUID.oid) {
87       case OID_WRITE_MGR:
88         pthread_rwlock_wrlock(&d->readerManagers.lock);
89         cstReader=&d->readerManagers;
90         break;
91       case OID_WRITE_APP:
92         pthread_rwlock_wrlock(&d->readerApplications.lock);
93         cstReader=&d->readerApplications;
94         break;
95       case OID_WRITE_PUBL:
96         pthread_rwlock_wrlock(&d->readerPublications.lock);
97         cstReader=&d->readerPublications;
98         break;
99       case OID_WRITE_SUBS:
100         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
101         cstReader=&d->readerSubscriptions;
102         break;
103     }
104   }  
105   if (!cstReader) return;
106   cstRemoteWriter=CSTRemoteWriter_find(cstReader,&writerGUID);
107   if (!cstRemoteWriter) {
108     pthread_rwlock_unlock(&cstReader->lock);
109     return;
110   }
111   cstRemoteWriter->firstSN=fsn;
112   cstRemoteWriter->lastSN=lsn;
113   cstRemoteWriter->ACKRetriesCounter=0;
114   if (SeqNumberCmp(cstRemoteWriter->sn,lsn)>0)
115     cstRemoteWriter->sn=lsn;
116   if (SeqNumberCmp(cstRemoteWriter->sn,fsn)<0) {
117     if (SeqNumberCmp(fsn,noneSN)!=0) {
118       SeqNumberDec(cstRemoteWriter->sn,fsn);
119     }
120   }
121   CSTReaderProcCSChanges(d,cstRemoteWriter);
122   if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
123     cstRemoteWriter->commStateACK=ACKPENDING;
124     eventDetach(d,
125         cstRemoteWriter->objectEntryOID->objectEntryAID,
126         &cstRemoteWriter->repeatActiveQueryTimer,
127         1); 
128     eventDetach(d,
129         cstRemoteWriter->objectEntryOID->objectEntryAID,
130         &cstRemoteWriter->delayResponceTimer,
131         1);   //metatraffic timer
132     eventAdd(d,
133         cstRemoteWriter->objectEntryOID->objectEntryAID,
134         &cstRemoteWriter->delayResponceTimer,
135         1,   //metatraffic timer
136         "CSTReaderResponceTimer",
137         CSTReaderResponceTimer,
138         &cstRemoteWriter->cstReader->lock,
139         cstRemoteWriter,
140         &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
141   }
142   pthread_rwlock_unlock(&cstReader->lock);
143