2 * $Id: ORTEAppRecvThread.c,v 0.0.0.1 2005/01/3
4 * DEBUG: section 22 Receiving thread
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
35 /* ThreadKind to String */
36 char* TK2S(TaskProp *tp)
40 if (tp==&d->taskRecvUnicastMetatraffic)
43 if (tp==&d->taskRecvMulticastMetatraffic)
46 if (tp==&d->taskRecvUnicastUserdata)
49 if (tp==&d->taskRecvMulticastUserdata)
55 /*****************************************************************************/
56 void ORTEAppRecvThread(TaskProp *tp) {
57 struct sockaddr_in des;
58 int32_t RTPS_Codec_len;
60 CDR_Codec *cdrCodec=&tp->mb.cdrCodec;
64 debug(22,10) ("ORTEAppRecvThread %s: start\n",TK2S(tp));
66 while (!tp->terminate) {
67 debug(22,7) ("ORTEAppRecvThread %s: receiving\n",TK2S(tp));
70 cdrCodec->wptr=cdrCodec->rptr=0;
71 RTPS_Codec_len = sock_recvfrom(
72 &tp->sock, //socked handle
73 cdrCodec->buffer, //buffer
74 cdrCodec->buf_len, //max length of message
75 &des,sizeof(des)); //info from sending host
77 if (RTPS_Codec_len == -1) {
78 perror("ORTEAppRecvThread: sock_recvfrom");
82 debug(22,7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n",TK2S(tp),RTPS_Codec_len);
84 //is it header of valid RTPS packet?
85 if (RTPSHeaderCheck(cdrCodec,RTPS_Codec_len,&mi)<0) {
86 debug(22,2) ("ORTEAppRecvThread: not valid RTPS header!\n");
90 debug(22,7) ("ORTEAppRecvThread: RTPS Heard OK\n");
91 debug(22,7) (" PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
92 mi.sourceVersion.major,mi.sourceVersion.minor,
93 mi.sourceVendorId.major,mi.sourceVendorId.minor,
94 mi.sourceHostId,mi.sourceAppId);
96 // check if length of submessage header is OK
97 if ((cdrCodec->rptr+3)<=RTPS_Codec_len) {
98 CORBA_char flags,sub_id;
99 CORBA_unsigned_short sub_len;
101 CDR_get_octet(cdrCodec, (CORBA_octet *)&sub_id);
102 CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
104 cdrCodec->data_endian=FLAG_LITTLE_ENDIAN;
106 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
107 CDR_get_ushort(cdrCodec,&sub_len);
109 debug(22,7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
110 TK2S(tp),sub_id,sub_len);
112 // check if length of submessage OK
113 if ((sub_len+cdrCodec->rptr)<=RTPS_Codec_len) {
114 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
115 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
117 switch ((SubmessageId)sub_id) {
119 RTPSVar(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
122 RTPSAck(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
125 RTPSHeartBeat(d,cdrCodec,&mi);
128 RTPSGap(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
131 RTPSInfoTS(cdrCodec,&mi);
134 RTPSInfoSRC(cdrCodec,&mi);
137 RTPSInfoREPLY(cdrCodec,&mi);
140 RTPSInfoDST(cdrCodec,&mi);
143 RTPSPad(cdrCodec,&mi);
146 RTPSIssue(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
149 debug(22,5) ("ORTEAppRecvThread: unknown message :0x%x\n",sub_id);
152 cdrCodec->rptr=rptr+sub_len;
153 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
154 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
155 } else break; /* submessage is too big */
156 } else break; /* submessage is too big */
157 } while (cdrCodec->rptr<RTPS_Codec_len);
158 debug(22,7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
161 debug(22,10) ("ORTEAppRecvThread %s: finished\n",TK2S(tp));