]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_vres.c
Bug Fix: mngr_commit: fwp_msg_contracthdr_out( msgb->tail changed to msgb->data
[frescor/fwp.git] / fwp / lib / core / fwp_vres.c
1 #include "fwp_util.h"
2 #include "fwp_vres.h"
3
4 static void* fwp_vres_tx_thread(void *_vres);
5
6
7 typedef enum {
8         FWP_VRES_FREE           = 0 ,
9         FWP_VRES_INACTIVE       = 1 ,
10         FWP_VRES_UNBOUND        = 2 ,
11         FWP_VRES_BOUND          = 3 ,
12 } fwp_vres_status_t;
13
14 /**
15  * Structure of FWP vres.
16  *
17  * 
18  */
19 struct fwp_vres{
20         struct fwp_vres_params          params;
21         /* consideration: move tx_queue to endpoint */
22         /**< queue for messages to send */
23         struct fwp_msgq                 tx_queue;   
24         int                             flags;
25         /**< endpoint bounded to this vres */
26         fwp_endpoint_d_t                epointd;   
27         pthread_t                       tx_thread; /**< tx_thread id*/
28         pthread_attr_t                  tx_thread_attr;
29         int                             ac_sockd;  /**< ac socket descriptor */
30         fwp_vres_status_t               status;
31 };
32
33 typedef
34 struct fwp_vres_table {
35         unsigned int                    nr_vres;
36         fwp_vres_t                      *entry;
37         pthread_mutex_t                 lock;
38 } fwp_vres_table_t;
39
40 /* Global variable - vres table */
41 static fwp_vres_table_t  fwp_vres_table = {
42         .nr_vres = 0,
43         .entry = NULL,
44         .lock = PTHREAD_MUTEX_INITIALIZER,
45 };
46
47 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
48 static const unsigned int ac_to_tos[4] = {224,160,96,64};
49
50 static int fwp_vres_ac_open(fwp_ac_t ac_id) 
51 {
52         int sockd;
53         unsigned int tos;
54         
55         if ((ac_id < 0)||(ac_id >= FWP_AC_NUM))
56                 return -EINVAL;
57
58         if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
59                 perror("Unable to open socket for AC");
60                 return (-errno);
61         }
62         
63         /*
64          * Not needed
65          * unisgned int yes = 1;
66          * if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, &yes, 
67          *                sizeof(int)) == -1) {
68          *      perror("fwp_ac_open - Root permission needed to set AC)");
69                 close(sockfd);
70                 return (-errno);
71         }*/
72
73         tos = ac_to_tos[ac_id];
74         
75         if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
76                 perror("Root permission needed to set AC");
77                 /* let pass
78                  * close(sockfd);
79                  *return (-errno);*/
80         }
81         
82         return sockd;
83 }
84
85 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
86 {
87         _fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
88                         msgb->peer->addr, msgb->peer->addrlen);
89         return 0;
90 }
91
92 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
93 {
94         if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
95                 return -EINVAL;
96         *vres = &fwp_vres_table.entry[vres_id];
97         return 0;
98 }
99
100 inline int fwp_vres_getid(fwp_vres_t *vres, fwp_vres_id_t *vres_id)
101 {
102         fwp_vres_id_t id;
103
104         id = vres - fwp_vres_table.entry;
105         if ((id < 0) || (id > fwp_vres_table.nr_vres - 1))
106                 return -EINVAL;
107         *vres_id = id;
108         return 0;
109 }*/
110
111 int fwp_vres_table_init(unsigned int nr_vres)
112 {
113         unsigned int table_size = nr_vres * sizeof(fwp_vres_t);
114
115         fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
116         if (!fwp_vres_table.entry)
117                 return -ENOMEM;
118
119         memset((void*) fwp_vres_table.entry, 0, table_size);
120         fwp_vres_table.nr_vres = nr_vres;
121         return 0;
122 }
123
124 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
125 {
126         fwp_vres_t *vres = vresd;
127         
128         return (vres - fwp_vres_table.entry);
129 }
130
131 fwp_vres_d_t fwp_vres_alloc()
132 {
133         int i;
134         unsigned int nr_vres;
135
136         /* find free vres id */
137         pthread_mutex_lock(&fwp_vres_table.lock);
138         i = 0;
139         nr_vres = fwp_vres_table.nr_vres;
140         while ((i < nr_vres) && 
141                 (fwp_vres_table.entry[i].status != FWP_VRES_FREE)) {
142                 i++;
143         }
144         
145         if (i == nr_vres) {
146                 pthread_mutex_unlock(&fwp_vres_table.lock);
147                 return NULL;
148         }
149
150         fwp_vres_table.entry[i].status = FWP_VRES_INACTIVE;
151         pthread_mutex_unlock(&fwp_vres_table.lock);
152         return (&fwp_vres_table.entry[i]);
153 }
154
155 static inline void fwp_vres_free(fwp_vres_t *vres)
156 {
157         vres->status = FWP_VRES_FREE;
158 }
159
160 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
161 {
162         fwp_vres_t *vres = vresd;
163         int rv;
164         
165         /* copy vres paramters into vres structure */
166         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
167         if (vres->status != FWP_VRES_INACTIVE) {
168                 /* Consider: hwo to change parameters when vres is in running
169                  * state - restart thread, set vres_resched flag
170                  */
171                 pthread_cancel(vres->tx_thread);
172                 close(vres->ac_sockd);
173                 /* or set vres_resched flag and return */
174         } else{
175                 /* initialize msg queue */
176                 fwp_msgq_init(&vres->tx_queue);
177         }
178         /* open ac socket */
179         if ((rv = fwp_vres_ac_open(vres->params.ac_id)) < 0) {
180                 goto err;
181         }
182         vres->ac_sockd = rv;    
183
184         pthread_attr_init(&vres->tx_thread_attr);
185         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
186                             fwp_vres_tx_thread, (void*) vres)) != 0){
187                 goto err;
188
189         }
190         vres->status = FWP_VRES_UNBOUND;
191         
192 /*      return vres->params.id; */
193         return 0;
194 err:    
195         fwp_vres_free(vres);
196         return rv; 
197 }
198
199 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
200 {
201         int rv;
202         fwp_vres_t *vres;
203         
204         /* Check for validity of the contract */
205
206         vres = fwp_vres_alloc();
207         if (!vres) {
208                 return -ENOMEM;
209         }
210         /* copy vres paramters into vres structure */
211         memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
212         /* initialize msg queue */
213         fwp_msgq_init(&vres->tx_queue);
214         /* open ac socket */
215         if ((rv = fwp_vres_ac_open(vres->params.ac_id)) < 0) {
216                 goto err;
217         }
218         vres->ac_sockd = rv;    
219
220         pthread_attr_init(&vres->tx_thread_attr);
221         if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
222                             fwp_vres_tx_thread, (void*) vres)) != 0){
223                 goto err;
224
225         }
226         vres->status = FWP_VRES_UNBOUND;
227         
228 /*      return vres->params.id; */
229         *vresdp = vres;
230         return 0;
231 err:    
232         fwp_vres_free(vres);
233         return rv; 
234 }
235
236 int fwp_vres_destroy(fwp_vres_d_t vresd)
237 {       
238         fwp_vres_t *vres;
239
240         vres = vresd;
241         if (vres->status == FWP_VRES_FREE)
242                 return -EPERM; 
243         
244         vres->status = FWP_VRES_INACTIVE;
245
246         /* unbind endpoint */
247         fwp_send_endpoint_unbind(vres->epointd);
248
249         pthread_cancel(vres->tx_thread);
250         close(vres->ac_sockd);
251         
252         FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
253         return  0;
254 }
255
256 static void fwp_vres_cleanup(void *_vres)
257 {
258         fwp_vres_t *vres = (fwp_vres_t*)_vres;
259
260         fwp_msgq_dequeue_all(&vres->tx_queue);
261         fwp_vres_free(vres);
262 }
263
264 static void* fwp_vres_tx_thread(void *_vres)
265 {/* TODO: make changes that count with changing of params */
266         struct fwp_vres *vres = (struct fwp_vres*)_vres;
267         struct fwp_msgq *msgq = &vres->tx_queue;
268         struct fwp_msgb *msgb = NULL;
269         unsigned int    ac_id = vres->params.ac_id;
270         unsigned int    ac_sockd = vres->ac_sockd;
271         int             budget = vres->params.budget;
272         int             curr_budget;
273         int             rc;
274                 
275         struct timespec  start_period, end_period, period;
276         struct timespec  current_time, interval;
277
278         period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
279         period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
280         
281         fwp_set_rt_prio(90 - ac_id);
282         
283         FWP_DEBUG("vres tx thread with budget:%d period_sec=%ld " 
284                         "period_nsec=%ld.\n",
285                   vres->params.budget, period.tv_sec, period.tv_nsec);
286
287         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
288         pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
289         clock_gettime(CLOCK_MONOTONIC, &start_period);
290
291         while (vres->status && (FWP_VRES_UNBOUND || FWP_VRES_BOUND)) {
292                 /* wait for next period and then send */
293                 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
294                 
295                 fwp_timespec_add(&end_period, &start_period, &period);
296                 clock_gettime(CLOCK_MONOTONIC, &current_time);
297                 fwp_timespec_sub(&interval, &end_period, &current_time);
298                 nanosleep(&interval, NULL);
299         
300                 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
301                 sem_wait(&msgq->empty_lock);
302                 clock_gettime(CLOCK_MONOTONIC, &start_period);
303                 
304                 /*msgb = fwp_msgq_dequeue(msgq);
305                 if (msgb){*/
306                 curr_budget = 0;
307                 while ((curr_budget < budget)&& 
308                        (msgb = fwp_msgq_dequeue(msgq))) {
309                         
310                         rc = __fwp_vres_send(ac_sockd, msgb);
311                         if (!(rc < 0)) {
312                                 FWP_DEBUG("Message sent through AC%d\n",ac_id);
313                                 /* Switch to this in the future
314                                  * curr_budget+= msgb->len;
315                                  */
316                                 curr_budget++;
317                         }
318                         fwp_msgb_free(msgb);
319                 }
320
321                 pthread_testcancel();   
322                 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
323                 
324                 fwp_timespec_add(&end_period, &start_period, &period);
325                 clock_gettime(CLOCK_MONOTONIC, &current_time);
326                 fwp_timespec_sub(&interval, &end_period, &current_time);
327                 nanosleep(&interval, NULL);
328                 */
329         }
330         
331         /* it should normaly never come here */ 
332         pthread_cleanup_pop(0); 
333         fwp_vres_free(vres);
334         
335         return NULL;
336 }
337
338 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
339 {
340         fwp_vres_t *vres = vresd;
341
342         /* test flags to check whether to send reliably*/
343         if (vres->status != FWP_VRES_INACTIVE) {
344                 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
345         } else 
346                 return -EPERM;
347 }
348
349 int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_d_t epointd)
350 {
351         fwp_vres_t *vres = vresd;
352         int rv = 0;
353
354         pthread_mutex_lock(&fwp_vres_table.lock);
355         if (vres->status == FWP_VRES_BOUND) /*if other endpoint is assigned to vres*/
356                 rv = -EPERM;
357         else { 
358                 vres->epointd = epointd;
359                 vres->status = FWP_VRES_BOUND;
360         }
361         pthread_mutex_unlock(&fwp_vres_table.lock);
362         return rv;
363 }
364
365 int _fwp_vres_unbind(fwp_vres_d_t vresd)
366 {
367         fwp_vres_t *vres = vresd;
368         
369         vres->status = FWP_VRES_UNBOUND;
370         /* TODO: consider what to do with pending messages */
371         /* fwp_vres_free_msgb(vres->tx_queue); */
372         
373         return 0;
374 }
375