5 #include "fwp_endpoint.h"
9 static void* fwp_vres_tx_thread(void *_vres);
17 fwp_vres_params_t fwp_vres_params_default = {
21 .period = {.tv_sec = 2 , .tv_nsec = 111111}
25 * Structure of FWP vres.
26 * Internal representation of vres
30 struct fwp_vres_params params;
31 /* consideration: move tx_queue to endpoint */
32 /**< queue for messages to send */
33 struct fwp_msgq tx_queue;
35 /**< endpoint bounded to this vres */
36 /*fwp_endpoint_t *epoint; */
37 pthread_t tx_thread; /**< tx_thread id*/
38 pthread_attr_t tx_thread_attr;
39 int ac_sockd; /**< ac socket descriptor */
40 fwp_sockaddr_t addr; /**< dest addr,for effectivness*/
44 struct fwp_vres_table {
45 unsigned int max_vres;
50 /* Global variable - vres table */
51 static fwp_vres_table_t fwp_vres_table = {
54 .lock = PTHREAD_MUTEX_INITIALIZER,
57 /**< mapping priority to ac*/
58 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
59 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */
60 static const unsigned int ac_to_tos[4] = {224,160,96,64};
63 * Set access category (AC) to socket
65 * \param[in] sockd Socket descriptor
66 * \param[in] ac_id AC identifier
68 * \return On success returns zero.
69 * On error, negative error code is returned.
72 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
76 tos = ac_to_tos[ac_id];
77 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
78 FWP_ERROR("setsockopt: %s", strerror(errno));
85 static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
87 vres->flags |= (1 << flag);
90 static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
92 vres->flags &= ~(1 << flag);
95 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
97 return !!(vres->flags & (1 << flag));
100 static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
107 static int fwp_vres_ac_open(fwp_ac_t ac_id)
112 if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
117 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
118 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
122 tos = ac_to_tos[ac_id];
124 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
126 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
136 static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
138 /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
139 msgb->peer->addr, msgb->peer->addrlen);*/
140 return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
143 static inline void fwp_vres_free(fwp_vres_t *vres)
145 fwp_vres_clearall_flag(vres);
148 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
150 int id = vres - fwp_vres_table.entry;
152 if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
153 (!fwp_vres_get_flag(vres, FWP_VF_USED)))
159 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
161 3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
163 *vres = &fwp_vres_table.entry[vres_id];
168 int fwp_vres_table_init(unsigned int max_vres)
170 unsigned int table_size = max_vres * sizeof(fwp_vres_t);
172 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
173 if (!fwp_vres_table.entry)
174 return -1; /* Errno is set by malloc */
176 memset((void*) fwp_vres_table.entry, 0, table_size);
177 fwp_vres_table.max_vres = max_vres;
181 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
183 fwp_vres_t *vres = vresd;
185 return (vres - fwp_vres_table.entry);
191 * \return On success returns vres descriptor.
193 fwp_vres_d_t fwp_vres_alloc()
196 unsigned int max_vres;
198 /* find free vres id */
199 pthread_mutex_lock(&fwp_vres_table.lock);
201 max_vres = fwp_vres_table.max_vres;
202 while ((i < max_vres) &&
203 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
208 pthread_mutex_unlock(&fwp_vres_table.lock);
213 FWP_DEBUG("Allocated vres id = %d\n",i);
214 fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
215 pthread_mutex_unlock(&fwp_vres_table.lock);
216 return (&fwp_vres_table.entry[i]);
219 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
223 /* copy vres paramters into vres structure */
224 rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
227 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
228 fwp_vres_set_flag(vres, FWP_VF_RESCHED);
236 * \param[in] vresdp Vres descriptor
237 * \param[in] params Vres parameters
239 * \return On success returns zero.
240 * On error, negative error code is returned.
243 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
245 fwp_vres_t *vres = vresd;
247 if (!fwp_vres_is_valid(vres)) {
252 return _fwp_vres_set_params(vres, params);
258 * \param[in] params Vres parameters
259 * \param[out] vresdp Pointer to the descriptor of newly created vres
261 * \return On success returns descriptor of vres.
262 * On error, negative error code is returned.
265 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
270 vres = fwp_vres_alloc();
275 /* initialize msg queue */
276 fwp_msgq_init(&vres->tx_queue);
278 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
279 fwp_vres_set_flag(vres, FWP_VF_RESCHED);
280 pthread_attr_init(&vres->tx_thread_attr);
281 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
282 fwp_vres_tx_thread, (void*) vres)) != 0){
296 * \param[in] vresd Vres descriptor
298 * \return On success returns 0.
299 * On error, negative error code is returned.
302 int fwp_vres_destroy(fwp_vres_d_t vresd)
304 fwp_vres_t *vres = vresd;
306 if (!fwp_vres_is_valid(vres)) {
311 pthread_cancel(vres->tx_thread);
313 FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
317 static void fwp_vres_cleanup(void *_vres)
319 fwp_vres_t *vres = (fwp_vres_t*)_vres;
321 fwp_msgq_dequeue_all(&vres->tx_queue);
326 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
327 fwp_budget_t *budget)
329 if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {
330 /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
331 period->tv_sec = vres->params.period / SEC_TO_USEC;*/
332 *period = vres->params.period;
333 *budget = vres->params.budget;
334 FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
335 "period_nsec=%ld.\n",vres->params.budget,
336 period->tv_sec, period->tv_nsec);
337 fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
342 * Thread that does budgeting
345 static void* fwp_vres_tx_thread(void *_vres)
347 struct fwp_vres *vres = (struct fwp_vres*)_vres;
348 struct fwp_msgq *msgq = &vres->tx_queue;
349 struct fwp_msgb *msgb = NULL;
350 unsigned int ac_id = vres->params.ac_id;
351 fwp_budget_t budget = vres->params.budget;
352 fwp_budget_t curr_budget;
355 struct timespec start_period, end_period, period;
356 struct timespec current_time, interval;
358 fwp_set_rt_prio(90 - ac_id);
361 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
362 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
363 pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
365 clock_gettime(CLOCK_MONOTONIC, &start_period);
368 /* wait for next period and then send */
369 fwp_timespec_add(&end_period, &start_period, &period);
370 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
371 fwp_timespec_sub(&interval, &end_period, ¤t_time);
372 nanosleep(&interval, NULL);
374 sem_wait(&msgq->empty_lock);
375 fwp_vres_sched_update(vres, &period, &budget);
376 clock_gettime(CLOCK_MONOTONIC, &start_period);
378 /*msgb = fwp_msgq_dequeue(msgq);
381 while ((curr_budget < budget)&&
382 (msgb = fwp_msgq_dequeue(msgq))) {
383 rc = _fwp_vres_send(vres->ac_sockd, msgb);
385 FWP_DEBUG("Message sent through AC%d\n",ac_id);
386 /* Switch to this in the future
387 * curr_budget+= msgb->len;
395 /*pthread_testcancel(); */
396 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
398 fwp_timespec_add(&end_period, &start_period, &period);
399 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
400 fwp_timespec_sub(&interval, &end_period, ¤t_time);
401 nanosleep(&interval, NULL);
405 /* it should normaly never come here */
406 pthread_cleanup_pop(0);
412 int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
414 fwp_vres_t *vres = vresd;
416 if (fwp_vres_is_valid(vres)) {
417 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
419 errno = EPERM; /* TODO: Use more appropriate error than EPERM. */
424 /*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
425 int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
427 fwp_vres_t *vres = vresd;
430 pthread_mutex_lock(&fwp_vres_table.lock);
431 if (!fwp_vres_is_valid(vres)) {
437 if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
443 vres->ac_sockd = sockd;
444 rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
447 fwp_vres_set_flag(vres, FWP_VF_BOUND);
449 pthread_mutex_unlock(&fwp_vres_table.lock);
453 int fwp_vres_unbind(fwp_vres_d_t vresd)
455 fwp_vres_t *vres = vresd;
457 if (!fwp_vres_is_valid(vresd)) {
461 fwp_vres_clear_flag(vres, FWP_VF_BOUND);
462 /* TODO: consider what to do with pending messages */
463 /* fwp_vres_free_msgb(vres->tx_queue); */