5 #include "fwp_endpoint.h"
9 static void* fwp_vres_tx_thread(void *_vres);
13 FWP_VRES_INACTIVE = 1 ,
14 FWP_VRES_UNBOUND = 2 ,
19 * Structure of FWP vres.
20 * Internal representation of vres
24 struct fwp_vres_params params;
25 /* consideration: move tx_queue to endpoint */
26 /**< queue for messages to send */
27 struct fwp_msgq tx_queue;
29 /**< endpoint bounded to this vres */
30 /*fwp_endpoint_t *epoint; */
31 pthread_t tx_thread; /**< tx_thread id*/
32 pthread_attr_t tx_thread_attr;
33 int ac_sockd; /**< ac socket descriptor */
34 fwp_vres_status_t status;
38 struct fwp_vres_table {
39 unsigned int max_vres;
44 /* Global variable - vres table */
45 static fwp_vres_table_t fwp_vres_table = {
48 .lock = PTHREAD_MUTEX_INITIALIZER,
51 /**< mapping priority to ac*/
52 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
53 /**< IP tos for AC_VI, AC_VO, AC_BE, AC_BK */
54 static const unsigned int ac_to_tos[4] = {224,160,96,64};
56 static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
60 tos = ac_to_tos[ac_id];
61 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
62 perror("Root permission needed to set AC");
70 static int fwp_vres_ac_open(fwp_ac_t ac_id)
75 if ((ac_id < 0)||(ac_id >= FWP_AC_NUM))
78 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
79 perror("Unable to open socket for AC");
83 tos = ac_to_tos[ac_id];
85 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
86 perror("Root permission needed to set AC");
95 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
97 /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
98 msgb->peer->addr, msgb->peer->addrlen);*/
99 _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
103 static inline void fwp_vres_free(fwp_vres_t *vres)
105 vres->status = FWP_VRES_FREE;
108 static inline int fwp_vres_is_valid(fwp_vres_d_t vresd)
110 int id = vresd - fwp_vres_table.entry;
112 if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
113 (vresd->status == FWP_VRES_FREE))
119 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
121 3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
123 *vres = &fwp_vres_table.entry[vres_id];
128 int fwp_vres_table_init(unsigned int max_vres)
130 unsigned int table_size = max_vres * sizeof(fwp_vres_t);
132 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
133 if (!fwp_vres_table.entry)
136 memset((void*) fwp_vres_table.entry, 0, table_size);
137 fwp_vres_table.max_vres = max_vres;
141 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
143 fwp_vres_t *vres = vresd;
145 return (vres - fwp_vres_table.entry);
148 fwp_vres_d_t fwp_vres_alloc()
151 unsigned int max_vres;
153 /* find free vres id */
154 pthread_mutex_lock(&fwp_vres_table.lock);
156 max_vres = fwp_vres_table.max_vres;
157 while ((i < max_vres) &&
158 (fwp_vres_table.entry[i].status != FWP_VRES_FREE)) {
163 pthread_mutex_unlock(&fwp_vres_table.lock);
167 fwp_vres_table.entry[i].status = FWP_VRES_INACTIVE;
168 pthread_mutex_unlock(&fwp_vres_table.lock);
169 return (&fwp_vres_table.entry[i]);
172 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
174 fwp_vres_t *vres = vresd;
177 if (!fwp_vres_is_valid(vresd)) {
180 /* copy vres paramters into vres structure */
181 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
183 if (vres->status != FWP_VRES_INACTIVE) {
184 /* Consider: how to change parameters when vres is in running
185 * state - restart thread, set vres_resched flag
187 pthread_cancel(vres->tx_thread);
188 /* or set vres_resched flag and return */
190 /* initialize msg queue */
191 fwp_msgq_init(&vres->tx_queue);
194 if ((vres->status == FWP_VRES_BOUND) &&
195 (rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id)) < 0) {
199 pthread_attr_init(&vres->tx_thread_attr);
200 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
201 fwp_vres_tx_thread, (void*) vres)) != 0){
205 vres->status = FWP_VRES_UNBOUND;
216 * \param[in] params Vres parameters
217 * \param[out] vresdp Pointer to the descriptor of newly created vres
219 * \return On success returns descriptor of vres.
220 * On error, negative error code is returned.
223 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
228 /* Check for validity of the contract */
230 vres = fwp_vres_alloc();
234 /* copy vres paramters into vres structure */
235 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
236 /* initialize msg queue */
237 fwp_msgq_init(&vres->tx_queue);
239 pthread_attr_init(&vres->tx_thread_attr);
240 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
241 fwp_vres_tx_thread, (void*) vres)) != 0){
245 vres->status = FWP_VRES_UNBOUND;
257 * \param[in] vresd Vres descriptor
259 * \return On success returns 0.
260 * On error, negative error code is returned.
263 int fwp_vres_destroy(fwp_vres_d_t vresd)
265 fwp_vres_t *vres = vresd;
267 if (!fwp_vres_is_valid(vresd))
270 vres->status = FWP_VRES_FREE;
271 pthread_cancel(vres->tx_thread);
273 FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
277 static void fwp_vres_cleanup(void *_vres)
279 fwp_vres_t *vres = (fwp_vres_t*)_vres;
281 fwp_msgq_dequeue_all(&vres->tx_queue);
285 static void* fwp_vres_tx_thread(void *_vres)
286 {/* TODO: make changes that count with changing of params */
287 struct fwp_vres *vres = (struct fwp_vres*)_vres;
288 struct fwp_msgq *msgq = &vres->tx_queue;
289 struct fwp_msgb *msgb = NULL;
290 unsigned int ac_id = vres->params.ac_id;
291 /*unsigned int ac_sockd = vres->ac_sockd;*/
292 int budget = vres->params.budget;
296 struct timespec start_period, end_period, period;
297 struct timespec current_time, interval;
299 period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
300 period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
302 fwp_set_rt_prio(90 - ac_id);
304 FWP_DEBUG("Started vres tx thread with budget:%d period_sec=%ld "
305 "period_nsec=%ld.\n",
306 vres->params.budget, period.tv_sec, period.tv_nsec);
308 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
309 pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
310 clock_gettime(CLOCK_MONOTONIC, &start_period);
312 while (vres->status && (FWP_VRES_UNBOUND || FWP_VRES_BOUND)) {
313 /* wait for next period and then send */
314 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
316 fwp_timespec_add(&end_period, &start_period, &period);
317 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
318 fwp_timespec_sub(&interval, &end_period, ¤t_time);
319 nanosleep(&interval, NULL);
321 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
322 sem_wait(&msgq->empty_lock);
323 clock_gettime(CLOCK_MONOTONIC, &start_period);
325 /*msgb = fwp_msgq_dequeue(msgq);
328 while ((curr_budget < budget)&&
329 (msgb = fwp_msgq_dequeue(msgq))) {
330 rc = __fwp_vres_send(vres->ac_sockd, msgb);
332 FWP_DEBUG("Message sent through AC%d\n",ac_id);
333 /* Switch to this in the future
334 * curr_budget+= msgb->len;
341 pthread_testcancel();
342 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
344 fwp_timespec_add(&end_period, &start_period, &period);
345 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
346 fwp_timespec_sub(&interval, &end_period, ¤t_time);
347 nanosleep(&interval, NULL);
351 /* it should normaly never come here */
352 pthread_cleanup_pop(0);
358 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
360 fwp_vres_t *vres = vresd;
362 if (vres->status == FWP_VRES_BOUND) {
363 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
368 /*int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
369 int _fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
371 fwp_vres_t *vres = vresd;
374 pthread_mutex_lock(&fwp_vres_table.lock);
375 if (!fwp_vres_is_valid(vresd)) {
380 if (vres->status != FWP_VRES_UNBOUND) { /*if already bounded */
385 vres->ac_sockd = sockd;
386 fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
387 vres->status = FWP_VRES_BOUND;
389 pthread_mutex_unlock(&fwp_vres_table.lock);
393 int _fwp_vres_unbind(fwp_vres_d_t vresd)
395 fwp_vres_t *vres = vresd;
397 if (!fwp_vres_is_valid(vresd)) {
400 vres->status = FWP_VRES_UNBOUND;
401 /* TODO: consider what to do with pending messages */
402 /* fwp_vres_free_msgb(vres->tx_queue); */