1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2009 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FWP (Frescor WLAN Protocol) */
27 /* FWP is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FWP is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FWP; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FWP header files in a file, */
39 /* instantiating FWP generics or templates, or linking other files */
40 /* with FWP objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
47 #include "fwp_utils.h"
51 #include "fwp_endpoint.h"
52 #include "fwp_debug.h"
58 static void* fwp_vres_tx_thread(void *_vres);
67 fwp_vres_params_t fwp_vres_params_default = {
70 .period = {.tv_sec = 2 , .tv_nsec = 111111},
75 * Structure of FWP vres.
76 * Internal representation of vres
80 struct fwp_vres_params params;
81 fwp_vres_flag_t flags;
82 pthread_mutex_t mutex;
83 pthread_cond_t cond; /**< Signalizes budget replenishment */
84 fwp_budget_t budget; /**< Current remaining budget */
85 fwp_period_t period; /**< Period for this "activation" */
86 struct timespec replenish_at; /**< Time of next replenishment */
88 /**< endpoint bounded to this vres */
89 struct fwp_endpoint *epoint;
90 pthread_t tx_thread; /**< tx_thread id*/
91 pthread_attr_t tx_thread_attr;
92 /** Copy of bound enpoint's socket - used for future changes
93 * of vres parameters. */
95 /** Queue for messages to send */
96 struct fwp_msgq msg_queue;
100 struct fwp_vres_table {
101 unsigned int max_vres;
103 pthread_mutex_t lock;
106 /* Global variable - vres table */
107 static fwp_vres_table_t fwp_vres_table = {
110 .lock = PTHREAD_MUTEX_INITIALIZER,
113 /**< mapping priority to ac*/
114 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
115 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */
116 static const unsigned int ac_to_tos[4] = {224,160,96,64};
119 * Set access category (AC) to socket
121 * \param[in] sockd Socket descriptor
122 * \param[in] ac_id AC identifier
124 * \return On success returns zero.
125 * On error, negative error code is returned.
128 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
132 tos = ac_to_tos[ac_id];
133 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
134 FWP_ERROR("setsockopt: %s", strerror(errno));
141 static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
143 vres->flags |= (1 << flag);
146 static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
148 vres->flags &= ~(1 << flag);
151 static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
153 return !!(vres->flags & (1 << flag));
156 static inline void clear_all_flags(fwp_vres_t *vres)
161 static inline void fwp_vres_free(fwp_vres_t *vres)
163 /* Clear USED flag */
164 clear_all_flags(vres);
167 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
169 int id = vres - fwp_vres_table.entry;
171 if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
172 (!get_flag(vres, USED)))
178 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
180 if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
182 *vres = &fwp_vres_table.entry[vres_id];
187 int fwp_vres_table_init(unsigned int max_vres)
189 unsigned int table_size = max_vres * sizeof(fwp_vres_t);
191 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
192 if (!fwp_vres_table.entry)
193 return -1; /* Errno is set by malloc */
195 memset((void*) fwp_vres_table.entry, 0, table_size);
196 fwp_vres_table.max_vres = max_vres;
200 fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
202 return (vres - fwp_vres_table.entry);
208 * \return On success returns vres descriptor.
210 fwp_vres_t *fwp_vres_alloc()
213 unsigned int max_vres;
215 /* find free vres id */
216 pthread_mutex_lock(&fwp_vres_table.lock);
218 max_vres = fwp_vres_table.max_vres;
219 while ((i < max_vres) &&
220 (get_flag(&fwp_vres_table.entry[i], USED))) {
225 pthread_mutex_unlock(&fwp_vres_table.lock);
230 FWP_DEBUG("Allocated vres id = %d\n",i);
231 set_flag(&fwp_vres_table.entry[i], USED);
232 pthread_mutex_unlock(&fwp_vres_table.lock);
233 return (&fwp_vres_table.entry[i]);
236 static int apply_params(fwp_vres_t *vres)
239 vres->period = vres->params.period;
240 vres->budget = vres->params.budget;
241 set_flag(vres, UNTOUCHED);
242 if (get_flag(vres, CHANGED)) {
243 clear_flag(vres, CHANGED);
244 rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
252 * \param[in] vresp Vres descriptor
253 * \param[in] params Vres parameters
255 * \return On success returns zero.
256 * On error, negative error code is returned.
259 int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
263 if (!fwp_vres_is_valid(vres)) {
268 pthread_mutex_lock(&vres->mutex);
271 params->src.s_addr != vres->params.src.s_addr) {
276 vres->params = *params;
278 set_flag(vres, CHANGED);
279 if (get_flag(vres, UNTOUCHED))
280 rv = apply_params(vres);
283 pthread_mutex_unlock(&vres->mutex);
290 * \param[in] params Vres parameters
291 * \param[out] vresp Pointer to the descriptor of newly created vres
293 * \return On success returns descriptor of vres.
294 * On error, negative error code is returned.
297 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
302 vres = fwp_vres_alloc();
308 pthread_mutexattr_t ma;
309 rv = pthread_mutexattr_init(&ma);
310 rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
312 pthread_mutex_init(&vres->mutex, &ma);
313 pthread_cond_init(&vres->cond, NULL);
315 vres->params = *params;
317 fwp_msgq_init(&vres->msg_queue);
319 pthread_attr_init(&vres->tx_thread_attr);
320 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
321 fwp_vres_tx_thread, (void*) vres)) != 0){
329 fwp_msgq_dequeue_all(&vres->msg_queue);
337 * \param[in] vres Vres descriptor
339 * \return On success returns 0.
340 * On error, negative error code is returned.
343 int fwp_vres_destroy(fwp_vres_t *vres)
345 if (!fwp_vres_is_valid(vres)) {
350 pthread_cancel(vres->tx_thread);
351 pthread_cond_destroy(&vres->cond);
352 pthread_mutex_destroy(&vres->mutex);
354 fwp_msgq_dequeue_all(&vres->msg_queue);
357 FWP_DEBUG("Vres destroyed.\n");
361 static void do_consume_budget(struct fwp_vres *vres, size_t size)
363 if (get_flag(vres, UNTOUCHED)) {
364 /* Setup next replenish time */
366 clear_flag(vres, UNTOUCHED);
367 if (get_flag(vres, QUEUED))
368 now = vres->replenish_at;
370 clock_gettime(CLOCK_MONOTONIC, &now);
371 fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
372 sem_post(&vres->consumed);
374 vres->budget -= size;
377 int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
380 if (vres->params.budget < size) {
384 while (can_block && vres->budget < size) {
385 ret = pthread_cond_wait(&vres->cond, &vres->mutex);
386 /* The budget might have been changed while we were
387 * waiting, so check it again. */
388 if (vres->params.budget < size) {
394 if (vres->budget >= size) {
395 do_consume_budget(vres, size);
398 set_flag(vres, QUEUED);
406 * Tries to consume (a part of) budget
408 * @param vres VRES whose budget is conumed.
409 * @param size How much to consume (in bytes).
410 * @param can_block True, indicates that the function can block to
411 * wait for budget replenishment. False causes no blocking and if 1 is
412 * returned, the calles must call fwp_vres_enqueue() to enqueue the
413 * packet to be sent later after replenishing.
415 * @return Zero if budget was consumed, 1 if there is not enough
416 * budget available and blocking was not allowed, -1 in case of error.
418 int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
421 pthread_mutex_lock(&vres->mutex);
422 ret = __consume_budget(vres, size, can_block);
423 pthread_mutex_unlock(&vres->mutex);
427 int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size)
429 struct fwp_msgb *msgb;
432 if (!(msgb = fwp_msgb_alloc(size)))
434 memcpy(msgb->data, msg, size);
435 fwp_msgb_put(msgb, size);
436 ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
444 static void wait_for_replenish(struct fwp_vres *vres)
446 sem_wait(&vres->consumed);
447 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
448 &vres->replenish_at, NULL);
451 static void send_queue(struct fwp_vres *vres)
453 struct fwp_msgb *msgb;
455 msgb = fwp_msgq_dequeue(&vres->msg_queue);
456 can_send = (0 == __consume_budget(vres, msgb->len, false));
459 FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
462 /* If we cannot send the whole queue, the flag will be set
463 * later by __consume_budget(). */
464 clear_flag(vres, QUEUED);
467 fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
469 msgb = fwp_msgq_peek(&vres->msg_queue);
471 can_send = (0 == __consume_budget(vres, msgb->len, false));
473 msgb = fwp_msgq_dequeue(&vres->msg_queue);
482 static void replenish(struct fwp_vres *vres)
484 pthread_mutex_lock(&vres->mutex);
486 if (get_flag(vres, QUEUED))
488 pthread_cond_broadcast(&vres->cond);
489 pthread_mutex_unlock(&vres->mutex);
493 * Thread that does budgeting
496 static void* fwp_vres_tx_thread(void *_vres)
498 struct fwp_vres *vres = (struct fwp_vres*)_vres;
499 unsigned int ac_id = vres->params.ac_id;
501 fwp_set_rt_prio(90 - ac_id);
502 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
503 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
506 wait_for_replenish(vres);
507 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
509 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
515 /*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
516 int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd)
520 if (!fwp_vres_is_valid(vres)) {
526 if (vres->epoint) { /*if already bounded */
532 vres->ac_sockd = sockd;
533 rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
541 int fwp_vres_unbind(fwp_vres_t *vres)
543 if (!fwp_vres_is_valid(vres)) {
547 pthread_mutex_lock(&vres->mutex);
549 pthread_mutex_unlock(&vres->mutex);
550 /* TODO: consider what to do with pending messages */
551 fwp_msgq_dequeue_all(&vres->msg_queue);