]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEAppSendThread.c
Reformat the sources with orte/uncrustify script
[orte.git] / orte / liborte / ORTEAppSendThread.c
1 /*
2  *  $Id: ORTEAppSendThread.c,v 0.0.0.1  2003/08/21
3  *
4  *  DEBUG:  section 24                  Sending thread
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 ORTESendData(ORTEDomain *d, ObjectEntryAID *objectEntryAID, Boolean meta)
37 {
38   struct sockaddr_in  des;
39   ObjectEntryOID      *object;
40   AppParams           *appParams;
41   int                 i;
42   CDR_Codec           *cdrCodec = &d->taskSend.mb.cdrCodec;
43   Port                port;
44
45   object = objectEntryAID->aobject;
46   if (object) {
47     appParams = (AppParams *)object->attributes;
48     if (!ObjectEntryMulticast_is_empty(object)) {
49       for (i = 0; i < appParams->metatrafficMulticastIPAddressCount; i++) {
50         des.sin_family = AF_INET;
51         des.sin_addr.s_addr = htonl(appParams->metatrafficMulticastIPAddressList[i]);
52         des.sin_port = htons((uint16_t)object->multicastPort);
53         if (d->taskSend.mb.cdrCodecDirect) {
54           sock_sendto(&d->taskSend.sock,
55                       d->taskSend.mb.cdrCodecDirect->buffer,
56                       d->taskSend.mb.cdrCodecDirect->wptr,
57                       &des,
58                       sizeof(des));
59         } else {
60           sock_sendto(&d->taskSend.sock,
61                       cdrCodec->buffer,
62                       cdrCodec->wptr,
63                       &des,
64                       sizeof(des));
65         }
66       }
67     } else {
68       for (i = 0; i < appParams->unicastIPAddressCount; i++) {
69         des.sin_family = AF_INET;
70         des.sin_addr.s_addr = htonl(appParams->unicastIPAddressList[i]);
71         if (meta) {
72           port = appParams->metatrafficUnicastPort;
73         } else {
74           port = appParams->userdataUnicastPort;
75         }
76         des.sin_port = htons((uint16_t)port);
77         if (d->taskSend.mb.cdrCodecDirect) {
78           sock_sendto(&d->taskSend.sock,
79                       d->taskSend.mb.cdrCodecDirect->buffer,
80                       d->taskSend.mb.cdrCodecDirect->wptr,
81                       &des,
82                       sizeof(des));
83         } else {
84           sock_sendto(&d->taskSend.sock,
85                       cdrCodec->buffer,
86                       cdrCodec->wptr,
87                       &des,
88                       sizeof(des));
89         }
90       }
91     }
92   } else {
93     debug(24, 1) ("ORTEAppSendThread: no aobjectEntryOID connected to objectEntryAID!\n");
94   }
95   //prepare buffer for next sending
96   cdrCodec->wptr = RTPS_HEADER_LENGTH;
97   d->taskSend.mb.needSend = ORTE_FALSE;
98   d->taskSend.mb.containsInfoReply = ORTE_FALSE;
99   d->taskSend.mb.cdrCodecDirect = NULL;
100 }
101
102 /*****************************************************************************/
103 void
104 ORTEAppSendThread(TaskProp *tp)
105 {
106   struct timespec     wtime;
107   NtpTime             actTime, nextExpire, whenExpire, sleepingTime;
108   int32_t             s, ms;
109   ORTEDomain          *d = tp->d;
110
111   debug(24, 10) ("ORTEAppSendThread: start\n");
112   d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
113
114   while (!tp->terminate) {
115     actTime = getActualNtpTime();
116
117     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
118     if (htimerRoot_next_expire(&d->objectEntry, &nextExpire) == 0) {
119       NTPTIME_BUILD(whenExpire, 300); //max time for sleeping (no. events)
120       NtpTimeAdd(nextExpire, actTime, whenExpire);
121     }
122     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
123     NtpTimeDisAssembToUs(wtime.tv_sec, wtime.tv_nsec, nextExpire);
124     wtime.tv_nsec *= 1000;  //conver to nano seconds
125     NtpTimeSub(sleepingTime, nextExpire, actTime);
126     NtpTimeDisAssembToMs(s, ms, sleepingTime);
127     if (s < 0)
128       s = ms = 0;
129
130     debug(24, 4) ("ORTEAppSendThread: sleeping for %lis %lims\n", s, ms);
131     if (!((wtime.tv_sec == 0) && (wtime.tv_nsec == 0))) {
132       pthread_mutex_lock(&d->objectEntry.htimSendMutex);
133       if (d->objectEntry.htimSendCondValue == 0) {
134         pthread_cond_timedwait(&d->objectEntry.htimSendCond,
135                                &d->objectEntry.htimSendMutex,
136                                &wtime);
137       }
138       d->objectEntry.htimSendCondValue = 0;
139       pthread_mutex_unlock(&d->objectEntry.htimSendMutex);
140     }
141
142     debug(24, 7) ("ORTEAppSendThread: fired\n");
143     actTime = getActualNtpTime();
144     pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
145     pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
146     d->objectEntry.htimNeedWakeUp = ORTE_FALSE;
147
148     htimerRoot_run_expired(d, &actTime);
149
150     d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
151     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
152     pthread_rwlock_unlock(&d->objectEntry.objRootLock);
153   }
154   debug(24, 10) ("ORTEAppSendThread: finished\n");
155   pthread_exit(NULL);
156 }