+/**************************************************************************/
+/* ---------------------------------------------------------------------- */
+/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
+/* */
+/* Universidad de Cantabria, SPAIN */
+/* University of York, UK */
+/* Scuola Superiore Sant'Anna, ITALY */
+/* Kaiserslautern University, GERMANY */
+/* Univ. Politécnica Valencia, SPAIN */
+/* Czech Technical University in Prague, CZECH REPUBLIC */
+/* ENEA SWEDEN */
+/* Thales Communication S.A. FRANCE */
+/* Visual Tools S.A. SPAIN */
+/* Rapita Systems Ltd UK */
+/* Evidence ITALY */
+/* */
+/* See http://www.frescor.org for a link to partners' websites */
+/* */
+/* FRESCOR project (FP6/2005/IST/5-034026) is funded */
+/* in part by the European Union Sixth Framework Programme */
+/* The European Union is not liable of any use that may be */
+/* made of this code. */
+/* */
+/* */
+/* This file is part of FWP (Frescor WLAN Protocol) */
+/* */
+/* FWP is free software; you can redistribute it and/or modify it */
+/* under terms of the GNU General Public License as published by the */
+/* Free Software Foundation; either version 2, or (at your option) any */
+/* later version. FWP is distributed in the hope that it will be */
+/* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
+/* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
+/* General Public License for more details. You should have received a */
+/* copy of the GNU General Public License along with FWP; see file */
+/* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
+/* Cambridge, MA 02139, USA. */
+/* */
+/* As a special exception, including FWP header files in a file, */
+/* instantiating FWP generics or templates, or linking other files */
+/* with FWP objects to produce an executable application, does not */
+/* by itself cause the resulting executable application to be covered */
+/* by the GNU General Public License. This exception does not */
+/* however invalidate any other reasons why the executable file might be */
+/* covered by the GNU Public License. */
+/**************************************************************************/
#include "fwp_utils.h"
#include "fwp_vres.h"
#include "fwp_msgq.h"
#include "fwp_endpoint.h"
+#include "fwp_debug.h"
#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
static void* fwp_vres_tx_thread(void *_vres);
typedef enum {
- FWP_VF_USED = 1 ,
- FWP_VF_BOUND = 2 ,
- FWP_VF_RESCHED = 4 ,
+ FWP_VF_USED = 0,
+ FWP_VF_BOUND = 1,
+ FWP_VF_CHANGED = 2,
} fwp_vres_flag_t;
fwp_vres_params_t fwp_vres_params_default = {
- .id = 0,
.ac_id = FWP_AC_VO,
.budget = 100,
.period = {.tv_sec = 2 , .tv_nsec = 111111}
/* consideration: move tx_queue to endpoint */
/**< queue for messages to send */
struct fwp_msgq tx_queue;
- int flags;
+ fwp_vres_flag_t flags;
/**< endpoint bounded to this vres */
/*fwp_endpoint_t *epoint; */
pthread_t tx_thread; /**< tx_thread id*/
pthread_attr_t tx_thread_attr;
int ac_sockd; /**< ac socket descriptor */
- fwp_sockaddr_t addr; /**< dest addr,for effectivness*/
};
typedef
}
#endif
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
+static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
{
- /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
- msgb->peer->addr, msgb->peer->addrlen);*/
- return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
+ struct iovec iov;
+ struct msghdr msg = {0};
+ ssize_t ret;
+ char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+ iov.iov_base = msgb->data;
+ iov.iov_len = msgb->len;
+
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+
+
+ if (vres->params.src.s_addr != 0) {
+ struct cmsghdr *cmsg;
+ struct in_pktinfo *ipi;
+
+ memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+ msg.msg_control = cmsg_buf;
+ msg.msg_controllen = sizeof(cmsg_buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ ipi->ipi_spec_dst = vres->params.src;
+ }
+ ret = sendmsg(vres->ac_sockd, &msg, 0);
+ return ret;
}
static inline void fwp_vres_free(fwp_vres_t *vres)
/*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
{
- 3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
+ if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
return -EINVAL;
*vres = &fwp_vres_table.entry[vres_id];
return 0;
return 0;
}
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
return (vres - fwp_vres_table.entry);
}
*
* \return On success returns vres descriptor.
*/
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
{
int i;
unsigned int max_vres;
if (!rv)
return rv;
memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_set_flag(vres, FWP_VF_CHANGED);
return 0;
}
/**
* Set vres params
*
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
* \param[in] params Vres parameters
*
* \return On success returns zero.
* On error, negative error code is returned.
*
*/
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
{
- fwp_vres_t *vres = vresd;
-
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
* Creates new vres
*
* \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
*
* \return On success returns descriptor of vres.
* On error, negative error code is returned.
*
*/
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
{
int rv;
fwp_vres_t *vres;
fwp_msgq_init(&vres->tx_queue);
memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_set_flag(vres, FWP_VF_CHANGED);
pthread_attr_init(&vres->tx_thread_attr);
if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
fwp_vres_tx_thread, (void*) vres)) != 0){
goto err;
}
- *vresdp = vres;
+ *vresp = vres;
return 0;
err:
fwp_vres_free(vres);
/**
* Destroys vres
*
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
*
* \return On success returns 0.
* On error, negative error code is returned.
*
*/
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
pthread_cancel(vres->tx_thread);
- FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
+ FWP_DEBUG("Vres destroyed.\n");
return 0;
}
fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
fwp_budget_t *budget)
{
- if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {
+ if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {
/*period->tv_nsec = vres->params.period % SEC_TO_USEC;
period->tv_sec = vres->params.period / SEC_TO_USEC;*/
*period = vres->params.period;
FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
"period_nsec=%ld.\n",vres->params.budget,
period->tv_sec, period->tv_nsec);
- fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
}
}
fwp_budget_t budget = vres->params.budget;
fwp_budget_t curr_budget;
int rc;
-
- struct timespec start_period, end_period, period;
- struct timespec current_time, interval;
+ struct timespec start, period, interval, now;
fwp_set_rt_prio(90 - ac_id);
-
-
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
+ /* just for sure */
+ fwp_vres_sched_update(vres, &period, &budget);
+ curr_budget = budget;
+ clock_gettime(CLOCK_MONOTONIC, &start);
while (1) {
- /* wait for next period and then send */
- fwp_timespec_add(&end_period, &start_period, &period);
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);
-
- sem_wait(&msgq->empty_lock);
- fwp_vres_sched_update(vres, &period, &budget);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
-
- /*msgb = fwp_msgq_dequeue(msgq);
- *if (msgb){*/
- curr_budget = 0;
- while ((curr_budget < budget)&&
- (msgb = fwp_msgq_dequeue(msgq))) {
- rc = _fwp_vres_send(vres->ac_sockd, msgb);
- if (!(rc < 0)) {
- FWP_DEBUG("Message sent through AC%d\n",ac_id);
- /* Switch to this in the future
- * curr_budget+= msgb->len;
- */
- curr_budget++;
+ msgb = fwp_msgq_dequeue(msgq);
+ if (msgb->len > budget) {
+ FWP_ERROR("Message too large: %zd -> skipping\n",
+ msgb->len);
+ goto skip_message;
+ }
+ if (curr_budget < msgb->len) {
+ /* needs to replenish */
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_sub(&interval, &now, &start);
+ ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
+ fwp_timespec_sub(&interval, &period, &interval);
+ if (interval.tv_sec > 0 ||
+ (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+ /* We have to wait to replenish */
+ ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+ nanosleep(&interval, NULL);
+ fwp_timespec_add(&start, &now, &interval);
+ } else {
+ start = now;
}
-
- fwp_msgb_free(msgb);
+ fwp_vres_sched_update(vres, &period, &budget);
+ curr_budget = budget;
+ }
+
+ rc = _fwp_vres_send(vres, msgb);
+ if (!(rc < 0)) {
+ FWP_DEBUG("Message sent through AC%d\n",ac_id);
+ curr_budget -= msgb->len;
+ } else {
+ FWP_ERROR("Message sent error %d\n",rc);
}
+ skip_message:
+ fwp_msgb_free(msgb);
/*pthread_testcancel(); */
/*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
return NULL;
}
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
+int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
{
- fwp_vres_t *vres = vresd;
-
if (fwp_vres_is_valid(vres)) {
return fwp_msgq_enqueue(&vres->tx_queue, msgb);
} else {
- errno = EPERM; /* TODO: Use more appropriate error than EPERM. */
+ errno = EINVAL;
return -1;
}
}
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, int sockd)
{
- fwp_vres_t *vres = vresd;
int rv = 0;
pthread_mutex_lock(&fwp_vres_table.lock);
}
if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
- errno = EPERM;
+ errno = EBUSY;
rv = -1;
goto err;
}
return rv;
}
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
- if (!fwp_vres_is_valid(vresd)) {
+ if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}