]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_queues.c
2206cd6594b8fe101a062cb4a9c1582d5ddd4e76
[frescor/fna.git] / src_frescan / frescan_queues.c
1 /*!
2  * @file frescan_queues.c
3  *
4  * @brief FRESCAN queues to manage the packets by prio and servers
5  *
6  * @version 0.01
7  *
8  * @date 27-Feb-2008
9  *
10  * @author
11  *      Daniel Sangorrin
12  *
13  * @comments
14  *
15  * This file contains the FRESCAN queues where frescan packets are stored and
16  * managed.
17  *
18  * TODO: disable interrupts for mutual exclusion!!!
19  *
20  * @license
21  *
22  * -----------------------------------------------------------------------
23  *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
24  *
25  *    Universidad de Cantabria,              SPAIN
26  *    University of York,                    UK
27  *    Scuola Superiore Sant'Anna,            ITALY
28  *    Kaiserslautern University,             GERMANY
29  *    Univ. Politécnica  Valencia,           SPAIN
30  *    Czech Technical University in Prague,  CZECH REPUBLIC
31  *    ENEA                                   SWEDEN
32  *    Thales Communication S.A.              FRANCE
33  *    Visual Tools S.A.                      SPAIN
34  *    Rapita Systems Ltd                     UK
35  *    Evidence                               ITALY
36  *
37  *    See http://www.frescor.org for a link to partners' websites
38  *
39  *           FRESCOR project (FP6/2005/IST/5-034026) is funded
40  *        in part by the European Union Sixth Framework Programme
41  *        The European Union is not liable of any use that may be
42  *        made of this code.
43  *
44  *  This file is part of FRESCAN
45  *
46  *  FRESCAN is free software; you can  redistribute it and/or  modify
47  *  it under the terms of  the GNU General Public License as published by
48  *  the Free Software Foundation;  either  version 2, or (at  your option)
49  *  any later version.
50  *
51  *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
52  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
53  *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
54  *  General Public License for more details.
55  *
56  *  You should have  received a  copy of  the  GNU  General Public License
57  *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
58  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
59  *  02111-1307, USA.
60  *
61  * As a special exception, including FRESCAN header files in a file,
62  * instantiating FRESCAN generics or templates, or linking other files
63  * with FRESCAN objects to produce an executable application, does not
64  * by itself cause the resulting executable application to be covered
65  * by the GNU General Public License. This exception does not
66  * however invalidate any other reasons why the executable file might be
67  * covered by the GNU Public License.
68  * -----------------------------------------------------------------------
69  *
70  */
71
72 #include <stdlib.h>
73 #include <time.h>
74
75 #include "frescan_queues.h"
76 #include "frescan_packets.h"
77 #include "frescan_debug.h"
78 #include "frescan_id.h"
79
80 /**
81  * frescan_pqueue_create() - creates a priority queue
82  */
83
84 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio,
85                                                           frescan_network_t net)
86 {
87         int ret, prio;
88         frescan_prio_queue_t *pq; // priority queue
89
90         pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
91         if (pq == NULL) {
92                 FRESCAN_ERROR("could not allocate memory for prio queue\n");
93                 return NULL;
94         }
95
96         pq->max_prio = max_prio;
97         pq->net      = net;
98
99         ret = sem_init (&pq->sem, 0, 0);
100         if (ret != 0) {
101                 FRESCAN_ERROR("could not init the semaphore\n");
102                 free(pq);
103                 return NULL;
104         }
105
106         pq->fifo_queues = (frescan_packet_t *)
107                         malloc(max_prio * sizeof(frescan_packet_t));
108
109         for(prio=0; prio<max_prio; prio++) {
110                 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
111         }
112
113         return pq;
114 }
115
116 /**
117  * frescan_queues_init() - initialize the queues
118  *
119  * 1.- create the transmission fixed priority queue
120  * 2.- create the rx channels and its associated priority queues
121  *
122  * TODO: when error free memory
123  */
124
125 int frescan_queues_init(frescan_queues_t *queues,
126                         frescan_init_params_t *params)
127 {
128         int i;
129         uint32_t max_prio;
130
131         // create transmission fixed priority queue
132         queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio,
133                                                     params->net);
134
135         if (queues->tx_fp_queue == NULL)  {
136                 FRESCAN_ERROR("could not allocate memory for tx fp queue\n");
137                 return -1;
138         }
139
140         // create receiving channels
141         queues->rx_channel_queues = (frescan_prio_queue_t **)
142                 malloc(params->rx_num_of_channels *
143                        sizeof(frescan_prio_queue_t *));
144
145         if (queues->rx_channel_queues == NULL) {
146                 FRESCAN_ERROR
147                         ("could not allocate memory for receiving channels\n");
148                 return -1;
149         }
150
151         queues->num_rx_channels = params->rx_num_of_channels;
152
153         // create a priority queue for each channel
154         for(i=0; i<params->rx_num_of_channels; i++) {
155
156                 if (params->rx_channel_max_prio == NULL) {
157                         max_prio = params->tx_fp_max_prio;
158                 } else {
159                         max_prio = params->rx_channel_max_prio[i];
160                 }
161
162                 queues->rx_channel_queues[i] = frescan_pqueue_create
163                                                        (max_prio, params->net);
164
165                 if (queues->rx_channel_queues[i] == NULL)  {
166                         FRESCAN_ERROR
167                                ("could not allocate memory for rx pq %d\n", i);
168                         return -1;
169                 }
170         }
171
172         return 0;
173 }
174
175 /**
176  * frescan_pqueue_enqueue() - enqueue a packet
177  *
178  * check the packet flags and enqueue the packet in the appropiate queue
179  */
180
181 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
182                            frescan_packet_t *packet,
183                            frescan_prio_t prio)
184 {
185         int ret;
186
187         if (prio >= pqueue->max_prio) {
188                 FRESCAN_ERROR("priority of the packet is too high\n");
189                 return -1;
190         }
191
192         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
193               "enqueue packet with prio %u, pending %u\n",
194               prio, packet->buffer_pending_bytes);
195
196         list_add_tail(&(packet->fifo_list),
197                       &(pqueue->fifo_queues[prio].fifo_list));
198
199         ret = sem_post(&pqueue->sem);
200         if (ret != 0) return ret;
201
202         return 0;
203 }
204
205 /**
206  * frescan_pqueue_requeue() - requeue a packet
207  */
208
209 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
210                            frescan_packet_t *packet,
211                            frescan_prio_t prio)
212 {
213         int ret;
214
215         if (prio >= pqueue->max_prio) {
216                 FRESCAN_ERROR("priority of the packet is too high\n");
217                 return -1;
218         }
219
220         list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
221
222         ret = sem_post(&pqueue->sem);
223         if (ret != 0) return ret;
224
225         return 0;
226 }
227
228 /**
229  * frescan_pqueue_dequeue() - dequeue the packet with highest priority
230  */
231
232 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
233                            frescan_packet_t **packet,
234                            frescan_prio_t *packet_prio,
235                            bool blocking)
236 {
237         int prio;
238         int ret;
239         frescan_packet_t *tmp = NULL;
240         struct list_head *pos;
241
242         *packet = NULL;
243
244         if (blocking) {
245                 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
246                 sem_wait(&pqueue->sem);
247         } else {
248                 ret = sem_trywait (&pqueue->sem);
249                 if (ret != 0) {
250                         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
251                               "sem_trywait was locked (no packets)\n");
252                         return 0;
253                 }
254         }
255
256         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
257               "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
258
259         // NOTE: we only acquire the lock if we block because non-blocking
260         // calls are made from a context with no interrupts (when updating
261         // the buffer at 'frescan_hw_buffer_update' which is always called
262         // with interrupts disabled)
263         if (blocking) FRESCAN_ACQUIRE_LOCK(&the_networks[pqueue->net].lock);
264
265         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
266                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
267                         list_for_each(pos,
268                                       &pqueue->fifo_queues[prio].fifo_list) {
269                                 tmp = list_entry(pos, frescan_packet_t,
270                                                  fifo_list);
271                                 break;
272                         }
273                         *packet = tmp;
274                         list_del(&tmp->fifo_list);
275                         break;
276                 }
277         }
278
279         if (blocking) FRESCAN_RELEASE_LOCK(&the_networks[pqueue->net].lock);
280
281         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
282         *packet_prio = prio;
283
284         return 0;
285 }
286
287 /**
288  * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
289  * but not extracting it from the queue.
290  */
291
292 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
293                                     frescan_packet_t **packet,
294                                     frescan_prio_t *packet_prio)
295 {
296         int prio;
297         frescan_packet_t *tmp = NULL;
298         struct list_head *pos;
299
300         *packet = NULL;
301
302         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
303                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
304                         list_for_each(pos,
305                                       &pqueue->fifo_queues[prio].fifo_list) {
306                                 tmp = list_entry(pos, frescan_packet_t,
307                                                  fifo_list);
308                                 break;
309                         }
310                         *packet = tmp;
311                         break;
312                 }
313         }
314
315         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
316         *packet_prio = prio;
317
318         return 0;
319 }
320
321
322 /**
323  * frescan_servers_enqueue() - enqueue a packet through a server
324  *
325  * @net: the network instance
326  * @id: the identificator for the server
327  * @packet: the packet being enqueued
328  *
329  */
330
331 int frescan_servers_enqueue(frescan_network_t net,
332                             frescan_ss_t id,
333                             frescan_packet_t *packet)
334 {
335         frescan_server_data_t *server = &the_servers_pool[net][id];
336
337         clock_gettime (CLOCK_MONOTONIC, &packet->timestamp);
338
339         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "id:%u timestamp:(%d, %d)\n",
340               id, packet->timestamp.tv_sec, packet->timestamp.tv_nsec);
341
342         // add the packet to the server fifo list
343         list_add_tail(&packet->fifo_list, &server->packet_list.fifo_list);
344
345         // if the server was inactive (no packets) put it in the active list
346         if (server->pending_packets == 0) {
347                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
348                       "ss becomes active act_time=timestamp\n");
349                 list_add_tail(&server->servers_list,
350                               &the_active_servers[net].servers_list);
351                 server->act_time = packet->timestamp;
352         }
353
354         server->pending_packets++;
355         return 0;
356 }
357
358 /**
359  * frescan_servers_requeue() - requeue a packet through a server
360  *
361  * @net: the network instance
362  * @id: the identificator for the server
363  * @packet: the packet being requeued
364  *
365  */
366
367 int frescan_servers_requeue(frescan_network_t net,
368                             frescan_ss_t id,
369                             frescan_packet_t *packet)
370 {
371         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
372               "requeue packet for id:%u\n", id);
373
374         // add the packet to the server fifo list
375         list_add(&packet->fifo_list,
376                   &the_servers_pool[net][id].packet_list.fifo_list);
377
378         // if the server was inactive (no packets to send) put it active
379         // (in the active list)
380         if (the_servers_pool[net][id].pending_packets == 0) {
381                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
382                       "server was inactive, put in the active list\n");
383                 list_add(&the_servers_pool[net][id].servers_list,
384                           &the_active_servers[net].servers_list);
385         }
386
387         the_servers_pool[net][id].pending_packets++;
388         return 0;
389 }
390
391 /**
392  * frescan_servers_dequeue() - dequeue a packet from a server
393  *
394  * @net: the network instance
395  * @id: the identificator for the server
396  * @packet: the packet dequeued
397  * @packet_prio: the priority current of the server
398  *
399  */
400
401 int frescan_servers_dequeue(frescan_network_t net,
402                             frescan_ss_t id,
403                             frescan_packet_t **packet,
404                             frescan_prio_t *packet_prio)
405 {
406         struct list_head *pos;
407         frescan_server_data_t *server;
408
409         server = &the_servers_pool[net][id];
410
411         if (list_empty(&server->packet_list.fifo_list)) {
412                 FRESCAN_ERROR("no packet in server %d fifo list\n", id);
413                 return -1;
414         }
415
416         list_for_each(pos, &server->packet_list.fifo_list) {
417                 *packet = list_entry(pos, frescan_packet_t, fifo_list);
418                 break;
419         }
420
421         list_del(&((*packet)->fifo_list));
422         *packet_prio = server->current_priority;
423         server->pending_packets--;
424
425         if (server->pending_packets == 0) {
426                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
427                       "no more packets, delete from active list\n");
428                 list_del(&server->servers_list);
429         }
430
431         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
432               "dequeued packet server:%u cur_prio:%u pending:%u\n",
433               id, *packet_prio, server->pending_packets);
434
435         return 0;
436 }