]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_queues.c
add group negotiations to frescan and change all the requests and messages to map...
[frescor/fna.git] / src_frescan / frescan_queues.c
1 /*!
2  * @file frescan_queues.c
3  *
4  * @brief FRESCAN queues to manage the packets by prio and servers
5  *
6  * @version 0.01
7  *
8  * @date 27-Feb-2008
9  *
10  * @author
11  *      Daniel Sangorrin
12  *
13  * @comments
14  *
15  * This file contains the FRESCAN queues where frescan packets are stored and
16  * managed.
17  *
18  * TODO: disable interrupts for mutual exclusion!!!
19  *
20  * @license
21  *
22  * -----------------------------------------------------------------------
23  *  Copyright (C) 2006 - 2008 FRESCOR consortium partners:
24  *
25  *    Universidad de Cantabria,              SPAIN
26  *    University of York,                    UK
27  *    Scuola Superiore Sant'Anna,            ITALY
28  *    Kaiserslautern University,             GERMANY
29  *    Univ. Politécnica  Valencia,           SPAIN
30  *    Czech Technical University in Prague,  CZECH REPUBLIC
31  *    ENEA                                   SWEDEN
32  *    Thales Communication S.A.              FRANCE
33  *    Visual Tools S.A.                      SPAIN
34  *    Rapita Systems Ltd                     UK
35  *    Evidence                               ITALY
36  *
37  *    See http://www.frescor.org for a link to partners' websites
38  *
39  *           FRESCOR project (FP6/2005/IST/5-034026) is funded
40  *        in part by the European Union Sixth Framework Programme
41  *        The European Union is not liable of any use that may be
42  *        made of this code.
43  *
44  *  This file is part of FRESCAN
45  *
46  *  FRESCAN is free software; you can  redistribute it and/or  modify
47  *  it under the terms of  the GNU General Public License as published by
48  *  the Free Software Foundation;  either  version 2, or (at  your option)
49  *  any later version.
50  *
51  *  FRESCAN  is distributed  in  the hope  that  it  will  be useful,  but
52  *  WITHOUT  ANY  WARRANTY;     without  even the   implied   warranty  of
53  *  MERCHANTABILITY  or  FITNESS FOR  A  PARTICULAR PURPOSE. See  the  GNU
54  *  General Public License for more details.
55  *
56  *  You should have  received a  copy of  the  GNU  General Public License
57  *  distributed  with  FRESCAN;  see file COPYING.   If not,  write to the
58  *  Free Software  Foundation,  59 Temple Place  -  Suite 330,  Boston, MA
59  *  02111-1307, USA.
60  *
61  * As a special exception, including FRESCAN header files in a file,
62  * instantiating FRESCAN generics or templates, or linking other files
63  * with FRESCAN objects to produce an executable application, does not
64  * by itself cause the resulting executable application to be covered
65  * by the GNU General Public License. This exception does not
66  * however invalidate any other reasons why the executable file might be
67  * covered by the GNU Public License.
68  * -----------------------------------------------------------------------
69  *
70  */
71
72 #include <stdlib.h>
73 #include <time.h>
74
75 #include "frescan_queues.h"
76 #include "frescan_packets.h"
77 #include "frescan_debug.h"
78 #include "frescan_id.h"
79 #include "frescan_data.h"
80
81 /**
82  * frescan_pqueue_create() - creates a priority queue
83  */
84
85 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio,
86                                                           frescan_network_t net)
87 {
88         int ret, prio;
89         frescan_prio_queue_t *pq; // priority queue
90
91         pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
92         if (pq == NULL) {
93                 FRESCAN_ERROR("could not allocate memory for prio queue\n");
94                 return NULL;
95         }
96
97         pq->max_prio = max_prio;
98         pq->net      = net;
99
100         ret = sem_init (&pq->sem, 0, 0);
101         if (ret != 0) {
102                 FRESCAN_ERROR("could not init the semaphore\n");
103                 free(pq);
104                 return NULL;
105         }
106
107         pq->fifo_queues = (frescan_packet_t *)
108                         malloc(max_prio * sizeof(frescan_packet_t));
109
110         for(prio=0; prio<max_prio; prio++) {
111                 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
112         }
113
114         return pq;
115 }
116
117 /**
118  * frescan_queues_init() - initialize the queues
119  *
120  * 1.- create the transmission fixed priority queue
121  * 2.- create the rx channels and its associated priority queues
122  *
123  * TODO: when error free memory
124  */
125
126 int frescan_queues_init(frescan_queues_t *queues,
127                         frescan_init_params_t *params)
128 {
129         int i;
130         uint32_t max_prio;
131
132         // create transmission fixed priority queue
133         queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio,
134                                                     params->net);
135
136         if (queues->tx_fp_queue == NULL)  {
137                 FRESCAN_ERROR("could not allocate memory for tx fp queue\n");
138                 return -1;
139         }
140
141         // create receiving channels
142         queues->rx_channel_queues = (frescan_prio_queue_t **)
143                 malloc(params->rx_num_of_channels *
144                        sizeof(frescan_prio_queue_t *));
145
146         if (queues->rx_channel_queues == NULL) {
147                 FRESCAN_ERROR
148                         ("could not allocate memory for receiving channels\n");
149                 return -1;
150         }
151
152         queues->num_rx_channels = params->rx_num_of_channels;
153
154         // create a priority queue for each channel
155         for(i=0; i<params->rx_num_of_channels; i++) {
156
157                 if (params->rx_channel_max_prio == NULL) {
158                         max_prio = params->tx_fp_max_prio;
159                 } else {
160                         max_prio = params->rx_channel_max_prio[i];
161                 }
162
163                 queues->rx_channel_queues[i] = frescan_pqueue_create
164                                                        (max_prio, params->net);
165
166                 if (queues->rx_channel_queues[i] == NULL)  {
167                         FRESCAN_ERROR
168                                ("could not allocate memory for rx pq %d\n", i);
169                         return -1;
170                 }
171         }
172
173         return 0;
174 }
175
176 /**
177  * frescan_pqueue_enqueue() - enqueue a packet
178  *
179  * check the packet flags and enqueue the packet in the appropiate queue
180  */
181
182 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
183                            frescan_packet_t *packet,
184                            frescan_prio_t prio)
185 {
186         int ret;
187
188         if (prio >= pqueue->max_prio) {
189                 FRESCAN_ERROR("priority of the packet is too high\n");
190                 return -1;
191         }
192
193         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
194               "enqueue packet with prio %u, pending %u\n",
195               prio, packet->buffer_pending_bytes);
196
197         list_add_tail(&(packet->fifo_list),
198                       &(pqueue->fifo_queues[prio].fifo_list));
199
200         ret = sem_post(&pqueue->sem);
201         if (ret != 0) return ret;
202
203         return 0;
204 }
205
206 /**
207  * frescan_pqueue_requeue() - requeue a packet
208  */
209
210 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
211                            frescan_packet_t *packet,
212                            frescan_prio_t prio)
213 {
214         int ret;
215
216         if (prio >= pqueue->max_prio) {
217                 FRESCAN_ERROR("priority of the packet is too high\n");
218                 return -1;
219         }
220
221         list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
222
223         ret = sem_post(&pqueue->sem);
224         if (ret != 0) return ret;
225
226         return 0;
227 }
228
229 /**
230  * frescan_pqueue_dequeue() - dequeue the packet with highest priority
231  */
232
233 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
234                            frescan_packet_t **packet,
235                            frescan_prio_t *packet_prio,
236                            bool blocking)
237 {
238         int prio;
239         int ret;
240         frescan_packet_t *tmp = NULL;
241         struct list_head *pos;
242
243         *packet = NULL;
244
245         if (blocking) {
246                 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
247                 sem_wait(&pqueue->sem);
248         } else {
249                 ret = sem_trywait (&pqueue->sem);
250                 if (ret != 0) {
251                         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
252                               "sem_trywait was locked (no packets)\n");
253                         return 0;
254                 }
255         }
256
257         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
258               "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
259
260         // NOTE: we only acquire the lock if we block because non-blocking
261         // calls are made from a context with no interrupts (when updating
262         // the buffer at 'frescan_hw_buffer_update' which is always called
263         // with interrupts disabled)
264         if (blocking) FRESCAN_ACQUIRE_LOCK(&frescan_data[pqueue->net].lock);
265
266         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
267                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
268                         list_for_each(pos,
269                                       &pqueue->fifo_queues[prio].fifo_list) {
270                                 tmp = list_entry(pos, frescan_packet_t,
271                                                  fifo_list);
272                                 break;
273                         }
274                         *packet = tmp;
275                         list_del(&tmp->fifo_list);
276                         break;
277                 }
278         }
279
280         if (blocking) FRESCAN_RELEASE_LOCK(&frescan_data[pqueue->net].lock);
281
282         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
283         *packet_prio = prio;
284
285         return 0;
286 }
287
288 /**
289  * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
290  * but not extracting it from the queue.
291  */
292
293 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
294                                     frescan_packet_t **packet,
295                                     frescan_prio_t *packet_prio)
296 {
297         int prio;
298         frescan_packet_t *tmp = NULL;
299         struct list_head *pos;
300
301         *packet = NULL;
302
303         for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
304                 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
305                         list_for_each(pos,
306                                       &pqueue->fifo_queues[prio].fifo_list) {
307                                 tmp = list_entry(pos, frescan_packet_t,
308                                                  fifo_list);
309                                 break;
310                         }
311                         *packet = tmp;
312                         break;
313                 }
314         }
315
316         DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
317         *packet_prio = prio;
318
319         return 0;
320 }
321
322
323 /**
324  * frescan_servers_enqueue() - enqueue a packet through a server
325  *
326  * @net: the network instance
327  * @id: the identificator for the server
328  * @packet: the packet being enqueued
329  *
330  */
331
332 int frescan_servers_enqueue(frescan_network_t net,
333                             frescan_ss_t id,
334                             frescan_packet_t *packet)
335 {
336         frescan_ss_data_t *server = &frescan_data[net].ss_data[id];
337
338         clock_gettime (CLOCK_MONOTONIC, &packet->timestamp);
339
340         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "id:%u timestamp:(%d, %d)\n",
341               id, packet->timestamp.tv_sec, packet->timestamp.tv_nsec);
342
343         // add the packet to the server fifo list
344         list_add_tail(&packet->fifo_list, &server->packet_list.fifo_list);
345
346         // if the server was inactive (no packets) put it in the active list
347         if (server->pending_packets == 0) {
348                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
349                       "ss becomes active act_time=timestamp\n");
350                 list_add_tail(&server->servers_list,
351                                &frescan_data[net].ss_active_head.servers_list);
352                 server->act_time = packet->timestamp;
353         }
354
355         server->pending_packets++;
356         return 0;
357 }
358
359 /**
360  * frescan_servers_requeue() - requeue a packet through a server
361  *
362  * @net: the network instance
363  * @id: the identificator for the server
364  * @packet: the packet being requeued
365  *
366  */
367
368 int frescan_servers_requeue(frescan_network_t net,
369                             frescan_ss_t id,
370                             frescan_packet_t *packet)
371 {
372         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
373               "requeue packet for id:%u\n", id);
374
375         // add the packet to the server fifo list
376         list_add(&packet->fifo_list,
377                   &frescan_data[net].ss_data[id].packet_list.fifo_list);
378
379         // if the server was inactive (no packets to send) put it active
380         // (in the active list)
381         if (frescan_data[net].ss_data[id].pending_packets == 0) {
382                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
383                       "server was inactive, put in the active list\n");
384                 list_add(&frescan_data[net].ss_data[id].servers_list,
385                           &frescan_data[net].ss_active_head.servers_list);
386         }
387
388         frescan_data[net].ss_data[id].pending_packets++;
389         return 0;
390 }
391
392 /**
393  * frescan_servers_dequeue() - dequeue a packet from a server
394  *
395  * @net: the network instance
396  * @id: the identificator for the server
397  * @packet: the packet dequeued
398  * @packet_prio: the priority current of the server
399  *
400  */
401
402 int frescan_servers_dequeue(frescan_network_t net,
403                             frescan_ss_t id,
404                             frescan_packet_t **packet,
405                             frescan_prio_t *packet_prio)
406 {
407         struct list_head *pos;
408         frescan_ss_data_t *server;
409
410         server = &frescan_data[net].ss_data[id];
411
412         if (list_empty(&server->packet_list.fifo_list)) {
413                 FRESCAN_ERROR("no packet in server %d fifo list\n", id);
414                 return -1;
415         }
416
417         list_for_each(pos, &server->packet_list.fifo_list) {
418                 *packet = list_entry(pos, frescan_packet_t, fifo_list);
419                 break;
420         }
421
422         list_del(&((*packet)->fifo_list));
423         *packet_prio = server->current_priority;
424         server->pending_packets--;
425
426         if (server->pending_packets == 0) {
427                 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
428                       "no more packets, delete from active list\n");
429                 list_del(&server->servers_list);
430         }
431
432         DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
433               "dequeued packet server:%u cur_prio:%u pending:%u\n",
434               id, *packet_prio, server->pending_packets);
435
436         return 0;
437 }