2 * @file frescan_queues.c
4 * @brief FRESCAN queues to manage the packets by prio and servers
15 * This file contains the FRESCAN queues where frescan packets are stored and
18 * TODO: disable interrupts for mutual exclusion!!!
22 * See MaRTE OS license
28 #include <misc/timespec_operations.h>
30 #include "frescan_queues.h"
31 #include "frescan_packets.h"
32 #include "frescan_debug.h"
33 #include "frescan_id.h"
36 * frescan_pqueue_create() - creates a priority queue
39 static inline frescan_prio_queue_t *frescan_pqueue_create(uint32_t max_prio,
40 frescan_network_t net)
43 frescan_prio_queue_t *pq; // priority queue
45 pq = (frescan_prio_queue_t *)malloc(sizeof(frescan_prio_queue_t));
47 ERROR("could not allocate memory for prio queue\n");
51 pq->max_prio = max_prio;
54 ret = sem_init (&pq->sem, 0, 0);
56 ERROR("could not init the semaphore\n");
61 pq->fifo_queues = (frescan_packet_t *)
62 malloc(max_prio * sizeof(frescan_packet_t));
64 for(prio=0; prio<max_prio; prio++) {
65 INIT_LIST_HEAD(&(pq->fifo_queues[prio].fifo_list));
72 * frescan_queues_init() - initialize the queues
74 * 1.- create the transmission fixed priority queue
75 * 2.- create the rx channels and its associated priority queues
77 * TODO: when error free memory
80 int frescan_queues_init(frescan_queues_t *queues,
81 frescan_init_params_t *params)
86 // create transmission fixed priority queue
87 queues->tx_fp_queue = frescan_pqueue_create(params->tx_fp_max_prio,
90 if (queues->tx_fp_queue == NULL) {
91 ERROR("could not allocate memory for tx fp queue\n");
95 // create receiving channels
96 queues->rx_channel_queues = (frescan_prio_queue_t **)
97 malloc(params->rx_num_of_channels *
98 sizeof(frescan_prio_queue_t *));
100 if (queues->rx_channel_queues == NULL) {
101 ERROR("could not allocate memory for receiving channels\n");
105 queues->num_rx_channels = params->rx_num_of_channels;
107 // create a priority queue for each channel
108 for(i=0; i<params->rx_num_of_channels; i++) {
110 if (params->rx_channel_max_prio == NULL) {
111 max_prio = params->tx_fp_max_prio;
113 max_prio = params->rx_channel_max_prio[i];
116 queues->rx_channel_queues[i] = frescan_pqueue_create
117 (max_prio, params->net);
119 if (queues->rx_channel_queues[i] == NULL) {
120 ERROR("could not allocate memory for rx pq %d\n", i);
129 * frescan_pqueue_enqueue() - enqueue a packet
131 * check the packet flags and enqueue the packet in the appropiate queue
134 int frescan_pqueue_enqueue(frescan_prio_queue_t *pqueue,
135 frescan_packet_t *packet,
140 if (prio >= pqueue->max_prio) {
141 ERROR("priority of the packet is too high\n");
145 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
146 "enqueue packet with prio %u, pending %u\n",
147 prio, packet->buffer_pending_bytes);
149 list_add_tail(&(packet->fifo_list),
150 &(pqueue->fifo_queues[prio].fifo_list));
152 ret = sem_post(&pqueue->sem);
153 if (ret != 0) return ret;
159 * frescan_pqueue_requeue() - requeue a packet
162 int frescan_pqueue_requeue(frescan_prio_queue_t *pqueue,
163 frescan_packet_t *packet,
168 if (prio >= pqueue->max_prio) {
169 ERROR("priority of the packet is too high\n");
173 list_add(&packet->fifo_list, &(pqueue->fifo_queues[prio].fifo_list));
175 ret = sem_post(&pqueue->sem);
176 if (ret != 0) return ret;
182 * frescan_pqueue_dequeue() - dequeue the packet with highest priority
185 int frescan_pqueue_dequeue(frescan_prio_queue_t *pqueue,
186 frescan_packet_t **packet,
187 frescan_prio_t *packet_prio,
192 frescan_packet_t *tmp = NULL;
193 struct list_head *pos;
198 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "calling sem_wait\n");
199 sem_wait(&pqueue->sem);
201 ret = sem_trywait (&pqueue->sem);
203 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
204 "sem_trywait was locked (no packets)\n");
209 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG,
210 "check priority fifo queues (max_prio=%u)\n", pqueue->max_prio);
212 // NOTE: we only acquire the lock if we block because non-blocking
213 // calls are made from a context with no interrupts (when updating
214 // the buffer at 'frescan_hw_buffer_update' which is always called
215 // with interrupts disabled)
216 if (blocking) FRESCAN_ACQUIRE_LOCK(&the_networks[pqueue->net].lock);
218 for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
219 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
221 &pqueue->fifo_queues[prio].fifo_list) {
222 tmp = list_entry(pos, frescan_packet_t,
227 list_del(&tmp->fifo_list);
232 if (blocking) FRESCAN_RELEASE_LOCK(&the_networks[pqueue->net].lock);
234 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "dequeued prio %u\n", prio);
241 * frescan_pqueue_get_highest_prio() - returns the packet with highest prio
242 * but not extracting it from the queue.
245 int frescan_pqueue_get_highest_prio(frescan_prio_queue_t *pqueue,
246 frescan_packet_t **packet,
247 frescan_prio_t *packet_prio)
250 frescan_packet_t *tmp = NULL;
251 struct list_head *pos;
255 for(prio=pqueue->max_prio-1; prio >= 0; prio--) {
256 if (!list_empty(&pqueue->fifo_queues[prio].fifo_list)) {
258 &pqueue->fifo_queues[prio].fifo_list) {
259 tmp = list_entry(pos, frescan_packet_t,
268 DEBUG(FRESCAN_QUEUES_ENABLE_DEBUG, "highest prio %u\n", prio);
276 * frescan_servers_enqueue() - enqueue a packet through a server
278 * @net: the network instance
279 * @id: the identificator for the server
280 * @packet: the packet being enqueued
284 int frescan_servers_enqueue(frescan_network_t net,
286 frescan_packet_t *packet)
288 frescan_server_data_t *server = &the_servers_pool[net][id];
290 clock_gettime (CLOCK_MONOTONIC, &packet->timestamp);
292 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG, "id:%u timestamp:(%d, %d)\n",
293 id, packet->timestamp.tv_sec, packet->timestamp.tv_nsec);
295 // add the packet to the server fifo list
296 list_add_tail(&packet->fifo_list, &server->packet_list.fifo_list);
298 // if the server was inactive (no packets) put it in the active list
299 if (server->pending_packets == 0) {
300 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
301 "ss becomes active act_time=timestamp\n");
302 list_add_tail(&server->servers_list,
303 &the_active_servers[net].servers_list);
304 server->act_time = packet->timestamp;
307 server->pending_packets++;
312 * frescan_servers_requeue() - requeue a packet through a server
314 * @net: the network instance
315 * @id: the identificator for the server
316 * @packet: the packet being requeued
320 int frescan_servers_requeue(frescan_network_t net,
322 frescan_packet_t *packet)
324 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
325 "requeue packet for id:%u\n", id);
327 // add the packet to the server fifo list
328 list_add(&packet->fifo_list,
329 &the_servers_pool[net][id].packet_list.fifo_list);
331 // if the server was inactive (no packets to send) put it active
332 // (in the active list)
333 if (the_servers_pool[net][id].pending_packets == 0) {
334 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
335 "server was inactive, put in the active list\n");
336 list_add(&the_servers_pool[net][id].servers_list,
337 &the_active_servers[net].servers_list);
340 the_servers_pool[net][id].pending_packets++;
345 * frescan_servers_dequeue() - dequeue a packet from a server
347 * @net: the network instance
348 * @id: the identificator for the server
349 * @packet: the packet dequeued
350 * @packet_prio: the priority current of the server
354 int frescan_servers_dequeue(frescan_network_t net,
356 frescan_packet_t **packet,
357 frescan_prio_t *packet_prio)
359 struct list_head *pos;
360 frescan_server_data_t *server;
362 server = &the_servers_pool[net][id];
364 if (list_empty(&server->packet_list.fifo_list)) {
365 ERROR("no packet in server %d fifo list\n", id);
369 list_for_each(pos, &server->packet_list.fifo_list) {
370 *packet = list_entry(pos, frescan_packet_t, fifo_list);
374 list_del(&((*packet)->fifo_list));
375 *packet_prio = server->current_priority;
376 server->pending_packets--;
378 if (server->pending_packets == 0) {
379 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
380 "no more packets, delete from active list\n");
381 list_del(&server->servers_list);
384 DEBUG(FRESCAN_SERVERS_ENABLE_DEBUG,
385 "dequeued packet server:%u cur_prio:%u pending:%u\n",
386 id, *packet_prio, server->pending_packets);