]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppRecvThread.c
eb0b2cf6eb520e41e8a9e9f956e7dcd3949f47f5
[orte.git] / orte / liborte / ORTEAppRecvThread.c
1 /*
2  *  $Id: ORTEAppRecvThread.c,v 0.0.0.1  2005/01/3 
3  *
4  *  DEBUG:  section 22                  Receiving thread
5  *
6  *  -------------------------------------------------------------------  
7  *                                ORTE                                 
8  *                      Open Real-Time Ethernet                       
9  *                                                                    
10  *                      Copyright (C) 2001-2006                       
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic  
12  *                      http://dce.felk.cvut.cz                       
13  *                      http://www.ocera.org                          
14  *                                                                    
15  *  Author:              Petr Smolik    petr.smolik@wo.cz             
16  *  Advisor:             Pavel Pisa                                   
17  *  Project Responsible: Zdenek Hanzalek                              
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *  
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *  
30
31  */ 
32
33 #include "orte_all.h"
34
35 /* ThreadKind to String */
36 char* TK2S(TaskProp *tp) 
37 {
38   ORTEDomain            *d=tp->d;
39
40   if (tp==&d->taskRecvUnicastMetatraffic) 
41     return "UM";
42
43   if (tp==&d->taskRecvMulticastMetatraffic) 
44     return "MM";
45
46   if (tp==&d->taskRecvUnicastUserdata) 
47     return "UU";
48
49   if (tp==&d->taskRecvMulticastUserdata) 
50     return "MU";
51
52   return "";
53 }
54
55 /*****************************************************************************/
56 void ORTEAppRecvThread(TaskProp *tp) {
57   struct sockaddr_in    des;
58   int32_t               RTPS_Codec_len;
59   MessageInterpret      mi; 
60   CDR_Codec             *cdrCodec=&tp->mb.cdrCodec;
61   ORTEDomain            *d=tp->d;
62   unsigned int          rptr;
63
64   debug(22,10) ("ORTEAppRecvThread %s: start\n",TK2S(tp));
65
66   while (!tp->terminate) {
67     debug(22,7) ("ORTEAppRecvThread %s: receiving\n",TK2S(tp));
68     
69     //prepare cdrCodec
70     cdrCodec->wptr=cdrCodec->rptr=0;
71     RTPS_Codec_len = sock_recvfrom(
72                   &tp->sock,                     //socked handle
73                   cdrCodec->buffer,             //buffer
74                   cdrCodec->buf_len,            //max length of message
75                   &des,sizeof(des));             //info from sending host
76
77     if (RTPS_Codec_len == -1) {
78             perror("ORTEAppRecvThread: sock_recvfrom");
79             return;
80     }
81
82     debug(22,7) ("ORTEAppRecvThread %s: fired, msg_len: 0x%x\n",TK2S(tp),RTPS_Codec_len);
83
84     //is it header of valid RTPS packet?
85     if (RTPSHeaderCheck(cdrCodec,RTPS_Codec_len,&mi)<0) {
86       debug(22,2) ("ORTEAppRecvThread: not valid RTPS header!\n");
87       continue;
88     } 
89
90     debug(22,7) ("ORTEAppRecvThread: RTPS Heard OK\n");
91     debug(22,7) ("  PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
92                     mi.sourceVersion.major,mi.sourceVersion.minor,
93                     mi.sourceVendorId.major,mi.sourceVendorId.minor,
94                     mi.sourceHostId,mi.sourceAppId);
95     do {
96       // check if length of submessage header is OK
97       if ((cdrCodec->rptr+3)<=RTPS_Codec_len) {
98         CORBA_char flags,sub_id;
99         CORBA_unsigned_short sub_len;
100
101         CDR_get_octet(cdrCodec, (CORBA_octet *)&sub_id);
102         CDR_get_octet(cdrCodec, (CORBA_octet *)&flags);
103         if (flags & 0x01)         
104            cdrCodec->data_endian=FLAG_LITTLE_ENDIAN;
105         else
106            cdrCodec->data_endian=FLAG_BIG_ENDIAN;
107         CDR_get_ushort(cdrCodec,&sub_len);
108
109         debug(22,7) ("ORTEAppRecvThread %s: sub_id: 0x%x, sub_len 0x%x: %u\n",
110                       TK2S(tp),sub_id,sub_len);
111
112         // check if length of submessage OK
113         if ((sub_len+cdrCodec->rptr)<=RTPS_Codec_len) {
114           pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
115           pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
116           rptr=cdrCodec->rptr;
117           switch ((SubmessageId)sub_id) {
118             case VAR:
119               RTPSVar(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
120             break;
121             case ACK:
122               RTPSAck(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
123             break;
124             case HEARTBEAT:
125               RTPSHeartBeat(d,cdrCodec,&mi);
126             break;
127             case GAP:
128               RTPSGap(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
129             break;
130             case INFO_TS:
131               RTPSInfoTS(cdrCodec,&mi);
132             break;
133             case INFO_SRC:
134               RTPSInfoSRC(cdrCodec,&mi);
135             break;
136             case INFO_REPLY:
137               RTPSInfoREPLY(cdrCodec,&mi);
138             break;
139             case INFO_DST:
140               RTPSInfoDST(cdrCodec,&mi);
141             break;
142             case PAD:
143               RTPSPad(cdrCodec,&mi);
144             break;
145             case ISSUE:
146               RTPSIssue(d,cdrCodec,&mi,ntohl(des.sin_addr.s_addr));
147             break;
148             default:
149               debug(22,5) ("ORTEAppRecvThread: unknown message :0x%x\n",sub_id);
150             break;
151           }  
152           cdrCodec->rptr=rptr+sub_len;
153           pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
154           pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
155         } else break;          /* submessage is too big */
156       } else break;            /* submessage is too big */
157     } while (cdrCodec->rptr<RTPS_Codec_len);
158     debug(22,7) ("ORTEAppRecvThread: processing of message(s) finnished\n");
159   } 
160
161   debug(22,10) ("ORTEAppRecvThread %s: finished\n",TK2S(tp));
162   pthread_exit(NULL);
163 }