/**************************************************************************/
/* ---------------------------------------------------------------------- */
-/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
+/* Copyright (C) 2006 - 2009 FRESCOR consortium partners: */
/* */
/* Universidad de Cantabria, SPAIN */
/* University of York, UK */
/* however invalidate any other reasons why the executable file might be */
/* covered by the GNU Public License. */
/**************************************************************************/
+
#include "fwp_utils.h"
#include "fwp_vres.h"
static void* fwp_vres_tx_thread(void *_vres);
typedef enum {
- FWP_VF_USED = 0,
- FWP_VF_BOUND = 1,
+ UNTOUCHED,
+ CHANGED,
+ QUEUED,
} fwp_vres_flag_t;
fwp_vres_params_t fwp_vres_params_default = {
.ac_id = FWP_AC_VO,
.budget = 100,
- .period = {.tv_sec = 2 , .tv_nsec = 111111}
+ .period = {.tv_sec = 2 , .tv_nsec = 111111},
+ .src = { 0 },
};
/**
*/
struct fwp_vres{
struct fwp_vres_params params;
- /* consideration: move tx_queue to endpoint */
- /**< queue for messages to send */
- struct fwp_msgq tx_queue;
fwp_vres_flag_t flags;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond; /**< Signalizes budget replenishment */
+ fwp_budget_t budget; /**< Current remaining budget */
+ fwp_period_t period; /**< Period for this "activation" */
+ struct timespec replenish_at; /**< Time of next replenishment */
+ sem_t consumed;
/**< endpoint bounded to this vres */
- /*fwp_endpoint_t *epoint; */
+ struct fwp_endpoint *epoint;
pthread_t tx_thread; /**< tx_thread id*/
pthread_attr_t tx_thread_attr;
- int ac_sockd; /**< ac socket descriptor */
-};
-
-typedef
-struct fwp_vres_table {
- unsigned int max_vres;
- fwp_vres_t *entry;
- pthread_mutex_t lock;
-} fwp_vres_table_t;
-
-/* Global variable - vres table */
-static fwp_vres_table_t fwp_vres_table = {
- .max_vres = 0,
- .entry = NULL,
- .lock = PTHREAD_MUTEX_INITIALIZER,
+ /** Copy of bound enpoint's socket - used for future changes
+ * of vres parameters. */
+ int ac_sockd;
+ /** Queue for messages to send */
+ struct fwp_msgq msg_queue;
+ /** If true, it is always allowed to send messages through this vres. */
+ bool bypass;
};
/**< mapping priority to ac*/
return 0;
}
-static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags |= (1 << flag);
}
-static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags &= ~(1 << flag);
}
-static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
return !!(vres->flags & (1 << flag));
}
-static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
+static inline void clear_all_flags(fwp_vres_t *vres)
{
vres->flags = 0;
}
-#if 0
-/* Deprecated */
-static int fwp_vres_ac_open(fwp_ac_t ac_id)
-{
- int sockd;
- unsigned int tos;
-
- if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
- errno = EINVAL;
- return -1;
- }
-
- if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
- FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
- return (-1);
- }
-
- tos = ac_to_tos[ac_id];
-
- if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
- int e = errno;
- FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
- close(sockfd);
- errno = e;
- return -1;
- }
-
- return sockd;
-}
-#endif
-
-static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
- struct iovec iov;
- struct msghdr msg = {0};
- ssize_t ret;
- char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
-
- iov.iov_base = msgb->data;
- iov.iov_len = msgb->len;
-
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
-
-
- if (vres->params.src.s_addr != 0) {
- struct cmsghdr *cmsg;
- struct in_pktinfo *ipi;
-
- memset(cmsg_buf, 0, sizeof(cmsg_buf));
-
- msg.msg_control = cmsg_buf;
- msg.msg_controllen = sizeof(cmsg_buf);
-
- cmsg = CMSG_FIRSTHDR(&msg);
-
- cmsg->cmsg_level = SOL_IP;
- cmsg->cmsg_type = IP_PKTINFO;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
-
- ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
- ipi->ipi_spec_dst = vres->params.src;
- }
- ret = sendmsg(vres->ac_sockd, &msg, 0);
- return ret;
-}
-
static inline void fwp_vres_free(fwp_vres_t *vres)
{
- fwp_vres_clearall_flag(vres);
-}
-
-static inline int fwp_vres_is_valid(fwp_vres_t *vres)
-{
- int id = vres - fwp_vres_table.entry;
-
- if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
- (!fwp_vres_get_flag(vres, FWP_VF_USED)))
- return 0;
-
- return 1;
+ free(vres);
}
/*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
}
*/
-int fwp_vres_table_init(unsigned int max_vres)
-{
- unsigned int table_size = max_vres * sizeof(fwp_vres_t);
-
- fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
- if (!fwp_vres_table.entry)
- return -1; /* Errno is set by malloc */
-
- memset((void*) fwp_vres_table.entry, 0, table_size);
- fwp_vres_table.max_vres = max_vres;
- return 0;
-}
-
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
-{
- return (vres - fwp_vres_table.entry);
-}
-
/**
* Allocate vres
*
*/
fwp_vres_t *fwp_vres_alloc()
{
- int i;
- unsigned int max_vres;
-
- /* find free vres id */
- pthread_mutex_lock(&fwp_vres_table.lock);
- i = 0;
- max_vres = fwp_vres_table.max_vres;
- while ((i < max_vres) &&
- (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
- i++;
+ fwp_vres_t *vres = malloc(sizeof(*vres));
+ if (vres) {
+ memset(vres, 0, sizeof(*vres));
+ FWP_DEBUG("Allocated vres\n");
}
-
- if (i == max_vres) {
- pthread_mutex_unlock(&fwp_vres_table.lock);
- errno = ENOBUFS;
- return NULL;
- }
-
- FWP_DEBUG("Allocated vres id = %d\n",i);
- fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
- pthread_mutex_unlock(&fwp_vres_table.lock);
- return (&fwp_vres_table.entry[i]);
+ return vres;
}
-inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
+static int apply_params(fwp_vres_t *vres)
{
- int rv;
-
- /* copy vres paramters into vres structure */
- rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
- if (!rv)
- return rv;
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-
- return 0;
+ int rv = 0;
+ vres->period = vres->params.period;
+ vres->budget = vres->params.budget;
+ set_flag(vres, UNTOUCHED);
+ if (get_flag(vres, CHANGED)) {
+ clear_flag(vres, CHANGED);
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ }
+ return rv;
}
/**
*/
int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
{
- if (!fwp_vres_is_valid(vres)) {
+ int rv = 0;
+
+ if (!vres) {
errno = EINVAL;
return -1;
}
- return _fwp_vres_set_params(vres, params);
+ pthread_mutex_lock(&vres->mutex);
+
+ if (vres->epoint &&
+ params->src.s_addr != vres->params.src.s_addr) {
+ errno = EREMCHG;
+ rv = -1;
+ goto out;
+ }
+ vres->params = *params;
+ if (vres->epoint) {
+ set_flag(vres, CHANGED);
+ if (get_flag(vres, UNTOUCHED))
+ rv = apply_params(vres);
+ }
+out:
+ pthread_mutex_unlock(&vres->mutex);
+ return rv;
}
/**
* \param[in] params Vres parameters
* \param[out] vresp Pointer to the descriptor of newly created vres
*
- * \return On success returns descriptor of vres.
- * On error, negative error code is returned.
- *
+ * \return Zero on success, -1 on error and errno is set
+ * appropriately.
*/
int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
{
errno = ENOMEM;
return -1;
}
- /* initialize msg queue */
- fwp_msgq_init(&vres->tx_queue);
+
+ pthread_mutexattr_t ma;
+ rv = pthread_mutexattr_init(&ma);
+ rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
+ if (rv) return rv;
+ pthread_mutex_init(&vres->mutex, &ma);
+ pthread_cond_init(&vres->cond, NULL);
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
+ vres->params = *params;
+ apply_params(vres);
+ fwp_msgq_init(&vres->msg_queue);
+
+ if (getenv("FWP_BYPASS"))
+ vres->bypass = true;
+
pthread_attr_init(&vres->tx_thread_attr);
if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
fwp_vres_tx_thread, (void*) vres)) != 0){
}
*vresp = vres;
+
return 0;
err:
+ fwp_msgq_dequeue_all(&vres->msg_queue);
fwp_vres_free(vres);
return -1;
}
*
* \param[in] vres Vres descriptor
*
- * \return On success returns 0.
- * On error, negative error code is returned.
- *
+ * \return Zero on success, -1 on error and errno is set
+ * appropriately.
*/
int fwp_vres_destroy(fwp_vres_t *vres)
{
- if (!fwp_vres_is_valid(vres)) {
+ if (!vres) {
errno = EINVAL;
return -1;
}
pthread_cancel(vres->tx_thread);
+ pthread_cond_destroy(&vres->cond);
+ pthread_mutex_destroy(&vres->mutex);
+ fwp_msgq_dequeue_all(&vres->msg_queue);
+ fwp_vres_free(vres);
+
FWP_DEBUG("Vres destroyed.\n");
return 0;
}
-static void fwp_vres_cleanup(void *_vres)
+static void do_consume_budget(struct fwp_vres *vres, size_t size)
{
- fwp_vres_t *vres = (fwp_vres_t*)_vres;
+ if (get_flag(vres, UNTOUCHED)) {
+ /* Setup next replenish time */
+ struct timespec now;
+ clear_flag(vres, UNTOUCHED);
+ if (get_flag(vres, QUEUED))
+ now = vres->replenish_at;
+ else
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
+ sem_post(&vres->consumed);
+ }
+ vres->budget -= size;
+}
- fwp_msgq_dequeue_all(&vres->tx_queue);
- fwp_vres_free(vres);
+int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ if (vres->bypass)
+ return 0;
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ while (can_block && vres->budget < size) {
+ ret = pthread_cond_wait(&vres->cond, &vres->mutex);
+ /* The budget might have been changed while we were
+ * waiting, so check it again. */
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ }
+ if (ret == 0) {
+ if (vres->budget >= size) {
+ do_consume_budget(vres, size);
+ ret = 0;
+ } else {
+ set_flag(vres, QUEUED);
+ ret = 1;
+ }
+ }
+ return ret;
+}
+
+/**
+ * Tries to consume (a part of) budget
+ *
+ * @param vres VRES whose budget is conumed.
+ * @param size How much to consume (in bytes).
+ * @param can_block True, indicates that the function can block to
+ * wait for budget replenishment. False causes no blocking and if 1 is
+ * returned, the calles must call fwp_vres_enqueue() to enqueue the
+ * packet to be sent later after replenishing.
+ *
+ * @return Zero if budget was consumed, 1 if there is not enough
+ * budget available and blocking was not allowed, -1 in case of error.
+ */
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ pthread_mutex_lock(&vres->mutex);
+ ret = __consume_budget(vres, size, can_block);
+ pthread_mutex_unlock(&vres->mutex);
+ return ret;
+}
+
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep,
+ const void *msg, size_t size)
+{
+ struct fwp_msgb *msgb;
+ int ret;
+
+ if (!(msgb = fwp_msgb_alloc(size)))
+ return -1;
+ memcpy(msgb->data, msg, size);
+ fwp_msgb_put(msgb, size);
+ ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
+ if (ret) {
+ fwp_msgb_free(msgb);
+ return ret;
+ }
+ return ret;
+}
+
+static void wait_for_replenish(struct fwp_vres *vres)
+{
+ sem_wait(&vres->consumed);
+ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
+ &vres->replenish_at, NULL);
}
-static inline void
-fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
- fwp_budget_t *budget)
+static void send_queue(struct fwp_vres *vres)
{
- *period = vres->params.period;
- *budget = vres->params.budget;
+ struct fwp_msgb *msgb;
+ bool can_send;
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (!can_send) {
+ fwp_msgb_free(msgb);
+ FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
+ return;
+ }
+ /* If we cannot send the whole queue, the flag will be set
+ * later by __consume_budget(). */
+ clear_flag(vres, QUEUED);
+
+ while (msgb) {
+ fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
+ fwp_msgb_free(msgb);
+ msgb = fwp_msgq_peek(&vres->msg_queue);
+ if (msgb) {
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (can_send) {
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ } else {
+ msgb = NULL;
+ return;
+ }
+ }
+ }
+}
+
+static void replenish(struct fwp_vres *vres)
+{
+ pthread_mutex_lock(&vres->mutex);
+ apply_params(vres);
+ if (get_flag(vres, QUEUED))
+ send_queue(vres);
+ pthread_cond_broadcast(&vres->cond);
+ pthread_mutex_unlock(&vres->mutex);
}
/**
static void* fwp_vres_tx_thread(void *_vres)
{
struct fwp_vres *vres = (struct fwp_vres*)_vres;
- struct fwp_msgq *msgq = &vres->tx_queue;
- struct fwp_msgb *msgb = NULL;
unsigned int ac_id = vres->params.ac_id;
- fwp_budget_t budget = vres->params.budget;
- fwp_budget_t curr_budget;
- int rc;
- struct timespec start, period, interval, now;
fwp_set_rt_prio(90 - ac_id);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- /* just for sure */
- fwp_vres_sched_update(vres, &period, &budget);
- curr_budget = budget;
- clock_gettime(CLOCK_MONOTONIC, &start);
-
while (1) {
- msgb = fwp_msgq_dequeue(msgq);
- if (msgb->len > budget) {
- FWP_ERROR("Message too large: %zd -> skipping\n",
- msgb->len);
- goto skip_message;
- }
- if (curr_budget < msgb->len) {
- /* needs to replenish */
- clock_gettime(CLOCK_MONOTONIC, &now);
- fwp_timespec_sub(&interval, &now, &start);
- ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
- fwp_timespec_sub(&interval, &period, &interval);
- if (interval.tv_sec > 0 ||
- (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
- /* We have to wait to replenish */
- ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
- nanosleep(&interval, NULL);
- fwp_timespec_add(&start, &now, &interval);
- } else {
- start = now;
- }
- fwp_vres_sched_update(vres, &period, &budget);
- curr_budget = budget;
- }
-
- rc = _fwp_vres_send(vres, msgb);
- if (!(rc < 0)) {
- FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget -= msgb->len;
- } else {
- FWP_ERROR("Message sent error %d\n",rc);
- }
- skip_message:
- fwp_msgb_free(msgb);
-
- /*pthread_testcancel(); */
- /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
-
- fwp_timespec_add(&end_period, &start_period, &period);
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);
- */
+ wait_for_replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
}
-
- /* it should normaly never come here */
- pthread_cleanup_pop(0);
- fwp_vres_free(vres);
-
- return NULL;
-}
-int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
- if (fwp_vres_is_valid(vres)) {
- return fwp_msgq_enqueue(&vres->tx_queue, msgb);
- } else {
- errno = EINVAL;
- return -1;
- }
+ return NULL;
}
-/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_t *vres, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd, struct in_addr *src)
{
int rv = 0;
- pthread_mutex_lock(&fwp_vres_table.lock);
- if (!fwp_vres_is_valid(vres)) {
+ if (!vres) {
errno = EINVAL;
rv = -1;
goto err;
}
- if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
+ if (vres->epoint) { /*if already bounded */
errno = EBUSY;
rv = -1;
goto err;
}
vres->ac_sockd = sockd;
- rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
- /*if (rv)
- goto err;*/
- fwp_vres_set_flag(vres, FWP_VF_BOUND);
+ *src = vres->params.src;
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ if (rv)
+ goto err;
+ vres->epoint = ep;
err:
- pthread_mutex_unlock(&fwp_vres_table.lock);
return rv;
}
int fwp_vres_unbind(fwp_vres_t *vres)
{
- if (!fwp_vres_is_valid(vres)) {
+ if (!vres) {
errno = EINVAL;
return -1;
}
- fwp_vres_clear_flag(vres, FWP_VF_BOUND);
+ pthread_mutex_lock(&vres->mutex);
+ vres->epoint = NULL;
+ pthread_mutex_unlock(&vres->mutex);
/* TODO: consider what to do with pending messages */
- /* fwp_vres_free_msgb(vres->tx_queue); */
+ fwp_msgq_dequeue_all(&vres->msg_queue);
return 0;
}