#define FWP_NEGT_RES 0x03 /* negotiation response */
struct fwp_msg_header{
- uint8_t code;
uint16_t hid; /**< handshake ID */
+ uint8_t code;
}__attribute__((packed));
struct fwp_msg_contract{
uint16_t id; /**< global contract_id */
- uint8_t ac_id; /**< AC id ~ priority of vres */
uint16_t budget; /**< bits per second */
uint32_t period_usec; /**< all time units are in microseconds */
+ uint8_t ac_id; /**< AC id ~ priority of vres */
uint8_t status;
}__attribute__((packed));
#include "fwp_msgb.h"
#include <pthread.h>
+#include <semaphore.h>
struct fwp_msgq {
unsigned int nr_pending; /**< number of messages in the queue */
-
- /* queue reject policy */
- /*queue_rejection_policy qr_policy;*/
- /*struct sockaddr_in dest_addr;*/
-
- /* For simplicity now */
struct fwp_msgb* queue[FWP_MSGQ_SIZE];
unsigned int in; /**< add at offset (in % size) */
unsigned int out; /**< extracted from offset (out % size) */
- pthread_mutex_t lock;
+ pthread_mutex_t lock; /**< queue lock */
+ sem_t empty_lock; /**< semaphore to block on empty mqueue */
+
+ /* queue reject policy */
+ /*queue_rejection_policy qr_policy;*/
};
void fwp_msgq_init(struct fwp_msgq *msgq);
enum fwp_vres_status_t {
FWP_VRES_CLOSED = 0 ,
- FWP_VRES_OPENED = 1 ,
+ FWP_VRES_CLOSING = 1 ,
+ FWP_VRES_OPENED = 2 ,
};
/**
int fwp_vres_close(unsigned int id)
{
struct fwp_vres *vres = &fwp_vres_table[id];
- struct fwp_msgb *msgb;
int rc;
if ((id < 0) || (id >= FWP_VRES_MAX))
return -EINVAL;
if ((vres->status != FWP_VRES_OPENED))
- return -EPERM;
-
+ return -EPERM;
+
+ vres->status = FWP_VRES_CLOSING;
+ pthread_cancel(vres->tx_thread);
+
rc = fwp_ac_close(vres->contract.ac_id);
if (rc < 0)
return rc;
- vres->status = FWP_VRES_CLOSED;
-
- /* locking not needed*/
-
- while ((msgb = fwp_msgq_dequeue(&vres->tx_queue))) {
- fwp_msgb_free(msgb);
- }
-
FWP_DEBUG("Vres id %d closed.\n", id);
return 0;
}
return -EPERM;
}
+static void fwp_vres_cleanup(void *_vres)
+{
+ struct fwp_vres *vres = (struct fwp_vres*)_vres;
+ struct fwp_msgq *msgq = &vres->tx_queue;
+ struct fwp_msgb *msgb;
+
+ while ((msgb = fwp_msgq_dequeue(msgq))) {
+ fwp_msgb_free(msgb);
+ }
+
+ vres->status = FWP_VRES_CLOSED;
+}
+
static void* fwp_vres_tx_thread(void *_vres)
{
struct fwp_vres *vres = (struct fwp_vres*)_vres;
period.tv_sec = 0;
fwp_set_rt_prio(90 - vres->contract.ac_id);
- /*
- * block_signals();
- */
+
FWP_DEBUG("vres tx thread with budget:%d period_usec = %d started.\n",
vres->contract.budget, vres->contract.period_usec);
- while (vres->status != FWP_VRES_CLOSED) {
- /* TODO-consider: block on semaphore if there is no msg */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+
+ while (vres->status == FWP_VRES_OPENED) {
+
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+ sem_wait(&msgq->empty_lock);
clock_gettime(CLOCK_MONOTONIC, &start_period);
msgb = fwp_msgq_dequeue(msgq);
fwp_msgb_free(msgb);
}
+ pthread_testcancel();
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+
fwp_timespec_add(&end_period, &start_period, &period);
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
fwp_timespec_sub(&interval, &end_period, ¤t_time);
nanosleep(&interval, NULL);
}
+ /* it should normaly never come here */
+ pthread_cleanup_pop(0);
+ vres->status = FWP_VRES_CLOSED;
+
return NULL;
}