]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppSendThread.c
5ce3b388b4a7c00fe21af1a5b64735c2ecafe2c9
[orte.git] / orte / liborte / ORTEAppSendThread.c
1 /*
2  *  $Id: ORTEAppSendThread.c,v 0.0.0.1  2003/08/21 
3  *
4  *  DEBUG:  section 24                  Sending thread
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 void ORTESendData(ORTEDomain *d,ObjectEntryAID *objectEntryAID,Boolean meta) {
26   struct sockaddr_in  des; 
27   ObjectEntryOID      *objectEntryOID;
28   AppParams           *appParams;
29   ObjectId            oid=OID_APP;
30   int                 i;
31
32   objectEntryOID=ObjectEntryOID_find(objectEntryAID,&oid);
33   if (objectEntryOID) {
34     appParams=(AppParams*)objectEntryOID->attributes;
35     for(i=0;i<appParams->unicastIPAddressCount;i++) {
36       des.sin_family=AF_INET; 
37       des.sin_addr.s_addr = htonl(appParams->unicastIPAddressList[i]);
38       if (meta) {
39         des.sin_port = htons((uint16_t)appParams->metatrafficUnicastPort); 
40         sock_sendto (
41             &d->taskSend.sock,
42             d->mbSend.cdrStream.buffer,
43             d->mbSend.cdrStream.length,
44             &des,
45             sizeof(des)); 
46       } else {
47         des.sin_port = htons((uint16_t)appParams->userdataUnicastPort); 
48         if (d->mbSend.cdrStreamDirect)
49           sock_sendto (
50               &d->taskSend.sock,
51               d->mbSend.cdrStreamDirect->buffer,
52               d->mbSend.cdrStreamDirect->length,
53               &des,
54               sizeof(des)); 
55         else
56           sock_sendto (
57               &d->taskSend.sock,
58               d->mbSend.cdrStream.buffer,
59               d->mbSend.cdrStream.length,
60               &des,
61               sizeof(des)); 
62       }
63     }
64   }
65   //prepare buffer for next sending
66   d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;
67   d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
68   d->mbSend.needSend=ORTE_FALSE;
69   d->mbSend.containsInfoReply=ORTE_FALSE;
70 }
71
72 /*****************************************************************************/
73 void ORTEAppSendThread(ORTEDomain *d) {
74   struct timespec     wtime; 
75   NtpTime             actTime,nextExpire,whenExpire,sleepingTime;
76   int32_t             s,ms;
77
78   debug(24,10) ("ORTEAppSendThread: start\n");
79   d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
80
81   while (!d->taskSend.terminate) {
82     actTime=getActualNtpTime();
83     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
84     if (htimerRoot_next_expire(&d->objectEntry,&nextExpire)==0) {
85       NTPTIME_BUILD(whenExpire,300); //max time for sleeping (no. events)
86       NtpTimeAdd(nextExpire,actTime,whenExpire);
87     }
88     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
89     NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,nextExpire);
90     wtime.tv_nsec*=1000;  //conver to nano seconds
91     NtpTimeSub(sleepingTime,nextExpire,actTime);
92     NtpTimeDisAssembToMs(s,ms,sleepingTime);
93     if (s<0) s=ms=0;
94     debug(24,4) ("ORTEAppSendThread: sleeping for %lis %lims\n",s,ms);
95     if (!((wtime.tv_sec==0) && (wtime.tv_nsec==0))) {
96       pthread_mutex_lock(&d->objectEntry.htimSendMutex);
97       if (d->objectEntry.htimSendCondValue==0) {
98         pthread_cond_timedwait(&d->objectEntry.htimSendCond,
99                                &d->objectEntry.htimSendMutex,
100                                &wtime);
101       }
102       d->objectEntry.htimSendCondValue=0;
103       pthread_mutex_unlock(&d->objectEntry.htimSendMutex);
104     }
105     debug(24,7) ("ORTEAppSendThread: fired\n");
106     actTime=getActualNtpTime();
107     pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
108     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
109     d->objectEntry.htimNeedWakeUp=ORTE_FALSE;
110     htimerRoot_run_expired(d,&actTime);
111     d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
112     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
113     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
114   }
115   debug(24,10) ("ORTEAppSendThread: finished\n");
116   pthread_exit(NULL);
117 }
118