]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_queues.c
b6ae28a2b08d017264db76ca989c9231d05c9f77
[frescor/fna.git] / src_frescan / frescan_queues.c
1 /*!
2  * @file frescan_queues.c
3  *
4  * @brief FRESCAN queues to manage the packets by prio and servers
5  *
6  * @version 0.01
7  *
8  * @date 27-Feb-2008
9  *
10  * @author
11  *      Daniel Sangorrin
12  *
13  * @comments
14  *
15  * This file contains the FRESCAN queues where frescan packets are stored and
16  * managed.
17  *
18  * TODO: disable interrupts for mutual exclusion!!!
19  *
20  * @license
21  *
22  * See MaRTE OS license
23  *
24  */
25
26 #include <stdlib.h>
27 #include <time.h>
28 #include <misc/timespec_operations.h>
29
30 #include "frescan_queues.h"
31 #include "frescan_packets.h"
32 #include "frescan_debug.h"
33 #include "frescan_id.h"
34
35 /**
36  * frescan_pqueue_create() - creates a priority queue
37  */
38
39 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio,
40                                                           frescan_network_t net)
41 {
42         int ret, prio;
43         frescan_prio_queue_t *pq; // priority queue
44
45         pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
46         if (pq == NULL) {
47                 ERROR("could not allocate memory for prio queue\n");
48                 return NULL;
49         }
50
51         pq->max_prio = max_prio;
52         pq->net      = net;
53
54         ret = sem_init (&pq->sem, 0, 0);
55         if (ret != 0) {
56                 ERROR("could not init the semaphore\n");
57                 free(pq);
58                 return NULL;
59         }
60
61         pq->fifo_queues = (frescan_packet_t *)
62                         malloc(max_prio * sizeof(frescan_packet_t));
63
64         for(prio=0; prio<max_prio; prio++) {
65                 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
66         }
67
68         return pq;
69 }
70
71 /**
72  * frescan_queues_init() - initialize the queues
73  *
74  * 1.- create the transmission fixed priority queue
75  * 2.- create the rx channels and its associated priority queues
76  *
77  * TODO: when error free memory
78  */
79
80 int frescan_queues_init(frescan_queues_t *queues,
81                         frescan_init_params_t *params)
82 {
83         int i;
84         uint32_t max_prio;
85
86         // create transmission fixed priority queue
87         queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio,
88                                                     params->net);
89
90         if (queues->tx_fp_queue == NULL)  {
91                 ERROR("could not allocate memory for tx fp queue\n");
92                 return -1;
93         }
94
95         // create receiving channels
96         queues->rx_channel_queues = (frescan_prio_queue_t **)
97                 malloc(params->rx_num_of_channels *
98                        sizeof(frescan_prio_queue_t *));
99
100         if (queues->rx_channel_queues == NULL) {
101                 ERROR("could not allocate memory for receiving channels\n");
102                 return -1;
103         }
104
105         queues->num_rx_channels = params->rx_num_of_channels;
106
107         // create a priority queue for each channel
108         for(i=0; i<params->rx_num_of_channels; i++) {
109
110                 if (params->rx_channel_max_prio == NULL) {
111                         max_prio = params->tx_fp_max_prio;
112                 } else {
113                         max_prio = params->rx_channel_max_prio[i];
114                 }
115
116                 queues->rx_channel_queues[i] = frescan_pqueue_create
117                                                        (max_prio, params->net);
118
119                 if (queues->rx_channel_queues[i] == NULL)  {
120                         ERROR("could not allocate memory for rx pq %d\n", i);
121                         return -1;
122                 }
123         }
124
125         return 0;
126 }
127
128 /**
129  * frescan_pqueue_enqueue() - enqueue a packet
130  *
131  * check the packet flags and enqueue the packet in the appropiate queue
132  */
133
134 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
135                            frescan_packet_t *packet,
136                            frescan_prio_t prio)
137 {
138         int ret;
139
140         if (prio >= pqueue->max_prio) {
141                 ERROR("priority of the packet is too high\n");
142                 return -1;
143         }
144
145         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
146               "enqueue packet with prio %u, pending %u\n",
147               prio, packet->buffer_pending_bytes);
148
149         list_add_tail(&(packet->fifo_list),
150                       &(pqueue->fifo_queues[prio].fifo_list));
151
152         ret = sem_post(&pqueue->sem);
153         if (ret != 0) return ret;
154
155         return 0;
156 }
157
158 /**
159  * frescan_pqueue_requeue() - requeue a packet
160  */
161
162 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
163                            frescan_packet_t *packet,
164                            frescan_prio_t prio)
165 {
166         int ret;
167
168         if (prio >= pqueue->max_prio) {
169                 ERROR("priority of the packet is too high\n");
170                 return -1;
171         }
172
173         list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
174
175         ret = sem_post(&pqueue->sem);
176         if (ret != 0) return ret;
177
178         return 0;
179 }
180
181 /**
182  * frescan_pqueue_dequeue() - dequeue the packet with highest priority
183  */
184
185 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
186                            frescan_packet_t **packet,
187                            frescan_prio_t *packet_prio,
188                            bool blocking)
189 {
190         int prio;
191         int ret;
192         frescan_packet_t *tmp = NULL;
193         struct list_head *pos;
194
195         *packet = NULL;
196
197         if (blocking) {
198                 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
199                 sem_wait(&pqueue->sem);
200         } else {
201                 ret = sem_trywait (&pqueue->sem);
202                 if (ret != 0) {
203                         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
204                               "sem_trywait was locked (no packets)\n");
205                         return 0;
206                 }
207         }
208
209         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
210               "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
211
212         // NOTE: we only acquire the lock if we block because non-blocking
213         // calls are made from a context with no interrupts (when updating
214         // the buffer at 'frescan_hw_buffer_update' which is always called
215         // with interrupts disabled)
216         if (blocking) FRESCAN_ACQUIRE_LOCK(&the_networks[pqueue->net].lock);
217
218         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
219                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
220                         list_for_each(pos,
221                                       &pqueue->fifo_queues[prio].fifo_list) {
222                                 tmp = list_entry(pos, frescan_packet_t,
223                                                  fifo_list);
224                                 break;
225                         }
226                         *packet = tmp;
227                         list_del(&tmp->fifo_list);
228                         break;
229                 }
230         }
231
232         if (blocking) FRESCAN_RELEASE_LOCK(&the_networks[pqueue->net].lock);
233
234         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
235         *packet_prio = prio;
236
237         return 0;
238 }
239
240 /**
241  * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
242  * but not extracting it from the queue.
243  */
244
245 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
246                                     frescan_packet_t **packet,
247                                     frescan_prio_t *packet_prio)
248 {
249         int prio;
250         frescan_packet_t *tmp = NULL;
251         struct list_head *pos;
252
253         *packet = NULL;
254
255         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
256                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
257                         list_for_each(pos,
258                                       &pqueue->fifo_queues[prio].fifo_list) {
259                                 tmp = list_entry(pos, frescan_packet_t,
260                                                  fifo_list);
261                                 break;
262                         }
263                         *packet = tmp;
264                         break;
265                 }
266         }
267
268         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
269         *packet_prio = prio;
270
271         return 0;
272 }
273
274
275 /**
276  * frescan_servers_enqueue() - enqueue a packet through a server
277  *
278  * @net: the network instance
279  * @id: the identificator for the server
280  * @packet: the packet being enqueued
281  *
282  */
283
284 int frescan_servers_enqueue(frescan_network_t net,
285                             frescan_ss_t id,
286                             frescan_packet_t *packet)
287 {
288         frescan_server_data_t *server = &the_servers_pool[net][id];
289
290         clock_gettime (CLOCK_MONOTONIC, &packet->timestamp);
291
292         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "id:%u timestamp:(%d, %d)\n",
293               id, packet->timestamp.tv_sec, packet->timestamp.tv_nsec);
294
295         // add the packet to the server fifo list
296         list_add_tail(&packet->fifo_list, &server->packet_list.fifo_list);
297
298         // if the server was inactive (no packets) put it in the active list
299         if (server->pending_packets == 0) {
300                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
301                       "ss becomes active act_time=timestamp\n");
302                 list_add_tail(&server->servers_list,
303                               &the_active_servers[net].servers_list);
304                 server->act_time = packet->timestamp;
305         }
306
307         server->pending_packets++;
308         return 0;
309 }
310
311 /**
312  * frescan_servers_requeue() - requeue a packet through a server
313  *
314  * @net: the network instance
315  * @id: the identificator for the server
316  * @packet: the packet being requeued
317  *
318  */
319
320 int frescan_servers_requeue(frescan_network_t net,
321                             frescan_ss_t id,
322                             frescan_packet_t *packet)
323 {
324         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
325               "requeue packet for id:%u\n", id);
326
327         // add the packet to the server fifo list
328         list_add(&packet->fifo_list,
329                   &the_servers_pool[net][id].packet_list.fifo_list);
330
331         // if the server was inactive (no packets to send) put it active
332         // (in the active list)
333         if (the_servers_pool[net][id].pending_packets == 0) {
334                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
335                       "server was inactive, put in the active list\n");
336                 list_add(&the_servers_pool[net][id].servers_list,
337                           &the_active_servers[net].servers_list);
338         }
339
340         the_servers_pool[net][id].pending_packets++;
341         return 0;
342 }
343
344 /**
345  * frescan_servers_dequeue() - dequeue a packet from a server
346  *
347  * @net: the network instance
348  * @id: the identificator for the server
349  * @packet: the packet dequeued
350  * @packet_prio: the priority current of the server
351  *
352  */
353
354 int frescan_servers_dequeue(frescan_network_t net,
355                             frescan_ss_t id,
356                             frescan_packet_t **packet,
357                             frescan_prio_t *packet_prio)
358 {
359         struct list_head *pos;
360         frescan_server_data_t *server;
361
362         server = &the_servers_pool[net][id];
363
364         if (list_empty(&server->packet_list.fifo_list)) {
365                 ERROR("no packet in server %d fifo list\n", id);
366                 return -1;
367         }
368
369         list_for_each(pos, &server->packet_list.fifo_list) {
370                 *packet = list_entry(pos, frescan_packet_t, fifo_list);
371                 break;
372         }
373
374         list_del(&((*packet)->fifo_list));
375         *packet_prio = server->current_priority;
376         server->pending_packets--;
377
378         if (server->pending_packets == 0) {
379                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
380                       "no more packets, delete from active list\n");
381                 list_del(&server->servers_list);
382         }
383
384         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
385               "dequeued packet server:%u cur_prio:%u pending:%u\n",
386               id, *packet_prio, server->pending_packets);
387
388         return 0;
389 }