]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_vres.c
Added service contract
[frescor/fwp.git] / fwp / lib / core / fwp_vres.c
1 #include "fwp_util.h"
2 #include "fwp_vres.h"
3
4 #include "fwp_msgq.h"
5 #include "fwp_endpoint.h"
6
7 #include <string.h>
8
9 static void* fwp_vres_tx_thread(void *_vres);
10
11 typedef enum {
12         FWP_VRES_FREE           = 0 ,
13         FWP_VRES_INACTIVE       = 1 ,
14         FWP_VRES_UNBOUND        = 2 ,
15         FWP_VRES_BOUND          = 3 ,
16 } fwp_vres_status_t;
17
18 /**
19  * Structure of FWP vres.
20  * Internal representation of vres
21  * 
22  */
23 struct fwp_vres{
24         struct fwp_vres_params          params;
25         /* consideration: move tx_queue to endpoint */
26         /**< queue for messages to send */
27         struct fwp_msgq                 tx_queue;   
28         int                             flags;
29         /**< endpoint bounded to this vres */
30         /*fwp_endpoint_t                *epoint; */
31         pthread_t                       tx_thread; /**< tx_thread id*/
32         pthread_attr_t                  tx_thread_attr;
33         int                             ac_sockd;  /**< ac socket descriptor */
34         fwp_vres_status_t               status;
35 };
36
37 typedef
38 struct fwp_vres_table {
39         unsigned int                    max_vres; 
40         fwp_vres_t                      *entry;
41         pthread_mutex_t                 lock;
42 } fwp_vres_table_t;
43
44 /* Global variable - vres table */
45 static fwp_vres_table_t  fwp_vres_table = {
46         .max_vres = 0,
47         .entry = NULL,
48         .lock = PTHREAD_MUTEX_INITIALIZER,
49 };
50
51 /**< mapping priority to ac*/
52 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
53 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */ 
54 static const unsigned int ac_to_tos[4] = {224,160,96,64};
55
56 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) 
57 {
58         unsigned int tos;
59         
60         tos = ac_to_tos[ac_id];
61         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
62                 perror("Root permission needed to set AC");
63                 //return (-errno);
64         }
65         
66         return 0;
67 }
68
69 /* Deprecated */
70 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
71 {
72         int sockd;
73         unsigned int tos;
74         
75         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM))
76                 return -EINVAL;
77
78         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
79                 perror("Unable to open socket for AC");
80                 return (-errno);
81         }
82
83         tos = ac_to_tos[ac_id];
84         
85         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
86                 perror("Root permission needed to set AC");
87                 /* let pass
88                  * close(sockfd);
89                  *return (-errno);*/
90         }
91         
92         return sockd;
93 }
94
95 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
96 {
97         /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
98                         msgb->peer->addr, msgb->peer->addrlen);*/
99         _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
100         return 0;
101 }
102
103 static inline void fwp_vres_free(fwp_vres_t *vres)
104 {
105         vres->status = FWP_VRES_FREE;
106 }
107
108 static inline int fwp_vres_is_valid(fwp_vres_d_t vresd)
109 {
110         int id  = vresd - fwp_vres_table.entry;
111
112         if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
113                 (vresd->status == FWP_VRES_FREE)) 
114                 return 0;
115                 
116         return 1; 
117 }
118
119 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
120 {
121         3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
122                 return -EINVAL;
123         *vres = &fwp_vres_table.entry[vres_id];
124         return 0;
125 }
126 */
127
128 int fwp_vres_table_init(unsigned int max_vres)
129 {
130         unsigned int table_size = max_vres * sizeof(fwp_vres_t);
131
132         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
133         if (!fwp_vres_table.entry)
134                 return -ENOMEM;
135
136         memset((void*) fwp_vres_table.entry, 0, table_size);
137         fwp_vres_table.max_vres = max_vres;
138         return 0;
139 }
140
141 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
142 {
143         fwp_vres_t *vres = vresd;
144         
145         return (vres - fwp_vres_table.entry);
146 }
147
148 fwp_vres_d_t fwp_vres_alloc()
149 {
150         int i;
151         unsigned int max_vres;
152
153         /* find free vres id */
154         pthread_mutex_lock(&fwp_vres_table.lock);
155         i = 0;
156         max_vres = fwp_vres_table.max_vres;
157         while ((i < max_vres) && 
158                 (fwp_vres_table.entry[i].status != FWP_VRES_FREE)) {
159                 i++;
160         }
161         
162         if (i == max_vres) {
163                 pthread_mutex_unlock(&fwp_vres_table.lock);
164                 return NULL;
165         }
166
167         fwp_vres_table.entry[i].status = FWP_VRES_INACTIVE;
168         pthread_mutex_unlock(&fwp_vres_table.lock);
169         return (&fwp_vres_table.entry[i]);
170 }
171
172 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
173 {
174         fwp_vres_t *vres = vresd;
175         int rv;
176         
177         if (!fwp_vres_is_valid(vresd)) {
178                 return -EINVAL;
179         }
180         /* copy vres paramters into vres structure */
181         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
182         
183         if (vres->status != FWP_VRES_INACTIVE) {
184                 /* Consider: how to change parameters when vres is in running
185                  * state - restart thread, set vres_resched flag
186                  */
187                 pthread_cancel(vres->tx_thread);
188                 /* or set vres_resched flag and return */
189         } else{
190                 /* initialize msg queue */
191                 fwp_msgq_init(&vres->tx_queue);
192         }
193         
194         if ((vres->status == FWP_VRES_BOUND) && 
195                 (rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id)) < 0) {
196                 goto err;
197         }
198
199         pthread_attr_init(&vres->tx_thread_attr);
200         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
201                             fwp_vres_tx_thread, (void*) vres)) != 0){
202                 goto err;
203
204         }
205         vres->status = FWP_VRES_UNBOUND;
206         
207         return 0;
208 err:    
209         fwp_vres_free(vres);
210         return rv; 
211 }
212
213 /**
214  * Creates new vres
215  *
216  * \param[in] params Vres parameters
217  * \param[out] vresdp Pointer to the descriptor of newly created vres
218  *
219  * \return On success returns descriptor of vres. 
220  * On error, negative error code is returned. 
221  *
222  */
223 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
224 {
225         int rv;
226         fwp_vres_t *vres;
227         
228         /* Check for validity of the contract */
229
230         vres = fwp_vres_alloc();
231         if (!vres) {
232                 return -ENOMEM;
233         }
234         /* copy vres paramters into vres structure */
235         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
236         /* initialize msg queue */
237         fwp_msgq_init(&vres->tx_queue);
238         
239         pthread_attr_init(&vres->tx_thread_attr);
240         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
241                             fwp_vres_tx_thread, (void*) vres)) != 0){
242                 goto err;
243
244         }
245         vres->status = FWP_VRES_UNBOUND;
246         
247         *vresdp = vres;
248         return 0;
249 err:    
250         fwp_vres_free(vres);
251         return rv; 
252 }
253
254 /**
255  * Destroys vres
256  *
257  * \param[in] vresd Vres descriptor
258  *
259  * \return On success returns 0. 
260  * On error, negative error code is returned. 
261  *
262  */
263 int fwp_vres_destroy(fwp_vres_d_t vresd)
264 {       
265         fwp_vres_t *vres = vresd;
266
267         if (!fwp_vres_is_valid(vresd))
268                 return -EINVAL;
269         
270         vres->status = FWP_VRES_FREE;
271         pthread_cancel(vres->tx_thread);
272         
273         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
274         return  0;
275 }
276
277 static void fwp_vres_cleanup(void *_vres)
278 {
279         fwp_vres_t *vres = (fwp_vres_t*)_vres;
280
281         fwp_msgq_dequeue_all(&vres->tx_queue);
282         fwp_vres_free(vres);
283 }
284
285 static void* fwp_vres_tx_thread(void *_vres)
286 {/* TODO: make changes that count with changing of params */
287         struct fwp_vres *vres = (struct fwp_vres*)_vres;
288         struct fwp_msgq *msgq = &vres->tx_queue;
289         struct fwp_msgb *msgb = NULL;
290         unsigned int    ac_id = vres->params.ac_id;
291         /*unsigned int  ac_sockd = vres->ac_sockd;*/
292         int             budget = vres->params.budget;
293         int             curr_budget;
294         int             rc;
295                 
296         struct timespec  start_period, end_period, period;
297         struct timespec  current_time, interval;
298
299         period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
300         period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
301         
302         fwp_set_rt_prio(90 - ac_id);
303         
304         FWP_DEBUG("Started vres tx thread with budget:%d period_sec=%ld " 
305                         "period_nsec=%ld.\n",
306                   vres->params.budget, period.tv_sec, period.tv_nsec);
307
308         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
309         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
310         clock_gettime(CLOCK_MONOTONIC, &start_period);
311
312         while (vres->status && (FWP_VRES_UNBOUND || FWP_VRES_BOUND)) {
313                 /* wait for next period and then send */
314                 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
315                 
316                 fwp_timespec_add(&end_period, &start_period, &period);
317                 clock_gettime(CLOCK_MONOTONIC, &current_time);
318                 fwp_timespec_sub(&interval, &end_period, &current_time);
319                 nanosleep(&interval, NULL);
320         
321                 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
322                 sem_wait(&msgq->empty_lock);
323                 clock_gettime(CLOCK_MONOTONIC, &start_period);
324                 
325                 /*msgb = fwp_msgq_dequeue(msgq);
326                 *if (msgb){*/
327                 curr_budget = 0;
328                 while ((curr_budget < budget)&& 
329                        (msgb = fwp_msgq_dequeue(msgq))) {
330                         rc = __fwp_vres_send(vres->ac_sockd, msgb);
331                         if (!(rc < 0)) {
332                                 FWP_DEBUG("Message sent through AC%d\n",ac_id);
333                                 /* Switch to this in the future
334                                  * curr_budget+= msgb->len;
335                                  */
336                                 curr_budget++;
337                         }
338                         fwp_msgb_free(msgb);
339                 }
340
341                 pthread_testcancel();   
342                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
343                 
344                 fwp_timespec_add(&end_period, &start_period, &period);
345                 clock_gettime(CLOCK_MONOTONIC, &current_time);
346                 fwp_timespec_sub(&interval, &end_period, &current_time);
347                 nanosleep(&interval, NULL);
348                 */
349         }
350         
351         /* it should normaly never come here */ 
352         pthread_cleanup_pop(0); 
353         fwp_vres_free(vres);
354         
355         return NULL;
356 }
357
358 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
359 {
360         fwp_vres_t *vres = vresd;
361
362         if (vres->status == FWP_VRES_BOUND) {
363                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
364         } else 
365                 return -EPERM;
366 }
367
368 /*int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
369 int _fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
370 {
371         fwp_vres_t *vres = vresd;
372         int rv = 0;
373
374         pthread_mutex_lock(&fwp_vres_table.lock);
375         if (!fwp_vres_is_valid(vresd)) {
376                 rv = -EINVAL;
377                 goto err;
378         }
379
380         if (vres->status != FWP_VRES_UNBOUND) { /*if already bounded */
381                 rv = -EPERM;
382                 goto err;
383         }
384
385         vres->ac_sockd = sockd;
386         fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
387         vres->status = FWP_VRES_BOUND;
388 err:
389         pthread_mutex_unlock(&fwp_vres_table.lock);
390         return rv;
391 }
392
393 int _fwp_vres_unbind(fwp_vres_d_t vresd)
394 {
395         fwp_vres_t *vres = vresd;
396         
397         if (!fwp_vres_is_valid(vresd)) {
398                 return -EINVAL;
399         }
400         vres->status = FWP_VRES_UNBOUND;
401         /* TODO: consider what to do with pending messages */
402         /* fwp_vres_free_msgb(vres->tx_queue); */
403         
404         return 0;
405 }
406