]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_vres.c
Endpoint and vres validity checking
[frescor/fwp.git] / fwp / lib / core / fwp_vres.c
1 #include "fwp_util.h"
2 #include "fwp_vres.h"
3
4 static void* fwp_vres_tx_thread(void *_vres);
5
6
7 typedef enum {
8         FWP_VRES_FREE           = 0 ,
9         FWP_VRES_INACTIVE       = 1 ,
10         FWP_VRES_UNBOUND        = 2 ,
11         FWP_VRES_BOUND          = 3 ,
12 } fwp_vres_status_t;
13
14 /**
15  * Structure of FWP vres.
16  * Internal representation of vres
17  * 
18  */
19 struct fwp_vres{
20         struct fwp_vres_params          params;
21         /* consideration: move tx_queue to endpoint */
22         /**< queue for messages to send */
23         struct fwp_msgq                 tx_queue;   
24         int                             flags;
25         /**< endpoint bounded to this vres */
26         /*fwp_endpoint_t                *epoint; */
27         pthread_t                       tx_thread; /**< tx_thread id*/
28         pthread_attr_t                  tx_thread_attr;
29         int                             ac_sockd;  /**< ac socket descriptor */
30         fwp_vres_status_t               status;
31 };
32
33 typedef
34 struct fwp_vres_table {
35         unsigned int                    max_vres; 
36         fwp_vres_t                      *entry;
37         pthread_mutex_t                 lock;
38 } fwp_vres_table_t;
39
40 /* Global variable - vres table */
41 static fwp_vres_table_t  fwp_vres_table = {
42         .max_vres = 0,
43         .entry = NULL,
44         .lock = PTHREAD_MUTEX_INITIALIZER,
45 };
46
47 /**< mapping priority to ac*/
48 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
49 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
50 static const unsigned int ac_to_tos[4] = {224,160,96,64};
51
52 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
53 {
54         unsigned int tos;
55         
56         tos = ac_to_tos[ac_id];
57         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
58                 perror("Root permission needed to set AC");
59                 //return (-errno);
60         }
61         
62         return 0;
63 }
64
65 /* Deprecated */
66 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
67 {
68         int sockd;
69         unsigned int tos;
70         
71         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM))
72                 return -EINVAL;
73
74         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
75                 perror("Unable to open socket for AC");
76                 return (-errno);
77         }
78
79         tos = ac_to_tos[ac_id];
80         
81         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
82                 perror("Root permission needed to set AC");
83                 /* let pass
84                  * close(sockfd);
85                  *return (-errno);*/
86         }
87         
88         return sockd;
89 }
90
91 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
92 {
93         /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
94                         msgb->peer->addr, msgb->peer->addrlen);*/
95         _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
96         return 0;
97 }
98
99 static inline void fwp_vres_free(fwp_vres_t *vres)
100 {
101         vres->status = FWP_VRES_FREE;
102 }
103
104 static inline int fwp_vres_is_valid(fwp_vres_d_t vresd)
105 {
106         int id  = vresd - fwp_vres_table.entry;
107
108         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
109                 (vresd->status == FWP_VRES_FREE)) 
110                 return 0;
111                 
112         return 1; 
113 }
114
115 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
116 {
117         3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
118                 return -EINVAL;
119         *vres = &fwp_vres_table.entry[vres_id];
120         return 0;
121 }
122 */
123
124 int fwp_vres_table_init(unsigned int max_vres)
125 {
126         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
127
128         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
129         if (!fwp_vres_table.entry)
130                 return -ENOMEM;
131
132         memset((void*) fwp_vres_table.entry, 0, table_size);
133         fwp_vres_table.max_vres = max_vres;
134         return 0;
135 }
136
137 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
138 {
139         fwp_vres_t *vres = vresd;
140         
141         return (vres - fwp_vres_table.entry);
142 }
143
144 fwp_vres_d_t fwp_vres_alloc()
145 {
146         int i;
147         unsigned int max_vres;
148
149         /* find free vres id */
150         pthread_mutex_lock(&fwp_vres_table.lock);
151         i = 0;
152         max_vres = fwp_vres_table.max_vres;
153         while ((i < max_vres) && 
154                 (fwp_vres_table.entry[i].status != FWP_VRES_FREE)) {
155                 i++;
156         }
157         
158         if (i == max_vres) {
159                 pthread_mutex_unlock(&fwp_vres_table.lock);
160                 return NULL;
161         }
162
163         fwp_vres_table.entry[i].status = FWP_VRES_INACTIVE;
164         pthread_mutex_unlock(&fwp_vres_table.lock);
165         return (&fwp_vres_table.entry[i]);
166 }
167
168
169 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
170 {
171         fwp_vres_t *vres = vresd;
172         int rv;
173         
174         if (!fwp_vres_is_valid(vresd)) {
175                 return -EINVAL;
176         }
177         /* copy vres paramters into vres structure */
178         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
179         
180         if (vres->status != FWP_VRES_INACTIVE) {
181                 /* Consider: how to change parameters when vres is in running
182                  * state - restart thread, set vres_resched flag
183                  */
184                 pthread_cancel(vres->tx_thread);
185                 /* or set vres_resched flag and return */
186         } else{
187                 /* initialize msg queue */
188                 fwp_msgq_init(&vres->tx_queue);
189         }
190         
191         if ((vres->status == FWP_VRES_BOUND) && 
192                 (rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id)) < 0) {
193                 goto err;
194         }
195
196         pthread_attr_init(&vres->tx_thread_attr);
197         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
198                             fwp_vres_tx_thread, (void*) vres)) != 0){
199                 goto err;
200
201         }
202         vres->status = FWP_VRES_UNBOUND;
203         
204         return 0;
205 err:    
206         fwp_vres_free(vres);
207         return rv; 
208 }
209
210 /**
211  * Creates new vres
212  *
213  * \param[in] params Vres parameters
214  * \param[out] vresdp Pointer to the descriptor of newly created vres
215  *
216  * \return On success returns descriptor of vres. 
217  * On error, negative error code is returned. 
218  *
219  */
220 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
221 {
222         int rv;
223         fwp_vres_t *vres;
224         
225         /* Check for validity of the contract */
226
227         vres = fwp_vres_alloc();
228         if (!vres) {
229                 return -ENOMEM;
230         }
231         /* copy vres paramters into vres structure */
232         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
233         /* initialize msg queue */
234         fwp_msgq_init(&vres->tx_queue);
235         
236         pthread_attr_init(&vres->tx_thread_attr);
237         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
238                             fwp_vres_tx_thread, (void*) vres)) != 0){
239                 goto err;
240
241         }
242         vres->status = FWP_VRES_UNBOUND;
243         
244         *vresdp = vres;
245         return 0;
246 err:    
247         fwp_vres_free(vres);
248         return rv; 
249 }
250
251 /**
252  * Destroys vres
253  *
254  * \param[in] vresd Vres descriptor
255  *
256  * \return On success returns 0. 
257  * On error, negative error code is returned. 
258  *
259  */
260 int fwp_vres_destroy(fwp_vres_d_t vresd)
261 {       
262         fwp_vres_t *vres = vresd;
263
264         if (!fwp_vres_is_valid(vresd))
265                 return -EINVAL;
266         
267         vres->status = FWP_VRES_FREE;
268         pthread_cancel(vres->tx_thread);
269         
270         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
271         return  0;
272 }
273
274 static void fwp_vres_cleanup(void *_vres)
275 {
276         fwp_vres_t *vres = (fwp_vres_t*)_vres;
277
278         fwp_msgq_dequeue_all(&vres->tx_queue);
279         fwp_vres_free(vres);
280 }
281
282 static void* fwp_vres_tx_thread(void *_vres)
283 {/* TODO: make changes that count with changing of params */
284         struct fwp_vres *vres = (struct fwp_vres*)_vres;
285         struct fwp_msgq *msgq = &vres->tx_queue;
286         struct fwp_msgb *msgb = NULL;
287         unsigned int    ac_id = vres->params.ac_id;
288         /*unsigned int  ac_sockd = vres->ac_sockd;*/
289         int             budget = vres->params.budget;
290         int             curr_budget;
291         int             rc;
292                 
293         struct timespec  start_period, end_period, period;
294         struct timespec  current_time, interval;
295
296         period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
297         period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
298         
299         fwp_set_rt_prio(90 - ac_id);
300         
301         FWP_DEBUG("Started vres tx thread with budget:%d period_sec=%ld " 
302                         "period_nsec=%ld.\n",
303                   vres->params.budget, period.tv_sec, period.tv_nsec);
304
305         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
306         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
307         clock_gettime(CLOCK_MONOTONIC, &start_period);
308
309         while (vres->status && (FWP_VRES_UNBOUND || FWP_VRES_BOUND)) {
310                 /* wait for next period and then send */
311                 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
312                 
313                 fwp_timespec_add(&end_period, &start_period, &period);
314                 clock_gettime(CLOCK_MONOTONIC, &current_time);
315                 fwp_timespec_sub(&interval, &end_period, &current_time);
316                 nanosleep(&interval, NULL);
317         
318                 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
319                 sem_wait(&msgq->empty_lock);
320                 clock_gettime(CLOCK_MONOTONIC, &start_period);
321                 
322                 /*msgb = fwp_msgq_dequeue(msgq);
323                 *if (msgb){*/
324                 curr_budget = 0;
325                 while ((curr_budget < budget)&& 
326                        (msgb = fwp_msgq_dequeue(msgq))) {
327                         rc = __fwp_vres_send(vres->ac_sockd, msgb);
328                         if (!(rc < 0)) {
329                                 FWP_DEBUG("Message sent through AC%d\n",ac_id);
330                                 /* Switch to this in the future
331                                  * curr_budget+= msgb->len;
332                                  */
333                                 curr_budget++;
334                         }
335                         fwp_msgb_free(msgb);
336                 }
337
338                 pthread_testcancel();   
339                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
340                 
341                 fwp_timespec_add(&end_period, &start_period, &period);
342                 clock_gettime(CLOCK_MONOTONIC, &current_time);
343                 fwp_timespec_sub(&interval, &end_period, &current_time);
344                 nanosleep(&interval, NULL);
345                 */
346         }
347         
348         /* it should normaly never come here */ 
349         pthread_cleanup_pop(0); 
350         fwp_vres_free(vres);
351         
352         return NULL;
353 }
354
355 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
356 {
357         fwp_vres_t *vres = vresd;
358
359         if (vres->status == FWP_VRES_BOUND) {
360                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
361         } else 
362                 return -EPERM;
363 }
364
365 /*int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
366 int _fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
367 {
368         fwp_vres_t *vres = vresd;
369         int rv = 0;
370
371         pthread_mutex_lock(&fwp_vres_table.lock);
372         if (!fwp_vres_is_valid(vresd)) {
373                 rv = -EINVAL;
374                 goto err;
375         }
376
377         if (vres->status != FWP_VRES_UNBOUND) { /*if already bounded */
378                 rv = -EPERM;
379                 goto err;
380         }
381
382         vres->ac_sockd = sockd;
383         fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
384         vres->status = FWP_VRES_BOUND;
385 err:
386         pthread_mutex_unlock(&fwp_vres_table.lock);
387         return rv;
388 }
389
390 int _fwp_vres_unbind(fwp_vres_d_t vresd)
391 {
392         fwp_vres_t *vres = vresd;
393         
394         if (!fwp_vres_is_valid(vresd)) {
395                 return -EINVAL;
396         }
397         vres->status = FWP_VRES_UNBOUND;
398         /* TODO: consider what to do with pending messages */
399         /* fwp_vres_free_msgb(vres->tx_queue); */
400         
401         return 0;
402 }
403