fwp_budget_t budget = vres->params.budget;
fwp_budget_t curr_budget;
int rc;
- struct timespec start_period, period, interval, current_time;
+ struct timespec start, period, interval, now;
fwp_set_rt_prio(90 - ac_id);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
/* just for sure */
fwp_vres_sched_update(vres, &period, &budget);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
- curr_budget = 0;
+ curr_budget = budget;
+ clock_gettime(CLOCK_MONOTONIC, &start);
while (1) {
- /* wait for next period and then send */
- /*clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);*/
-
- /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1
- && errno == EINTR) {
- continue;
- }*/
-
msgb = fwp_msgq_dequeue(msgq);
- fwp_vres_sched_update(vres, &period, &budget);
- if ((curr_budget + msgb->len) > budget) {
- /* need to recharge */
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, ¤t_time, &start_period);
- fwp_timespec_modulo(&interval, &interval, &period);
+ if (msgb->len > budget) {
+ FWP_ERROR("Message too large: %zd -> skipping\n",
+ msgb->len);
+ goto skip_message;
+ }
+ if (curr_budget < msgb->len) {
+ /* needs to replenish */
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_sub(&interval, &now, &start);
+ ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
fwp_timespec_sub(&interval, &period, &interval);
- nanosleep(&interval, NULL);
- curr_budget = 0;
- clock_gettime(CLOCK_MONOTONIC, &start_period);
+ if (interval.tv_sec > 0 ||
+ (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+ /* We have to wait to replenish */
+ ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+ nanosleep(&interval, NULL);
+ fwp_timespec_add(&start, &now, &interval);
+ } else {
+ start = now;
+ }
+ fwp_vres_sched_update(vres, &period, &budget);
+ curr_budget = budget;
}
rc = _fwp_vres_send(vres->ac_sockd, msgb);
if (!(rc < 0)) {
FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget+= msgb->len;
+ curr_budget -= msgb->len;
} else {
- FWP_DEBUG("Message sent error %d\n",rc);
+ FWP_ERROR("Message sent error %d\n",rc);
}
-
+ skip_message:
fwp_msgb_free(msgb);
/*pthread_testcancel(); */