]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Bug fix in vres tx thread. Fixed budgeting.
authorMartin Molnar <molnam1@fel.cvut.cz>
Sun, 14 Dec 2008 15:04:11 +0000 (16:04 +0100)
committerMartin <molnam1@fel.cvut.cz>
Sun, 14 Dec 2008 15:04:11 +0000 (16:04 +0100)
fwp/lib/fwp/fwp_msgq.c
fwp/lib/fwp/fwp_msgq.h
fwp/lib/fwp/fwp_utils.c
fwp/lib/fwp/fwp_utils.h
fwp/lib/fwp/fwp_vres.c

index 4dddb973cd5e800a934f4ba9c91b45247f7188ca..0fcc73d0f4d4bc607026e2d6c18bfb8d97cbbaa5 100644 (file)
@@ -56,7 +56,7 @@ void fwp_msgq_init(struct fwp_msgq *msgq)
        msgq->in = 0;
        msgq->out = 0;
        pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */
-       sem_init(&msgq->empty_lock, 0, 0);
+       sem_init(&msgq->msg_sem, 0, 0);
 }
 
 /**
@@ -91,9 +91,9 @@ int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb *msgb)
        msgq->nr_pending++;
        msgq->in = (++msgq->in) & (FWP_MSGQ_SIZE - 1);
 
+       sem_post(&msgq->msg_sem);
        /* release queue mutex */
        pthread_mutex_unlock(&msgq->lock);
-       sem_post(&msgq->empty_lock);
 
        return 0;
 }
@@ -110,9 +110,7 @@ struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq)
 {
        struct fwp_msgb* msgb;
        
-       if (msgq->in == msgq->out)
-               return NULL;
-                 
+       sem_wait(&msgq->msg_sem);
        /* acquire queue mutex */
        pthread_mutex_lock(&msgq->lock);
        
@@ -138,7 +136,17 @@ void fwp_msgq_dequeue_all(struct fwp_msgq *msgq)
 {
        struct fwp_msgb *msgb;
 
-       while ((msgb = fwp_msgq_dequeue(msgq))) { 
+       /* acquire queue mutex */
+       pthread_mutex_lock(&msgq->lock);
+       
+       while (msgq->in != msgq->out){
+               msgb = msgq->queue[msgq->out];
+               msgq->nr_pending--;
+               msgq->out = (++msgq->out) & (FWP_MSGQ_SIZE - 1);
                fwp_msgb_free(msgb);
        }
+       
+       sem_init(&msgq->msg_sem, 0, 0);
+       /* release queue mutex */
+       pthread_mutex_unlock(&msgq->lock);
 }
index ff8a58c1be442bf303c7318d8f1bb9330835928a..5ab0ae0b9156f66b3835e7dbb3f0a08dc17c0dca 100644 (file)
@@ -62,7 +62,7 @@ struct fwp_msgq {
        unsigned int    in;   /**< add at offset (in % size) */ 
        unsigned int    out;  /**< extracted from offset (out % size) */ 
        pthread_mutex_t  lock; /**< queue lock */
-       sem_t   empty_lock;     /**< semaphore to block on empty mqueue */
+       sem_t   msg_sem;     /**< semaphore to block on empty mqueue */
        
        /* queue reject policy */
        /*queue_rejection_policy qr_policy;*/
index 773da3c0932b3403f7ae7047aa5e6fd51d1ea2f3..28ef3abf04ed0693db35db1062e8bc396136c0a1 100644 (file)
@@ -78,6 +78,18 @@ inline void fwp_timespec_sub (struct timespec *diff, const struct timespec *left
        }
 }
 
+void fwp_timespec_modulo(struct timespec *remainder, struct timespec *dividend, 
+                               struct timespec *dividor)
+{
+       long long a, b, res;
+
+       a = dividend->tv_sec * SEC_TO_USEC + dividend->tv_nsec / USEC_TO_NSEC;
+       b = dividor->tv_sec * SEC_TO_USEC + dividor->tv_nsec / USEC_TO_NSEC;
+       res = a % b;
+       remainder->tv_sec = res / SEC_TO_USEC;
+       remainder->tv_nsec = ( res % SEC_TO_USEC ) * USEC_TO_NSEC;
+}
+
 int fwp_set_rt_prio(int priority)
 {
        int maxpri, minpri;
index 62ef25f749313ecaf9e2c0126f6501b83df6805c..c99ec65fdbc13d66f34dda5250f4424615e26926 100644 (file)
@@ -128,16 +128,15 @@ _fwp_recv(int s, void *buf, size_t len, int flags)
        return ret;
 }
 
-inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left,
-             const struct timespec *right);
-
+inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left, 
+                               const struct timespec *right);
 inline void fwp_timespec_sub (struct timespec *diff, const struct timespec *left,
-             const struct timespec *right);
+                               const struct timespec *right);
+void fwp_timespec_modulo(struct timespec *dividend, struct timespec *dividor,
+                               struct timespec *remainder);
 
 int fwp_set_rt_prio(int priority);
