]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppRecvUserdataThread.c
Added renamed_include_HEADERS
[orte.git] / orte / liborte / ORTEAppRecvUserdataThread.c
1 /*
2  *  $Id: ORTEAppRecvUserdataThread.c,v 0.0.0.1  2003/08/21 
3  *
4  *  DEBUG:  section 23                  Receiving userdata thread
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 void ORTEAppRecvUserdataThread(ORTEDomain *d) {
26   struct sockaddr_in    des;
27   uint16_t              submsg_len;
28   MessageInterpret      mi; 
29
30   debug(23,10) ("ORTEAppRecvUserdataThread: start\n");
31
32   while (!d->taskRecvUserdata.terminate) {
33     debug(23,7) ("ORTEAppRecvUserdataThread: receiving\n");
34     d->mbRecvUserdata.cdrStream.length = sock_recvfrom(
35                   &d->taskRecvUserdata.sock,     //socked handle
36                   d->mbRecvUserdata.cdrStream.buffer,//buffer
37                   d->domainProp.recvBuffSize,    //max length of message
38                   &des,sizeof(des));              //info from sending host
39     d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
40     debug(23,7) ("ORTEAppRecvUserdataThread: fired\n");
41     //is it header of valid RTPS packet?
42     if (RTPSHeaderCheck(d->mbRecvUserdata.cdrStream.buffer,
43                         d->mbRecvUserdata.cdrStream.length,&mi)==0) {
44       debug(23,7) ("ORTEAppRecvUserdataThread: RTPS Heard OK\n");
45       debug(23,7) ("  PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
46                     mi.sourceVersion.major,mi.sourceVersion.minor,
47       mi.sourceVendorId.major,mi.sourceVendorId.minor,
48       mi.sourceHostId,mi.sourceAppId);
49       d->mbRecvUserdata.cdrStream.bufferPtr+=16;
50       do {
51         // check if length of submessage header is OK 
52         if ((d->mbRecvUserdata.cdrStream.bufferPtr-
53              d->mbRecvUserdata.cdrStream.buffer+3)<=d->mbRecvUserdata.cdrStream.length) {
54           char e_bit=d->mbRecvUserdata.cdrStream.bufferPtr[1] & 1;
55           submsg_len=*(uint16_t*)&d->mbRecvUserdata.cdrStream.bufferPtr[2];
56           conv_u16(&submsg_len,e_bit);
57           // check if length of submessage OK 
58           if ((submsg_len+d->mbRecvUserdata.cdrStream.bufferPtr-
59                d->mbRecvUserdata.cdrStream.buffer+3)<=d->mbRecvUserdata.cdrStream.length) {
60             pthread_rwlock_wrlock(&d->objectEntry.objRootLock);    
61             pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
62             switch ((SubmessageId)d->mbRecvUserdata.cdrStream.bufferPtr[0]) {
63               case VAR:
64                 //can't be
65               break;
66               case ACK:
67                 RTPSAck(d,d->mbRecvUserdata.cdrStream.bufferPtr,&mi,ntohl(des.sin_addr.s_addr));
68               break;
69               case HEARTBEAT:
70                 RTPSHeardBeat(d,d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
71               break;
72               case GAP:
73                 //can't be
74               break;
75               case INFO_TS:
76                 RTPSInfoTS(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
77               break;
78               case INFO_SRC:
79                 RTPSInfoSRC(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
80               break;
81               case INFO_REPLY:
82                 RTPSInfoREPLY(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
83               break;
84               case INFO_DST:
85                 RTPSInfoDST(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
86               break;
87               case PAD:
88                 RTPSPad(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
89               break;
90               case ISSUE:
91                 RTPSIssue(d,d->mbRecvUserdata.cdrStream.bufferPtr,&mi,ntohl(des.sin_addr.s_addr));
92               break;
93               default:
94                 debug(23,5) ("ORTEAppRecvUserdataThread: unknown message :%d\n",d->mbRecvUserdata.cdrStream.bufferPtr[0]);
95               break;
96             }  
97             pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
98             pthread_rwlock_unlock(&d->objectEntry.objRootLock);    
99             d->mbRecvUserdata.cdrStream.bufferPtr+=submsg_len+4;
100           } else break;          /* submessage is too big */
101         } else break;            /* submessage is too big */
102       } while ((d->mbRecvUserdata.cdrStream.bufferPtr-
103                 d->mbRecvUserdata.cdrStream.buffer)<d->mbRecvUserdata.cdrStream.length);
104     }
105   } 
106
107   debug(23,10) ("ORTEAppRecvUserdataThread: finished\n");
108   pthread_exit(NULL);
109 }