2 * @file frescan_queues.c
4 * @brief FRESCAN queues to manage the packets by prio and servers
15 * This file contains the FRESCAN queues where frescan packets are stored and
18 * TODO: disable interrupts for mutual exclusion!!!
22 * See MaRTE OS license
29 #include "frescan_queues.h"
30 #include "frescan_packets.h"
31 #include "frescan_debug.h"
32 #include "frescan_id.h"
35 * frescan_pqueue_create() - creates a priority queue
38 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio,
39 frescan_network_t net)
42 frescan_prio_queue_t *pq; // priority queue
44 pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
46 ERROR("could not allocate memory for prio queue\n");
50 pq->max_prio = max_prio;
53 ret = sem_init (&pq->sem, 0, 0);
55 ERROR("could not init the semaphore\n");
60 pq->fifo_queues = (frescan_packet_t *)
61 malloc(max_prio * sizeof(frescan_packet_t));
63 for(prio=0; prio<max_prio; prio++) {
64 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
71 * frescan_queues_init() - initialize the queues
73 * 1.- create the transmission fixed priority queue
74 * 2.- create the rx channels and its associated priority queues
76 * TODO: when error free memory
79 int frescan_queues_init(frescan_queues_t *queues,
80 frescan_init_params_t *params)
85 // create transmission fixed priority queue
86 queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio,
89 if (queues->tx_fp_queue == NULL) {
90 ERROR("could not allocate memory for tx fp queue\n");
94 // create receiving channels
95 queues->rx_channel_queues = (frescan_prio_queue_t **)
96 malloc(params->rx_num_of_channels *
97 sizeof(frescan_prio_queue_t *));
99 if (queues->rx_channel_queues == NULL) {
100 ERROR("could not allocate memory for receiving channels\n");
104 queues->num_rx_channels = params->rx_num_of_channels;
106 // create a priority queue for each channel
107 for(i=0; i<params->rx_num_of_channels; i++) {
109 if (params->rx_channel_max_prio == NULL) {
110 max_prio = params->tx_fp_max_prio;
112 max_prio = params->rx_channel_max_prio[i];
115 queues->rx_channel_queues[i] = frescan_pqueue_create
116 (max_prio, params->net);
118 if (queues->rx_channel_queues[i] == NULL) {
119 ERROR("could not allocate memory for rx pq %d\n", i);
128 * frescan_pqueue_enqueue() - enqueue a packet
130 * check the packet flags and enqueue the packet in the appropiate queue
133 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
134 frescan_packet_t *packet,
139 if (prio >= pqueue->max_prio) {
140 ERROR("priority of the packet is too high\n");
144 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
145 "enqueue packet with prio %u, pending %u\n",
146 prio, packet->buffer_pending_bytes);
148 list_add_tail(&(packet->fifo_list),
149 &(pqueue->fifo_queues[prio].fifo_list));
151 ret = sem_post(&pqueue->sem);
152 if (ret != 0) return ret;
158 * frescan_pqueue_requeue() - requeue a packet
161 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
162 frescan_packet_t *packet,
167 if (prio >= pqueue->max_prio) {
168 ERROR("priority of the packet is too high\n");
172 list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
174 ret = sem_post(&pqueue->sem);
175 if (ret != 0) return ret;
181 * frescan_pqueue_dequeue() - dequeue the packet with highest priority
184 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
185 frescan_packet_t **packet,
186 frescan_prio_t *packet_prio,
191 frescan_packet_t *tmp = NULL;
192 struct list_head *pos;
197 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
198 sem_wait(&pqueue->sem);
200 ret = sem_trywait (&pqueue->sem);
202 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
203 "sem_trywait was locked (no packets)\n");
208 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
209 "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
211 // NOTE: we only acquire the lock if we block because non-blocking
212 // calls are made from a context with no interrupts (when updating
213 // the buffer at 'frescan_hw_buffer_update' which is always called
214 // with interrupts disabled)
215 if (blocking) FRESCAN_ACQUIRE_LOCK(&the_networks[pqueue->net].lock);
217 for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
218 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
220 &pqueue->fifo_queues[prio].fifo_list) {
221 tmp = list_entry(pos, frescan_packet_t,
226 list_del(&tmp->fifo_list);
231 if (blocking) FRESCAN_RELEASE_LOCK(&the_networks[pqueue->net].lock);
233 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
240 * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
241 * but not extracting it from the queue.
244 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
245 frescan_packet_t **packet,
246 frescan_prio_t *packet_prio)
249 frescan_packet_t *tmp = NULL;
250 struct list_head *pos;
254 for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
255 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
257 &pqueue->fifo_queues[prio].fifo_list) {
258 tmp = list_entry(pos, frescan_packet_t,
267 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
275 * frescan_servers_enqueue() - enqueue a packet through a server
277 * @net: the network instance
278 * @id: the identificator for the server
279 * @packet: the packet being enqueued
283 int frescan_servers_enqueue(frescan_network_t net,
285 frescan_packet_t *packet)
287 frescan_server_data_t *server = &the_servers_pool[net][id];
289 clock_gettime (CLOCK_MONOTONIC, &packet->timestamp);
291 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "id:%u timestamp:(%d, %d)\n",
292 id, packet->timestamp.tv_sec, packet->timestamp.tv_nsec);
294 // add the packet to the server fifo list
295 list_add_tail(&packet->fifo_list, &server->packet_list.fifo_list);
297 // if the server was inactive (no packets) put it in the active list
298 if (server->pending_packets == 0) {
299 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
300 "ss becomes active act_time=timestamp\n");
301 list_add_tail(&server->servers_list,
302 &the_active_servers[net].servers_list);
303 server->act_time = packet->timestamp;
306 server->pending_packets++;
311 * frescan_servers_requeue() - requeue a packet through a server
313 * @net: the network instance
314 * @id: the identificator for the server
315 * @packet: the packet being requeued
319 int frescan_servers_requeue(frescan_network_t net,
321 frescan_packet_t *packet)
323 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
324 "requeue packet for id:%u\n", id);
326 // add the packet to the server fifo list
327 list_add(&packet->fifo_list,
328 &the_servers_pool[net][id].packet_list.fifo_list);
330 // if the server was inactive (no packets to send) put it active
331 // (in the active list)
332 if (the_servers_pool[net][id].pending_packets == 0) {
333 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
334 "server was inactive, put in the active list\n");
335 list_add(&the_servers_pool[net][id].servers_list,
336 &the_active_servers[net].servers_list);
339 the_servers_pool[net][id].pending_packets++;
344 * frescan_servers_dequeue() - dequeue a packet from a server
346 * @net: the network instance
347 * @id: the identificator for the server
348 * @packet: the packet dequeued
349 * @packet_prio: the priority current of the server
353 int frescan_servers_dequeue(frescan_network_t net,
355 frescan_packet_t **packet,
356 frescan_prio_t *packet_prio)
358 struct list_head *pos;
359 frescan_server_data_t *server;
361 server = &the_servers_pool[net][id];
363 if (list_empty(&server->packet_list.fifo_list)) {
364 ERROR("no packet in server %d fifo list\n", id);
368 list_for_each(pos, &server->packet_list.fifo_list) {
369 *packet = list_entry(pos, frescan_packet_t, fifo_list);
373 list_del(&((*packet)->fifo_list));
374 *packet_prio = server->current_priority;
375 server->pending_packets--;
377 if (server->pending_packets == 0) {
378 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
379 "no more packets, delete from active list\n");
380 list_del(&server->servers_list);
383 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
384 "dequeued packet server:%u cur_prio:%u pending:%u\n",
385 id, *packet_prio, server->pending_packets);