]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppRecvThread.c
changed name to Open Real-time Ethernet, some source header arranging
[orte.git] / orte / liborte / ORTEAppRecvThread.c
1 /*
2  *  $Id: ORTEAppRecvThread.c,v 0.0.0.1  2005/01/3 
3  *
4  *  DEBUG:  section 22                  Receiving thread
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30
31  */ 
32
33 #include "orte_all.h"
34
35 /* ThreadKind to String */
36 char* TK2S(TaskProp *tp) 
37 {
38   ORTEDomain            *d=tp->d;
39
40   if (tp==&d->taskRecvUnicastMetatraffic) 
41     return "UM";
42
43   if (tp==&d->taskRecvMulticastMetatraffic) 
44     return "MM";
45
46   if (tp==&d->taskRecvUnicastUserdata) 
47     return "UU";
48
49   if (tp==&d->taskRecvMulticastUserdata) 
50     return "MU";
51
52   return "";
53 }
54
55 /*****************************************************************************/
56 void ORTEAppRecvThread(TaskProp *tp) {
57   struct sockaddr_in    des;
58   uint32_t              RTPS_Codec_len;
59   MessageInterpret      mi; 
60   CDR_Codec             *cdrCodec=&tp->mb.cdrCodec;
61   ORTEDomain            *d=tp->d;
62   unsigned int          rptr;
63
64   debug(22,10) ("ORTEAppRecvThread %s: start\n",TK2S(tp));
65
66   while (!tp->terminate) {
67     debug(22,7) ("ORTEAppRecvThread %s: receiving\n",TK2S(tp));
68     
69     //prepare cdrCodec
70     cdrCodec->wptr=cdrCodec->rptr=0;
71     RTPS_Codec_len = sock_recvfrom(
72                   &tp->sock,                     //socked handle
73                   cdrCodec->buffer,             //buffer
74                   cdrCodec->buf_len,            //max length of message
75                   &des,sizeof(des));             //info from sending host
76
77     debug(22,7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n",TK2S(tp),RTPS_Codec_len);
78
79     //is it header of valid RTPS packet?
80     if (RTPSHeaderCheck(cdrCodec,RTPS_Codec_len,&mi)<0) {
81       debug(22,2) ("ORTEAppRecvThread: not valid RTPS header!\n");
82       continue;
83     } 
84
85     debug(22,7) ("ORTEAppRecvThread: RTPS Heard OK\n");
86     debug(22,7) ("  PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
87                     mi.sourceVersion.major,mi.sourceVersion.minor,
88                     mi.sourceVendorId.major,mi.sourceVendorId.minor,
89                     mi.sourceHostId,mi.sourceAppId);
90     do {
91       // check if length of submessage header is OK
92       if ((cdrCodec->rptr+3)<=RTPS_Codec_len) {
93         CORBA_char flags,sub_id;
94         CORBA_unsigned_short sub_len;
95
96         CDR_get_octet(cdrCodec,&sub_id);
97         CDR_get_octet(cdrCodec,&flags);
98         if (flags & 0x01)         
99            cdrCodec->data_endian=FLAG_LITTLE_ENDIAN;
100         else
101            cdrCodec->data_endian=FLAG_BIG_ENDIAN;
102         CDR_get_ushort(cdrCodec,&sub_len);
103
104         debug(22,7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
105                       TK2S(tp),sub_id,sub_len);
106
107         // check if length of submessage OK
108         if ((sub_len+cdrCodec->rptr)<=RTPS_Codec_len) {
109           pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
110           pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
111           rptr=cdrCodec->rptr;
112           switch ((SubmessageId)sub_id) {
113             case VAR:
114               RTPSVar(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
115             break;
116             case ACK:
117               RTPSAck(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
118             break;
119             case HEARTBEAT:
120               RTPSHeartBeat(d,cdrCodec,&mi);
121             break;
122             case GAP:
123               RTPSGap(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
124             break;
125             case INFO_TS:
126               RTPSInfoTS(cdrCodec,&mi);
127             break;
128             case INFO_SRC:
129               RTPSInfoSRC(cdrCodec,&mi);
130             break;
131             case INFO_REPLY:
132               RTPSInfoREPLY(cdrCodec,&mi);
133             break;
134             case INFO_DST:
135               RTPSInfoDST(cdrCodec,&mi);
136             break;
137             case PAD:
138               RTPSPad(cdrCodec,&mi);
139             break;
140             case ISSUE:
141               RTPSIssue(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
142             break;
143             default:
144               debug(22,5) ("ORTEAppRecvThread: unknown message :0x%x\n",sub_id);
145             break;
146           }  
147           cdrCodec->rptr=rptr+sub_len;
148           pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
149           pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
150         } else break;          /* submessage is too big */
151       } else break;            /* submessage is too big */
152     } while (cdrCodec->rptr<RTPS_Codec_len);
153     debug(22,7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
154   } 
155
156   debug(22,10) ("ORTEAppRecvThread %s: finished\n",TK2S(tp));
157   pthread_exit(NULL);
158 }