2 * $Id: ORTEAppRecvUserdataThread.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 23 Receiving userdata thread
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
25 void ORTEAppRecvUserdataThread(ORTEDomain *d) {
26 struct sockaddr_in des;
30 debug(23,10) ("ORTEAppRecvUserdataThread: start\n");
32 while (!d->taskRecvUserdata.terminate) {
33 debug(23,7) ("ORTEAppRecvUserdataThread: receiving\n");
34 d->mbRecvUserdata.cdrStream.length = sock_recvfrom(
35 &d->taskRecvUserdata.sock, //socked handle
36 d->mbRecvUserdata.cdrStream.buffer,//buffer
37 d->domainProp.recvBuffSize, //max length of message
38 &des,sizeof(des)); //info from sending host
39 d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
40 debug(23,7) ("ORTEAppRecvUserdataThread: fired\n");
41 //is it header of valid RTPS packet?
42 if (RTPSHeaderCheck(d->mbRecvUserdata.cdrStream.buffer,
43 d->mbSend.cdrStream.length,&mi)==0) {
44 debug(23,7) ("ORTEAppRecvUserdataThread: RTPS Heard OK\n");
45 debug(23,7) (" PV: %d.%d VID:%d.%d HID:0x%x AID:0x%x\n",
46 mi.sourceVersion.major,mi.sourceVersion.minor,
47 mi.sourceVendorId.major,mi.sourceVendorId.minor,
48 mi.sourceHostId,mi.sourceAppId);
49 d->mbRecvUserdata.cdrStream.bufferPtr+=16;
51 // check if length of submessage header is OK
52 if ((d->mbRecvUserdata.cdrStream.bufferPtr-
53 d->mbRecvUserdata.cdrStream.buffer+3)<=d->mbRecvUserdata.cdrStream.length) {
54 int8_t e_bit=d->mbRecvUserdata.cdrStream.bufferPtr[1] & 1;
55 submsg_len=(u_int16_t)d->mbRecvUserdata.cdrStream.bufferPtr[2];
56 conv_u16(&submsg_len,e_bit);
57 // check if length of submessage OK
58 if ((submsg_len+d->mbRecvUserdata.cdrStream.bufferPtr-
59 d->mbRecvUserdata.cdrStream.buffer+3)<=d->mbRecvUserdata.cdrStream.length) {
60 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
61 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
62 switch ((SubmessageId)d->mbRecvUserdata.cdrStream.bufferPtr[0]) {
64 // RTPSVar(d,d->taskRecvUserdata.bufferStart+msg_ptr,&mi,ntohl(des.sin_addr.s_addr));
67 // RTPSAck(d,d->taskRecvUserdata.bufferStart+msg_ptr,&mi,ntohl(des.sin_addr.s_addr));
70 // RTPSHeardBeat(d,d->taskRecvUserdata.bufferStart+msg_ptr,&mi);
73 // RTPSGap(d,d->taskRecvUserdata.bufferStart+msg_ptr,&mi,ntohl(des.sin_addr.s_addr));
76 RTPSInfoTS(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
79 RTPSInfoSRC(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
82 RTPSInfoREPLY(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
85 RTPSInfoDST(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
88 RTPSPad(d->mbRecvUserdata.cdrStream.bufferPtr,&mi);
91 RTPSIssue(d,d->mbRecvUserdata.cdrStream.bufferPtr,&mi,ntohl(des.sin_addr.s_addr));
94 debug(23,5) ("ORTEAppRecvUserdataThread: unknown message :%d\n",d->mbRecvUserdata.cdrStream.bufferPtr[0]);
97 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
98 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
99 d->mbRecvUserdata.cdrStream.bufferPtr+=submsg_len+4;
100 } else break; /* submessage is too big */
101 } else break; /* submessage is too big */
102 } while ((d->mbRecvUserdata.cdrStream.bufferPtr-
103 d->mbRecvUserdata.cdrStream.buffer)<d->mbRecvUserdata.cdrStream.length);
107 debug(23,10) ("ORTEAppRecvUserdataThread: finished\n");