2 * $Id: RTPSHeardBeat.c,v 0.0.0.1 2003/10/07
4 * DEBUG: section 48 RTPS message HeardBeat
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /**********************************************************************************/
26 RTPSHeartBeatCreate(CDR_Codec *cdrCodec,
27 SequenceNumber *fsn,SequenceNumber *lsn,
28 ObjectId roid,ObjectId woid,Boolean f_bit)
30 CDR_Endianness data_endian;
33 if (cdrCodec->buf_len<cdrCodec->wptr+28) return -1;
36 CDR_put_octet(cdrCodec,HEARTBEAT);
39 flags=cdrCodec->data_endian;
41 CDR_put_octet(cdrCodec,flags);
44 CDR_put_ushort(cdrCodec,24);
46 /* next data are sent in big endianing */
47 data_endian=cdrCodec->data_endian;
48 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
51 CDR_put_ulong(cdrCodec,roid);
54 CDR_put_ulong(cdrCodec,woid);
56 cdrCodec->data_endian=data_endian;
59 CDR_put_ulong(cdrCodec,fsn->high);
60 CDR_put_ulong(cdrCodec,fsn->low);
63 CDR_put_ulong(cdrCodec,lsn->high);
64 CDR_put_ulong(cdrCodec,lsn->low);
69 /**********************************************************************************/
71 HeartBeatProc(CSTReader *cstReader,GUID_RTPS *writerGUID,
72 SequenceNumber *fsn,SequenceNumber *lsn,char f_bit) {
73 CSTRemoteWriter *cstRemoteWriter;
75 if (!cstReader) return;
76 cstRemoteWriter=CSTRemoteWriter_find(cstReader,writerGUID);
77 if (!cstRemoteWriter) return;
79 cstRemoteWriter->firstSN=*fsn;
80 cstRemoteWriter->lastSN=*lsn;
81 cstRemoteWriter->ACKRetriesCounter=0;
83 if (SeqNumberCmp(cstRemoteWriter->sn,*lsn)>0)
84 cstRemoteWriter->sn=*lsn;
85 if (SeqNumberCmp(cstRemoteWriter->sn,*fsn)<0) {
86 if (SeqNumberCmp(*fsn,noneSN)!=0) {
87 SeqNumberDec(cstRemoteWriter->sn,*fsn);
91 if ((writerGUID->oid & 0x07) == OID_PUBLICATION) {
92 CSTReaderProcCSChangesIssue(cstRemoteWriter,ORTE_FALSE);
94 CSTReaderProcCSChanges(cstReader->domain,cstRemoteWriter);
97 if ((!f_bit) && (cstRemoteWriter->commStateACK==WAITING)) {
99 cstRemoteWriter->commStateACK=ACKPENDING;
100 if ((cstRemoteWriter->guid.oid & 0x07) == OID_PUBLICATION)
102 eventDetach(cstReader->domain,
103 cstRemoteWriter->spobject->objectEntryAID,
104 &cstRemoteWriter->repeatActiveQueryTimer,
106 eventDetach(cstReader->domain,
107 cstRemoteWriter->spobject->objectEntryAID,
108 &cstRemoteWriter->delayResponceTimer,
109 queue); //metatraffic timer
110 eventAdd(cstReader->domain,
111 cstRemoteWriter->spobject->objectEntryAID,
112 &cstRemoteWriter->delayResponceTimer,
113 queue, //metatraffic timer
114 "CSTReaderResponceTimer",
115 CSTReaderResponceTimer,
116 &cstRemoteWriter->cstReader->lock,
118 &cstRemoteWriter->cstReader->params.delayResponceTimeMin);
122 /**********************************************************************************/
124 RTPSHeartBeat(ORTEDomain *d,CDR_Codec *cdrCodec,MessageInterpret *mi) {
125 GUID_RTPS writerGUID;
127 SequenceNumber fsn,lsn;
128 CSTReader *cstReader=NULL;
129 CDR_Endianness data_endian;
133 /* restore flag possition in submessage */
137 CDR_get_octet(cdrCodec,&flags);
140 /* move reading possition to begin of submessage */
143 /* next data are sent in big endianing */
144 data_endian=cdrCodec->data_endian;
145 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
148 CDR_get_ulong(cdrCodec,&roid);
151 CDR_get_ulong(cdrCodec,&woid);
153 cdrCodec->data_endian=data_endian;
156 CDR_get_ulong(cdrCodec,&fsn.high);
157 CDR_get_ulong(cdrCodec,&fsn.low);
160 CDR_get_ulong(cdrCodec,&lsn.high);
161 CDR_get_ulong(cdrCodec,&lsn.low);
163 if (SeqNumberCmp(fsn,lsn)==1) return; // lsn<fsn -> break
164 writerGUID.hid=mi->sourceHostId;
165 writerGUID.aid=mi->sourceAppId;
168 debug(48,3) ("recv: RTPS HB%c(0x%x) from 0x%x-0x%x\n",
169 f_bit ? 'F':'f',woid,mi->sourceHostId,mi->sourceAppId);
171 if ((d->guid.aid & 0x03)==MANAGER) {
172 if ((writerGUID.oid==OID_WRITE_APPSELF) &&
173 ((writerGUID.aid & 0x03)==MANAGER)) {
174 pthread_rwlock_wrlock(&d->readerManagers.lock);
175 cstReader=&d->readerManagers;
177 if (((writerGUID.oid==OID_WRITE_APPSELF) &&
178 ((writerGUID.aid & 0x03)==MANAGEDAPPLICATION)) ||
179 ((writerGUID.oid==OID_WRITE_APP) &&
180 ((writerGUID.aid & 0x03)==MANAGER))) {
181 pthread_rwlock_wrlock(&d->readerApplications.lock);
182 cstReader=&d->readerApplications;
186 if ((d->guid.aid & 3)==MANAGEDAPPLICATION) {
187 switch (writerGUID.oid) {
189 pthread_rwlock_wrlock(&d->readerManagers.lock);
190 cstReader=&d->readerManagers;
193 pthread_rwlock_wrlock(&d->readerApplications.lock);
194 cstReader=&d->readerApplications;
197 pthread_rwlock_wrlock(&d->readerPublications.lock);
198 cstReader=&d->readerPublications;
201 pthread_rwlock_wrlock(&d->readerSubscriptions.lock);
202 cstReader=&d->readerSubscriptions;
206 if ((writerGUID.oid & 0x07) == OID_PUBLICATION) {
207 pthread_rwlock_rdlock(&d->subscriptions.lock);
208 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
209 pthread_rwlock_wrlock(&cstReader->lock);
210 HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
211 pthread_rwlock_unlock(&cstReader->lock);
213 pthread_rwlock_unlock(&d->subscriptions.lock);
218 HeartBeatProc(cstReader,&writerGUID,&fsn,&lsn,f_bit);
221 pthread_rwlock_unlock(&cstReader->lock);