5 #include "fwp_endpoint.h"
9 static void* fwp_vres_tx_thread(void *_vres);
18 * Structure of FWP vres.
19 * Internal representation of vres
23 struct fwp_vres_params params;
24 /* consideration: move tx_queue to endpoint */
25 /**< queue for messages to send */
26 struct fwp_msgq tx_queue;
28 /**< endpoint bounded to this vres */
29 /*fwp_endpoint_t *epoint; */
30 pthread_t tx_thread; /**< tx_thread id*/
31 pthread_attr_t tx_thread_attr;
32 int ac_sockd; /**< ac socket descriptor */
33 fwp_sockaddr_t addr; /**< dest addr,for effectivness*/
37 struct fwp_vres_table {
38 unsigned int max_vres;
43 /* Global variable - vres table */
44 static fwp_vres_table_t fwp_vres_table = {
47 .lock = PTHREAD_MUTEX_INITIALIZER,
50 /**< mapping priority to ac*/
51 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
52 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */
53 static const unsigned int ac_to_tos[4] = {224,160,96,64};
56 * Set access category (AC) to socket
58 * \param[in] sockd Socket descriptor
59 * \param[in] ac_id AC identifier
61 * \return On success returns zero.
62 * On error, negative error code is returned.
65 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
69 tos = ac_to_tos[ac_id];
70 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
71 FWP_ERROR("setsockopt: %s", strerror(errno));
78 static inline int fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
80 return (vres->flags | (1 << flag));
83 static inline int fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
85 return (vres->flags & ~(1 << flag));
88 static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
90 return !!(vres->flags & (1 << flag));
95 static int fwp_vres_ac_open(fwp_ac_t ac_id)
100 if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
105 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
106 FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
110 tos = ac_to_tos[ac_id];
112 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
114 FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
124 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
126 /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
127 msgb->peer->addr, msgb->peer->addrlen);*/
128 return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
131 static inline void fwp_vres_free(fwp_vres_t *vres)
133 fwp_vres_clear_flag(vres, FWP_VF_USED);
136 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
138 int id = vres - fwp_vres_table.entry;
140 if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
141 (!fwp_vres_get_flag(vres, FWP_VF_USED)))
147 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
149 3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
151 *vres = &fwp_vres_table.entry[vres_id];
156 int fwp_vres_table_init(unsigned int max_vres)
158 unsigned int table_size = max_vres * sizeof(fwp_vres_t);
160 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
161 if (!fwp_vres_table.entry)
162 return -1; /* Errno is set by malloc */
164 memset((void*) fwp_vres_table.entry, 0, table_size);
165 fwp_vres_table.max_vres = max_vres;
169 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
171 fwp_vres_t *vres = vresd;
173 return (vres - fwp_vres_table.entry);
179 * \return On success returns vres descriptor.
181 fwp_vres_d_t fwp_vres_alloc()
184 unsigned int max_vres;
186 /* find free vres id */
187 pthread_mutex_lock(&fwp_vres_table.lock);
189 max_vres = fwp_vres_table.max_vres;
190 while ((i < max_vres) &&
191 (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
196 pthread_mutex_unlock(&fwp_vres_table.lock);
201 fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
202 pthread_mutex_unlock(&fwp_vres_table.lock);
203 return (&fwp_vres_table.entry[i]);
206 inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
208 /* copy vres paramters into vres structure */
209 rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
212 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
213 fwp_vres_set_flag(vres, FWP_VF_RESCHED);
221 * \param[in] vresdp Vres descriptor
222 * \param[in] params Vres parameters
224 * \return On success returns zero.
225 * On error, negative error code is returned.
228 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
230 fwp_vres_t *vres = vresd;
233 if (!fwp_vres_is_valid(vres)) {
238 return fwp_vres_set_params(vres, params);
244 * \param[in] params Vres parameters
245 * \param[out] vresdp Pointer to the descriptor of newly created vres
247 * \return On success returns descriptor of vres.
248 * On error, negative error code is returned.
251 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
256 /* Check for validity of the contract */
258 vres = fwp_vres_alloc();
262 /* copy vres paramters into vres structure */
263 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
264 /* initialize msg queue */
265 fwp_msgq_init(&vres->tx_queue);
267 pthread_attr_init(&vres->tx_thread_attr);
268 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
269 fwp_vres_tx_thread, (void*) vres)) != 0){
286 * \param[in] vresd Vres descriptor
288 * \return On success returns 0.
289 * On error, negative error code is returned.
292 int fwp_vres_destroy(fwp_vres_d_t vresd)
294 fwp_vres_t *vres = vresd;
296 if (!fwp_vres_is_valid(vres)) {
301 pthread_cancel(vres->tx_thread);
303 FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
307 static void fwp_vres_cleanup(void *_vres)
309 fwp_vres_t *vres = (fwp_vres_t*)_vres;
311 fwp_msgq_dequeue_all(&vres->tx_queue);
316 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
317 fwp_budget_t *budget)
319 if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {
320 period->tv_nsec = vres->params.period_usec % SEC_TO_USEC;
321 period->tv_sec = vres->params.period_usec / SEC_TO_USEC;
322 *budget = vres->params.budget;
323 FWP_DEBUG("Vres tx thread with budget:%d period_sec=%ld "
324 "period_nsec=%ld.\n",vres->params.budget,
325 period->tv_sec, period->tv_nsec);
330 * Thread that does budgeting
333 static void* fwp_vres_tx_thread(void *_vres)
334 {/* TODO: make changes that count with changing of params */
335 struct fwp_vres *vres = (struct fwp_vres*)_vres;
336 struct fwp_msgq *msgq = &vres->tx_queue;
337 struct fwp_msgb *msgb = NULL;
338 unsigned int ac_id = vres->params.ac_id;
339 /*unsigned int ac_sockd = vres->ac_sockd;*/
340 int budget = vres->params.budget;
344 struct timespec start_period, end_period, period;
345 struct timespec current_time, interval;
347 /* period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
348 period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
350 fwp_set_rt_prio(90 - ac_id);
353 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
354 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
355 pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
357 clock_gettime(CLOCK_MONOTONIC, &start_period);
360 /* wait for next period and then send */
361 fwp_vres_sched_update(vres, &period, &budget);
362 fwp_timespec_add(&end_period, &start_period, &period);
363 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
364 fwp_timespec_sub(&interval, &end_period, ¤t_time);
365 nanosleep(&interval, NULL);
367 sem_wait(&msgq->empty_lock);
368 clock_gettime(CLOCK_MONOTONIC, &start_period);
370 /*msgb = fwp_msgq_dequeue(msgq);
373 while ((curr_budget < budget)&&
374 (msgb = fwp_msgq_dequeue(msgq))) {
375 rc = __fwp_vres_send(vres->ac_sockd, msgb);
377 FWP_DEBUG("Message sent through AC%d\n",ac_id);
378 /* Switch to this in the future
379 * curr_budget+= msgb->len;
386 /*pthread_testcancel(); */
387 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
389 fwp_timespec_add(&end_period, &start_period, &period);
390 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
391 fwp_timespec_sub(&interval, &end_period, ¤t_time);
392 nanosleep(&interval, NULL);
396 /* it should normaly never come here */
397 pthread_cleanup_pop(0);
403 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
405 fwp_vres_t *vres = vresd;
407 if (fwp_vres_is_valid(vres)) {
408 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
410 errno = EPERM; /* TODO: Use more appropriate error than EPERM. */
416 /*int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
417 int _fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
419 fwp_vres_t *vres = vresd;
422 pthread_mutex_lock(&fwp_vres_table.lock);
423 if (!fwp_vres_is_valid(vres)) {
429 if (vres->status != FWP_VRES_UNBOUND) { /*if already bounded */
435 vres->ac_sockd = sockd;
436 fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
437 vres->status = FWP_VRES_BOUND;
439 pthread_mutex_unlock(&fwp_vres_table.lock);
443 int _fwp_vres_unbind(fwp_vres_d_t vresd)
445 fwp_vres_t *vres = vresd;
447 if (!fwp_vres_is_valid(vresd)) {
451 vres->status = FWP_VRES_UNBOUND;
452 /* TODO: consider what to do with pending messages */
453 /* fwp_vres_free_msgb(vres->tx_queue); */