2 * $Id: ORTEAppSendThread.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 24 Sending thread
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 ORTESendData(ORTEDomain *d, ObjectEntryAID *objectEntryAID, Boolean meta)
38 struct sockaddr_in des;
39 ObjectEntryOID *object;
42 CDR_Codec *cdrCodec = &d->taskSend.mb.cdrCodec;
45 object = objectEntryAID->aobject;
47 appParams = (AppParams *)object->attributes;
48 if (!ObjectEntryMulticast_is_empty(object)) {
49 for (i = 0; i < appParams->metatrafficMulticastIPAddressCount; i++) {
50 des.sin_family = AF_INET;
51 des.sin_addr.s_addr = htonl(appParams->metatrafficMulticastIPAddressList[i]);
52 des.sin_port = htons((uint16_t)object->multicastPort);
53 if (d->taskSend.mb.cdrCodecDirect) {
54 sock_sendto(&d->taskSend.sock,
55 d->taskSend.mb.cdrCodecDirect->buffer,
56 d->taskSend.mb.cdrCodecDirect->wptr,
60 sock_sendto(&d->taskSend.sock,
68 for (i = 0; i < appParams->unicastIPAddressCount; i++) {
69 des.sin_family = AF_INET;
70 des.sin_addr.s_addr = htonl(appParams->unicastIPAddressList[i]);
72 port = appParams->metatrafficUnicastPort;
74 port = appParams->userdataUnicastPort;
76 des.sin_port = htons((uint16_t)port);
77 if (d->taskSend.mb.cdrCodecDirect) {
78 sock_sendto(&d->taskSend.sock,
79 d->taskSend.mb.cdrCodecDirect->buffer,
80 d->taskSend.mb.cdrCodecDirect->wptr,
84 sock_sendto(&d->taskSend.sock,
93 debug(24, 1) ("ORTEAppSendThread: no aobjectEntryOID connected to objectEntryAID!\n");
95 //prepare buffer for next sending
96 cdrCodec->wptr = RTPS_HEADER_LENGTH;
97 d->taskSend.mb.needSend = ORTE_FALSE;
98 d->taskSend.mb.containsInfoReply = ORTE_FALSE;
99 d->taskSend.mb.cdrCodecDirect = NULL;
102 /*****************************************************************************/
104 ORTEAppSendThread(TaskProp *tp)
106 struct timespec wtime;
107 NtpTime actTime, nextExpire, whenExpire, sleepingTime;
109 ORTEDomain *d = tp->d;
111 debug(24, 10) ("ORTEAppSendThread: start\n");
112 d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
114 while (!tp->terminate) {
115 actTime = getActualNtpTime();
117 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
118 if (htimerRoot_next_expire(&d->objectEntry, &nextExpire) == 0) {
119 NTPTIME_BUILD(whenExpire, 300); //max time for sleeping (no. events)
120 NtpTimeAdd(nextExpire, actTime, whenExpire);
122 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
123 NtpTimeDisAssembToUs(wtime.tv_sec, wtime.tv_nsec, nextExpire);
124 wtime.tv_nsec *= 1000; //conver to nano seconds
125 NtpTimeSub(sleepingTime, nextExpire, actTime);
126 NtpTimeDisAssembToMs(s, ms, sleepingTime);
130 debug(24, 4) ("ORTEAppSendThread: sleeping for %lis %lims\n", s, ms);
131 if (!((wtime.tv_sec == 0) && (wtime.tv_nsec == 0))) {
132 pthread_mutex_lock(&d->objectEntry.htimSendMutex);
133 if (d->objectEntry.htimSendCondValue == 0) {
134 pthread_cond_timedwait(&d->objectEntry.htimSendCond,
135 &d->objectEntry.htimSendMutex,
138 d->objectEntry.htimSendCondValue = 0;
139 pthread_mutex_unlock(&d->objectEntry.htimSendMutex);
142 debug(24, 7) ("ORTEAppSendThread: fired\n");
143 actTime = getActualNtpTime();
144 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
145 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
146 d->objectEntry.htimNeedWakeUp = ORTE_FALSE;
148 htimerRoot_run_expired(d, &actTime);
150 d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
151 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
152 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
154 debug(24, 10) ("ORTEAppSendThread: finished\n");