]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/RTPSHeardBeat.c
new version 0.2.3
[orte.git] / orte / liborte / RTPSHeardBeat.c
1 /*
2  *  $Id: RTPSHeardBeat.c,v 0.0.0.1      2003/10/07 
3  *
4  *  DEBUG:  section 48                  RTPS message HeardBeat
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /**********************************************************************************/
25 int 
26 RTPSHeardBeatCreate(uint8_t *rtps_msg,uint32_t max_msg_len,
27     SequenceNumber *firstSeqNumber,SequenceNumber *lastSeqNumber,
28     ObjectId woid,ObjectId roid,Boolean f_bit) {
29   if (max_msg_len<28) return -1;
30   rtps_msg[0]=(uint8_t)HEARTBEAT;
31   rtps_msg[1]=ORTE_MY_MBO;
32   if (f_bit) rtps_msg[1]|=2;
33   *((ParameterLength*)(rtps_msg+2))=24;
34   conv_u32(&roid,0);
35   *((ObjectId*)(rtps_msg+4))=roid;
36   conv_u32(&woid,0);
37   *((ObjectId*)(rtps_msg+8))=woid;
38   *((SequenceNumber*)(rtps_msg+12))=*firstSeqNumber;
39   *((SequenceNumber*)(rtps_msg+20))=*lastSeqNumber;
40   return 28;
41 }
42
43 /**********************************************************************************/
44 void 
45 HeardBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
46     SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
47   CSTRemoteWriter    *cstRemoteWriter;
48   
49   if (!cstReader) return;
50   cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
51   if (!cstRemoteWriter) return;
52   cstRemoteWriter->firstSN=*fsn;
53   cstRemoteWriter->lastSN=*lsn;
54   cstRemoteWriter->ACKRetriesCounter=0;
55   if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
56     cstRemoteWriter->sn=*lsn;
57   if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
58     if (SeqNumberCmp(*fsn,noneSN)!=0) {
59       SeqNumberDec(cstRemoteWriter->sn,*fsn);
60     }
61   }
62   if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
63     CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
64   } else {
65     CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
66   }
67   if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
68     char queue=1;
69     cstRemoteWriter->commStateACK=ACKPENDING;
70     if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION) 
71       queue=2;
72     eventDetach(cstReader->domain,
73         cstRemoteWriter->objectEntryOID->objectEntryAID,
74         &cstRemoteWriter->repeatActiveQueryTimer,
75         queue); 
76     eventDetach(cstReader->domain,
77         cstRemoteWriter->objectEntryOID->objectEntryAID,
78         &cstRemoteWriter->delayResponceTimer,
79         queue);   //metatraffic timer
80     eventAdd(cstReader->domain,
81         cstRemoteWriter->objectEntryOID->objectEntryAID,
82         &cstRemoteWriter->delayResponceTimer,
83         queue,    //metatraffic timer
84         "CSTReaderResponceTimer",
85         CSTReaderResponceTimer,
86         &cstRemoteWriter->cstReader->lock,
87         cstRemoteWriter,
88         &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
89   }
90 }
91
92 /**********************************************************************************/
93 void 
94 RTPSHeardBeat(ORTEDomain *d,uint8_t *rtps_msg,MessageInterpret *mi) {
95   GUID_RTPS          writerGUID;
96   ObjectId               roid,woid;
97   SequenceNumber     fsn,lsn;
98   char               e_bit,f_bit;
99   CSTReader          *cstReader=NULL;
100
101   e_bit=rtps_msg[1] & 0x01;
102   f_bit=(rtps_msg[1] & 0x02)>>1;
103   roid=*((ObjectId*)(rtps_msg+4));              /* readerObjectId */
104   conv_u32(&roid,0);
105   woid=*((ObjectId*)(rtps_msg+8));              /* writerObjectId */
106   conv_u32(&woid,0);
107   fsn=*((SequenceNumber*)(rtps_msg+12));        /* firstSeqNumber */
108   conv_sn(&fsn,e_bit);
109   lsn=*((SequenceNumber*)(rtps_msg+20));        /* lastSeqNumber  */
110   conv_sn(&lsn,e_bit);
111   if (SeqNumberCmp(fsn,lsn)==1) return;         // lsn<fsn -> break
112   writerGUID.hid=mi->sourceHostId;
113   writerGUID.aid=mi->sourceAppId;
114   writerGUID.oid=woid;
115
116   debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
117                 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
118
119   if ((d->guid.aid & 0x03)==MANAGER) {
120     if ((writerGUID.oid==OID_WRITE_APPSELF) && 
121         ((writerGUID.aid & 0x03)==MANAGER)) {
122       pthread_rwlock_wrlock(&d->readerManagers.lock);
123       cstReader=&d->readerManagers;
124     }
125     if (((writerGUID.oid==OID_WRITE_APPSELF) &&
126          ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
127         ((writerGUID.oid==OID_WRITE_APP) &&
128          ((writerGUID.aid & 0x03)==MANAGER))) {
129       pthread_rwlock_wrlock(&d->readerApplications.lock);
130       cstReader=&d->readerApplications;
131     }
132   }
133   if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
134     switch (writerGUID.oid) {
135       case OID_WRITE_MGR:
136         pthread_rwlock_wrlock(&d->readerManagers.lock);
137         cstReader=&d->readerManagers;
138         break;
139       case OID_WRITE_APP:
140         pthread_rwlock_wrlock(&d->readerApplications.lock);
141         cstReader=&d->readerApplications;
142         break;
143       case OID_WRITE_PUBL:
144         pthread_rwlock_wrlock(&d->readerPublications.lock);
145         cstReader=&d->readerPublications;
146         break;
147       case OID_WRITE_SUBS:
148         pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
149         cstReader=&d->readerSubscriptions;
150         break;
151     }
152     if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
153       pthread_rwlock_rdlock(&d->subscriptions.lock);
154       gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
155         pthread_rwlock_wrlock(&cstReader->lock);
156         HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
157         pthread_rwlock_unlock(&cstReader->lock);    
158       }
159       pthread_rwlock_unlock(&d->subscriptions.lock);
160       cstReader=NULL;
161     }
162   }  
163   HeardBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
164   if (cstReader)
165     pthread_rwlock_unlock(&cstReader->lock);
166