]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_vres.c
Properly initialize newly allocated VRES
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2009 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FWP (Frescor WLAN Protocol)                      */
26 /*                                                                        */
27 /* FWP is free software; you can redistribute it and/or modify it         */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FWP is distributed in the hope that it will be         */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FWP; see file        */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FWP header files in a file,          */
39 /* instantiating FWP generics or templates, or linking other files        */
40 /* with FWP objects to produce an executable application, does not        */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 #include "fwp_utils.h"
48 #include "fwp_vres.h"
49
50 #include "fwp_msgq.h"
51 #include "fwp_endpoint.h"
52 #include "fwp_debug.h"
53
54 #include <string.h>
55 #include <errno.h>
56 #include <stdlib.h>
57
58 static void* fwp_vres_tx_thread(void *_vres);
59
60 typedef enum {
61         UNTOUCHED,
62         CHANGED,
63         QUEUED,
64 } fwp_vres_flag_t;
65
66 fwp_vres_params_t fwp_vres_params_default = {
67         .ac_id = FWP_AC_VO,
68         .budget = 100,
69         .period = {.tv_sec = 2 , .tv_nsec = 111111},
70         .src = { 0 },
71 };
72
73 /**
74  * Structure of FWP vres.
75  * Internal representation of vres
76  * 
77  */
78 struct fwp_vres{
79         struct fwp_vres_params          params;
80         fwp_vres_flag_t                 flags;
81         pthread_mutex_t                 mutex;
82         pthread_cond_t                  cond; /**< Signalizes budget replenishment */
83         fwp_budget_t                    budget; /**< Current remaining budget */
84         fwp_period_t                    period; /**< Period for this "activation" */
85         struct timespec                 replenish_at; /**< Time of next replenishment */
86         sem_t                           consumed;
87         /**< endpoint bounded to this vres */
88         struct fwp_endpoint             *epoint;
89         pthread_t                       tx_thread; /**< tx_thread id*/
90         pthread_attr_t                  tx_thread_attr;
91         /** Copy of bound enpoint's socket - used for future changes
92          * of vres parameters. */
93         int                             ac_sockd;
94         /** Queue for messages to send */
95         struct fwp_msgq                 msg_queue;   
96 };
97
98 /**< mapping priority to ac*/
99 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
100 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
101 static const unsigned int ac_to_tos[4] = {224,160,96,64};
102
103 /**
104  * Set access category (AC) to socket
105  *
106  * \param[in] sockd Socket descriptor
107  * \param[in] ac_id AC identifier
108  * 
109  * \return On success returns zero. 
110  * On error, negative error code is returned. 
111  *
112  */
113 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
114 {
115         unsigned int tos;
116         
117         tos = ac_to_tos[ac_id];
118         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
119                 FWP_ERROR("setsockopt: %s", strerror(errno));
120                 return (-1);
121         }
122         
123         return 0;
124 }
125
126 static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
127 {
128         vres->flags |= (1 << flag);
129 }
130
131 static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
132 {
133         vres->flags &= ~(1 << flag);
134 }
135
136 static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
137 {
138         return !!(vres->flags & (1 << flag));
139 }
140
141 static inline void clear_all_flags(fwp_vres_t *vres)
142 {
143         vres->flags = 0;
144 }
145
146 static inline void fwp_vres_free(fwp_vres_t *vres)
147 {
148         free(vres);
149 }
150
151 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
152 {
153         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
154                 return -EINVAL;
155         *vres = &fwp_vres_table.entry[vres_id];
156         return 0;
157 }
158 */
159
160 /**
161  * Allocate vres
162  *
163  * \return On success returns vres descriptor. 
164  */
165 fwp_vres_t *fwp_vres_alloc()
166 {
167         fwp_vres_t *vres = malloc(sizeof(*vres));
168         if (vres) {
169                 memset(vres, 0, sizeof(*vres));
170                 FWP_DEBUG("Allocated vres\n");
171         }
172         return vres;
173 }
174
175 static int apply_params(fwp_vres_t *vres)
176 {
177         int rv = 0;
178         vres->period = vres->params.period;
179         vres->budget = vres->params.budget;
180         set_flag(vres, UNTOUCHED);
181         if (get_flag(vres, CHANGED)) {
182                 clear_flag(vres, CHANGED);
183                 rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
184         }
185         return rv;
186 }
187
188 /**
189  * Set vres params
190  *
191  * \param[in] vresp Vres descriptor
192  * \param[in] params Vres parameters
193  *
194  * \return On success returns zero. 
195  * On error, negative error code is returned. 
196  *
197  */
198 int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
199 {
200         int rv = 0;
201
202         if (!vres) {
203                 errno = EINVAL;
204                 return -1;
205         }
206
207         pthread_mutex_lock(&vres->mutex);
208
209         if (vres->epoint &&
210             params->src.s_addr != vres->params.src.s_addr) {
211                 errno = EREMCHG;
212                 rv = -1;
213                 goto out;
214         }
215         vres->params = *params;
216         if (vres->epoint) {
217                 set_flag(vres, CHANGED);
218                 if (get_flag(vres, UNTOUCHED))
219                         rv = apply_params(vres);
220         }
221 out:
222         pthread_mutex_unlock(&vres->mutex);
223         return rv;
224 }
225
226 /**
227  * Creates new vres
228  *
229  * \param[in] params Vres parameters
230  * \param[out] vresp Pointer to the descriptor of newly created vres
231  *
232  * \return Zero on success, -1 on error and errno is set
233  * appropriately.
234  */
235 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
236 {
237         int rv;
238         fwp_vres_t *vres;
239         
240         vres = fwp_vres_alloc();
241         if (!vres) {
242                 errno = ENOMEM;
243                 return -1;
244         }
245
246         pthread_mutexattr_t ma;
247         rv = pthread_mutexattr_init(&ma);
248         rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
249         if (rv) return rv;
250         pthread_mutex_init(&vres->mutex, &ma);
251         pthread_cond_init(&vres->cond, NULL);
252         
253         vres->params = *params;
254         apply_params(vres);
255         fwp_msgq_init(&vres->msg_queue);
256
257         pthread_attr_init(&vres->tx_thread_attr);
258         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
259                             fwp_vres_tx_thread, (void*) vres)) != 0){
260                 goto err;
261         }
262
263         *vresp = vres;
264         
265         return 0;
266 err:    
267         fwp_msgq_dequeue_all(&vres->msg_queue);
268         fwp_vres_free(vres);
269         return -1; 
270 }
271
272 /**
273  * Destroys vres
274  *
275  * \param[in] vres Vres descriptor
276  *
277  * \return Zero on success, -1 on error and errno is set
278  * appropriately.
279  */
280 int fwp_vres_destroy(fwp_vres_t *vres)
281 {       
282         if (!vres) {
283                 errno = EINVAL;
284                 return -1;
285         }
286         
287         pthread_cancel(vres->tx_thread);
288         pthread_cond_destroy(&vres->cond);
289         pthread_mutex_destroy(&vres->mutex);
290                 
291         fwp_msgq_dequeue_all(&vres->msg_queue);
292         fwp_vres_free(vres);
293         
294         FWP_DEBUG("Vres destroyed.\n");
295         return  0;
296 }
297
298 static void do_consume_budget(struct fwp_vres *vres, size_t size)
299 {
300         if (get_flag(vres, UNTOUCHED)) {
301                 /* Setup next replenish time */
302                 struct timespec now;
303                 clear_flag(vres, UNTOUCHED);
304                 if (get_flag(vres, QUEUED))
305                         now = vres->replenish_at;
306                 else
307                         clock_gettime(CLOCK_MONOTONIC, &now);
308                 fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
309                 sem_post(&vres->consumed);
310         }
311         vres->budget -= size;
312 }
313
314 int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
315 {
316         int ret = 0;
317         if (vres->params.budget < size) {
318                 errno = ENOSR;
319                 return -1;
320         }
321         while (can_block && vres->budget < size) {
322                 ret = pthread_cond_wait(&vres->cond, &vres->mutex);
323                 /* The budget might have been changed while we were
324                  * waiting, so check it again. */
325                 if (vres->params.budget < size) {
326                         errno = ENOSR;
327                         return -1;
328                 }
329         }
330         if (ret == 0) {
331                 if (vres->budget >= size) {
332                         do_consume_budget(vres, size);
333                         ret = 0;
334                 } else {
335                         set_flag(vres, QUEUED);
336                         ret = 1;
337                 }
338         }
339         return ret;
340 }
341
342 /** 
343  * Tries to consume (a part of) budget
344  * 
345  * @param vres VRES whose budget is conumed.
346  * @param size How much to consume (in bytes).
347  * @param can_block True, indicates that the function can block to
348  * wait for budget replenishment. False causes no blocking and if 1 is
349  * returned, the calles must call fwp_vres_enqueue() to enqueue the
350  * packet to be sent later after replenishing.
351  * 
352  * @return Zero if budget was consumed, 1 if there is not enough
353  * budget available and blocking was not allowed, -1 in case of error.
354  */
355 int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
356 {
357         int ret = 0;
358         pthread_mutex_lock(&vres->mutex);
359         ret = __consume_budget(vres, size, can_block);
360         pthread_mutex_unlock(&vres->mutex);
361         return ret;
362 }
363
364 int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep,
365                      const void *msg, size_t size)
366 {
367         struct fwp_msgb *msgb;
368         int ret;
369         
370         if (!(msgb = fwp_msgb_alloc(size)))
371                 return -1;
372         memcpy(msgb->data, msg, size);
373         fwp_msgb_put(msgb, size);
374         ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
375         if (ret) {
376                 fwp_msgb_free(msgb);
377                 return ret;
378         }
379         return ret;
380 }
381
382 static void wait_for_replenish(struct fwp_vres *vres)
383 {
384         sem_wait(&vres->consumed);
385         clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
386                         &vres->replenish_at, NULL);
387 }
388
389 static void send_queue(struct fwp_vres *vres)
390 {
391         struct fwp_msgb *msgb;
392         bool can_send;
393         msgb = fwp_msgq_dequeue(&vres->msg_queue);
394         can_send = (0 == __consume_budget(vres, msgb->len, false));
395         if (!can_send) {
396                 fwp_msgb_free(msgb);
397                 FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
398                 return;
399         }
400         /* If we cannot send the whole queue, the flag will be set
401          * later by __consume_budget(). */
402         clear_flag(vres, QUEUED);
403
404         while (msgb) {
405                 fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
406                 fwp_msgb_free(msgb);
407                 msgb = fwp_msgq_peek(&vres->msg_queue);
408                 if (msgb) {
409                         can_send = (0 == __consume_budget(vres, msgb->len, false));
410                         if (can_send) {
411                                 msgb = fwp_msgq_dequeue(&vres->msg_queue);
412                         } else {
413                                 msgb = NULL;
414                                 return;
415                         }
416                 }
417         }
418 }
419
420 static void replenish(struct fwp_vres *vres)
421 {
422         pthread_mutex_lock(&vres->mutex);
423         apply_params(vres);
424         if (get_flag(vres, QUEUED))
425                 send_queue(vres);
426         pthread_cond_broadcast(&vres->cond);
427         pthread_mutex_unlock(&vres->mutex);
428 }
429
430 /**
431  * Thread that does budgeting
432  *
433  */
434 static void* fwp_vres_tx_thread(void *_vres)
435 {
436         struct fwp_vres *vres = (struct fwp_vres*)_vres;
437         unsigned int    ac_id = vres->params.ac_id;
438
439         fwp_set_rt_prio(90 - ac_id);
440         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
441         pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
442         
443         while (1) {
444                 wait_for_replenish(vres);
445                 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
446                 replenish(vres);
447                 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
448         }
449
450         return NULL;
451 }
452
453 /*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
454 int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd, struct in_addr  *src)
455 {
456         int rv = 0;
457
458         if (!vres) {
459                 errno = EINVAL;
460                 rv = -1;
461                 goto err;
462         }
463         
464         if (vres->epoint) { /*if already bounded */
465                 errno = EBUSY;
466                 rv = -1;
467                 goto err;
468         }
469
470         vres->ac_sockd = sockd;
471         *src = vres->params.src;
472         rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
473         if (rv)
474                 goto err;
475         vres->epoint = ep;
476 err:
477         return rv;
478 }
479
480 int fwp_vres_unbind(fwp_vres_t *vres)
481 {
482         if (!vres) {
483                 errno = EINVAL;
484                 return -1;
485         }
486         pthread_mutex_lock(&vres->mutex);
487         vres->epoint = NULL;
488         pthread_mutex_unlock(&vres->mutex);
489         /* TODO: consider what to do with pending messages */
490         fwp_msgq_dequeue_all(&vres->msg_queue);
491         return 0;
492 }