]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppRecvThread.c
1225ef516e9b528eecdebacbd817216b56002f75
[orte.git] / orte / liborte / ORTEAppRecvThread.c
1 /*
2  *  $Id: ORTEAppRecvThread.c,v 0.0.0.1  2005/01/3 
3  *
4  *  DEBUG:  section 22                  Receiving thread
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20
21  */ 
22
23 #include "orte_all.h"
24
25 /* ThreadKind to String */
26 char* TK2S(TaskProp *tp) 
27 {
28   ORTEDomain            *d=tp->d;
29
30   if (tp==&d->taskRecvUnicastMetatraffic) 
31     return "UM";
32
33   if (tp==&d->taskRecvMulticastMetatraffic) 
34     return "MM";
35
36   if (tp==&d->taskRecvUnicastUserdata) 
37     return "UU";
38
39   if (tp==&d->taskRecvMulticastUserdata) 
40     return "MU";
41
42   return "";
43 }
44
45 /*****************************************************************************/
46 void ORTEAppRecvThread(TaskProp *tp) {
47   struct sockaddr_in    des;
48   uint32_t              RTPS_Codec_len;
49   MessageInterpret      mi; 
50   CDR_Codec             *cdrCodec=&tp->mb.cdrCodec;
51   ORTEDomain            *d=tp->d;
52   unsigned int          rptr;
53
54   debug(22,10) ("ORTEAppRecvThread %s: start\n",TK2S(tp));
55
56   while (!tp->terminate) {
57     debug(22,7) ("ORTEAppRecvThread %s: receiving\n",TK2S(tp));
58     
59     //prepare cdrCodec
60     cdrCodec->wptr=cdrCodec->rptr=0;
61     RTPS_Codec_len = sock_recvfrom(
62                   &tp->sock,                     //socked handle
63                   cdrCodec->buffer,             //buffer
64                   cdrCodec->buf_len,            //max length of message
65                   &des,sizeof(des));             //info from sending host
66
67     debug(22,7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n",TK2S(tp),RTPS_Codec_len);
68
69     //is it header of valid RTPS packet?
70     if (RTPSHeaderCheck(cdrCodec,RTPS_Codec_len,&mi)<0) {
71       debug(22,2) ("ORTEAppRecvThread: not valid RTPS header!\n");
72       continue;
73     } 
74
75     debug(22,7) ("ORTEAppRecvThread: RTPS Heard OK\n");
76     debug(22,7) ("  PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
77                     mi.sourceVersion.major,mi.sourceVersion.minor,
78                     mi.sourceVendorId.major,mi.sourceVendorId.minor,
79                     mi.sourceHostId,mi.sourceAppId);
80     do {
81       // check if length of submessage header is OK
82       if ((cdrCodec->rptr+3)<=RTPS_Codec_len) {
83         CORBA_char flags,sub_id;
84         CORBA_unsigned_short sub_len;
85
86         CDR_get_octet(cdrCodec,&sub_id);
87         CDR_get_octet(cdrCodec,&flags);
88         if (flags & 0x01)         
89            cdrCodec->data_endian=FLAG_LITTLE_ENDIAN;
90         else
91            cdrCodec->data_endian=FLAG_BIG_ENDIAN;
92         CDR_get_ushort(cdrCodec,&sub_len);
93
94         debug(22,7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
95                       TK2S(tp),sub_id,sub_len);
96
97         // check if length of submessage OK
98         if ((sub_len+cdrCodec->rptr)<=RTPS_Codec_len) {
99           pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
100           pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
101           rptr=cdrCodec->rptr;
102           switch ((SubmessageId)sub_id) {
103             case VAR:
104               RTPSVar(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
105             break;
106             case ACK:
107               RTPSAck(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
108             break;
109             case HEARTBEAT:
110               RTPSHeartBeat(d,cdrCodec,&mi);
111             break;
112             case GAP:
113               RTPSGap(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
114             break;
115             case INFO_TS:
116               RTPSInfoTS(cdrCodec,&mi);
117             break;
118             case INFO_SRC:
119               RTPSInfoSRC(cdrCodec,&mi);
120             break;
121             case INFO_REPLY:
122               RTPSInfoREPLY(cdrCodec,&mi);
123             break;
124             case INFO_DST:
125               RTPSInfoDST(cdrCodec,&mi);
126             break;
127             case PAD:
128               RTPSPad(cdrCodec,&mi);
129             break;
130             case ISSUE:
131               RTPSIssue(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
132             break;
133             default:
134               debug(22,5) ("ORTEAppRecvThread: unknown message :0x%x\n",sub_id);
135             break;
136           }  
137           cdrCodec->rptr=rptr+sub_len;
138           pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
139           pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
140         } else break;          /* submessage is too big */
141       } else break;            /* submessage is too big */
142     } while (cdrCodec->rptr<RTPS_Codec_len);
143     debug(22,7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
144   } 
145
146   debug(22,10) ("ORTEAppRecvThread %s: finished\n",TK2S(tp));
147   pthread_exit(NULL);
148 }