]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/fwp/fwp_vres.c
Fixed broken budgeting algorithm
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
index 4fcb21891e26259f1f131ef1a32a5224e1d74c93..83f872568aedb25ea28bb8cd9fcb119cbf042a5a 100644 (file)
@@ -398,7 +398,7 @@ static void* fwp_vres_tx_thread(void *_vres)
        fwp_budget_t    budget = vres->params.budget;
        fwp_budget_t    curr_budget;
        int             rc;
-       struct timespec start_period, period, interval, current_time;
+       struct timespec start, period, interval, now;
 
        fwp_set_rt_prio(90 - ac_id);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
@@ -407,41 +407,43 @@ static void* fwp_vres_tx_thread(void *_vres)
        
        /* just for sure */
        fwp_vres_sched_update(vres, &period, &budget);
-       clock_gettime(CLOCK_MONOTONIC, &start_period);
-       curr_budget = 0;
+       curr_budget = budget;
+       clock_gettime(CLOCK_MONOTONIC, &start);
 
        while (1) {
-               /* wait for next period and then send */
-               /*clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);*/
-       
-               /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1 
-                       && errno == EINTR) {
-                       continue;
-               }*/
-               
                msgb = fwp_msgq_dequeue(msgq);
-               fwp_vres_sched_update(vres, &period, &budget);  
-               if ((curr_budget + msgb->len) > budget) {
-                       /* need to recharge */
-                       clock_gettime(CLOCK_MONOTONIC, &current_time);
-                       fwp_timespec_sub(&interval, &current_time, &start_period);
-                       fwp_timespec_modulo(&interval, &interval, &period);
+               if (msgb->len > budget) {
+                       FWP_ERROR("Message too large: %zd -> skipping\n",
+                                 msgb->len);
+                       goto skip_message;
+               }
+               if (curr_budget < msgb->len) {
+                       /* needs to replenish */
+                       clock_gettime(CLOCK_MONOTONIC, &now);
+                       fwp_timespec_sub(&interval, &now, &start);
+                       ul_logtrash("start=%ld.%09ld,  now=%ld.%09ld  diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
                        fwp_timespec_sub(&interval, &period, &interval);
-                       nanosleep(&interval, NULL);
-                       curr_budget = 0;
-                       clock_gettime(CLOCK_MONOTONIC, &start_period);
+                       if (interval.tv_sec > 0 ||
+                           (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+                               /* We have to wait to replenish */
+                               ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+                               nanosleep(&interval, NULL);
+                               fwp_timespec_add(&start, &now, &interval);
+                       } else {
+                               start = now;
+                       }
+                       fwp_vres_sched_update(vres, &period, &budget);  
+                       curr_budget = budget;
                }
 
                rc = _fwp_vres_send(vres->ac_sockd, msgb);
                if (!(rc < 0)) {
                        FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                       curr_budget+= msgb->len;
+                       curr_budget -= msgb->len;
                } else {
-                       FWP_DEBUG("Message sent error %d\n",rc);
+                       FWP_ERROR("Message sent error %d\n",rc);
                }
-                       
+       skip_message:                   
                fwp_msgb_free(msgb);
 
                /*pthread_testcancel(); */