]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_vres.c
Implemented synchronous and asynchronous sending
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2009 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FWP (Frescor WLAN Protocol)                      */
26 /*                                                                        */
27 /* FWP is free software; you can redistribute it and/or modify it         */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FWP is distributed in the hope that it will be         */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FWP; see file        */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FWP header files in a file,          */
39 /* instantiating FWP generics or templates, or linking other files        */
40 /* with FWP objects to produce an executable application, does not        */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "fwp_utils.h"
48 #include "fwp_vres.h"
49
50 #include "fwp_msgq.h"
51 #include "fwp_endpoint.h"
52 #include "fwp_debug.h"
53
54 #include <string.h>
55 #include <errno.h>
56 #include <stdlib.h>
57
58 static void* fwp_vres_tx_thread(void *_vres);
59
60 typedef enum {
61         USED,
62         UNTOUCHED,
63         CHANGED,
64         QUEUED,
65 } fwp_vres_flag_t;
66
67 fwp_vres_params_t fwp_vres_params_default = {
68         .ac_id = FWP_AC_VO,
69         .budget = 100,
70         .period = {.tv_sec = 2 , .tv_nsec = 111111},
71         .src = { 0 },
72 };
73
74 /**
75  * Structure of FWP vres.
76  * Internal representation of vres
77  * 
78  */
79 struct fwp_vres{
80         struct fwp_vres_params          params;
81         fwp_vres_flag_t                 flags;
82         pthread_mutex_t                 mutex;
83         pthread_cond_t                  cond; /**< Signalizes budget replenishment */
84         fwp_budget_t                    budget; /**< Current remaining budget */
85         fwp_period_t                    period; /**< Period for this "activation" */
86         struct timespec                 replenish_at; /**< Time of next replenishment */
87         sem_t                           consumed;
88         /**< endpoint bounded to this vres */
89         struct fwp_endpoint             *epoint;
90         pthread_t                       tx_thread; /**< tx_thread id*/
91         pthread_attr_t                  tx_thread_attr;
92         /** Copy of bound enpoint's socket - used for future changes
93          * of vres parameters. */
94         int                             ac_sockd;
95         /** Queue for messages to send */
96         struct fwp_msgq                 msg_queue;   
97 };
98
99 typedef
100 struct fwp_vres_table {
101         unsigned int                    max_vres; 
102         fwp_vres_t                      *entry;
103         pthread_mutex_t                 lock;
104 } fwp_vres_table_t;
105
106 /* Global variable - vres table */
107 static fwp_vres_table_t  fwp_vres_table = {
108         .max_vres = 0,
109         .entry = NULL,
110         .lock = PTHREAD_MUTEX_INITIALIZER,
111 };
112
113 /**< mapping priority to ac*/
114 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
115 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
116 static const unsigned int ac_to_tos[4] = {224,160,96,64};
117
118 /**
119  * Set access category (AC) to socket
120  *
121  * \param[in] sockd Socket descriptor
122  * \param[in] ac_id AC identifier
123  * 
124  * \return On success returns zero. 
125  * On error, negative error code is returned. 
126  *
127  */
128 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
129 {
130         unsigned int tos;
131         
132         tos = ac_to_tos[ac_id];
133         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
134                 FWP_ERROR("setsockopt: %s", strerror(errno));
135                 return (-1);
136         }
137         
138         return 0;
139 }
140
141 static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
142 {
143         vres->flags |= (1 << flag);
144 }
145
146 static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
147 {
148         vres->flags &= ~(1 << flag);
149 }
150
151 static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
152 {
153         return !!(vres->flags & (1 << flag));
154 }
155
156 static inline void clear_all_flags(fwp_vres_t *vres)
157 {
158         vres->flags = 0;
159 }
160
161 static inline void fwp_vres_free(fwp_vres_t *vres)
162 {
163         /* Clear USED flag */
164         clear_all_flags(vres);
165 }
166
167 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
168 {
169         int id  = vres - fwp_vres_table.entry;
170
171         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || 
172                 (!get_flag(vres, USED))) 
173                 return 0;
174         
175         return 1; 
176 }
177
178 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
179 {
180         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
181                 return -EINVAL;
182         *vres = &fwp_vres_table.entry[vres_id];
183         return 0;
184 }
185 */
186
187 int fwp_vres_table_init(unsigned int max_vres)
188 {
189         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
190
191         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
192         if (!fwp_vres_table.entry) 
193                 return -1;      /* Errno is set by malloc */
194
195         memset((void*) fwp_vres_table.entry, 0, table_size);
196         fwp_vres_table.max_vres = max_vres;
197         return 0;
198 }
199
200 fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
201 {
202         return (vres - fwp_vres_table.entry);
203 }
204
205 /**
206  * Allocate vres
207  *
208  * \return On success returns vres descriptor. 
209  */
210 fwp_vres_t *fwp_vres_alloc()
211 {
212         int i;
213         unsigned int max_vres;
214
215         /* find free vres id */
216         pthread_mutex_lock(&fwp_vres_table.lock);
217         i = 0;
218         max_vres = fwp_vres_table.max_vres;
219         while ((i < max_vres) && 
220                 (get_flag(&fwp_vres_table.entry[i], USED))) {
221                 i++;
222         }
223         
224         if (i == max_vres) {
225                 pthread_mutex_unlock(&fwp_vres_table.lock);
226                 errno = ENOBUFS;
227                 return NULL;
228         }
229
230         FWP_DEBUG("Allocated vres id = %d\n",i);
231         set_flag(&fwp_vres_table.entry[i], USED);
232         pthread_mutex_unlock(&fwp_vres_table.lock);
233         return (&fwp_vres_table.entry[i]);
234 }
235
236 static int apply_params(fwp_vres_t *vres)
237 {
238         int rv;
239         vres->period = vres->params.period;
240         vres->budget = vres->params.budget;
241         set_flag(vres, UNTOUCHED);
242         if (get_flag(vres, CHANGED)) {
243                 clear_flag(vres, CHANGED);
244                 rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
245         }
246         return rv;
247 }
248
249 /**
250  * Set vres params
251  *
252  * \param[in] vresp Vres descriptor
253  * \param[in] params Vres parameters
254  *
255  * \return On success returns zero. 
256  * On error, negative error code is returned. 
257  *
258  */
259 int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
260 {
261         int rv = 0;
262
263         if (!fwp_vres_is_valid(vres)) {
264                 errno = EINVAL;
265                 return -1;
266         }
267
268         pthread_mutex_lock(&vres->mutex);
269
270         if (vres->epoint &&
271             params->src.s_addr != vres->params.src.s_addr) {
272                 errno = EREMCHG;
273                 rv = -1;
274                 goto out;
275         }
276         vres->params = *params;
277         if (vres->epoint) {
278                 set_flag(vres, CHANGED);
279                 if (get_flag(vres, UNTOUCHED))
280                         rv = apply_params(vres);
281         }
282 out:
283         pthread_mutex_unlock(&vres->mutex);
284         return rv;
285 }
286
287 /**
288  * Creates new vres
289  *
290  * \param[in] params Vres parameters
291  * \param[out] vresp Pointer to the descriptor of newly created vres
292  *
293  * \return On success returns descriptor of vres. 
294  * On error, negative error code is returned. 
295  *
296  */
297 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
298 {
299         int rv;
300         fwp_vres_t *vres;
301         
302         vres = fwp_vres_alloc();
303         if (!vres) {
304                 errno = ENOMEM;
305                 return -1;
306         }
307
308         pthread_mutexattr_t ma;
309         rv = pthread_mutexattr_init(&ma);
310         rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
311         if (rv) return rv;
312         pthread_mutex_init(&vres->mutex, &ma);
313         pthread_cond_init(&vres->cond, NULL);
314         
315         vres->params = *params;
316         apply_params(vres);
317         fwp_msgq_init(&vres->msg_queue);
318
319         pthread_attr_init(&vres->tx_thread_attr);
320         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
321                             fwp_vres_tx_thread, (void*) vres)) != 0){
322                 goto err;
323         }
324
325         *vresp = vres;
326         
327         return 0;
328 err:    
329         fwp_msgq_dequeue_all(&vres->msg_queue);
330         fwp_vres_free(vres);
331         return -1; 
332 }
333
334 /**
335  * Destroys vres
336  *
337  * \param[in] vres Vres descriptor
338  *
339  * \return On success returns 0. 
340  * On error, negative error code is returned. 
341  *
342  */
343 int fwp_vres_destroy(fwp_vres_t *vres)
344 {       
345         if (!fwp_vres_is_valid(vres)) {
346                 errno = EINVAL;
347                 return -1;
348         }
349         
350         pthread_cancel(vres->tx_thread);
351         pthread_cond_destroy(&vres->cond);
352         pthread_mutex_destroy(&vres->mutex);
353                 
354         fwp_msgq_dequeue_all(&vres->msg_queue);
355         fwp_vres_free(vres);
356         
357         FWP_DEBUG("Vres destroyed.\n");
358         return  0;
359 }
360
361 static void do_consume_budget(struct fwp_vres *vres, size_t size)
362 {
363         if (get_flag(vres, UNTOUCHED)) {
364                 /* Setup next replenish time */
365                 struct timespec now;
366                 clear_flag(vres, UNTOUCHED);
367                 if (get_flag(vres, QUEUED))
368                         now = vres->replenish_at;
369                 else
370                         clock_gettime(CLOCK_MONOTONIC, &now);
371                 fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
372                 sem_post(&vres->consumed);
373         }
374         vres->budget -= size;
375 }
376
377 int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
378 {
379         int ret = 0;
380         if (vres->params.budget < size) {
381                 errno = ENOSR;
382                 return -1;
383         }
384         while (can_block && vres->budget < size) {
385                 ret = pthread_cond_wait(&vres->cond, &vres->mutex);
386                 /* The budget might have been changed while we were
387                  * waiting, so check it again. */
388                 if (vres->params.budget < size) {
389                         errno = ENOSR;
390                         return -1;
391                 }
392         }
393         if (ret == 0) {
394                 if (vres->budget >= size) {
395                         do_consume_budget(vres, size);
396                         ret = 0;
397                 } else {
398                         set_flag(vres, QUEUED);
399                         ret = 1;
400                 }
401         }
402         return ret;
403 }
404
405 /** 
406  * Tries to consume (a part of) budget
407  * 
408  * @param vres VRES whose budget is conumed.
409  * @param size How much to consume (in bytes).
410  * @param can_block True, indicates that the function can block to
411  * wait for budget replenishment. False causes no blocking and if 1 is
412  * returned, the calles must call fwp_vres_enqueue() to enqueue the
413  * packet to be sent later after replenishing.
414  * 
415  * @return Zero if budget was consumed, 1 if there is not enough
416  * budget available and blocking was not allowed, -1 in case of error.
417  */
418 int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
419 {
420         int ret = 0;
421         pthread_mutex_lock(&vres->mutex);
422         ret = __consume_budget(vres, size, can_block);
423         pthread_mutex_unlock(&vres->mutex);
424         return ret;
425 }
426
427 int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size)
428 {
429         struct fwp_msgb *msgb;
430         int ret;
431         
432         if (!(msgb = fwp_msgb_alloc(size)))
433                 return -1;
434         memcpy(msgb->data, msg, size);
435         fwp_msgb_put(msgb, size);
436         ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
437         if (ret) {
438                 fwp_msgb_free(msgb);
439                 return ret;
440         }
441         return ret;
442 }
443
444 static void wait_for_replenish(struct fwp_vres *vres)
445 {
446         sem_wait(&vres->consumed);
447         clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
448                         &vres->replenish_at, NULL);
449 }
450
451 static void send_queue(struct fwp_vres *vres)
452 {
453         struct fwp_msgb *msgb;
454         bool can_send;
455         msgb = fwp_msgq_dequeue(&vres->msg_queue);
456         can_send = (0 == __consume_budget(vres, msgb->len, false));
457         if (!can_send) {
458                 fwp_msgb_free(msgb);
459                 FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
460                 return;
461         }
462         /* If we cannot send the whole queue, the flag will be set
463          * later by __consume_budget(). */
464         clear_flag(vres, QUEUED);
465
466         while (msgb) {
467                 fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
468                 fwp_msgb_free(msgb);
469                 msgb = fwp_msgq_peek(&vres->msg_queue);
470                 if (msgb) {
471                         can_send = (0 == __consume_budget(vres, msgb->len, false));
472                         if (can_send) {
473                                 msgb = fwp_msgq_dequeue(&vres->msg_queue);
474                         } else {
475                                 msgb = NULL;
476                                 return;
477                         }
478                 }
479         }
480 }
481
482 static void replenish(struct fwp_vres *vres)
483 {
484         pthread_mutex_lock(&vres->mutex);
485         apply_params(vres);
486         if (get_flag(vres, QUEUED))
487                 send_queue(vres);
488         pthread_cond_broadcast(&vres->cond);
489         pthread_mutex_unlock(&vres->mutex);
490 }
491
492 /**
493  * Thread that does budgeting
494  *
495  */
496 static void* fwp_vres_tx_thread(void *_vres)
497 {
498         struct fwp_vres *vres = (struct fwp_vres*)_vres;
499         unsigned int    ac_id = vres->params.ac_id;
500
501         fwp_set_rt_prio(90 - ac_id);
502         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
503         pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
504         
505         while (1) {
506                 wait_for_replenish(vres);
507                 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
508                 replenish(vres);
509                 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
510         }
511
512         return NULL;
513 }
514
515 /*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
516 int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd)
517 {
518         int rv = 0;
519
520         if (!fwp_vres_is_valid(vres)) {
521                 errno = EINVAL;
522                 rv = -1;
523                 goto err;
524         }
525         
526         if (vres->epoint) { /*if already bounded */
527                 errno = EBUSY;
528                 rv = -1;
529                 goto err;
530         }
531
532         vres->ac_sockd = sockd;
533         rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
534         if (rv)
535                 goto err;
536         vres->epoint = ep;
537 err:
538         return rv;
539 }
540
541 int fwp_vres_unbind(fwp_vres_t *vres)
542 {
543         if (!fwp_vres_is_valid(vres)) {
544                 errno = EINVAL;
545                 return -1;
546         }
547         pthread_mutex_lock(&vres->mutex);
548         vres->epoint = NULL;
549         pthread_mutex_unlock(&vres->mutex);
550         /* TODO: consider what to do with pending messages */
551         fwp_msgq_dequeue_all(&vres->msg_queue);
552         return 0;
553 }