]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
tx_thread cancellation modified by using pthread_cancel call
authorMartin Molnar <molnar@sum.(none)>
Fri, 25 Jan 2008 16:37:00 +0000 (17:37 +0100)
committerMartin Molnar <molnar@sum.(none)>
Fri, 25 Jan 2008 16:37:00 +0000 (17:37 +0100)
fwp/libfwp/include/fwp_msg.h
fwp/libfwp/include/fwp_msgq.h
fwp/libfwp/src/fwp_msgq.c
fwp/libfwp/src/fwp_vres.c

index d3968af63e9be97cd7aa5623ce2ff016532455b4..b390730a552a63b218f205a5e6b5a12694edca35 100644 (file)
@@ -9,15 +9,15 @@
 #define FWP_NEGT_RES  0x03  /* negotiation response */
 
 struct fwp_msg_header{
-       uint8_t code;
        uint16_t hid;           /**< handshake ID */
+       uint8_t code;
 }__attribute__((packed));
 
 struct fwp_msg_contract{
        uint16_t id;            /**< global contract_id */ 
-       uint8_t  ac_id;         /**< AC id ~ priority of vres */ 
        uint16_t budget;        /**< bits per second */
        uint32_t period_usec;   /**< all time units are in microseconds */
+       uint8_t  ac_id;         /**< AC id ~ priority of vres */ 
        uint8_t  status;
 }__attribute__((packed));
 
index 6ca34e32ce3a43a15c0c00abc58b64727900e5d7..2f49496d45720d1db71cc9d5d6fc1f10c3bfe28c 100644 (file)
@@ -5,19 +5,18 @@
 #include "fwp_msgb.h"
 
 #include <pthread.h>
+#include <semaphore.h>
 
 struct fwp_msgq {
        unsigned int nr_pending;  /**< number of messages in the queue */
-
-       /* queue reject policy */
-       /*queue_rejection_policy qr_policy;*/
-       /*struct sockaddr_in dest_addr;*/
-       
-       /* For simplicity now */
        struct fwp_msgb* queue[FWP_MSGQ_SIZE];
        unsigned int in;   /**< add at offset (in % size) */ 
        unsigned int out;  /**< extracted from offset (out % size) */ 
-       pthread_mutex_t lock;
+       pthread_mutex_t lock; /**< queue lock */
+       sem_t empty_lock;     /**< semaphore to block on empty mqueue */
+       
+       /* queue reject policy */
+       /*queue_rejection_policy qr_policy;*/
 };
 
 void fwp_msgq_init(struct fwp_msgq *msgq);
index 658c28fcf583dde7ce256b52159874de6dad595a..510aecd673be2a3e36dfa46567f5d4188a085462 100644 (file)
@@ -10,6 +10,7 @@ void fwp_msgq_init(struct fwp_msgq *msgq)
        msgq->in = 0;
        msgq->out = 0;
        pthread_mutex_init(&msgq->lock, NULL); /* fast mutex */
+       sem_init(&msgq->empty_lock, 0, 0);
 }
 
 /**
@@ -46,6 +47,7 @@ int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb *msgb)
 
        /* release queue mutex */
        pthread_mutex_unlock(&msgq->lock);
+       sem_post(&msgq->empty_lock);
 
        return 0;
 }
index 36557f40d77cf5e2bd211cd9b3b8247c511e3f87..cf320c7fe1cd63989634c0ae41a75d4f2dae3a69 100644 (file)
@@ -12,7 +12,8 @@ static void* fwp_vres_tx_thread(void *_vres);
 
 enum fwp_vres_status_t {
        FWP_VRES_CLOSED = 0 ,
-       FWP_VRES_OPENED = 1 ,
+       FWP_VRES_CLOSING = 1 ,
+       FWP_VRES_OPENED = 2 ,
 };
 
 /**
@@ -95,26 +96,20 @@ int fwp_vres_open(struct fwp_contract *cnt)
 int fwp_vres_close(unsigned int id)
 {      
        struct fwp_vres *vres = &fwp_vres_table[id];
-       struct fwp_msgb *msgb;
        int rc;
 
        if ((id < 0) || (id >= FWP_VRES_MAX))
                return -EINVAL;
        if ((vres->status != FWP_VRES_OPENED))
-               return -EPERM;
-                        
+               return -EPERM; 
+       
+       vres->status = FWP_VRES_CLOSING;
+       pthread_cancel(vres->tx_thread);
+
        rc = fwp_ac_close(vres->contract.ac_id);
        if (rc < 0)
                return rc;
        
-       vres->status = FWP_VRES_CLOSED;
-       
-       /* locking not needed*/
-       
-       while ((msgb = fwp_msgq_dequeue(&vres->tx_queue))) { 
-               fwp_msgb_free(msgb);
-       }
-
        FWP_DEBUG("Vres id %d closed.\n", id);  
        return  0;
 }
@@ -127,6 +122,19 @@ inline int fwp_vres_send(unsigned int id, struct fwp_msgb* msgb)
                return -EPERM;
 }
 
+static void fwp_vres_cleanup(void *_vres)
+{
+       struct fwp_vres *vres = (struct fwp_vres*)_vres;
+       struct fwp_msgq *msgq = &vres->tx_queue;
+       struct fwp_msgb *msgb;
+
+       while ((msgb = fwp_msgq_dequeue(msgq))) { 
+               fwp_msgb_free(msgb);
+       }
+       
+       vres->status = FWP_VRES_CLOSED;
+}
+
 static void* fwp_vres_tx_thread(void *_vres)
 {
        struct fwp_vres *vres = (struct fwp_vres*)_vres;
@@ -142,14 +150,17 @@ static void* fwp_vres_tx_thread(void *_vres)
        period.tv_sec = 0;
        
        fwp_set_rt_prio(90 - vres->contract.ac_id);
-       /*
-        * block_signals();
-        */
+       
        FWP_DEBUG("vres tx thread with budget:%d period_usec = %d started.\n", 
                vres->contract.budget, vres->contract.period_usec);
 
-       while (vres->status != FWP_VRES_CLOSED) {
-               /* TODO-consider: block on semaphore if there is no msg */
+       pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+       pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+
+       while (vres->status == FWP_VRES_OPENED) {
+               
+               pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
+               sem_wait(&msgq->empty_lock);
                clock_gettime(CLOCK_MONOTONIC, &start_period);
                
                msgb = fwp_msgq_dequeue(msgq);
@@ -161,11 +172,18 @@ static void* fwp_vres_tx_thread(void *_vres)
                        fwp_msgb_free(msgb);
                }
 
+               pthread_testcancel();   
+               pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
+               
                fwp_timespec_add(&end_period, &start_period, &period);
                clock_gettime(CLOCK_MONOTONIC, &current_time);
                fwp_timespec_sub(&interval, &end_period, &current_time);
                nanosleep(&interval, NULL);
        }
        
+       /* it should normaly never come here */ 
+       pthread_cleanup_pop(0); 
+       vres->status = FWP_VRES_CLOSED;
+       
        return NULL;
 }