From 311f9ddd1a93bb747e6404baf49674662bfd7088 Mon Sep 17 00:00:00 2001 From: Martin Molnar Date: Sun, 14 Dec 2008 16:04:11 +0100 Subject: [PATCH] Bug fix in vres tx thread. Fixed budgeting. --- fwp/lib/fwp/fwp_msgq.c | 20 ++++++++++----- fwp/lib/fwp/fwp_msgq.h | 2 +- fwp/lib/fwp/fwp_utils.c | 12 +++++++++ fwp/lib/fwp/fwp_utils.h | 11 ++++---- fwp/lib/fwp/fwp_vres.c | 56 +++++++++++++++++++++++------------------ 5 files changed, 63 insertions(+), 38 deletions(-) diff --git a/fwp/lib/fwp/fwp_msgq.c b/fwp/lib/fwp/fwp_msgq.c index 4dddb97..0fcc73d 100644 --- a/fwp/lib/fwp/fwp_msgq.c +++ b/fwp/lib/fwp/fwp_msgq.c @@ -56,7 +56,7 @@ void fwp_msgq_init(struct fwp_msgq *msgq) msgq->in = 0; msgq->out = 0; pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */ - sem_init(&msgq->empty_lock, 0, 0); + sem_init(&msgq->msg_sem, 0, 0); } /** @@ -91,9 +91,9 @@ int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb *msgb) msgq->nr_pending++; msgq->in = (++msgq->in) & (FWP_MSGQ_SIZE - 1); + sem_post(&msgq->msg_sem); /* release queue mutex */ pthread_mutex_unlock(&msgq->lock); - sem_post(&msgq->empty_lock); return 0; } @@ -110,9 +110,7 @@ struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq) { struct fwp_msgb* msgb; - if (msgq->in == msgq->out) - return NULL; - + sem_wait(&msgq->msg_sem); /* acquire queue mutex */ pthread_mutex_lock(&msgq->lock); @@ -138,7 +136,17 @@ void fwp_msgq_dequeue_all(struct fwp_msgq *msgq) { struct fwp_msgb *msgb; - while ((msgb = fwp_msgq_dequeue(msgq))) { + /* acquire queue mutex */ + pthread_mutex_lock(&msgq->lock); + + while (msgq->in != msgq->out){ + msgb = msgq->queue[msgq->out]; + msgq->nr_pending--; + msgq->out = (++msgq->out) & (FWP_MSGQ_SIZE - 1); fwp_msgb_free(msgb); } + + sem_init(&msgq->msg_sem, 0, 0); + /* release queue mutex */ + pthread_mutex_unlock(&msgq->lock); } diff --git a/fwp/lib/fwp/fwp_msgq.h b/fwp/lib/fwp/fwp_msgq.h index ff8a58c..5ab0ae0 100644 --- a/fwp/lib/fwp/fwp_msgq.h +++ b/fwp/lib/fwp/fwp_msgq.h @@ -62,7 +62,7 @@ struct fwp_msgq { unsigned int in; /**< add at offset (in % size) */ unsigned int out; /**< extracted from offset (out % size) */ pthread_mutex_t lock; /**< queue lock */ - sem_t empty_lock; /**< semaphore to block on empty mqueue */ + sem_t msg_sem; /**< semaphore to block on empty mqueue */ /* queue reject policy */ /*queue_rejection_policy qr_policy;*/ diff --git a/fwp/lib/fwp/fwp_utils.c b/fwp/lib/fwp/fwp_utils.c index 773da3c..28ef3ab 100644 --- a/fwp/lib/fwp/fwp_utils.c +++ b/fwp/lib/fwp/fwp_utils.c @@ -78,6 +78,18 @@ inline void fwp_timespec_sub (struct timespec *diff, const struct timespec *left } } +void fwp_timespec_modulo(struct timespec *remainder, struct timespec *dividend, + struct timespec *dividor) +{ + long long a, b, res; + + a = dividend->tv_sec * SEC_TO_USEC + dividend->tv_nsec / USEC_TO_NSEC; + b = dividor->tv_sec * SEC_TO_USEC + dividor->tv_nsec / USEC_TO_NSEC; + res = a % b; + remainder->tv_sec = res / SEC_TO_USEC; + remainder->tv_nsec = ( res % SEC_TO_USEC ) * USEC_TO_NSEC; +} + int fwp_set_rt_prio(int priority) { int maxpri, minpri; diff --git a/fwp/lib/fwp/fwp_utils.h b/fwp/lib/fwp/fwp_utils.h index 62ef25f..c99ec65 100644 --- a/fwp/lib/fwp/fwp_utils.h +++ b/fwp/lib/fwp/fwp_utils.h @@ -128,16 +128,15 @@ _fwp_recv(int s, void *buf, size_t len, int flags) return ret; } -inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left, - const struct timespec *right); - +inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left, + const struct timespec *right); inline void fwp_timespec_sub (struct timespec *diff, const struct timespec *left, - const struct timespec *right); + const struct timespec *right); +void fwp_timespec_modulo(struct timespec *dividend, struct timespec *dividor, + struct timespec *remainder); int fwp_set_rt_prio(int priority); - int fwp_create_unix_socket(char *path, struct sockaddr_un *addr); - int fwp_create_inet_socket(unsigned int port, struct sockaddr_in *addr); #endif /* _FWP_UTILS_H */ diff --git a/fwp/lib/fwp/fwp_vres.c b/fwp/lib/fwp/fwp_vres.c index 5c91d23..d5dae0b 100644 --- a/fwp/lib/fwp/fwp_vres.c +++ b/fwp/lib/fwp/fwp_vres.c @@ -395,46 +395,52 @@ static void* fwp_vres_tx_thread(void *_vres) fwp_budget_t budget = vres->params.budget; fwp_budget_t curr_budget; int rc; - - struct timespec start_period, end_period, period; - struct timespec current_time, interval; + struct timespec start_period, period, interval, current_time; fwp_set_rt_prio(90 - ac_id); - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_cleanup_push(fwp_vres_cleanup, (void*)vres); + /* just for sure */ + fwp_vres_sched_update(vres, &period, &budget); clock_gettime(CLOCK_MONOTONIC, &start_period); + curr_budget = budget; while (1) { /* wait for next period and then send */ - fwp_timespec_add(&end_period, &start_period, &period); - clock_gettime(CLOCK_MONOTONIC, ¤t_time); + /*clock_gettime(CLOCK_MONOTONIC, ¤t_time); fwp_timespec_sub(&interval, &end_period, ¤t_time); - nanosleep(&interval, NULL); + nanosleep(&interval, NULL);*/ - sem_wait(&msgq->empty_lock); - fwp_vres_sched_update(vres, &period, &budget); - clock_gettime(CLOCK_MONOTONIC, &start_period); + /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1 + && errno == EINTR) { + continue; + }*/ - /*msgb = fwp_msgq_dequeue(msgq); - *if (msgb){*/ - curr_budget = 0; - while ((curr_budget < budget)&& - (msgb = fwp_msgq_dequeue(msgq))) { - rc = _fwp_vres_send(vres->ac_sockd, msgb); - if (!(rc < 0)) { - FWP_DEBUG("Message sent through AC%d\n",ac_id); - curr_budget+= msgb->len; - } else { - FWP_DEBUG("Message sent error %d\n",rc); - } - - fwp_msgb_free(msgb); + msgb = fwp_msgq_dequeue(msgq); + fwp_vres_sched_update(vres, &period, &budget); + if ((curr_budget + msgb->len) > budget) { + /* need to recharge */ + clock_gettime(CLOCK_MONOTONIC, ¤t_time); + fwp_timespec_sub(&interval, ¤t_time, &start_period); + fwp_timespec_modulo(&interval, &interval, &period); + fwp_timespec_sub(&interval, &period, &interval); + nanosleep(&interval, NULL); + curr_budget = 0; + clock_gettime(CLOCK_MONOTONIC, &start_period); } + rc = _fwp_vres_send(vres->ac_sockd, msgb); + if (!(rc < 0)) { + FWP_DEBUG("Message sent through AC%d\n",ac_id); + curr_budget+= msgb->len; + } else { + FWP_DEBUG("Message sent error %d\n",rc); + } + + fwp_msgb_free(msgb); + /*pthread_testcancel(); */ /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); -- 2.39.2