4 static void* fwp_vres_tx_thread(void *_vres);
9 FWP_VRES_INACTIVE = 1 ,
10 FWP_VRES_UNBOUND = 2 ,
15 * Structure of FWP vres.
20 struct fwp_vres_params params;
21 /* consideration: move tx_queue to endpoint */
22 /**< queue for messages to send */
23 struct fwp_msgq tx_queue;
25 /**< endpoint bounded to this vres */
26 fwp_endpoint_d_t epointd;
27 pthread_t tx_thread; /**< tx_thread id*/
28 pthread_attr_t tx_thread_attr;
29 int ac_sockd; /**< ac socket descriptor */
30 fwp_vres_status_t status;
34 struct fwp_vres_table {
40 /* Global variable - vres table */
41 static fwp_vres_table_t fwp_vres_table = {
44 .lock = PTHREAD_MUTEX_INITIALIZER,
47 static const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
48 static const unsigned int ac_to_tos[4] = {224,160,96,64};
50 static int fwp_vres_ac_open(fwp_ac_t ac_id)
55 if ((ac_id < 0)||(ac_id >= FWP_AC_NUM))
58 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
59 perror("Unable to open socket for AC");
65 * unisgned int yes = 1;
66 * if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, &yes,
67 * sizeof(int)) == -1) {
68 * perror("fwp_ac_open - Root permission needed to set AC)");
73 tos = ac_to_tos[ac_id];
75 if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
76 perror("Root permission needed to set AC");
85 static inline int __fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
87 _fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
88 msgb->peer->addr, msgb->peer->addrlen);
92 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
94 if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
96 *vres = &fwp_vres_table.entry[vres_id];
100 inline int fwp_vres_getid(fwp_vres_t *vres, fwp_vres_id_t *vres_id)
104 id = vres - fwp_vres_table.entry;
105 if ((id < 0) || (id > fwp_vres_table.nr_vres - 1))
111 int fwp_vres_table_init(unsigned int nr_vres)
113 unsigned int table_size = nr_vres * sizeof(fwp_vres_t);
115 fwp_vres_table.entry = (fwp_vres_t*) malloc(table_size);
116 if (!fwp_vres_table.entry)
119 memset((void*) fwp_vres_table.entry, 0, table_size);
120 fwp_vres_table.nr_vres = nr_vres;
124 fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
126 fwp_vres_t *vres = vresd;
128 return (vres - fwp_vres_table.entry);
131 fwp_vres_d_t fwp_vres_alloc()
134 unsigned int nr_vres;
136 /* find free vres id */
137 pthread_mutex_lock(&fwp_vres_table.lock);
139 nr_vres = fwp_vres_table.nr_vres;
140 while ((i < nr_vres) &&
141 (fwp_vres_table.entry[i].status != FWP_VRES_FREE)) {
146 pthread_mutex_unlock(&fwp_vres_table.lock);
150 fwp_vres_table.entry[i].status = FWP_VRES_INACTIVE;
151 pthread_mutex_unlock(&fwp_vres_table.lock);
152 return (&fwp_vres_table.entry[i]);
155 static inline void fwp_vres_free(fwp_vres_t *vres)
157 vres->status = FWP_VRES_FREE;
160 int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
162 fwp_vres_t *vres = vresd;
165 /* copy vres paramters into vres structure */
166 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
167 if (vres->status != FWP_VRES_INACTIVE) {
168 /* Consider: hwo to change parameters when vres is in running
169 * state - restart thread, set vres_resched flag
171 pthread_cancel(vres->tx_thread);
172 close(vres->ac_sockd);
173 /* or set vres_resched flag and return */
175 /* initialize msg queue */
176 fwp_msgq_init(&vres->tx_queue);
179 if ((rv = fwp_vres_ac_open(vres->params.ac_id)) < 0) {
184 pthread_attr_init(&vres->tx_thread_attr);
185 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
186 fwp_vres_tx_thread, (void*) vres)) != 0){
190 vres->status = FWP_VRES_UNBOUND;
192 /* return vres->params.id; */
199 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
204 /* Check for validity of the contract */
206 vres = fwp_vres_alloc();
210 /* copy vres paramters into vres structure */
211 memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
212 /* initialize msg queue */
213 fwp_msgq_init(&vres->tx_queue);
215 if ((rv = fwp_vres_ac_open(vres->params.ac_id)) < 0) {
220 pthread_attr_init(&vres->tx_thread_attr);
221 if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
222 fwp_vres_tx_thread, (void*) vres)) != 0){
226 vres->status = FWP_VRES_UNBOUND;
228 /* return vres->params.id; */
236 int fwp_vres_destroy(fwp_vres_d_t vresd)
241 if (vres->status == FWP_VRES_FREE)
244 vres->status = FWP_VRES_INACTIVE;
246 /* unbind endpoint */
247 fwp_send_endpoint_unbind(vres->epointd);
249 pthread_cancel(vres->tx_thread);
250 close(vres->ac_sockd);
252 FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
256 static void fwp_vres_cleanup(void *_vres)
258 fwp_vres_t *vres = (fwp_vres_t*)_vres;
260 fwp_msgq_dequeue_all(&vres->tx_queue);
264 static void* fwp_vres_tx_thread(void *_vres)
265 {/* TODO: make changes that count with changing of params */
266 struct fwp_vres *vres = (struct fwp_vres*)_vres;
267 struct fwp_msgq *msgq = &vres->tx_queue;
268 struct fwp_msgb *msgb = NULL;
269 unsigned int ac_id = vres->params.ac_id;
270 unsigned int ac_sockd = vres->ac_sockd;
271 int budget = vres->params.budget;
275 struct timespec start_period, end_period, period;
276 struct timespec current_time, interval;
278 period.tv_nsec = vres->params.period_usec % SEC_TO_USEC;
279 period.tv_sec = vres->params.period_usec / SEC_TO_USEC;
281 fwp_set_rt_prio(90 - ac_id);
283 FWP_DEBUG("vres tx thread with budget:%d period_sec=%ld "
284 "period_nsec=%ld.\n",
285 vres->params.budget, period.tv_sec, period.tv_nsec);
287 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
288 pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
289 clock_gettime(CLOCK_MONOTONIC, &start_period);
291 while (vres->status && (FWP_VRES_UNBOUND || FWP_VRES_BOUND)) {
292 /* wait for next period and then send */
293 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
295 fwp_timespec_add(&end_period, &start_period, &period);
296 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
297 fwp_timespec_sub(&interval, &end_period, ¤t_time);
298 nanosleep(&interval, NULL);
300 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
301 sem_wait(&msgq->empty_lock);
302 clock_gettime(CLOCK_MONOTONIC, &start_period);
304 /*msgb = fwp_msgq_dequeue(msgq);
307 while ((curr_budget < budget)&&
308 (msgb = fwp_msgq_dequeue(msgq))) {
310 rc = __fwp_vres_send(ac_sockd, msgb);
312 FWP_DEBUG("Message sent through AC%d\n",ac_id);
313 /* Switch to this in the future
314 * curr_budget+= msgb->len;
321 pthread_testcancel();
322 /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
324 fwp_timespec_add(&end_period, &start_period, &period);
325 clock_gettime(CLOCK_MONOTONIC, ¤t_time);
326 fwp_timespec_sub(&interval, &end_period, ¤t_time);
327 nanosleep(&interval, NULL);
331 /* it should normaly never come here */
332 pthread_cleanup_pop(0);
338 int _fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
340 fwp_vres_t *vres = vresd;
342 /* test flags to check whether to send reliably*/
343 if (vres->status != FWP_VRES_INACTIVE) {
344 return fwp_msgq_enqueue(&vres->tx_queue, msgb);
349 int _fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_d_t epointd)
351 fwp_vres_t *vres = vresd;
354 pthread_mutex_lock(&fwp_vres_table.lock);
355 if (vres->status == FWP_VRES_BOUND) /*if other endpoint is assigned to vres*/
358 vres->epointd = epointd;
359 vres->status = FWP_VRES_BOUND;
361 pthread_mutex_unlock(&fwp_vres_table.lock);
365 int _fwp_vres_unbind(fwp_vres_d_t vresd)
367 fwp_vres_t *vres = vresd;
369 vres->status = FWP_VRES_UNBOUND;
370 /* TODO: consider what to do with pending messages */
371 /* fwp_vres_free_msgb(vres->tx_queue); */