]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/fwp/fwp_vres.c
FWP clean-up. CONFIG_WITH_CONTNEGT changed to CONFIG_WITHOUT_CONNEGT.
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
1 #include "fwp_utils.h"
2 #include "fwp_vres.h"
3
4 #include "fwp_msgq.h"
5 #include "fwp_endpoint.h"
6
7 #include <string.h>
8
9 static void* fwp_vres_tx_thread(void *_vres);
10
11 typedef enum {
12         FWP_VF_USED             = 1 ,
13         FWP_VF_BOUND            = 2 ,
14         FWP_VF_RESCHED          = 4 ,
15 } fwp_vres_flag_t;
16
17 fwp_vres_params_t fwp_vres_params_default = {
18         .id = 0,        
19         .ac_id = FWP_AC_VO,
20         .budget = 100,
21         .period = {.tv_sec = 2 , .tv_nsec = 111111}
22 };
23
24 /**
25  * Structure of FWP vres.
26  * Internal representation of vres
27  * 
28  */
29 struct fwp_vres{
30         struct fwp_vres_params          params;
31         /* consideration: move tx_queue to endpoint */
32         /**< queue for messages to send */
33         struct fwp_msgq                 tx_queue;   
34         fwp_vres_flag_t                 flags;
35         /**< endpoint bounded to this vres */
36         /*fwp_endpoint_t                *epoint; */
37         pthread_t                       tx_thread; /**< tx_thread id*/
38         pthread_attr_t                  tx_thread_attr;
39         int                             ac_sockd;  /**< ac socket descriptor */
40 };
41
42 typedef
43 struct fwp_vres_table {
44         unsigned int                    max_vres; 
45         fwp_vres_t                      *entry;
46         pthread_mutex_t                 lock;
47 } fwp_vres_table_t;
48
49 /* Global variable - vres table */
50 static fwp_vres_table_t  fwp_vres_table = {
51         .max_vres = 0,
52         .entry = NULL,
53         .lock = PTHREAD_MUTEX_INITIALIZER,
54 };
55
56 /**< mapping priority to ac*/
57 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
58 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
59 static const unsigned int ac_to_tos[4] = {224,160,96,64};
60
61 /**
62  * Set access category (AC) to socket
63  *
64  * \param[in] sockd Socket descriptor
65  * \param[in] ac_id AC identifier
66  * 
67  * \return On success returns zero. 
68  * On error, negative error code is returned. 
69  *
70  */
71 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
72 {
73         unsigned int tos;
74         
75         tos = ac_to_tos[ac_id];
76         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
77                 FWP_ERROR("setsockopt: %s", strerror(errno));
78                 return (-1);
79         }
80         
81         return 0;
82 }
83
84 static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
85 {
86         vres->flags |= (1 << flag);
87 }
88
89 static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
90 {
91         vres->flags &= ~(1 << flag);
92 }
93
94 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
95 {
96         return !!(vres->flags & (1 << flag));
97 }
98
99 static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
100 {
101         vres->flags = 0;
102 }
103
104 #if 0
105 /* Deprecated */
106 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
107 {
108         int sockd;
109         unsigned int tos;
110         
111         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
112                 errno = EINVAL;
113                 return -1;
114         }
115
116         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
117                 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
118                 return (-1);
119         }
120
121         tos = ac_to_tos[ac_id];
122         
123         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
124                 int e = errno;
125                 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
126                 close(sockfd);
127                 errno = e;
128                 return -1;
129         }
130         
131         return sockd;
132 }
133 #endif
134
135 static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
136 {
137         /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
138                         msgb->peer->addr, msgb->peer->addrlen);*/
139         return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
140 }
141
142 static inline void fwp_vres_free(fwp_vres_t *vres)
143 {
144         fwp_vres_clearall_flag(vres);
145 }
146
147 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
148 {
149         int id  = vres - fwp_vres_table.entry;
150
151         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || 
152                 (!fwp_vres_get_flag(vres, FWP_VF_USED))) 
153                 return 0;
154         
155         return 1; 
156 }
157
158 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
159 {
160         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
161                 return -EINVAL;
162         *vres = &fwp_vres_table.entry[vres_id];
163         return 0;
164 }
165 */
166
167 int fwp_vres_table_init(unsigned int max_vres)
168 {
169         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
170
171         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
172         if (!fwp_vres_table.entry) 
173                 return -1;      /* Errno is set by malloc */
174
175         memset((void*) fwp_vres_table.entry, 0, table_size);
176         fwp_vres_table.max_vres = max_vres;
177         return 0;
178 }
179
180 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
181 {
182         fwp_vres_t *vres = vresd;
183         
184         return (vres - fwp_vres_table.entry);
185 }
186
187 /**
188  * Allocate vres
189  *
190  * \return On success returns vres descriptor. 
191  */
192 fwp_vres_d_t fwp_vres_alloc()
193 {
194         int i;
195         unsigned int max_vres;
196
197         /* find free vres id */
198         pthread_mutex_lock(&fwp_vres_table.lock);
199         i = 0;
200         max_vres = fwp_vres_table.max_vres;
201         while ((i < max_vres) && 
202                 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
203                 i++;
204         }
205         
206         if (i == max_vres) {
207                 pthread_mutex_unlock(&fwp_vres_table.lock);
208                 errno = ENOBUFS;
209                 return NULL;
210         }
211
212         FWP_DEBUG("Allocated vres id = %d\n",i);
213         fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
214         pthread_mutex_unlock(&fwp_vres_table.lock);
215         return (&fwp_vres_table.entry[i]);
216 }
217
218 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
219 {
220         int rv;
221
222         /* copy vres paramters into vres structure */
223         rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
224         if (!rv)
225                 return rv;
226         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
227         fwp_vres_set_flag(vres, FWP_VF_RESCHED);
228
229         return 0;
230 }
231
232 /**
233  * Set vres params
234  *
235  * \param[in] vresdp Vres descriptor
236  * \param[in] params Vres parameters
237  *
238  * \return On success returns zero. 
239  * On error, negative error code is returned. 
240  *
241  */
242 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
243 {
244         fwp_vres_t *vres = vresd;
245         
246         if (!fwp_vres_is_valid(vres)) {
247                 errno = EINVAL;
248                 return -1;
249         }
250
251         return _fwp_vres_set_params(vres, params);
252 }
253
254 /**
255  * Creates new vres
256  *
257  * \param[in] params Vres parameters
258  * \param[out] vresdp Pointer to the descriptor of newly created vres
259  *
260  * \return On success returns descriptor of vres. 
261  * On error, negative error code is returned. 
262  *
263  */
264 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
265 {
266         int rv;
267         fwp_vres_t *vres;
268         
269         vres = fwp_vres_alloc();
270         if (!vres) {
271                 errno = ENOMEM;
272                 return -1;
273         }
274         /* initialize msg queue */
275         fwp_msgq_init(&vres->tx_queue);
276         
277         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
278         fwp_vres_set_flag(vres, FWP_VF_RESCHED);
279         pthread_attr_init(&vres->tx_thread_attr);
280         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
281                             fwp_vres_tx_thread, (void*) vres)) != 0){
282                 goto err;
283         }
284
285         *vresdp = vres;
286         return 0;
287 err:    
288         fwp_vres_free(vres);
289         return -1; 
290 }
291
292 /**
293  * Destroys vres
294  *
295  * \param[in] vresd Vres descriptor
296  *
297  * \return On success returns 0. 
298  * On error, negative error code is returned. 
299  *
300  */
301 int fwp_vres_destroy(fwp_vres_d_t vresd)
302 {       
303         fwp_vres_t *vres = vresd;
304
305         if (!fwp_vres_is_valid(vres)) {
306                 errno = EINVAL;
307                 return -1;
308         }
309         
310         pthread_cancel(vres->tx_thread);
311                 
312         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
313         return  0;
314 }
315
316 static void fwp_vres_cleanup(void *_vres)
317 {
318         fwp_vres_t *vres = (fwp_vres_t*)_vres;
319
320         fwp_msgq_dequeue_all(&vres->tx_queue);
321         fwp_vres_free(vres);
322 }
323
324 static inline void 
325 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
326                         fwp_budget_t  *budget)
327 {
328         if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {  
329                 /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
330                 period->tv_sec = vres->params.period / SEC_TO_USEC;*/
331                 *period = vres->params.period;
332                 *budget = vres->params.budget; 
333                 FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
334                                 "period_nsec=%ld.\n",vres->params.budget, 
335                                 period->tv_sec, period->tv_nsec);
336                 fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
337         }
338 }
339
340 /**
341  * Thread that does budgeting
342  *
343  */
344 static void* fwp_vres_tx_thread(void *_vres)
345 {
346         struct fwp_vres *vres = (struct fwp_vres*)_vres;
347         struct fwp_msgq *msgq = &vres->tx_queue;
348         struct fwp_msgb *msgb = NULL;
349         unsigned int    ac_id = vres->params.ac_id;
350         fwp_budget_t    budget = vres->params.budget;
351         fwp_budget_t    curr_budget;
352         int             rc;
353                 
354         struct timespec  start_period, end_period, period;
355         struct timespec  current_time, interval;
356
357         fwp_set_rt_prio(90 - ac_id);
358         
359
360         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
361         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
362         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
363         
364         clock_gettime(CLOCK_MONOTONIC, &start_period);
365
366         while (1) {
367                 /* wait for next period and then send */
368                 fwp_timespec_add(&end_period, &start_period, &period);
369                 clock_gettime(CLOCK_MONOTONIC, &current_time);
370                 fwp_timespec_sub(&interval, &end_period, &current_time);
371                 nanosleep(&interval, NULL);
372         
373                 sem_wait(&msgq->empty_lock);
374                 fwp_vres_sched_update(vres, &period, &budget);
375                 clock_gettime(CLOCK_MONOTONIC, &start_period);
376                 
377                 /*msgb = fwp_msgq_dequeue(msgq);
378                 *if (msgb){*/
379                 curr_budget = 0;
380                 while ((curr_budget < budget)&& 
381                        (msgb = fwp_msgq_dequeue(msgq))) {
382                         rc = _fwp_vres_send(vres->ac_sockd, msgb);
383                         if (!(rc < 0)) {
384                                 FWP_DEBUG("Message sent through AC%d\n",ac_id);
385                                 /* Switch to this in the future */
386                                  //curr_budget+= msgb->len;
387                                  curr_budget++;
388                         }
389                         
390                         fwp_msgb_free(msgb);
391                 }
392
393                 /*pthread_testcancel(); */
394                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
395                 
396                 fwp_timespec_add(&end_period, &start_period, &period);
397                 clock_gettime(CLOCK_MONOTONIC, &current_time);
398                 fwp_timespec_sub(&interval, &end_period, &current_time);
399                 nanosleep(&interval, NULL);
400                 */
401         }
402         
403         /* it should normaly never come here */ 
404         pthread_cleanup_pop(0); 
405         fwp_vres_free(vres);
406         
407         return NULL;
408 }
409
410 int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
411 {
412         fwp_vres_t *vres = vresd;
413
414         if (fwp_vres_is_valid(vres)) {
415                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
416         } else {
417                 errno = EPERM;  /* TODO: Use more appropriate error than EPERM. */
418                 return -1;
419         }
420 }
421
422 /*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
423 int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
424 {
425         fwp_vres_t *vres = vresd;
426         int rv = 0;
427
428         pthread_mutex_lock(&fwp_vres_table.lock);
429         if (!fwp_vres_is_valid(vres)) {
430                 errno = EINVAL;
431                 rv = -1;
432                 goto err;
433         }
434         
435         if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
436                 errno = EPERM;  
437                 rv = -1;
438                 goto err;
439         }
440
441         vres->ac_sockd = sockd;
442         rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
443         /*if (rv)
444                 goto err;*/
445         fwp_vres_set_flag(vres, FWP_VF_BOUND);
446 err:
447         pthread_mutex_unlock(&fwp_vres_table.lock);
448         return rv;
449 }
450
451 int fwp_vres_unbind(fwp_vres_d_t vresd)
452 {
453         fwp_vres_t *vres = vresd;
454         
455         if (!fwp_vres_is_valid(vresd)) {
456                 errno = EINVAL;
457                 return -1;
458         }
459         fwp_vres_clear_flag(vres, FWP_VF_BOUND);
460         /* TODO: consider what to do with pending messages */
461         /* fwp_vres_free_msgb(vres->tx_queue); */
462         return 0;
463 }