-
 int fwp_create_unix_socket(char *path, struct sockaddr_un *addr);
-
 int fwp_create_inet_socket(unsigned int port, struct sockaddr_in *addr);
 
 #endif /* _FWP_UTILS_H */
index 5c91d23bcd073130be3bdfd60c43a7c93004d9f3..d5dae0b42a11a14f8aa4b5cbf28c035df43c6b78 100644 (file)
@@ -395,46 +395,52 @@ static void* fwp_vres_tx_thread(void *_vres)
        fwp_budget_t    budget = vres->params.budget;
        fwp_budget_t    curr_budget;
        int             rc;
-               
-       struct timespec  start_period, end_period, period;
-       struct timespec  current_time, interval;
+       struct timespec start_period, period, interval, current_time;
 
        fwp_set_rt_prio(90 - ac_id);
-       
-
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
        pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
        pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
        
+       /* just for sure */
+       fwp_vres_sched_update(vres, &period, &budget);
        clock_gettime(CLOCK_MONOTONIC, &start_period);
+       curr_budget = budget;
 
        while (1) {
                /* wait for next period and then send */
-               fwp_timespec_add(&end_period, &start_period, &period);
-               clock_gettime(CLOCK_MONOTONIC, &current_time);
+               /*clock_gettime(CLOCK_MONOTONIC, &current_time);
                fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);
+               nanosleep(&interval, NULL);*/
        
-               sem_wait(&msgq->empty_lock);
-               fwp_vres_sched_update(vres, &period, &budget);
-               clock_gettime(CLOCK_MONOTONIC, &start_period);
+               /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1 
+                       && errno == EINTR) {
+                       continue;
+               }*/
                
-               /*msgb = fwp_msgq_dequeue(msgq);
-               *if (msgb){*/
-               curr_budget = 0;
-               while ((curr_budget < budget)&& 
-                      (msgb = fwp_msgq_dequeue(msgq))) {
-                       rc = _fwp_vres_send(vres->ac_sockd, msgb);
-                       if (!(rc < 0)) {
-                               FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                               curr_budget+= msgb->len;
-                       } else {
-                               FWP_DEBUG("Message sent error %d\n",rc);
-                       }
-                       
-                       fwp_msgb_free(msgb);
+               msgb = fwp_msgq_dequeue(msgq);
+               fwp_vres_sched_update(vres, &period, &budget);  
+               if ((curr_budget + msgb->len) > budget) {
+                       /* need to recharge */
+                       clock_gettime(CLOCK_MONOTONIC, &current_time);
+                       fwp_timespec_sub(&interval, &current_time, &start_period);
+                       fwp_timespec_modulo(&interval, &interval, &period);
+                       fwp_timespec_sub(&interval, &period, &interval);
+                       nanosleep(&interval, NULL);
+                       curr_budget = 0;
+                       clock_gettime(CLOCK_MONOTONIC, &start_period);
                }
 
+               rc = _fwp_vres_send(vres->ac_sockd, msgb);
+               if (!(rc < 0)) {
+                       FWP_DEBUG("Message sent through AC%d\n",ac_id);
+                       curr_budget+= msgb->len;
+               } else {
+                       FWP_DEBUG("Message sent error %d\n",rc);
+               }
+                       
+               fwp_msgb_free(msgb);
+
                /*pthread_testcancel(); */
                /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);