]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_vres.c
823b1900448021614c0b5f794e298051d9a041c1
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FWP (Frescor WLAN Protocol)                      */
26 /*                                                                        */
27 /* FWP is free software; you can redistribute it and/or modify it         */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FWP is distributed in the hope that it will be         */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FWP; see file        */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FWP header files in a file,          */
39 /* instantiating FWP generics or templates, or linking other files        */
40 /* with FWP objects to produce an executable application, does not        */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46 #include "fwp_utils.h"
47 #include "fwp_vres.h"
48
49 #include "fwp_msgq.h"
50 #include "fwp_endpoint.h"
51 #include "fwp_debug.h"
52
53 #include <string.h>
54 #include <errno.h>
55 #include <stdlib.h>
56
57 static void* fwp_vres_tx_thread(void *_vres);
58
59 typedef enum {
60         FWP_VF_USED             = 0,
61         FWP_VF_BOUND            = 1,
62         FWP_VF_CHANGED          = 2,
63 } fwp_vres_flag_t;
64
65 fwp_vres_params_t fwp_vres_params_default = {
66         .id = 0,        
67         .ac_id = FWP_AC_VO,
68         .budget = 100,
69         .period = {.tv_sec = 2 , .tv_nsec = 111111}
70 };
71
72 /**
73  * Structure of FWP vres.
74  * Internal representation of vres
75  * 
76  */
77 struct fwp_vres{
78         struct fwp_vres_params          params;
79         /* consideration: move tx_queue to endpoint */
80         /**< queue for messages to send */
81         struct fwp_msgq                 tx_queue;   
82         fwp_vres_flag_t                 flags;
83         /**< endpoint bounded to this vres */
84         /*fwp_endpoint_t                *epoint; */
85         pthread_t                       tx_thread; /**< tx_thread id*/
86         pthread_attr_t                  tx_thread_attr;
87         int                             ac_sockd;  /**< ac socket descriptor */
88 };
89
90 typedef
91 struct fwp_vres_table {
92         unsigned int                    max_vres; 
93         fwp_vres_t                      *entry;
94         pthread_mutex_t                 lock;
95 } fwp_vres_table_t;
96
97 /* Global variable - vres table */
98 static fwp_vres_table_t  fwp_vres_table = {
99         .max_vres = 0,
100         .entry = NULL,
101         .lock = PTHREAD_MUTEX_INITIALIZER,
102 };
103
104 /**< mapping priority to ac*/
105 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
106 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
107 static const unsigned int ac_to_tos[4] = {224,160,96,64};
108
109 /**
110  * Set access category (AC) to socket
111  *
112  * \param[in] sockd Socket descriptor
113  * \param[in] ac_id AC identifier
114  * 
115  * \return On success returns zero. 
116  * On error, negative error code is returned. 
117  *
118  */
119 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
120 {
121         unsigned int tos;
122         
123         tos = ac_to_tos[ac_id];
124         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
125                 FWP_ERROR("setsockopt: %s", strerror(errno));
126                 return (-1);
127         }
128         
129         return 0;
130 }
131
132 static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
133 {
134         vres->flags |= (1 << flag);
135 }
136
137 static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
138 {
139         vres->flags &= ~(1 << flag);
140 }
141
142 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
143 {
144         return !!(vres->flags & (1 << flag));
145 }
146
147 static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
148 {
149         vres->flags = 0;
150 }
151
152 #if 0
153 /* Deprecated */
154 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
155 {
156         int sockd;
157         unsigned int tos;
158         
159         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
160                 errno = EINVAL;
161                 return -1;
162         }
163
164         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
165                 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
166                 return (-1);
167         }
168
169         tos = ac_to_tos[ac_id];
170         
171         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
172                 int e = errno;
173                 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
174                 close(sockfd);
175                 errno = e;
176                 return -1;
177         }
178         
179         return sockd;
180 }
181 #endif
182
183 static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
184 {
185         struct iovec  iov;
186         struct msghdr msg = {0};
187         ssize_t ret;
188         char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
189
190         iov.iov_base = msgb->data;
191         iov.iov_len = msgb->len;
192
193         msg.msg_iov = &iov;
194         msg.msg_iovlen = 1;
195
196
197
198         if (vres->params.src.s_addr != 0) {
199                 struct cmsghdr *cmsg;
200                 struct in_pktinfo *ipi;
201
202                 memset(cmsg_buf, 0, sizeof(cmsg_buf));
203
204                 msg.msg_control = cmsg_buf;
205                 msg.msg_controllen = sizeof(cmsg_buf);
206
207                 cmsg = CMSG_FIRSTHDR(&msg);
208
209                 cmsg->cmsg_level = SOL_IP;
210                 cmsg->cmsg_type = IP_PKTINFO;
211                 cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
212
213                 ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
214                 ipi->ipi_spec_dst = vres->params.src;
215         }
216         ret = sendmsg(vres->ac_sockd, &msg, 0);
217         return ret;
218 }
219
220 static inline void fwp_vres_free(fwp_vres_t *vres)
221 {
222         fwp_vres_clearall_flag(vres);
223 }
224
225 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
226 {
227         int id  = vres - fwp_vres_table.entry;
228
229         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || 
230                 (!fwp_vres_get_flag(vres, FWP_VF_USED))) 
231                 return 0;
232         
233         return 1; 
234 }
235
236 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
237 {
238         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
239                 return -EINVAL;
240         *vres = &fwp_vres_table.entry[vres_id];
241         return 0;
242 }
243 */
244
245 int fwp_vres_table_init(unsigned int max_vres)
246 {
247         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
248
249         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
250         if (!fwp_vres_table.entry) 
251                 return -1;      /* Errno is set by malloc */
252
253         memset((void*) fwp_vres_table.entry, 0, table_size);
254         fwp_vres_table.max_vres = max_vres;
255         return 0;
256 }
257
258 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
259 {
260         fwp_vres_t *vres = vresd;
261         
262         return (vres - fwp_vres_table.entry);
263 }
264
265 /**
266  * Allocate vres
267  *
268  * \return On success returns vres descriptor. 
269  */
270 fwp_vres_d_t fwp_vres_alloc()
271 {
272         int i;
273         unsigned int max_vres;
274
275         /* find free vres id */
276         pthread_mutex_lock(&fwp_vres_table.lock);
277         i = 0;
278         max_vres = fwp_vres_table.max_vres;
279         while ((i < max_vres) && 
280                 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
281                 i++;
282         }
283         
284         if (i == max_vres) {
285                 pthread_mutex_unlock(&fwp_vres_table.lock);
286                 errno = ENOBUFS;
287                 return NULL;
288         }
289
290         FWP_DEBUG("Allocated vres id = %d\n",i);
291         fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
292         pthread_mutex_unlock(&fwp_vres_table.lock);
293         return (&fwp_vres_table.entry[i]);
294 }
295
296 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
297 {
298         int rv;
299
300         /* copy vres paramters into vres structure */
301         rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
302         if (!rv)
303                 return rv;
304         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
305         fwp_vres_set_flag(vres, FWP_VF_CHANGED);
306
307         return 0;
308 }
309
310 /**
311  * Set vres params
312  *
313  * \param[in] vresdp Vres descriptor
314  * \param[in] params Vres parameters
315  *
316  * \return On success returns zero. 
317  * On error, negative error code is returned. 
318  *
319  */
320 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
321 {
322         fwp_vres_t *vres = vresd;
323         
324         if (!fwp_vres_is_valid(vres)) {
325                 errno = EINVAL;
326                 return -1;
327         }
328
329         return _fwp_vres_set_params(vres, params);
330 }
331
332 /**
333  * Creates new vres
334  *
335  * \param[in] params Vres parameters
336  * \param[out] vresdp Pointer to the descriptor of newly created vres
337  *
338  * \return On success returns descriptor of vres. 
339  * On error, negative error code is returned. 
340  *
341  */
342 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
343 {
344         int rv;
345         fwp_vres_t *vres;
346         
347         vres = fwp_vres_alloc();
348         if (!vres) {
349                 errno = ENOMEM;
350                 return -1;
351         }
352         /* initialize msg queue */
353         fwp_msgq_init(&vres->tx_queue);
354         
355         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
356         fwp_vres_set_flag(vres, FWP_VF_CHANGED);
357         pthread_attr_init(&vres->tx_thread_attr);
358         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
359                             fwp_vres_tx_thread, (void*) vres)) != 0){
360                 goto err;
361         }
362
363         *vresdp = vres;
364         return 0;
365 err:    
366         fwp_vres_free(vres);
367         return -1; 
368 }
369
370 /**
371  * Destroys vres
372  *
373  * \param[in] vresd Vres descriptor
374  *
375  * \return On success returns 0. 
376  * On error, negative error code is returned. 
377  *
378  */
379 int fwp_vres_destroy(fwp_vres_d_t vresd)
380 {       
381         fwp_vres_t *vres = vresd;
382
383         if (!fwp_vres_is_valid(vres)) {
384                 errno = EINVAL;
385                 return -1;
386         }
387         
388         pthread_cancel(vres->tx_thread);
389                 
390         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
391         return  0;
392 }
393
394 static void fwp_vres_cleanup(void *_vres)
395 {
396         fwp_vres_t *vres = (fwp_vres_t*)_vres;
397
398         fwp_msgq_dequeue_all(&vres->tx_queue);
399         fwp_vres_free(vres);
400 }
401
402 static inline void 
403 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
404                         fwp_budget_t  *budget)
405 {
406         if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {  
407                 /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
408                 period->tv_sec = vres->params.period / SEC_TO_USEC;*/
409                 *period = vres->params.period;
410                 *budget = vres->params.budget; 
411                 FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
412                                 "period_nsec=%ld.\n",vres->params.budget, 
413                                 period->tv_sec, period->tv_nsec);
414                 fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
415         }
416 }
417
418 /**
419  * Thread that does budgeting
420  *
421  */
422 static void* fwp_vres_tx_thread(void *_vres)
423 {
424         struct fwp_vres *vres = (struct fwp_vres*)_vres;
425         struct fwp_msgq *msgq = &vres->tx_queue;
426         struct fwp_msgb *msgb = NULL;
427         unsigned int    ac_id = vres->params.ac_id;
428         fwp_budget_t    budget = vres->params.budget;
429         fwp_budget_t    curr_budget;
430         int             rc;
431         struct timespec start, period, interval, now;
432
433         fwp_set_rt_prio(90 - ac_id);
434         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
435         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
436         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
437         
438         /* just for sure */
439         fwp_vres_sched_update(vres, &period, &budget);
440         curr_budget = budget;
441         clock_gettime(CLOCK_MONOTONIC, &start);
442
443         while (1) {
444                 msgb = fwp_msgq_dequeue(msgq);
445                 if (msgb->len > budget) {
446                         FWP_ERROR("Message too large: %zd -> skipping\n",
447                                   msgb->len);
448                         goto skip_message;
449                 }
450                 if (curr_budget < msgb->len) {
451                         /* needs to replenish */
452                         clock_gettime(CLOCK_MONOTONIC, &now);
453                         fwp_timespec_sub(&interval, &now, &start);
454                         ul_logtrash("start=%ld.%09ld,  now=%ld.%09ld  diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
455                         fwp_timespec_sub(&interval, &period, &interval);
456                         if (interval.tv_sec > 0 ||
457                             (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
458                                 /* We have to wait to replenish */
459                                 ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
460                                 nanosleep(&interval, NULL);
461                                 fwp_timespec_add(&start, &now, &interval);
462                         } else {
463                                 start = now;
464                         }
465                         fwp_vres_sched_update(vres, &period, &budget);  
466                         curr_budget = budget;
467                 }
468
469                 rc = _fwp_vres_send(vres, msgb);
470                 if (!(rc < 0)) {
471                         FWP_DEBUG("Message sent through AC%d\n",ac_id);
472                         curr_budget -= msgb->len;
473                 } else {
474                         FWP_ERROR("Message sent error %d\n",rc);
475                 }
476         skip_message:                   
477                 fwp_msgb_free(msgb);
478
479                 /*pthread_testcancel(); */
480                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
481                 
482                 fwp_timespec_add(&end_period, &start_period, &period);
483                 clock_gettime(CLOCK_MONOTONIC, &current_time);
484                 fwp_timespec_sub(&interval, &end_period, &current_time);
485                 nanosleep(&interval, NULL);
486                 */
487         }
488         
489         /* it should normaly never come here */ 
490         pthread_cleanup_pop(0); 
491         fwp_vres_free(vres);
492         
493         return NULL;
494 }
495
496 int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
497 {
498         fwp_vres_t *vres = vresd;
499
500         if (fwp_vres_is_valid(vres)) {
501                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
502         } else {
503                 errno = EINVAL;
504                 return -1;
505         }
506 }
507
508 /*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
509 int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
510 {
511         fwp_vres_t *vres = vresd;
512         int rv = 0;
513
514         pthread_mutex_lock(&fwp_vres_table.lock);
515         if (!fwp_vres_is_valid(vres)) {
516                 errno = EINVAL;
517                 rv = -1;
518                 goto err;
519         }
520         
521         if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
522                 errno = EBUSY;
523                 rv = -1;
524                 goto err;
525         }
526
527         vres->ac_sockd = sockd;
528         rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
529         /*if (rv)
530                 goto err;*/
531         fwp_vres_set_flag(vres, FWP_VF_BOUND);
532 err:
533         pthread_mutex_unlock(&fwp_vres_table.lock);
534         return rv;
535 }
536
537 int fwp_vres_unbind(fwp_vres_d_t vresd)
538 {
539         fwp_vres_t *vres = vresd;
540         
541         if (!fwp_vres_is_valid(vresd)) {
542                 errno = EINVAL;
543                 return -1;
544         }
545         fwp_vres_clear_flag(vres, FWP_VF_BOUND);
546         /* TODO: consider what to do with pending messages */
547         /* fwp_vres_free_msgb(vres->tx_queue); */
548         return 0;
549 }