]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppSendThread.c
342904a52a5b5dfaf3b2b5b2ce5f3c5b693c26f1
[orte.git] / orte / liborte / ORTEAppSendThread.c
1 /*
2  *  $Id: ORTEAppSendThread.c,v 0.0.0.1  2003/08/21 
3  *
4  *  DEBUG:  section 24                  Sending thread
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte_all.h"
23
24 /*****************************************************************************/
25 void ORTESendData(ORTEDomain *d,ObjectEntryAID *objectEntryAID,Boolean meta) {
26   struct sockaddr_in  des; 
27   ObjectEntryOID      *object;
28   AppParams           *appParams;
29   int                 i;
30   CDR_Codec           *cdrCodec=&d->taskSend.mb.cdrCodec;
31   Port                port;
32
33   object=objectEntryAID->aobject;
34   if (object) {
35     appParams=(AppParams*)object->attributes;
36     if (!ObjectEntryMulticast_is_empty(object)) {
37       for(i=0;i<appParams->metatrafficMulticastIPAddressCount;i++) {
38         des.sin_family=AF_INET; 
39         des.sin_addr.s_addr = htonl(appParams->metatrafficMulticastIPAddressList[i]);
40         des.sin_port = htons((uint16_t)object->multicastPort); 
41         if (d->taskSend.mb.cdrCodecDirect) {
42           sock_sendto (&d->taskSend.sock,
43                        d->taskSend.mb.cdrCodecDirect->buffer,
44                        d->taskSend.mb.cdrCodecDirect->wptr,
45                        &des,
46                        sizeof(des)); 
47         } else {
48           sock_sendto (&d->taskSend.sock,
49                        cdrCodec->buffer,
50                        cdrCodec->wptr,
51                        &des,
52                        sizeof(des)); 
53         }
54       }
55     } else {
56       for(i=0;i<appParams->unicastIPAddressCount;i++) {
57         des.sin_family=AF_INET; 
58         des.sin_addr.s_addr = htonl(appParams->unicastIPAddressList[i]);
59         if (meta) {
60           port=appParams->metatrafficUnicastPort;
61         } else {
62           port=appParams->userdataUnicastPort; 
63         }
64         des.sin_port = htons((uint16_t)port); 
65         if (d->taskSend.mb.cdrCodecDirect) {
66           sock_sendto (&d->taskSend.sock,
67                        d->taskSend.mb.cdrCodecDirect->buffer,
68                        d->taskSend.mb.cdrCodecDirect->wptr,
69                        &des,
70                        sizeof(des)); 
71         } else {
72           sock_sendto (&d->taskSend.sock,
73                        cdrCodec->buffer,
74                        cdrCodec->wptr,
75                        &des,
76                        sizeof(des)); 
77         }
78       }
79     }
80   } else {
81     debug(24,1) ("ORTEAppSendThread: no aobjectEntryOID connected to objectEntryAID!\n");
82   }
83   //prepare buffer for next sending
84   cdrCodec->wptr=RTPS_HEADER_LENGTH;
85   d->taskSend.mb.needSend=ORTE_FALSE;
86   d->taskSend.mb.containsInfoReply=ORTE_FALSE;
87   d->taskSend.mb.cdrCodecDirect=NULL;
88 }
89
90 /*****************************************************************************/
91 void ORTEAppSendThread(TaskProp *tp) {
92   struct timespec     wtime; 
93   NtpTime             actTime,nextExpire,whenExpire,sleepingTime;
94   int32_t             s,ms;
95   ORTEDomain          *d=tp->d;
96
97   debug(24,10) ("ORTEAppSendThread: start\n");
98   d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
99
100   while (!tp->terminate) {
101     actTime=getActualNtpTime();
102
103     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
104     if (htimerRoot_next_expire(&d->objectEntry,&nextExpire)==0) {
105       NTPTIME_BUILD(whenExpire,300); //max time for sleeping (no. events)
106       NtpTimeAdd(nextExpire,actTime,whenExpire);
107     }
108     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
109     NtpTimeDisAssembToUs(wtime.tv_sec,wtime.tv_nsec,nextExpire);
110     wtime.tv_nsec*=1000;  //conver to nano seconds
111     NtpTimeSub(sleepingTime,nextExpire,actTime);
112     NtpTimeDisAssembToMs(s,ms,sleepingTime);
113     if (s<0) s=ms=0;
114
115     debug(24,4) ("ORTEAppSendThread: sleeping for %lis %lims\n",s,ms);
116     if (!((wtime.tv_sec==0) && (wtime.tv_nsec==0))) {
117       pthread_mutex_lock(&d->objectEntry.htimSendMutex);
118       if (d->objectEntry.htimSendCondValue==0) {
119         pthread_cond_timedwait(&d->objectEntry.htimSendCond,
120                                &d->objectEntry.htimSendMutex,
121                                &wtime);
122       }
123       d->objectEntry.htimSendCondValue=0;
124       pthread_mutex_unlock(&d->objectEntry.htimSendMutex);
125     }
126
127     debug(24,7) ("ORTEAppSendThread: fired\n");
128     actTime=getActualNtpTime();
129     pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
130     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
131     d->objectEntry.htimNeedWakeUp=ORTE_FALSE;
132
133     htimerRoot_run_expired(d,&actTime);
134
135     d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
136     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
137     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
138   }
139   debug(24,10) ("ORTEAppSendThread: finished\n");
140   pthread_exit(NULL);
141 }
142