]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppRecvThread.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / ORTEAppRecvThread.c
1 /*
2  *  $Id: ORTEAppRecvThread.c,v 0.0.0.1  2005/01/3
3  *
4  *  DEBUG:  section 22                  Receiving thread
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30
31  */
32
33 #include "orte_all.h"
34
35 /* ThreadKind to String */
36 char *
37 TK2S(TaskProp *tp)
38 {
39   ORTEDomain            *d = tp->d;
40
41   if (tp == &d->taskRecvUnicastMetatraffic)
42     return "UM";
43
44   if (tp == &d->taskRecvMulticastMetatraffic)
45     return "MM";
46
47   if (tp == &d->taskRecvUnicastUserdata)
48     return "UU";
49
50   if (tp == &d->taskRecvMulticastUserdata)
51     return "MU";
52
53   return "";
54 }
55
56 /*****************************************************************************/
57 void
58 ORTEAppRecvThread(TaskProp *tp)
59 {
60   struct sockaddr_in    des;
61   int32_t               RTPS_Codec_len;
62   MessageInterpret      mi;
63   CDR_Codec             *cdrCodec = &tp->mb.cdrCodec;
64   ORTEDomain            *d = tp->d;
65   unsigned int          rptr;
66
67   debug(22, 10) ("ORTEAppRecvThread %s: start\n", TK2S(tp));
68
69   while (!tp->terminate) {
70     debug(22, 7) ("ORTEAppRecvThread %s: receiving\n", TK2S(tp));
71
72     //prepare cdrCodec
73     cdrCodec->wptr = cdrCodec->rptr = 0;
74     RTPS_Codec_len = sock_recvfrom(
75       &tp->sock,                                 //socked handle
76       cdrCodec->buffer,                         //buffer
77       cdrCodec->buf_len,                        //max length of message
78       &des, sizeof(des));                        //info from sending host
79
80     if (RTPS_Codec_len == -1) {
81       perror("ORTEAppRecvThread: sock_recvfrom");
82       return;
83     }
84
85     debug(22, 7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n", TK2S(tp), RTPS_Codec_len);
86
87     //is it header of valid RTPS packet?
88     if (RTPSHeaderCheck(cdrCodec, RTPS_Codec_len, &mi) < 0) {
89       debug(22, 2) ("ORTEAppRecvThread: not valid RTPS header!\n");
90       continue;
91     }
92
93     debug(22, 7) ("ORTEAppRecvThread: RTPS Heard OK\n");
94     debug(22, 7) ("  PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
95                   mi.sourceVersion.major, mi.sourceVersion.minor,
96                   mi.sourceVendorId.major, mi.sourceVendorId.minor,
97                   mi.sourceHostId, mi.sourceAppId);
98     do {
99       // check if length of submessage header is OK
100       if ((cdrCodec->rptr+3) <= RTPS_Codec_len) {
101         CORBA_char flags, sub_id;
102         CORBA_unsigned_short sub_len;
103
104         CDR_get_octet(cdrCodec, (CORBA_octet *)&sub_id);
105         CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
106         if (flags & 0x01)
107           cdrCodec->data_endian = FLAG_LITTLE_ENDIAN;
108         else
109           cdrCodec->data_endian = FLAG_BIG_ENDIAN;
110         CDR_get_ushort(cdrCodec, &sub_len);
111
112         debug(22, 7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
113                       TK2S(tp), sub_id, sub_len);
114
115         // check if length of submessage OK
116         if ((sub_len+cdrCodec->rptr) <= RTPS_Codec_len) {
117           pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
118           pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
119           rptr = cdrCodec->rptr;
120           switch ((SubmessageId)sub_id) {
121             case VAR:
122               RTPSVar(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
123               break;
124             case ACK:
125               RTPSAck(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
126               break;
127             case HEARTBEAT:
128               RTPSHeartBeat(d, cdrCodec, &mi);
129               break;
130             case GAP:
131               RTPSGap(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
132               break;
133             case INFO_TS:
134               RTPSInfoTS(cdrCodec, &mi);
135               break;
136             case INFO_SRC:
137               RTPSInfoSRC(cdrCodec, &mi);
138               break;
139             case INFO_REPLY:
140               RTPSInfoREPLY(cdrCodec, &mi);
141               break;
142             case INFO_DST:
143               RTPSInfoDST(cdrCodec, &mi);
144               break;
145             case PAD:
146               RTPSPad(cdrCodec, &mi);
147               break;
148             case ISSUE:
149               RTPSIssue(d, cdrCodec, &mi, ntohl(des.sin_addr.s_addr));
150               break;
151             default:
152               debug(22, 5) ("ORTEAppRecvThread: unknown message :0x%x\n", sub_id);
153               break;
154           }
155           cdrCodec->rptr = rptr+sub_len;
156           pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
157           pthread_rwlock_unlock(&d->objectEntry.objRootLock);
158         } else
159           break;               /* submessage is too big */
160       } else
161         break;                 /* submessage is too big */
162     } while (cdrCodec->rptr < RTPS_Codec_len);
163     debug(22, 7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
164   }
165
166   debug(22, 10) ("ORTEAppRecvThread %s: finished\n", TK2S(tp));
167   pthread_exit(NULL);
168 }