]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_queues.c
6d1d1501430c93ffd0a2c74950f6a6567f7b4ff8
[frescor/fna.git] / src_frescan / frescan_queues.c
1 /*!
2  * @file frescan_queues.c
3  *
4  * @brief FRESCAN queues to manage the packets by prio and servers
5  *
6  * @version 0.01
7  *
8  * @date 27-Feb-2008
9  *
10  * @author
11  *      Daniel Sangorrin
12  *
13  * @comments
14  *
15  * This file contains the FRESCAN queues where frescan packets are stored and
16  * managed.
17  *
18  * TODO: disable interrupts for mutual exclusion!!!
19  *
20  * @license
21  *
22  * See MaRTE OS license
23  *
24  */
25
26 #include <stdlib.h>
27
28 #include "frescan_queues.h"
29 #include "frescan_packets.h"
30 #include "frescan_debug.h"
31 #include "frescan_id.h"
32
33 /**
34  * frescan_pqueue_create() - creates a priority queue
35  */
36
37 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio)
38 {
39         int ret, prio;
40         frescan_prio_queue_t *pq; // priority queue
41
42         pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
43         if (pq == NULL) {
44                 ERROR("could not allocate memory for prio queue\n");
45                 return NULL;
46         }
47
48         pq->max_prio = max_prio;
49
50         ret = sem_init (&pq->sem, 0, 0);
51         if (ret != 0) {
52                 ERROR("could not init the semaphore\n");
53                 free(pq);
54                 return NULL;
55         }
56
57         pq->fifo_queues = (frescan_packet_t *)
58                         malloc(max_prio * sizeof(frescan_packet_t));
59
60         for(prio=0; prio<max_prio; prio++) {
61                 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
62         }
63
64         return pq;
65 }
66
67 /**
68  * frescan_queues_init() - initialize the queues
69  *
70  * 1.- create the transmission fixed priority queue
71  * 2.- create the rx channels and its associated priority queues
72  *
73  * TODO: when error free memory
74  */
75
76 int frescan_queues_init(frescan_queues_t *queues,
77                         frescan_init_params_t *params)
78 {
79         int i;
80         uint32_t max_prio;
81
82         // create transmission fixed priority queue
83         queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio);
84
85         if (queues->tx_fp_queue == NULL)  {
86                 ERROR("could not allocate memory for tx fp queue\n");
87                 return -1;
88         }
89
90         // create receiving channels
91         queues->rx_channel_queues = (frescan_prio_queue_t **)
92                 malloc(params->rx_num_of_channels *
93                        sizeof(frescan_prio_queue_t *));
94
95         if (queues->rx_channel_queues == NULL) {
96                 ERROR("could not allocate memory for receiving channels\n");
97                 return -1;
98         }
99
100         queues->num_rx_channels = params->rx_num_of_channels;
101
102         // create a priority queue for each channel
103         for(i=0; i<params->rx_num_of_channels; i++) {
104
105                 if (params->rx_channel_max_prio == NULL) {
106                         max_prio = params->tx_fp_max_prio;
107                 } else {
108                         max_prio = params->rx_channel_max_prio[i];
109                 }
110
111                 queues->rx_channel_queues[i] = frescan_pqueue_create(max_prio);
112
113                 if (queues->rx_channel_queues[i] == NULL)  {
114                         ERROR("could not allocate memory for rx pq %d\n", i);
115                         return -1;
116                 }
117         }
118
119         return 0;
120 }
121
122 /**
123  * frescan_pqueue_enqueue() - enqueue a packet
124  *
125  * check the packet flags and enqueue the packet in the appropiate queue
126  */
127
128 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
129                            frescan_packet_t *packet,
130                            frescan_prio_t prio)
131 {
132         int ret;
133
134         if (prio >= pqueue->max_prio) {
135                 ERROR("priority of the packet is too high\n");
136                 return -1;
137         }
138
139         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
140               "enqueue packet with prio %u, pending %u\n",
141               prio, packet->buffer_pending_bytes);
142
143         list_add_tail(&(packet->fifo_list),
144                       &(pqueue->fifo_queues[prio].fifo_list));
145
146         ret = sem_post(&pqueue->sem);
147         if (ret != 0) return ret;
148
149         return 0;
150 }
151
152 /**
153  * frescan_pqueue_requeue() - requeue a packet
154  */
155
156 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
157                            frescan_packet_t *packet,
158                            frescan_prio_t prio)
159 {
160         int ret;
161
162         if (prio >= pqueue->max_prio) {
163                 ERROR("priority of the packet is too high\n");
164                 return -1;
165         }
166
167         list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
168
169         ret = sem_post(&pqueue->sem);
170         if (ret != 0) return ret;
171
172         return 0;
173 }
174
175 /**
176  * frescan_pqueue_dequeue() - dequeue the packet with highest priority
177  */
178
179 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
180                            frescan_packet_t **packet,
181                            frescan_prio_t *packet_prio,
182                            bool blocking)
183 {
184         int prio;
185         int ret;
186         frescan_packet_t *tmp = NULL;
187         struct list_head *pos;
188
189         *packet = NULL;
190
191         if (blocking) {
192                 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
193                 sem_wait(&pqueue->sem);
194         } else {
195                 ret = sem_trywait (&pqueue->sem);
196                 if (ret != 0) {
197                         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
198                               "sem_trywait was locked (no packets)\n");
199                         return 0;
200                 }
201         }
202
203         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
204               "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
205
206         // TODO: the lock is currently hardwired to network 0!!!
207         FRESCAN_ACQUIRE_LOCK(&the_networks[0].lock);
208
209         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
210                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
211                         list_for_each(pos,
212                                       &pqueue->fifo_queues[prio].fifo_list) {
213                                 tmp = list_entry(pos, frescan_packet_t,
214                                                  fifo_list);
215                                 break;
216                         }
217                         *packet = tmp;
218                         list_del(&tmp->fifo_list);
219                         break;
220                 }
221         }
222
223         FRESCAN_RELEASE_LOCK(&the_networks[0].lock);
224
225         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
226         *packet_prio = prio;
227
228         return 0;
229 }
230
231 /**
232  * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
233  * but not extracting it from the queue.
234  */
235
236 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
237                                     frescan_packet_t **packet,
238                                     frescan_prio_t *packet_prio)
239 {
240         int prio;
241         frescan_packet_t *tmp = NULL;
242         struct list_head *pos;
243
244         *packet = NULL;
245
246         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
247                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
248                         list_for_each(pos,
249                                       &pqueue->fifo_queues[prio].fifo_list) {
250                                 tmp = list_entry(pos, frescan_packet_t,
251                                                  fifo_list);
252                                 break;
253                         }
254                         *packet = tmp;
255                         break;
256                 }
257         }
258
259         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
260         *packet_prio = prio;
261
262         return 0;
263 }
264
265
266 /**
267  * frescan_servers_enqueue() - enqueue a packet through a server
268  *
269  * @net: the network instance
270  * @id: the identificator for the server
271  * @packet: the packet being enqueued
272  *
273  */
274
275 int frescan_servers_enqueue(frescan_network_t net,
276                             frescan_ss_t id,
277                             frescan_packet_t *packet)
278 {
279         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "enqueue packet for id:%u\n", id);
280
281         // add the packet to the server fifo list
282         list_add_tail(&packet->fifo_list,
283                        &the_servers_pool[net][id].packet_list.fifo_list);
284
285         // if the server was inactive (no packets to send) put it active
286         // (in the active list)
287         if (the_servers_pool[net][id].pending_packets == 0) {
288                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
289                       "server was inactive, put in the active list\n");
290                 list_add_tail(&the_servers_pool[net][id].servers_list,
291                                &the_active_servers[net].servers_list);
292         }
293
294         the_servers_pool[net][id].pending_packets++;
295         return 0;
296 }
297
298 /**
299  * frescan_servers_requeue() - requeue a packet through a server
300  *
301  * @net: the network instance
302  * @id: the identificator for the server
303  * @packet: the packet being requeued
304  *
305  */
306
307 int frescan_servers_requeue(frescan_network_t net,
308                             frescan_ss_t id,
309                             frescan_packet_t *packet)
310 {
311         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
312               "requeue packet for id:%u\n", id);
313
314         // add the packet to the server fifo list
315         list_add(&packet->fifo_list,
316                   &the_servers_pool[net][id].packet_list.fifo_list);
317
318         // if the server was inactive (no packets to send) put it active
319         // (in the active list)
320         if (the_servers_pool[net][id].pending_packets == 0) {
321                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
322                       "server was inactive, put in the active list\n");
323                 list_add(&the_servers_pool[net][id].servers_list,
324                           &the_active_servers[net].servers_list);
325         }
326
327         the_servers_pool[net][id].pending_packets++;
328         return 0;
329 }
330
331 /**
332  * frescan_servers_dequeue() - dequeue a packet from a server
333  *
334  * @net: the network instance
335  * @id: the identificator for the server
336  * @packet: the packet dequeued
337  * @packet_prio: the priority current of the server
338  *
339  */
340
341 int frescan_servers_dequeue(frescan_network_t net,
342                             frescan_ss_t id,
343                             frescan_packet_t **packet,
344                             frescan_prio_t *packet_prio)
345 {
346         struct list_head *pos;
347
348         if (list_empty(&the_servers_pool[net][id].packet_list.fifo_list)) {
349                 ERROR("no packet in server %d fifo list\n", id);
350                 return -1;
351         }
352
353         list_for_each(pos, &the_servers_pool[net][id].packet_list.fifo_list) {
354                 *packet = list_entry(pos, frescan_packet_t, fifo_list);
355                 break;
356         }
357
358         list_del(&((*packet)->fifo_list));
359         *packet_prio = the_servers_pool[net][id].current_priority;
360         the_servers_pool[net][id].pending_packets--;
361
362         if (the_servers_pool[net][id].pending_packets == 0) {
363                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
364                       "no more packets, delete from active list\n");
365                 list_del(&the_servers_pool[net][id].servers_list);
366         }
367
368         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
369               "dequeued packet server:%u cur_prio:%u pending:%u\n",
370               id, *packet_prio, the_servers_pool[net][id].pending_packets);
371
372         return 0;
373 }