2 * $Id: ORTEAppRecvThread.c,v 0.0.0.1 2005/01/3
4 * DEBUG: section 22 Receiving thread
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
25 /* ThreadKind to String */
26 char* TK2S(TaskProp *tp)
30 if (tp==&d->taskRecvUnicastMetatraffic)
33 if (tp==&d->taskRecvMulticastMetatraffic)
36 if (tp==&d->taskRecvUnicastUserdata)
39 if (tp==&d->taskRecvMulticastUserdata)
45 /*****************************************************************************/
46 void ORTEAppRecvThread(TaskProp *tp) {
47 struct sockaddr_in des;
48 uint32_t RTPS_Codec_len;
50 CDR_Codec *cdrCodec=&tp->mb.cdrCodec;
54 debug(22,10) ("ORTEAppRecvThread %s: start\n",TK2S(tp));
56 while (!tp->terminate) {
57 debug(22,7) ("ORTEAppRecvThread %s: receiving\n",TK2S(tp));
60 cdrCodec->wptr=cdrCodec->rptr=0;
61 RTPS_Codec_len = sock_recvfrom(
62 &tp->sock, //socked handle
63 cdrCodec->buffer, //buffer
64 cdrCodec->buf_len, //max length of message
65 &des,sizeof(des)); //info from sending host
67 debug(22,7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n",TK2S(tp),RTPS_Codec_len);
69 //is it header of valid RTPS packet?
70 if (RTPSHeaderCheck(cdrCodec,RTPS_Codec_len,&mi)<0) {
71 debug(22,2) ("ORTEAppRecvThread: not valid RTPS header!\n");
75 debug(22,7) ("ORTEAppRecvThread: RTPS Heard OK\n");
76 debug(22,7) (" PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
77 mi.sourceVersion.major,mi.sourceVersion.minor,
78 mi.sourceVendorId.major,mi.sourceVendorId.minor,
79 mi.sourceHostId,mi.sourceAppId);
81 // check if length of submessage header is OK
82 if ((cdrCodec->rptr+3)<=RTPS_Codec_len) {
83 CORBA_char flags,sub_id;
84 CORBA_unsigned_short sub_len;
86 CDR_get_octet(cdrCodec,&sub_id);
87 CDR_get_octet(cdrCodec,&flags);
89 cdrCodec->data_endian=FLAG_LITTLE_ENDIAN;
91 cdrCodec->data_endian=FLAG_BIG_ENDIAN;
92 CDR_get_ushort(cdrCodec,&sub_len);
94 debug(22,7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
95 TK2S(tp),sub_id,sub_len);
97 // check if length of submessage OK
98 if ((sub_len+cdrCodec->rptr)<=RTPS_Codec_len) {
99 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
100 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
102 switch ((SubmessageId)sub_id) {
104 RTPSVar(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
107 RTPSAck(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
110 RTPSHeartBeat(d,cdrCodec,&mi);
113 RTPSGap(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
116 RTPSInfoTS(cdrCodec,&mi);
119 RTPSInfoSRC(cdrCodec,&mi);
122 RTPSInfoREPLY(cdrCodec,&mi);
125 RTPSInfoDST(cdrCodec,&mi);
128 RTPSPad(cdrCodec,&mi);
131 RTPSIssue(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
134 debug(22,5) ("ORTEAppRecvThread: unknown message :0x%x\n",sub_id);
137 cdrCodec->rptr=rptr+sub_len;
138 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
139 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
140 } else break; /* submessage is too big */
141 } else break; /* submessage is too big */
142 } while (cdrCodec->rptr<RTPS_Codec_len);
143 debug(22,7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
146 debug(22,10) ("ORTEAppRecvThread %s: finished\n",TK2S(tp));