]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_vres.c
Allow bypassing FWP by setting FWP_BYPASS environment variable
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2009 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FWP (Frescor WLAN Protocol)                      */
26 /*                                                                        */
27 /* FWP is free software; you can redistribute it and/or modify it         */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FWP is distributed in the hope that it will be         */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FWP; see file        */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FWP header files in a file,          */
39 /* instantiating FWP generics or templates, or linking other files        */
40 /* with FWP objects to produce an executable application, does not        */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "fwp_utils.h"
48 #include "fwp_vres.h"
49
50 #include "fwp_msgq.h"
51 #include "fwp_endpoint.h"
52 #include "fwp_debug.h"
53
54 #include <string.h>
55 #include <errno.h>
56 #include <stdlib.h>
57
58 static void* fwp_vres_tx_thread(void *_vres);
59
60 typedef enum {
61         UNTOUCHED,
62         CHANGED,
63         QUEUED,
64 } fwp_vres_flag_t;
65
66 fwp_vres_params_t fwp_vres_params_default = {
67         .ac_id = FWP_AC_VO,
68         .budget = 100,
69         .period = {.tv_sec = 2 , .tv_nsec = 111111},
70         .src = { 0 },
71 };
72
73 /**
74  * Structure of FWP vres.
75  * Internal representation of vres
76  * 
77  */
78 struct fwp_vres{
79         struct fwp_vres_params          params;
80         fwp_vres_flag_t                 flags;
81         pthread_mutex_t                 mutex;
82         pthread_cond_t                  cond; /**< Signalizes budget replenishment */
83         fwp_budget_t                    budget; /**< Current remaining budget */
84         fwp_period_t                    period; /**< Period for this "activation" */
85         struct timespec                 replenish_at; /**< Time of next replenishment */
86         sem_t                           consumed;
87         /**< endpoint bounded to this vres */
88         struct fwp_endpoint             *epoint;
89         pthread_t                       tx_thread; /**< tx_thread id*/
90         pthread_attr_t                  tx_thread_attr;
91         /** Copy of bound enpoint's socket - used for future changes
92          * of vres parameters. */
93         int                             ac_sockd;
94         /** Queue for messages to send */
95         struct fwp_msgq                 msg_queue;
96         /** If true, it is always allowed to send messages through this vres. */
97         bool                            bypass;
98 };
99
100 /**< mapping priority to ac*/
101 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
102 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
103 static const unsigned int ac_to_tos[4] = {224,160,96,64};
104
105 /**
106  * Set access category (AC) to socket
107  *
108  * \param[in] sockd Socket descriptor
109  * \param[in] ac_id AC identifier
110  * 
111  * \return On success returns zero. 
112  * On error, negative error code is returned. 
113  *
114  */
115 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
116 {
117         unsigned int tos;
118         
119         tos = ac_to_tos[ac_id];
120         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
121                 FWP_ERROR("setsockopt: %s", strerror(errno));
122                 return (-1);
123         }
124         
125         return 0;
126 }
127
128 static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
129 {
130         vres->flags |= (1 << flag);
131 }
132
133 static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
134 {
135         vres->flags &= ~(1 << flag);
136 }
137
138 static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
139 {
140         return !!(vres->flags & (1 << flag));
141 }
142
143 static inline void clear_all_flags(fwp_vres_t *vres)
144 {
145         vres->flags = 0;
146 }
147
148 static inline void fwp_vres_free(fwp_vres_t *vres)
149 {
150         free(vres);
151 }
152
153 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
154 {
155         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
156                 return -EINVAL;
157         *vres = &fwp_vres_table.entry[vres_id];
158         return 0;
159 }
160 */
161
162 /**
163  * Allocate vres
164  *
165  * \return On success returns vres descriptor. 
166  */
167 fwp_vres_t *fwp_vres_alloc()
168 {
169         fwp_vres_t *vres = malloc(sizeof(*vres));
170         if (vres) {
171                 memset(vres, 0, sizeof(*vres));
172                 FWP_DEBUG("Allocated vres\n");
173         }
174         return vres;
175 }
176
177 static int apply_params(fwp_vres_t *vres)
178 {
179         int rv = 0;
180         vres->period = vres->params.period;
181         vres->budget = vres->params.budget;
182         set_flag(vres, UNTOUCHED);
183         if (get_flag(vres, CHANGED)) {
184                 clear_flag(vres, CHANGED);
185                 rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
186         }
187         return rv;
188 }
189
190 /**
191  * Set vres params
192  *
193  * \param[in] vresp Vres descriptor
194  * \param[in] params Vres parameters
195  *
196  * \return On success returns zero. 
197  * On error, negative error code is returned. 
198  *
199  */
200 int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
201 {
202         int rv = 0;
203
204         if (!vres) {
205                 errno = EINVAL;
206                 return -1;
207         }
208
209         pthread_mutex_lock(&vres->mutex);
210
211         if (vres->epoint &&
212             params->src.s_addr != vres->params.src.s_addr) {
213                 errno = EREMCHG;
214                 rv = -1;
215                 goto out;
216         }
217         vres->params = *params;
218         if (vres->epoint) {
219                 set_flag(vres, CHANGED);
220                 if (get_flag(vres, UNTOUCHED))
221                         rv = apply_params(vres);
222         }
223 out:
224         pthread_mutex_unlock(&vres->mutex);
225         return rv;
226 }
227
228 /**
229  * Creates new vres
230  *
231  * \param[in] params Vres parameters
232  * \param[out] vresp Pointer to the descriptor of newly created vres
233  *
234  * \return Zero on success, -1 on error and errno is set
235  * appropriately.
236  */
237 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
238 {
239         int rv;
240         fwp_vres_t *vres;
241         
242         vres = fwp_vres_alloc();
243         if (!vres) {
244                 errno = ENOMEM;
245                 return -1;
246         }
247
248         pthread_mutexattr_t ma;
249         rv = pthread_mutexattr_init(&ma);
250         rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
251         if (rv) return rv;
252         pthread_mutex_init(&vres->mutex, &ma);
253         pthread_cond_init(&vres->cond, NULL);
254         
255         vres->params = *params;
256         apply_params(vres);
257         fwp_msgq_init(&vres->msg_queue);
258
259         if (getenv("FWP_BYPASS"))
260                 vres->bypass = true;
261
262         pthread_attr_init(&vres->tx_thread_attr);
263         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
264                             fwp_vres_tx_thread, (void*) vres)) != 0){
265                 goto err;
266         }
267
268         *vresp = vres;
269         
270         return 0;
271 err:    
272         fwp_msgq_dequeue_all(&vres->msg_queue);
273         fwp_vres_free(vres);
274         return -1; 
275 }
276
277 /**
278  * Destroys vres
279  *
280  * \param[in] vres Vres descriptor
281  *
282  * \return Zero on success, -1 on error and errno is set
283  * appropriately.
284  */
285 int fwp_vres_destroy(fwp_vres_t *vres)
286 {       
287         if (!vres) {
288                 errno = EINVAL;
289                 return -1;
290         }
291         
292         pthread_cancel(vres->tx_thread);
293         pthread_cond_destroy(&vres->cond);
294         pthread_mutex_destroy(&vres->mutex);
295                 
296         fwp_msgq_dequeue_all(&vres->msg_queue);
297         fwp_vres_free(vres);
298         
299         FWP_DEBUG("Vres destroyed.\n");
300         return  0;
301 }
302
303 static void do_consume_budget(struct fwp_vres *vres, size_t size)
304 {
305         if (get_flag(vres, UNTOUCHED)) {
306                 /* Setup next replenish time */
307                 struct timespec now;
308                 clear_flag(vres, UNTOUCHED);
309                 if (get_flag(vres, QUEUED))
310                         now = vres->replenish_at;
311                 else
312                         clock_gettime(CLOCK_MONOTONIC, &now);
313                 fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
314                 sem_post(&vres->consumed);
315         }
316         vres->budget -= size;
317 }
318
319 int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
320 {
321         int ret = 0;
322         if (vres->bypass)
323                 return 0;
324         if (vres->params.budget < size) {
325                 errno = ENOSR;
326                 return -1;
327         }
328         while (can_block && vres->budget < size) {
329                 ret = pthread_cond_wait(&vres->cond, &vres->mutex);
330                 /* The budget might have been changed while we were
331                  * waiting, so check it again. */
332                 if (vres->params.budget < size) {
333                         errno = ENOSR;
334                         return -1;
335                 }
336         }
337         if (ret == 0) {
338                 if (vres->budget >= size) {
339                         do_consume_budget(vres, size);
340                         ret = 0;
341                 } else {
342                         set_flag(vres, QUEUED);
343                         ret = 1;
344                 }
345         }
346         return ret;
347 }
348
349 /** 
350  * Tries to consume (a part of) budget
351  * 
352  * @param vres VRES whose budget is conumed.
353  * @param size How much to consume (in bytes).
354  * @param can_block True, indicates that the function can block to
355  * wait for budget replenishment. False causes no blocking and if 1 is
356  * returned, the calles must call fwp_vres_enqueue() to enqueue the
357  * packet to be sent later after replenishing.
358  * 
359  * @return Zero if budget was consumed, 1 if there is not enough
360  * budget available and blocking was not allowed, -1 in case of error.
361  */
362 int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
363 {
364         int ret = 0;
365         pthread_mutex_lock(&vres->mutex);
366         ret = __consume_budget(vres, size, can_block);
367         pthread_mutex_unlock(&vres->mutex);
368         return ret;
369 }
370
371 int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep,
372                      const void *msg, size_t size)
373 {
374         struct fwp_msgb *msgb;
375         int ret;
376         
377         if (!(msgb = fwp_msgb_alloc(size)))
378                 return -1;
379         memcpy(msgb->data, msg, size);
380         fwp_msgb_put(msgb, size);
381         ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
382         if (ret) {
383                 fwp_msgb_free(msgb);
384                 return ret;
385         }
386         return ret;
387 }
388
389 static void wait_for_replenish(struct fwp_vres *vres)
390 {
391         sem_wait(&vres->consumed);
392         clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
393                         &vres->replenish_at, NULL);
394 }
395
396 static void send_queue(struct fwp_vres *vres)
397 {
398         struct fwp_msgb *msgb;
399         bool can_send;
400         msgb = fwp_msgq_dequeue(&vres->msg_queue);
401         can_send = (0 == __consume_budget(vres, msgb->len, false));
402         if (!can_send) {
403                 fwp_msgb_free(msgb);
404                 FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
405                 return;
406         }
407         /* If we cannot send the whole queue, the flag will be set
408          * later by __consume_budget(). */
409         clear_flag(vres, QUEUED);
410
411         while (msgb) {
412                 fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
413                 fwp_msgb_free(msgb);
414                 msgb = fwp_msgq_peek(&vres->msg_queue);
415                 if (msgb) {
416                         can_send = (0 == __consume_budget(vres, msgb->len, false));
417                         if (can_send) {
418                                 msgb = fwp_msgq_dequeue(&vres->msg_queue);
419                         } else {
420                                 msgb = NULL;
421                                 return;
422                         }
423                 }
424         }
425 }
426
427 static void replenish(struct fwp_vres *vres)
428 {
429         pthread_mutex_lock(&vres->mutex);
430         apply_params(vres);
431         if (get_flag(vres, QUEUED))
432                 send_queue(vres);
433         pthread_cond_broadcast(&vres->cond);
434         pthread_mutex_unlock(&vres->mutex);
435 }
436
437 /**
438  * Thread that does budgeting
439  *
440  */
441 static void* fwp_vres_tx_thread(void *_vres)
442 {
443         struct fwp_vres *vres = (struct fwp_vres*)_vres;
444         unsigned int    ac_id = vres->params.ac_id;
445
446         fwp_set_rt_prio(90 - ac_id);
447         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
448         pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
449         
450         while (1) {
451                 wait_for_replenish(vres);
452                 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
453                 replenish(vres);
454                 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
455         }
456
457         return NULL;
458 }
459
460 /*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
461 int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd, struct in_addr  *src)
462 {
463         int rv = 0;
464
465         if (!vres) {
466                 errno = EINVAL;
467                 rv = -1;
468                 goto err;
469         }
470         
471         if (vres->epoint) { /*if already bounded */
472                 errno = EBUSY;
473                 rv = -1;
474                 goto err;
475         }
476
477         vres->ac_sockd = sockd;
478         *src = vres->params.src;
479         rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
480         if (rv)
481                 goto err;
482         vres->epoint = ep;
483 err:
484         return rv;
485 }
486
487 int fwp_vres_unbind(fwp_vres_t *vres)
488 {
489         if (!vres) {
490                 errno = EINVAL;
491                 return -1;
492         }
493         pthread_mutex_lock(&vres->mutex);
494         vres->epoint = NULL;
495         pthread_mutex_unlock(&vres->mutex);
496         /* TODO: consider what to do with pending messages */
497         fwp_msgq_dequeue_all(&vres->msg_queue);
498         return 0;
499 }