msgq->in = 0;
msgq->out = 0;
pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */
- sem_init(&msgq->empty_lock, 0, 0);
+ sem_init(&msgq->msg_sem, 0, 0);
}
/**
msgq->nr_pending++;
msgq->in = (++msgq->in) & (FWP_MSGQ_SIZE - 1);
+ sem_post(&msgq->msg_sem);
/* release queue mutex */
pthread_mutex_unlock(&msgq->lock);
- sem_post(&msgq->empty_lock);
return 0;
}
{
struct fwp_msgb* msgb;
- if (msgq->in == msgq->out)
- return NULL;
-
+ sem_wait(&msgq->msg_sem);
/* acquire queue mutex */
pthread_mutex_lock(&msgq->lock);
{
struct fwp_msgb *msgb;
- while ((msgb = fwp_msgq_dequeue(msgq))) {
+ /* acquire queue mutex */
+ pthread_mutex_lock(&msgq->lock);
+
+ while (msgq->in != msgq->out){
+ msgb = msgq->queue[msgq->out];
+ msgq->nr_pending--;
+ msgq->out = (++msgq->out) & (FWP_MSGQ_SIZE - 1);
fwp_msgb_free(msgb);
}
+
+ sem_init(&msgq->msg_sem, 0, 0);
+ /* release queue mutex */
+ pthread_mutex_unlock(&msgq->lock);
}
return ret;
}
-inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left,
- const struct timespec *right);
-
+inline void fwp_timespec_add (struct timespec *sum, const struct timespec *left,
+ const struct timespec *right);
inline void fwp_timespec_sub (struct timespec *diff, const struct timespec *left,
- const struct timespec *right);
+ const struct timespec *right);
+void fwp_timespec_modulo(struct timespec *dividend, struct timespec *dividor,
+ struct timespec *remainder);
int fwp_set_rt_prio(int priority);
-
int fwp_create_unix_socket(char *path, struct sockaddr_un *addr);
-
int fwp_create_inet_socket(unsigned int port, struct sockaddr_in *addr);
#endif /* _FWP_UTILS_H */
fwp_budget_t budget = vres->params.budget;
fwp_budget_t curr_budget;
int rc;
-
- struct timespec start_period, end_period, period;
- struct timespec current_time, interval;
+ struct timespec start_period, period, interval, current_time;
fwp_set_rt_prio(90 - ac_id);
-
-
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+ /* just for sure */
+ fwp_vres_sched_update(vres, &period, &budget);
clock_gettime(CLOCK_MONOTONIC, &start_period);
+ curr_budget = budget;
while (1) {
/* wait for next period and then send */
- fwp_timespec_add(&end_period, &start_period, &period);
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
+ /*clock_gettime(CLOCK_MONOTONIC, ¤t_time);
fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);
+ nanosleep(&interval, NULL);*/
- sem_wait(&msgq->empty_lock);
- fwp_vres_sched_update(vres, &period, &budget);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
+ /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1
+ && errno == EINTR) {
+ continue;
+ }*/
- /*msgb = fwp_msgq_dequeue(msgq);
- *if (msgb){*/
- curr_budget = 0;
- while ((curr_budget < budget)&&
- (msgb = fwp_msgq_dequeue(msgq))) {
- rc = _fwp_vres_send(vres->ac_sockd, msgb);
- if (!(rc < 0)) {
- FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget+= msgb->len;
- } else {
- FWP_DEBUG("Message sent error %d\n",rc);
- }
-
- fwp_msgb_free(msgb);
+ msgb = fwp_msgq_dequeue(msgq);
+ fwp_vres_sched_update(vres, &period, &budget);
+ if ((curr_budget + msgb->len) > budget) {
+ /* need to recharge */
+ clock_gettime(CLOCK_MONOTONIC, ¤t_time);
+ fwp_timespec_sub(&interval, ¤t_time, &start_period);
+ fwp_timespec_modulo(&interval, &interval, &period);
+ fwp_timespec_sub(&interval, &period, &interval);
+ nanosleep(&interval, NULL);
+ curr_budget = 0;
+ clock_gettime(CLOCK_MONOTONIC, &start_period);
}
+ rc = _fwp_vres_send(vres->ac_sockd, msgb);
+ if (!(rc < 0)) {
+ FWP_DEBUG("Message sent through AC%d\n",ac_id);
+ curr_budget+= msgb->len;
+ } else {
+ FWP_DEBUG("Message sent error %d\n",rc);
+ }
+
+ fwp_msgb_free(msgb);
+
/*pthread_testcancel(); */
/*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);