msgq->in = 0;
msgq->out = 0;
pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */
- sem_init(&msgq->empty_lock, 0, 0);
+ sem_init(&msgq->msg_sem, 0, 0);
}
/**
msgq->nr_pending++;
msgq->in = (++msgq->in) & (FWP_MSGQ_SIZE - 1);
+ sem_post(&msgq->msg_sem);
/* release queue mutex */
pthread_mutex_unlock(&msgq->lock);
- sem_post(&msgq->empty_lock);
return 0;
}
{
struct fwp_msgb* msgb;
- if (msgq->in == msgq->out)
- return NULL;
-
+ sem_wait(&msgq->msg_sem);
/* acquire queue mutex */
pthread_mutex_lock(&msgq->lock);
{
struct fwp_msgb *msgb;
- while ((msgb = fwp_msgq_dequeue(msgq))) {
+ /* acquire queue mutex */
+ pthread_mutex_lock(&msgq->lock);
+
+ while (msgq->in != msgq->out){
+ msgb = msgq->queue[msgq->out];
+ msgq->nr_pending--;
+ msgq->out = (++msgq->out) & (FWP_MSGQ_SIZE - 1);
fwp_msgb_free(msgb);
}
+
+ sem_init(&msgq->msg_sem, 0, 0);
+ /* release queue mutex */
+ pthread_mutex_unlock(&msgq->lock);
}