/**************************************************************************/
/* ---------------------------------------------------------------------- */
-/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
+/* Copyright (C) 2006 - 2009 FRESCOR consortium partners: */
/* */
/* Universidad de Cantabria, SPAIN */
/* University of York, UK */
/* however invalidate any other reasons why the executable file might be */
/* covered by the GNU Public License. */
/**************************************************************************/
+
#include "fwp_utils.h"
#include "fwp_vres.h"
static void* fwp_vres_tx_thread(void *_vres);
typedef enum {
- FWP_VF_USED = 0,
- FWP_VF_BOUND = 1,
- FWP_VF_RESCHED = 2,
+ USED,
+ UNTOUCHED,
+ CHANGED,
+ QUEUED,
} fwp_vres_flag_t;
fwp_vres_params_t fwp_vres_params_default = {
- .id = 0,
.ac_id = FWP_AC_VO,
.budget = 100,
- .period = {.tv_sec = 2 , .tv_nsec = 111111}
+ .period = {.tv_sec = 2 , .tv_nsec = 111111},
+ .src = { 0 },
};
/**
*/
struct fwp_vres{
struct fwp_vres_params params;
- /* consideration: move tx_queue to endpoint */
- /**< queue for messages to send */
- struct fwp_msgq tx_queue;
fwp_vres_flag_t flags;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond; /**< Signalizes budget replenishment */
+ fwp_budget_t budget; /**< Current remaining budget */
+ fwp_period_t period; /**< Period for this "activation" */
+ struct timespec replenish_at; /**< Time of next replenishment */
+ sem_t consumed;
/**< endpoint bounded to this vres */
- /*fwp_endpoint_t *epoint; */
+ struct fwp_endpoint *epoint;
pthread_t tx_thread; /**< tx_thread id*/
pthread_attr_t tx_thread_attr;
- int ac_sockd; /**< ac socket descriptor */
+ /** Copy of bound enpoint's socket - used for future changes
+ * of vres parameters. */
+ int ac_sockd;
+ /** Queue for messages to send */
+ struct fwp_msgq msg_queue;
};
typedef
return 0;
}
-static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags |= (1 << flag);
}
-static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags &= ~(1 << flag);
}
-static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
return !!(vres->flags & (1 << flag));
}
-static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
+static inline void clear_all_flags(fwp_vres_t *vres)
{
vres->flags = 0;
}
-#if 0
-/* Deprecated */
-static int fwp_vres_ac_open(fwp_ac_t ac_id)
-{
- int sockd;
- unsigned int tos;
-
- if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
- errno = EINVAL;
- return -1;
- }
-
- if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
- FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
- return (-1);
- }
-
- tos = ac_to_tos[ac_id];
-
- if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
- int e = errno;
- FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
- close(sockfd);
- errno = e;
- return -1;
- }
-
- return sockd;
-}
-#endif
-
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
-{
- /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
- msgb->peer->addr, msgb->peer->addrlen);*/
- return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
-}
-
static inline void fwp_vres_free(fwp_vres_t *vres)
{
- fwp_vres_clearall_flag(vres);
+ /* Clear USED flag */
+ clear_all_flags(vres);
}
static inline int fwp_vres_is_valid(fwp_vres_t *vres)
int id = vres - fwp_vres_table.entry;
if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
- (!fwp_vres_get_flag(vres, FWP_VF_USED)))
+ (!get_flag(vres, USED)))
return 0;
return 1;
return 0;
}
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
return (vres - fwp_vres_table.entry);
}
*
* \return On success returns vres descriptor.
*/
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
{
int i;
unsigned int max_vres;
i = 0;
max_vres = fwp_vres_table.max_vres;
while ((i < max_vres) &&
- (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
+ (get_flag(&fwp_vres_table.entry[i], USED))) {
i++;
}
}
FWP_DEBUG("Allocated vres id = %d\n",i);
- fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
+ set_flag(&fwp_vres_table.entry[i], USED);
pthread_mutex_unlock(&fwp_vres_table.lock);
return (&fwp_vres_table.entry[i]);
}
-inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
+static int apply_params(fwp_vres_t *vres)
{
- int rv;
-
- /* copy vres paramters into vres structure */
- rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
- if (!rv)
- return rv;
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
-
- return 0;
+ int rv = 0;
+ vres->period = vres->params.period;
+ vres->budget = vres->params.budget;
+ set_flag(vres, UNTOUCHED);
+ if (get_flag(vres, CHANGED)) {
+ clear_flag(vres, CHANGED);
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ }
+ return rv;
}
/**
* Set vres params
*
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
* \param[in] params Vres parameters
*
* \return On success returns zero.
* On error, negative error code is returned.
*
*/
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
{
- fwp_vres_t *vres = vresd;
-
+ int rv = 0;
+
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}
- return _fwp_vres_set_params(vres, params);
+ pthread_mutex_lock(&vres->mutex);
+
+ if (vres->epoint &&
+ params->src.s_addr != vres->params.src.s_addr) {
+ errno = EREMCHG;
+ rv = -1;
+ goto out;
+ }
+ vres->params = *params;
+ if (vres->epoint) {
+ set_flag(vres, CHANGED);
+ if (get_flag(vres, UNTOUCHED))
+ rv = apply_params(vres);
+ }
+out:
+ pthread_mutex_unlock(&vres->mutex);
+ return rv;
}
/**
* Creates new vres
*
* \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
*
* \return On success returns descriptor of vres.
* On error, negative error code is returned.
*
*/
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
{
int rv;
fwp_vres_t *vres;
errno = ENOMEM;
return -1;
}
- /* initialize msg queue */
- fwp_msgq_init(&vres->tx_queue);
+
+ pthread_mutexattr_t ma;
+ rv = pthread_mutexattr_init(&ma);
+ rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
+ if (rv) return rv;
+ pthread_mutex_init(&vres->mutex, &ma);
+ pthread_cond_init(&vres->cond, NULL);
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+ vres->params = *params;
+ apply_params(vres);
+ fwp_msgq_init(&vres->msg_queue);
+
pthread_attr_init(&vres->tx_thread_attr);
if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
fwp_vres_tx_thread, (void*) vres)) != 0){
goto err;
}
- *vresdp = vres;
+ *vresp = vres;
+
return 0;
err:
+ fwp_msgq_dequeue_all(&vres->msg_queue);
fwp_vres_free(vres);
return -1;
}
/**
* Destroys vres
*
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
*
* \return On success returns 0.
* On error, negative error code is returned.
*
*/
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}
pthread_cancel(vres->tx_thread);
+ pthread_cond_destroy(&vres->cond);
+ pthread_mutex_destroy(&vres->mutex);
- FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
+ fwp_msgq_dequeue_all(&vres->msg_queue);
+ fwp_vres_free(vres);
+
+ FWP_DEBUG("Vres destroyed.\n");
return 0;
}
-static void fwp_vres_cleanup(void *_vres)
+static void do_consume_budget(struct fwp_vres *vres, size_t size)
{
- fwp_vres_t *vres = (fwp_vres_t*)_vres;
+ if (get_flag(vres, UNTOUCHED)) {
+ /* Setup next replenish time */
+ struct timespec now;
+ clear_flag(vres, UNTOUCHED);
+ if (get_flag(vres, QUEUED))
+ now = vres->replenish_at;
+ else
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
+ sem_post(&vres->consumed);
+ }
+ vres->budget -= size;
+}
- fwp_msgq_dequeue_all(&vres->tx_queue);
- fwp_vres_free(vres);
+int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ while (can_block && vres->budget < size) {
+ ret = pthread_cond_wait(&vres->cond, &vres->mutex);
+ /* The budget might have been changed while we were
+ * waiting, so check it again. */
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ }
+ if (ret == 0) {
+ if (vres->budget >= size) {
+ do_consume_budget(vres, size);
+ ret = 0;
+ } else {
+ set_flag(vres, QUEUED);
+ ret = 1;
+ }
+ }
+ return ret;
+}
+
+/**
+ * Tries to consume (a part of) budget
+ *
+ * @param vres VRES whose budget is conumed.
+ * @param size How much to consume (in bytes).
+ * @param can_block True, indicates that the function can block to
+ * wait for budget replenishment. False causes no blocking and if 1 is
+ * returned, the calles must call fwp_vres_enqueue() to enqueue the
+ * packet to be sent later after replenishing.
+ *
+ * @return Zero if budget was consumed, 1 if there is not enough
+ * budget available and blocking was not allowed, -1 in case of error.
+ */
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ pthread_mutex_lock(&vres->mutex);
+ ret = __consume_budget(vres, size, can_block);
+ pthread_mutex_unlock(&vres->mutex);
+ return ret;
+}
+
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep,
+ const void *msg, size_t size)
+{
+ struct fwp_msgb *msgb;
+ int ret;
+
+ if (!(msgb = fwp_msgb_alloc(size)))
+ return -1;
+ memcpy(msgb->data, msg, size);
+ fwp_msgb_put(msgb, size);
+ ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
+ if (ret) {
+ fwp_msgb_free(msgb);
+ return ret;
+ }
+ return ret;
}
-static inline void
-fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
- fwp_budget_t *budget)
+static void wait_for_replenish(struct fwp_vres *vres)
{
- if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {
- /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
- period->tv_sec = vres->params.period / SEC_TO_USEC;*/
- *period = vres->params.period;
- *budget = vres->params.budget;
- FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
- "period_nsec=%ld.\n",vres->params.budget,
- period->tv_sec, period->tv_nsec);
- fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+ sem_wait(&vres->consumed);
+ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
+ &vres->replenish_at, NULL);
+}
+
+static void send_queue(struct fwp_vres *vres)
+{
+ struct fwp_msgb *msgb;
+ bool can_send;
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (!can_send) {
+ fwp_msgb_free(msgb);
+ FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
+ return;
}
+ /* If we cannot send the whole queue, the flag will be set
+ * later by __consume_budget(). */
+ clear_flag(vres, QUEUED);
+
+ while (msgb) {
+ fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
+ fwp_msgb_free(msgb);
+ msgb = fwp_msgq_peek(&vres->msg_queue);
+ if (msgb) {
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (can_send) {
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ } else {
+ msgb = NULL;
+ return;
+ }
+ }
+ }
+}
+
+static void replenish(struct fwp_vres *vres)
+{
+ pthread_mutex_lock(&vres->mutex);
+ apply_params(vres);
+ if (get_flag(vres, QUEUED))
+ send_queue(vres);
+ pthread_cond_broadcast(&vres->cond);
+ pthread_mutex_unlock(&vres->mutex);
}
/**
static void* fwp_vres_tx_thread(void *_vres)
{
struct fwp_vres *vres = (struct fwp_vres*)_vres;
- struct fwp_msgq *msgq = &vres->tx_queue;
- struct fwp_msgb *msgb = NULL;
unsigned int ac_id = vres->params.ac_id;
- fwp_budget_t budget = vres->params.budget;
- fwp_budget_t curr_budget;
- int rc;
- struct timespec start_period, period, interval, current_time;
fwp_set_rt_prio(90 - ac_id);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- /* just for sure */
- fwp_vres_sched_update(vres, &period, &budget);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
- curr_budget = 0;
-
while (1) {
- /* wait for next period and then send */
- /*clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);*/
-
- /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1
- && errno == EINTR) {
- continue;
- }*/
-
- msgb = fwp_msgq_dequeue(msgq);
- fwp_vres_sched_update(vres, &period, &budget);
- if ((curr_budget + msgb->len) > budget) {
- /* need to recharge */
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, ¤t_time, &start_period);
- fwp_timespec_modulo(&interval, &interval, &period);
- fwp_timespec_sub(&interval, &period, &interval);
- nanosleep(&interval, NULL);
- curr_budget = 0;
- clock_gettime(CLOCK_MONOTONIC, &start_period);
- }
-
- rc = _fwp_vres_send(vres->ac_sockd, msgb);
- if (!(rc < 0)) {
- FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget+= msgb->len;
- } else {
- FWP_DEBUG("Message sent error %d\n",rc);
- }
-
- fwp_msgb_free(msgb);
-
- /*pthread_testcancel(); */
- /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
-
- fwp_timespec_add(&end_period, &start_period, &period);
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);
- */
+ wait_for_replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
}
-
- /* it should normaly never come here */
- pthread_cleanup_pop(0);
- fwp_vres_free(vres);
-
- return NULL;
-}
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
-{
- fwp_vres_t *vres = vresd;
-
- if (fwp_vres_is_valid(vres)) {
- return fwp_msgq_enqueue(&vres->tx_queue, msgb);
- } else {
- errno = EPERM; /* TODO: Use more appropriate error than EPERM. */
- return -1;
- }
+ return NULL;
}
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd, struct in_addr *src)
{
- fwp_vres_t *vres = vresd;
int rv = 0;
- pthread_mutex_lock(&fwp_vres_table.lock);
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
rv = -1;
goto err;
}
- if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
- errno = EPERM;
+ if (vres->epoint) { /*if already bounded */
+ errno = EBUSY;
rv = -1;
goto err;
}
vres->ac_sockd = sockd;
- rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
- /*if (rv)
- goto err;*/
- fwp_vres_set_flag(vres, FWP_VF_BOUND);
+ *src = vres->params.src;
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ if (rv)
+ goto err;
+ vres->epoint = ep;
err:
- pthread_mutex_unlock(&fwp_vres_table.lock);
return rv;
}
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
- if (!fwp_vres_is_valid(vresd)) {
+ if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}
- fwp_vres_clear_flag(vres, FWP_VF_BOUND);
+ pthread_mutex_lock(&vres->mutex);
+ vres->epoint = NULL;
+ pthread_mutex_unlock(&vres->mutex);
/* TODO: consider what to do with pending messages */
- /* fwp_vres_free_msgb(vres->tx_queue); */
+ fwp_msgq_dequeue_all(&vres->msg_queue);
return 0;
}