]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_msgq.c
Added support for contract negotiation
[frescor/fwp.git] / fwp / lib / core / fwp_msgq.c
1 #include "fwp_msgq.h"
2 #include <errno.h>
3
4 /**
5  *Initialize message queue 
6  *@param[in] msgq Message queue
7  **/
8 void fwp_msgq_init(struct fwp_msgq *msgq)
9 {
10         msgq->nr_pending = 0;
11         msgq->in = 0;
12         msgq->out = 0;
13         pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */
14         sem_init(&msgq->empty_lock, 0, 0);
15 }
16
17 /**
18  * Enqueue message in message queue
19  *
20  * @param[in] msgq Message queue
21  *`@param[in] msgb Message buffer which stores a message
22  * @return
23  *   Zero on success, -1 or error.
24  **/
25 int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb *msgb)
26 {
27         /* acquire queue mutex */
28         pthread_mutex_lock(&msgq->lock);
29         
30         if (!(msgq->nr_pending < FWP_MSGQ_SIZE)){
31                 /*if (msgq->qr_policy == NEWCOMER) {*/  
32                 
33                 /*release queue mutex*/
34                 pthread_mutex_unlock(&msgq->lock);
35                 errno = ENOBUFS;
36                 return -1;
37                 /* }
38                  * if (msgq->qr_policy == OLDEST)
39                         msgq->first = (msgq->first++) % FWP_MSGQ_SIZE;
40                         msgq->nr_pending--;
41                 */
42         }
43
44         /* depends on queuing policy specifies in endpoint */
45         msgq->queue[msgq->in] = msgb;
46         msgq->nr_pending++;
47         msgq->in = (++msgq->in) & (FWP_MSGQ_SIZE - 1);
48
49         /* release queue mutex */
50         pthread_mutex_unlock(&msgq->lock);
51         sem_post(&msgq->empty_lock);
52
53         return 0;
54 }
55
56 /**
57  * Dequeue message from message queue
58  *
59  * @param[in] msgq Message queue
60  * @return
61  *   NULL if message queue in empty 
62  *   else returns pointer to message buffer(msgb) 
63  **/
64 struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq)
65 {
66         struct fwp_msgb* msgb;
67         
68         if (msgq->in == msgq->out)
69                 return NULL;
70                   
71         /* acquire queue mutex */
72         pthread_mutex_lock(&msgq->lock);
73         
74         msgb = msgq->queue[msgq->out];
75         msgq->nr_pending--;
76         msgq->out = (++msgq->out) & (FWP_MSGQ_SIZE - 1);
77         
78         /* release queue mutex */
79         pthread_mutex_unlock(&msgq->lock);
80
81         return msgb;
82
83
84 /*
85  * Dequeue all messages from message queue
86  *
87  * @param[in] msgq Message queue
88  * @return
89  *   NULL if message queue is empty
90  *   else returns pointer to message buffer(msgb) 
91  **/
92 void fwp_msgq_dequeue_all(struct fwp_msgq *msgq)
93 {
94         struct fwp_msgb *msgb;
95
96         while ((msgb = fwp_msgq_dequeue(msgq))) { 
97                 fwp_msgb_free(msgb);
98         }
99 }