1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FWP (Frescor WLAN Protocol) */
27 /* FWP is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FWP is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FWP; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FWP header files in a file, */
39 /* instantiating FWP generics or templates, or linking other files */
40 /* with FWP objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
46 #include "fwp_utils.h"
50 #include "fwp_endpoint.h"
51 #include "fwp_debug.h"
57 static void* fwp_vres_tx_thread(void *_vres);
65 fwp_vres_params_t fwp_vres_params_default = {
69 .period = {.tv_sec = 2 , .tv_nsec = 111111}
73 * Structure of FWP vres.
74 * Internal representation of vres
78 struct fwp_vres_params params;
79 /* consideration: move tx_queue to endpoint */
80 /**< queue for messages to send */
81 struct fwp_msgq tx_queue;
82 fwp_vres_flag_t flags;
83 /**< endpoint bounded to this vres */
84 /*fwp_endpoint_t *epoint; */
85 pthread_t tx_thread; /**< tx_thread id*/
86 pthread_attr_t tx_thread_attr;
87 int ac_sockd; /**< ac socket descriptor */
91 struct fwp_vres_table {
92 unsigned int max_vres;
97 /* Global variable - vres table */
98 static fwp_vres_table_t fwp_vres_table = {
101 .lock = PTHREAD_MUTEX_INITIALIZER,
104 /**< mapping priority to ac*/
105 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
106 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */
107 static const unsigned int ac_to_tos[4] = {224,160,96,64};
110 * Set access category (AC) to socket
112 * \param[in] sockd Socket descriptor
113 * \param[in] ac_id AC identifier
115 * \return On success returns zero.
116 * On error, negative error code is returned.
119 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
123 tos = ac_to_tos[ac_id];
124 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
125 FWP_ERROR("setsockopt: %s", strerror(errno));
132 static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
134 vres->flags |= (1 << flag);
137 static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
139 vres->flags &= ~(1 << flag);
142 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
144 return !!(vres->flags & (1 << flag));
147 static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
154 static int fwp_vres_ac_open(fwp_ac_t ac_id)
159 if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
164 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
165 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
169 tos = ac_to_tos[ac_id];
171 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
173 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
183 static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
186 struct msghdr msg = {0};
188 char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
190 iov.iov_base = msgb->data;
191 iov.iov_len = msgb->len;
198 if (vres->params.src.s_addr != 0) {
199 struct cmsghdr *cmsg;
200 struct in_pktinfo *ipi;
202 memset(cmsg_buf, 0, sizeof(cmsg_buf));
204 msg.msg_control = cmsg_buf;
205 msg.msg_controllen = sizeof(cmsg_buf);
207 cmsg = CMSG_FIRSTHDR(&msg);
209 cmsg->cmsg_level = SOL_IP;
210 cmsg->cmsg_type = IP_PKTINFO;
211 cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
213 ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
214 ipi->ipi_spec_dst = vres->params.src;
216 ret = sendmsg(vres->ac_sockd, &msg, 0);
220 static inline void fwp_vres_free(fwp_vres_t *vres)
222 fwp_vres_clearall_flag(vres);
225 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
227 int id = vres - fwp_vres_table.entry;
229 if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
230 (!fwp_vres_get_flag(vres, FWP_VF_USED)))
236 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
238 if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
240 *vres = &fwp_vres_table.entry[vres_id];
245 int fwp_vres_table_init(unsigned int max_vres)
247 unsigned int table_size = max_vres * sizeof(fwp_vres_t);
249 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
250 if (!fwp_vres_table.entry)
251 return -1; /* Errno is set by malloc */
253 memset((void*) fwp_vres_table.entry, 0, table_size);
254 fwp_vres_table.max_vres = max_vres;
258 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
260 fwp_vres_t *vres = vresd;
262 return (vres - fwp_vres_table.entry);
268 * \return On success returns vres descriptor.
270 fwp_vres_d_t fwp_vres_alloc()
273 unsigned int max_vres;
275 /* find free vres id */
276 pthread_mutex_lock(&fwp_vres_table.lock);
278 max_vres = fwp_vres_table.max_vres;
279 while ((i < max_vres) &&
280 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
285 pthread_mutex_unlock(&fwp_vres_table.lock);
290 FWP_DEBUG("Allocated vres id = %d\n",i);
291 fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
292 pthread_mutex_unlock(&fwp_vres_table.lock);
293 return (&fwp_vres_table.entry[i]);
296 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
300 /* copy vres paramters into vres structure */
301 rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
304 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
305 fwp_vres_set_flag(vres, FWP_VF_CHANGED);
313 * \param[in] vresdp Vres descriptor
314 * \param[in] params Vres parameters
316 * \return On success returns zero.
317 * On error, negative error code is returned.
320 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
322 fwp_vres_t *vres = vresd;
324 if (!fwp_vres_is_valid(vres)) {
329 return _fwp_vres_set_params(vres, params);
335 * \param[in] params Vres parameters
336 * \param[out] vresdp Pointer to the descriptor of newly created vres
338 * \return On success returns descriptor of vres.
339 * On error, negative error code is returned.
342 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
347 vres = fwp_vres_alloc();
352 /* initialize msg queue */
353 fwp_msgq_init(&vres->tx_queue);
355 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
356 fwp_vres_set_flag(vres, FWP_VF_CHANGED);
357 pthread_attr_init(&vres->tx_thread_attr);
358 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
359 fwp_vres_tx_thread, (void*) vres)) != 0){
373 * \param[in] vresd Vres descriptor
375 * \return On success returns 0.
376 * On error, negative error code is returned.
379 int fwp_vres_destroy(fwp_vres_d_t vresd)
381 fwp_vres_t *vres = vresd;
383 if (!fwp_vres_is_valid(vres)) {
388 pthread_cancel(vres->tx_thread);
390 FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
394 static void fwp_vres_cleanup(void *_vres)
396 fwp_vres_t *vres = (fwp_vres_t*)_vres;
398 fwp_msgq_dequeue_all(&vres->tx_queue);
403 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
404 fwp_budget_t *budget)
406 if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {
407 /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
408 period->tv_sec = vres->params.period / SEC_TO_USEC;*/
409 *period = vres->params.period;
410 *budget = vres->params.budget;
411 FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
412 "period_nsec=%ld.\n",vres->params.budget,
413 period->tv_sec, period->tv_nsec);
414 fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
419 * Thread that does budgeting
422 static void* fwp_vres_tx_thread(void *_vres)
424 struct fwp_vres *vres = (struct fwp_vres*)_vres;
425 struct fwp_msgq *msgq = &vres->tx_queue;
426 struct fwp_msgb *msgb = NULL;
427 unsigned int ac_id = vres->params.ac_id;
428 fwp_budget_t budget = vres->params.budget;
429 fwp_budget_t curr_budget;
431 struct timespec start, period, interval, now;
433 fwp_set_rt_prio(90 - ac_id);
434 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
435 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
436 pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
439 fwp_vres_sched_update(vres, &period, &budget);
440 curr_budget = budget;
441 clock_gettime(CLOCK_MONOTONIC, &start);
444 msgb = fwp_msgq_dequeue(msgq);
445 if (msgb->len > budget) {
446 FWP_ERROR("Message too large: %zd -> skipping\n",
450 if (curr_budget < msgb->len) {
451 /* needs to replenish */
452 clock_gettime(CLOCK_MONOTONIC, &now);
453 fwp_timespec_sub(&interval, &now, &start);
454 ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
455 fwp_timespec_sub(&interval, &period, &interval);
456 if (interval.tv_sec > 0 ||
457 (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
458 /* We have to wait to replenish */
459 ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
460 nanosleep(&interval, NULL);
461 fwp_timespec_add(&start, &now, &interval);
465 fwp_vres_sched_update(vres, &period, &budget);
466 curr_budget = budget;
469 rc = _fwp_vres_send(vres, msgb);
471 FWP_DEBUG("Message sent through AC%d\n",ac_id);
472 curr_budget -= msgb->len;
474 FWP_ERROR("Message sent error %d\n",rc);
479 /*pthread_testcancel(); */
480 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
482 fwp_timespec_add(&end_period, &start_period, &period);
483 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
484 fwp_timespec_sub(&interval, &end_period, ¤t_time);
485 nanosleep(&interval, NULL);
489 /* it should normaly never come here */
490 pthread_cleanup_pop(0);
496 int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
498 fwp_vres_t *vres = vresd;
500 if (fwp_vres_is_valid(vres)) {
501 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
508 /*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
509 int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
511 fwp_vres_t *vres = vresd;
514 pthread_mutex_lock(&fwp_vres_table.lock);
515 if (!fwp_vres_is_valid(vres)) {
521 if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
527 vres->ac_sockd = sockd;
528 rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
531 fwp_vres_set_flag(vres, FWP_VF_BOUND);
533 pthread_mutex_unlock(&fwp_vres_table.lock);
537 int fwp_vres_unbind(fwp_vres_d_t vresd)
539 fwp_vres_t *vres = vresd;
541 if (!fwp_vres_is_valid(vresd)) {
545 fwp_vres_clear_flag(vres, FWP_VF_BOUND);
546 /* TODO: consider what to do with pending messages */
547 /* fwp_vres_free_msgb(vres->tx_queue); */