2 * $Id: ORTEAppRecvThread.c,v 0.0.0.1 2005/01/3
4 * DEBUG: section 22 Receiving thread
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
35 /* ThreadKind to String */
39 ORTEDomain *d = tp->d;
41 if (tp == &d->taskRecvUnicastMetatraffic)
44 if (tp == &d->taskRecvMulticastMetatraffic)
47 if (tp == &d->taskRecvUnicastUserdata)
50 if (tp == &d->taskRecvMulticastUserdata)
56 /*****************************************************************************/
58 ORTEAppRecvThread(TaskProp *tp)
60 struct sockaddr_in des;
61 int32_t RTPS_Codec_len;
63 CDR_Codec *cdrCodec = &tp->mb.cdrCodec;
64 ORTEDomain *d = tp->d;
67 debug(22, 10) ("ORTEAppRecvThread %s: start\n", TK2S(tp));
69 while (!tp->terminate) {
70 debug(22, 7) ("ORTEAppRecvThread %s: receiving\n", TK2S(tp));
73 cdrCodec->wptr = cdrCodec->rptr = 0;
74 RTPS_Codec_len = sock_recvfrom(
75 &tp->sock, //socked handle
76 cdrCodec->buffer, //buffer
77 cdrCodec->buf_len, //max length of message
78 &des, sizeof(des)); //info from sending host
80 if (RTPS_Codec_len == -1) {
81 perror("ORTEAppRecvThread: sock_recvfrom");
85 debug(22, 7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n", TK2S(tp), RTPS_Codec_len);
87 //is it header of valid RTPS packet?
88 if (RTPSHeaderCheck(cdrCodec, RTPS_Codec_len, &mi) < 0) {
89 debug(22, 2) ("ORTEAppRecvThread: not valid RTPS header!\n");
93 debug(22, 7) ("ORTEAppRecvThread: RTPS Heard OK\n");
94 debug(22, 7) (" PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
95 mi.sourceVersion.major, mi.sourceVersion.minor,
96 mi.sourceVendorId.major, mi.sourceVendorId.minor,
97 mi.sourceHostId, mi.sourceAppId);
99 // check if length of submessage header is OK
100 if ((cdrCodec->rptr+3) <= RTPS_Codec_len) {
101 CORBA_char flags, sub_id;
102 CORBA_unsigned_short sub_len;
104 CDR_get_octet(cdrCodec, (CORBA_octet *)&sub_id);
105 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
107 cdrCodec->data_endian = FLAG_LITTLE_ENDIAN;
109 cdrCodec->data_endian = FLAG_BIG_ENDIAN;
110 CDR_get_ushort(cdrCodec, &sub_len);
112 debug(22, 7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
113 TK2S(tp), sub_id, sub_len);
115 // check if length of submessage OK
116 if ((sub_len+cdrCodec->rptr) <= RTPS_Codec_len) {
117 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
118 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
119 rptr = cdrCodec->rptr;
120 switch ((SubmessageId)sub_id) {
122 RTPSVar(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
125 RTPSAck(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
128 RTPSHeartBeat(d, cdrCodec, &mi);
131 RTPSGap(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
134 RTPSInfoTS(cdrCodec, &mi);
137 RTPSInfoSRC(cdrCodec, &mi);
140 RTPSInfoREPLY(cdrCodec, &mi);
143 RTPSInfoDST(cdrCodec, &mi);
146 RTPSPad(cdrCodec, &mi);
149 RTPSIssue(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
152 debug(22, 5) ("ORTEAppRecvThread: unknown message :0x%x\n", sub_id);
155 cdrCodec->rptr = rptr+sub_len;
156 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
157 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
159 break; /* submessage is too big */
161 break; /* submessage is too big */
162 } while (cdrCodec->rptr < RTPS_Codec_len);
163 debug(22, 7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
166 debug(22, 10) ("ORTEAppRecvThread %s: finished\n", TK2S(tp));