]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_vres.c
e94f1461a964f21de004124d8dda19d5fab9f8a7
[frescor/fwp.git] / fwp / lib / core / fwp_vres.c
1 #include "fwp_util.h"
2 #include "fwp_vres.h"
3
4 #include "fwp_msgq.h"
5 #include "fwp_endpoint.h"
6
7 #include <string.h>
8
9 static void* fwp_vres_tx_thread(void *_vres);
10
11 typedef enum {
12         FWP_VF_USED             = 1 ,
13         FWP_VF_ACTIVE           = 2 ,
14         FWP_VF_RESCHED          = 4 ,
15 } fwp_vres_flag_t;
16
17 /**
18  * Structure of FWP vres.
19  * Internal representation of vres
20  * 
21  */
22 struct fwp_vres{
23         struct fwp_vres_params          params;
24         /* consideration: move tx_queue to endpoint */
25         /**< queue for messages to send */
26         struct fwp_msgq                 tx_queue;   
27         int                             flags;
28         /**< endpoint bounded to this vres */
29         /*fwp_endpoint_t                *epoint; */
30         pthread_t                       tx_thread; /**< tx_thread id*/
31         pthread_attr_t                  tx_thread_attr;
32         int                             ac_sockd;  /**< ac socket descriptor */
33         fwp_sockaddr_t                  addr;   /**< dest addr,for effectivness*/
34 };
35
36 typedef
37 struct fwp_vres_table {
38         unsigned int                    max_vres; 
39         fwp_vres_t                      *entry;
40         pthread_mutex_t                 lock;
41 } fwp_vres_table_t;
42
43 /* Global variable - vres table */
44 static fwp_vres_table_t  fwp_vres_table = {
45         .max_vres = 0,
46         .entry = NULL,
47         .lock = PTHREAD_MUTEX_INITIALIZER,
48 };
49
50 /**< mapping priority to ac*/
51 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
52 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
53 static const unsigned int ac_to_tos[4] = {224,160,96,64};
54
55 /**
56  * Set access category (AC) to socket
57  *
58  * \param[in] sockd Socket descriptor
59  * \param[in] ac_id AC identifier
60  * 
61  * \return On success returns zero. 
62  * On error, negative error code is returned. 
63  *
64  */
65 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
66 {
67         unsigned int tos;
68         
69         tos = ac_to_tos[ac_id];
70         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
71                 FWP_ERROR("setsockopt: %s", strerror(errno));
72                 return (-1);
73         }
74         
75         return 0;
76 }
77
78 static inline int fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
79 {
80         return (vres->flags | (1 << flag));
81 }
82
83 static inline int fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
84 {
85         return (vres->flags & ~(1 << flag));
86 }
87
88 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
89 {
90         return !!(vres->flags & (1 << flag));
91 }
92
93 #if 0
94 /* Deprecated */
95 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
96 {
97         int sockd;
98         unsigned int tos;
99         
100         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
101                 errno = EINVAL;
102                 return -1;
103         }
104
105         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
106                 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
107                 return (-1);
108         }
109
110         tos = ac_to_tos[ac_id];
111         
112         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
113                 int e = errno;
114                 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
115                 close(sockfd);
116                 errno = e;
117                 return -1;
118         }
119         
120         return sockd;
121 }
122 #endif
123
124 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
125 {
126         /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
127                         msgb->peer->addr, msgb->peer->addrlen);*/
128         return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
129 }
130
131 static inline void fwp_vres_free(fwp_vres_t *vres)
132 {
133         fwp_vres_clear_flag(vres, FWP_VF_USED);
134 }
135
136 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
137 {
138         int id  = vres - fwp_vres_table.entry;
139
140         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
141                 (!fwp_vres_get_flag(vres, FWP_VF_USED))) 
142                 return 0;
143                 
144         return 1; 
145 }
146
147 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
148 {
149         3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
150                 return -EINVAL;
151         *vres = &fwp_vres_table.entry[vres_id];
152         return 0;
153 }
154 */
155
156 int fwp_vres_table_init(unsigned int max_vres)
157 {
158         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
159
160         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
161         if (!fwp_vres_table.entry) 
162                 return -1;      /* Errno is set by malloc */
163
164         memset((void*) fwp_vres_table.entry, 0, table_size);
165         fwp_vres_table.max_vres = max_vres;
166         return 0;
167 }
168
169 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
170 {
171         fwp_vres_t *vres = vresd;
172         
173         return (vres - fwp_vres_table.entry);
174 }
175
176 /**
177  * Allocate vres
178  *
179  * \return On success returns vres descriptor. 
180  */
181 fwp_vres_d_t fwp_vres_alloc()
182 {
183         int i;
184         unsigned int max_vres;
185
186         /* find free vres id */
187         pthread_mutex_lock(&fwp_vres_table.lock);
188         i = 0;
189         max_vres = fwp_vres_table.max_vres;
190         while ((i < max_vres) && 
191                 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
192                 i++;
193         }
194         
195         if (i == max_vres) {
196                 pthread_mutex_unlock(&fwp_vres_table.lock);
197                 errno = ENOBUFS;
198                 return NULL;
199         }
200
201         fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
202         pthread_mutex_unlock(&fwp_vres_table.lock);
203         return (&fwp_vres_table.entry[i]);
204 }
205
206 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
207 {
208         /* copy vres paramters into vres structure */
209         rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
210         if (!rv)
211                 return rv;
212         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
213         fwp_vres_set_flag(vres, FWP_VF_RESCHED);
214
215         return 0;
216 }
217
218 /**
219  * Set vres params
220  *
221  * \param[in] vresdp Vres descriptor
222  * \param[in] params Vres parameters
223  *
224  * \return On success returns zero. 
225  * On error, negative error code is returned. 
226  *
227  */
228 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
229 {
230         fwp_vres_t *vres = vresd;
231         int rv;
232         
233         if (!fwp_vres_is_valid(vres)) {
234                 errno = EINVAL;
235                 return -1;
236         }
237
238         return fwp_vres_set_params(vres, params);
239 }
240
241 /**
242  * Creates new vres
243  *
244  * \param[in] params Vres parameters
245  * \param[out] vresdp Pointer to the descriptor of newly created vres
246  *
247  * \return On success returns descriptor of vres. 
248  * On error, negative error code is returned. 
249  *
250  */
251 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
252 {
253         int rv;
254         fwp_vres_t *vres;
255         
256         /* Check for validity of the contract */
257
258         vres = fwp_vres_alloc();
259         if (!vres) {
260                 return -1;
261         }
262         /* copy vres paramters into vres structure */
263         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
264         /* initialize msg queue */
265         fwp_msgq_init(&vres->tx_queue);
266         
267         pthread_attr_init(&vres->tx_thread_attr);
268         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
269                             fwp_vres_tx_thread, (void*) vres)) != 0){
270                 goto err;
271
272         }
273         _fwp_vres_set_params    
274         *vresdp = vres;
275         return 0;
276 err:    
277         fwp_vres_free(vres);
278         errno = rv;
279         return -1; 
280 }
281
282
283 /**
284  * Destroys vres
285  *
286  * \param[in] vresd Vres descriptor
287  *
288  * \return On success returns 0. 
289  * On error, negative error code is returned. 
290  *
291  */
292 int fwp_vres_destroy(fwp_vres_d_t vresd)
293 {       
294         fwp_vres_t *vres = vresd;
295
296         if (!fwp_vres_is_valid(vres)) {
297                 errno = EINVAL;
298                 return -1;
299         }
300         
301         pthread_cancel(vres->tx_thread);
302         
303         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
304         return  0;
305 }
306
307 static void fwp_vres_cleanup(void *_vres)
308 {
309         fwp_vres_t *vres = (fwp_vres_t*)_vres;
310
311         fwp_msgq_dequeue_all(&vres->tx_queue);
312         fwp_vres_free(vres);
313 }
314
315 static inline void 
316 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
317                         fwp_budget_t  *budget)
318 {
319         if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {  
320                 period->tv_nsec = vres->params.period_usec % SEC_TO_USEC;
321                 period->tv_sec = vres->params.period_usec / SEC_TO_USEC;
322                 *budget = vres->params.budget; 
323                 FWP_DEBUG("Vres tx thread with budget:%d period_sec=%ld "
324                                 "period_nsec=%ld.\n",vres->params.budget, 
325                                 period->tv_sec, period->tv_nsec);
326         }
327 }
328
329 /**
330  * Thread that does budgeting
331  *
332  */
333 static void* fwp_vres_tx_thread(void *_vres)
334 {/* TODO: make changes that count with changing of params */
335         struct fwp_vres *vres = (struct fwp_vres*)_vres;
336         struct fwp_msgq *msgq = &vres->tx_queue;
337         struct fwp_msgb *msgb = NULL;
338         unsigned int    ac_id = vres->params.ac_id;
339         /*unsigned int  ac_sockd = vres->ac_sockd;*/
340         int             budget = vres->params.budget;
341         int             curr_budget;
342         int             rc;
343                 
344         struct timespec  start_period, end_period, period;
345         struct timespec  current_time, interval;
346
347 /*      period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
348         period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
349 */      
350         fwp_set_rt_prio(90 - ac_id);
351         
352
353         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
354         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
355         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
356         
357         clock_gettime(CLOCK_MONOTONIC, &start_period);
358
359         while (1) {
360                 /* wait for next period and then send */
361                 fwp_vres_sched_update(vres, &period, &budget);
362                 fwp_timespec_add(&end_period, &start_period, &period);
363                 clock_gettime(CLOCK_MONOTONIC, &current_time);
364                 fwp_timespec_sub(&interval, &end_period, &current_time);
365                 nanosleep(&interval, NULL);
366         
367                 sem_wait(&msgq->empty_lock);
368                 clock_gettime(CLOCK_MONOTONIC, &start_period);
369                 
370                 /*msgb = fwp_msgq_dequeue(msgq);
371                 *if (msgb){*/
372                 curr_budget = 0;
373                 while ((curr_budget < budget)&& 
374                        (msgb = fwp_msgq_dequeue(msgq))) {
375                         rc = __fwp_vres_send(vres->ac_sockd, msgb);
376                         if (!(rc < 0)) {
377                                 FWP_DEBUG("Message sent through AC%d\n",ac_id);
378                                 /* Switch to this in the future
379                                  * curr_budget+= msgb->len;
380                                  */
381                                 curr_budget++;
382                         }
383                         fwp_msgb_free(msgb);
384                 }
385
386                 /*pthread_testcancel(); */
387                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
388                 
389                 fwp_timespec_add(&end_period, &start_period, &period);
390                 clock_gettime(CLOCK_MONOTONIC, &current_time);
391                 fwp_timespec_sub(&interval, &end_period, &current_time);
392                 nanosleep(&interval, NULL);
393                 */
394         }
395         
396         /* it should normaly never come here */ 
397         pthread_cleanup_pop(0); 
398         fwp_vres_free(vres);
399         
400         return NULL;
401 }
402
403 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
404 {
405         fwp_vres_t *vres = vresd;
406
407         if (fwp_vres_is_valid(vres)) {
408                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
409         } else {
410                 errno = EPERM;  /* TODO: Use more appropriate error than EPERM. */
411                 return -1;
412         }
413 }
414
415 #if 0
416 /*int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
417 int _fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
418 {
419         fwp_vres_t *vres = vresd;
420         int rv = 0;
421
422         pthread_mutex_lock(&fwp_vres_table.lock);
423         if (!fwp_vres_is_valid(vres)) {
424                 errno = EINVAL;
425                 rv = -1;
426                 goto err;
427         }
428
429         if (vres->status != FWP_VRES_UNBOUND) { /*if already bounded */
430                 errno = EPERM;  
431                 rv = -1;
432                 goto err;
433         }
434
435         vres->ac_sockd = sockd;
436         fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
437         vres->status = FWP_VRES_BOUND;
438 err:
439         pthread_mutex_unlock(&fwp_vres_table.lock);
440         return rv;
441 }
442
443 int _fwp_vres_unbind(fwp_vres_d_t vresd)
444 {
445         fwp_vres_t *vres = vresd;
446         
447         if (!fwp_vres_is_valid(vresd)) {
448                 errno = EINVAL;
449                 return -1;
450         }
451         vres->status = FWP_VRES_UNBOUND;
452         /* TODO: consider what to do with pending messages */
453         /* fwp_vres_free_msgb(vres->tx_queue); */
454         
455         return 0;
456 }
457 #endif