]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_queues.c
5abe9f1ace8bd43cc7e8a585a832790e101bb386
[frescor/fna.git] / src_frescan / frescan_queues.c
1 /*!
2  * @file frescan_queues.c
3  *
4  * @brief FRESCAN queues to manage the packets by prio and servers
5  *
6  * @version 0.01
7  *
8  * @date 27-Feb-2008
9  *
10  * @author
11  *      Daniel Sangorrin
12  *
13  * @comments
14  *
15  * This file contains the FRESCAN queues where frescan packets are stored and
16  * managed.
17  *
18  * TODO: disable interrupts for mutual exclusion!!!
19  *
20  * @license
21  *
22  * See MaRTE OS license
23  *
24  */
25
26 #include <stdlib.h>
27
28 #include "frescan_queues.h"
29 #include "frescan_packets.h"
30 #include "frescan_debug.h"
31 #include "frescan_id.h"
32
33 /**
34  * frescan_pqueue_create() - creates a priority queue
35  */
36
37 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio)
38 {
39         int ret, prio;
40         frescan_prio_queue_t *pq; // priority queue
41
42         pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
43         if (pq == NULL) {
44                 ERROR("could not allocate memory for prio queue\n");
45                 return NULL;
46         }
47
48         pq->max_prio = max_prio;
49
50         ret = sem_init (&pq->sem, 0, 0);
51         if (ret != 0) {
52                 ERROR("could not init the semaphore\n");
53                 free(pq);
54                 return NULL;
55         }
56
57         pq->fifo_queues = (frescan_packet_t *)
58                         malloc(max_prio * sizeof(frescan_packet_t));
59
60         for(prio=0; prio<max_prio; prio++) {
61                 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
62         }
63
64         return pq;
65 }
66
67 /**
68  * frescan_queues_init() - initialize the queues
69  *
70  * 1.- create the transmission fixed priority queue
71  * 2.- create the rx channels and its associated priority queues
72  *
73  * TODO: when error free memory
74  */
75
76 int frescan_queues_init(frescan_queues_t *queues,
77                         frescan_init_params_t *params)
78 {
79         int i;
80         uint32_t max_prio;
81
82         // create transmission fixed priority queue
83         queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio);
84
85         if (queues->tx_fp_queue == NULL)  {
86                 ERROR("could not allocate memory for tx fp queue\n");
87                 return -1;
88         }
89
90         // create receiving channels
91         queues->rx_channel_queues = (frescan_prio_queue_t **)
92                 malloc(params->rx_num_of_channels *
93                        sizeof(frescan_prio_queue_t *));
94
95         if (queues->rx_channel_queues == NULL) {
96                 ERROR("could not allocate memory for receiving channels\n");
97                 return -1;
98         }
99
100         queues->num_rx_channels = params->rx_num_of_channels;
101
102         // create a priority queue for each channel
103         for(i=0; i<params->rx_num_of_channels; i++) {
104
105                 if (params->rx_channel_max_prio == NULL) {
106                         max_prio = params->tx_fp_max_prio;
107                 } else {
108                         max_prio = params->rx_channel_max_prio[i];
109                 }
110
111                 queues->rx_channel_queues[i] = frescan_pqueue_create(max_prio);
112
113                 if (queues->rx_channel_queues[i] == NULL)  {
114                         ERROR("could not allocate memory for rx pq %d\n", i);
115                         return -1;
116                 }
117         }
118
119         return 0;
120 }
121
122 /**
123  * frescan_pqueue_enqueue() - enqueue a packet
124  *
125  * check the packet flags and enqueue the packet in the appropiate queue
126  */
127
128 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
129                            frescan_packet_t *packet,
130                            frescan_prio_t prio)
131 {
132         int ret;
133
134         if (prio >= pqueue->max_prio) {
135                 ERROR("priority of the packet is too high\n");
136                 return -1;
137         }
138
139         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
140               "enqueue packet with prio %u, pending %u\n",
141               prio, packet->buffer_pending_bytes);
142
143         list_add_tail(&(packet->fifo_list),
144                       &(pqueue->fifo_queues[prio].fifo_list));
145
146         ret = sem_post(&pqueue->sem);
147         if (ret != 0) return ret;
148
149         return 0;
150 }
151
152 /**
153  * frescan_pqueue_requeue() - requeue a packet
154  */
155
156 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
157                            frescan_packet_t *packet,
158                            frescan_prio_t prio)
159 {
160         int ret;
161
162         if (prio >= pqueue->max_prio) {
163                 ERROR("priority of the packet is too high\n");
164                 return -1;
165         }
166
167         list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
168
169         ret = sem_post(&pqueue->sem);
170         if (ret != 0) return ret;
171
172         return 0;
173 }
174
175 /**
176  * frescan_pqueue_dequeue() - dequeue the packet with highest priority
177  */
178
179 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
180                            frescan_packet_t **packet,
181                            frescan_prio_t *packet_prio,
182                            bool blocking)
183 {
184         int prio;
185         int ret;
186         frescan_packet_t *tmp = NULL;
187         struct list_head *pos;
188
189         *packet = NULL;
190
191         if (blocking) {
192                 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
193                 sem_wait(&pqueue->sem);
194         } else {
195                 ret = sem_trywait (&pqueue->sem);
196                 if (ret != 0) {
197                         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
198                               "sem_trywait was locked (no packets)\n");
199                         return 0;
200                 }
201         }
202
203         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
204               "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
205
206         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
207                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
208                         list_for_each(pos,
209                                       &pqueue->fifo_queues[prio].fifo_list) {
210                                 tmp = list_entry(pos, frescan_packet_t,
211                                                  fifo_list);
212                                 break;
213                         }
214                         *packet = tmp;
215                         list_del(&tmp->fifo_list);
216                         break;
217                 }
218         }
219
220         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
221         *packet_prio = prio;
222
223         return 0;
224 }
225
226 /**
227  * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
228  * but not extracting it from the queue.
229  */
230
231 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
232                                     frescan_packet_t **packet,
233                                     frescan_prio_t *packet_prio)
234 {
235         int prio;
236         frescan_packet_t *tmp = NULL;
237         struct list_head *pos;
238
239         *packet = NULL;
240
241         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
242                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
243                         list_for_each(pos,
244                                       &pqueue->fifo_queues[prio].fifo_list) {
245                                 tmp = list_entry(pos, frescan_packet_t,
246                                                  fifo_list);
247                                 break;
248                         }
249                         *packet = tmp;
250                         break;
251                 }
252         }
253
254         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
255         *packet_prio = prio;
256
257         return 0;
258 }
259
260
261 /**
262  * frescan_servers_enqueue() - enqueue a packet through a server
263  *
264  * @net: the network instance
265  * @id: the identificator for the server
266  * @packet: the packet being enqueued
267  *
268  */
269
270 int frescan_servers_enqueue(frescan_network_t net,
271                             frescan_ss_t id,
272                             frescan_packet_t *packet)
273 {
274         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "enqueue packet for id:%u\n", id);
275
276         // add the packet to the server fifo list
277         list_add_tail(&packet->fifo_list,
278                        &the_servers_pool[net][id].packet_list.fifo_list);
279
280         // if the server was inactive (no packets to send) put it active
281         // (in the active list)
282         if (the_servers_pool[net][id].pending_packets == 0) {
283                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
284                       "server was inactive, put in the active list\n");
285                 list_add_tail(&the_servers_pool[net][id].servers_list,
286                                &the_active_servers[net].servers_list);
287         }
288
289         the_servers_pool[net][id].pending_packets++;
290         return 0;
291 }
292
293 /**
294  * frescan_servers_requeue() - requeue a packet through a server
295  *
296  * @net: the network instance
297  * @id: the identificator for the server
298  * @packet: the packet being requeued
299  *
300  */
301
302 int frescan_servers_requeue(frescan_network_t net,
303                             frescan_ss_t id,
304                             frescan_packet_t *packet)
305 {
306         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
307               "requeue packet for id:%u\n", id);
308
309         // add the packet to the server fifo list
310         list_add(&packet->fifo_list,
311                   &the_servers_pool[net][id].packet_list.fifo_list);
312
313         // if the server was inactive (no packets to send) put it active
314         // (in the active list)
315         if (the_servers_pool[net][id].pending_packets == 0) {
316                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
317                       "server was inactive, put in the active list\n");
318                 list_add(&the_servers_pool[net][id].servers_list,
319                           &the_active_servers[net].servers_list);
320         }
321
322         the_servers_pool[net][id].pending_packets++;
323         return 0;
324 }
325
326 /**
327  * frescan_servers_dequeue() - dequeue a packet from a server
328  *
329  * @net: the network instance
330  * @id: the identificator for the server
331  * @packet: the packet dequeued
332  * @packet_prio: the priority current of the server
333  *
334  */
335
336 int frescan_servers_dequeue(frescan_network_t net,
337                             frescan_ss_t id,
338                             frescan_packet_t **packet,
339                             frescan_prio_t *packet_prio)
340 {
341         struct list_head *pos;
342
343         if (list_empty(&the_servers_pool[net][id].packet_list.fifo_list)) {
344                 ERROR("no packet in server %d fifo list\n", id);
345                 return -1;
346         }
347
348         list_for_each(pos, &the_servers_pool[net][id].packet_list.fifo_list) {
349                 *packet = list_entry(pos, frescan_packet_t, fifo_list);
350                 break;
351         }
352
353         list_del(&((*packet)->fifo_list));
354         *packet_prio = the_servers_pool[net][id].current_priority;
355         the_servers_pool[net][id].pending_packets--;
356
357         if (the_servers_pool[net][id].pending_packets == 0) {
358                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
359                       "no more packets, delete from active list\n");
360                 list_del(&the_servers_pool[net][id].servers_list);
361         }
362
363         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
364               "dequeued packet server:%u cur_prio:%u pending:%u\n",
365               id, *packet_prio, the_servers_pool[net][id].pending_packets);
366
367         return 0;
368 }