]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/fwp/fwp_vres.c
Implemented synchronous and asynchronous sending
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
index 42305748281ddc562242b1c11417eb3d08fa3683..1177a4eb1ffa5b931f1cbe0c983ec605ced15a1e 100644 (file)
@@ -1,24 +1,74 @@
+/**************************************************************************/
+/* ---------------------------------------------------------------------- */
+/* Copyright (C) 2006 - 2009 FRESCOR consortium partners:                */
+/*                                                                       */
+/*   Universidad de Cantabria,              SPAIN                        */
+/*   University of York,                    UK                           */
+/*   Scuola Superiore Sant'Anna,            ITALY                        */
+/*   Kaiserslautern University,             GERMANY                      */
+/*   Univ. Politécnica  Valencia,           SPAIN                       */
+/*   Czech Technical University in Prague,  CZECH REPUBLIC               */
+/*   ENEA                                   SWEDEN                       */
+/*   Thales Communication S.A.              FRANCE                       */
+/*   Visual Tools S.A.                      SPAIN                        */
+/*   Rapita Systems Ltd                     UK                           */
+/*   Evidence                               ITALY                        */
+/*                                                                       */
+/*   See http://www.frescor.org for a link to partners' websites         */
+/*                                                                       */
+/*          FRESCOR project (FP6/2005/IST/5-034026) is funded            */
+/*       in part by the European Union Sixth Framework Programme         */
+/*       The European Union is not liable of any use that may be         */
+/*       made of this code.                                              */
+/*                                                                       */
+/*                                                                       */
+/*  This file is part of FWP (Frescor WLAN Protocol)                     */
+/*                                                                       */
+/* FWP is free software; you can redistribute it and/or modify it        */
+/* under terms of the GNU General Public License as published by the     */
+/* Free Software Foundation; either version 2, or (at your option) any   */
+/* later version.  FWP is distributed in the hope that it will be        */
+/* useful, but WITHOUT ANY WARRANTY; without even the implied warranty   */
+/* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU   */
+/* General Public License for more details. You should have received a   */
+/* copy of the GNU General Public License along with FWP; see file       */
+/* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
+/* Cambridge, MA 02139, USA.                                             */
+/*                                                                       */
+/* As a special exception, including FWP header files in a file,         */
+/* instantiating FWP generics or templates, or linking other files       */
+/* with FWP objects to produce an executable application, does not       */
+/* by itself cause the resulting executable application to be covered    */
+/* by the GNU General Public License. This exception does not            */
+/* however invalidate any other reasons why the executable file might be  */
+/* covered by the GNU Public License.                                    */
+/**************************************************************************/
+
 #include "fwp_utils.h"
 #include "fwp_vres.h"
 
 #include "fwp_msgq.h"
 #include "fwp_endpoint.h"
+#include "fwp_debug.h"
 
 #include <string.h>
+#include <errno.h>
+#include <stdlib.h>
 
 static void* fwp_vres_tx_thread(void *_vres);
 
 typedef enum {
-       FWP_VF_USED             = 1 ,
-       FWP_VF_BOUND            = 2 ,
-       FWP_VF_RESCHED          = 4 ,
+       USED,
+       UNTOUCHED,
+       CHANGED,
+       QUEUED,
 } fwp_vres_flag_t;
 
 fwp_vres_params_t fwp_vres_params_default = {
-       .id = 0,        
        .ac_id = FWP_AC_VO,
         .budget = 100,
-       .period = {.tv_sec = 2 , .tv_nsec = 111111}
+       .period = {.tv_sec = 2 , .tv_nsec = 111111},
+       .src = { 0 },
 };
 
 /**
@@ -28,16 +78,22 @@ fwp_vres_params_t fwp_vres_params_default = {
  */
 struct fwp_vres{
        struct fwp_vres_params          params;
-       /* consideration: move tx_queue to endpoint */
-       /**< queue for messages to send */
-       struct fwp_msgq                 tx_queue;   
-       int                             flags;
+       fwp_vres_flag_t                 flags;
+       pthread_mutex_t                 mutex;
+       pthread_cond_t                  cond; /**< Signalizes budget replenishment */
+       fwp_budget_t                    budget; /**< Current remaining budget */
+       fwp_period_t                    period; /**< Period for this "activation" */
+       struct timespec                 replenish_at; /**< Time of next replenishment */
+       sem_t                           consumed;
        /**< endpoint bounded to this vres */
-       /*fwp_endpoint_t                *epoint; */
+       struct fwp_endpoint             *epoint;
        pthread_t                       tx_thread; /**< tx_thread id*/
        pthread_attr_t                  tx_thread_attr;
-       int                             ac_sockd;  /**< ac socket descriptor */
-       fwp_sockaddr_t                  addr;   /**< dest addr,for effectivness*/
+       /** Copy of bound enpoint's socket - used for future changes
+        * of vres parameters. */
+       int                             ac_sockd;
+       /** Queue for messages to send */
+       struct fwp_msgq                 msg_queue;   
 };
 
 typedef
@@ -82,67 +138,30 @@ static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
        return 0;
 }
 
-static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        vres->flags |= (1 << flag);
 }
 
-static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        vres->flags &= ~(1 << flag);
 }
 
-static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        return !!(vres->flags & (1 << flag));
 }
 
-static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
+static inline void clear_all_flags(fwp_vres_t *vres)
 {
        vres->flags = 0;
 }
 
-#if 0
-/* Deprecated */
-static int fwp_vres_ac_open(fwp_ac_t ac_id) 
-{
-       int sockd;
-       unsigned int tos;
-       
-       if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
-               errno = EINVAL;
-               return -1;
-       }
-
-       if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
-               FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
-               return (-1);
-       }
-
-       tos = ac_to_tos[ac_id];
-       
-       if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
-               int e = errno;
-               FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
-               close(sockfd);
-               errno = e;
-               return -1;
-       }
-       
-       return sockd;
-}
-#endif
-
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
-{
-       /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
-                       msgb->peer->addr, msgb->peer->addrlen);*/
-       return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
-}
-
 static inline void fwp_vres_free(fwp_vres_t *vres)
 {
-       fwp_vres_clearall_flag(vres);
+       /* Clear USED flag */
+       clear_all_flags(vres);
 }
 
 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
@@ -150,7 +169,7 @@ static inline int fwp_vres_is_valid(fwp_vres_t *vres)
        int id  = vres - fwp_vres_table.entry;
 
        if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || 
-               (!fwp_vres_get_flag(vres, FWP_VF_USED))) 
+               (!get_flag(vres, USED))) 
                return 0;
        
        return 1; 
@@ -158,7 +177,7 @@ static inline int fwp_vres_is_valid(fwp_vres_t *vres)
 
 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
 {
-       3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
+       if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
                return -EINVAL;
        *vres = &fwp_vres_table.entry[vres_id];
        return 0;
@@ -178,10 +197,8 @@ int fwp_vres_table_init(unsigned int max_vres)
        return 0;
 }
 
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
        return (vres - fwp_vres_table.entry);
 }
 
@@ -190,7 +207,7 @@ fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
  *
  * \return On success returns vres descriptor. 
  */
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
 {
        int i;
        unsigned int max_vres;
@@ -200,7 +217,7 @@ fwp_vres_d_t fwp_vres_alloc()
        i = 0;
        max_vres = fwp_vres_table.max_vres;
        while ((i < max_vres) && 
-               (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
+               (get_flag(&fwp_vres_table.entry[i], USED))) {
                i++;
        }
        
@@ -211,58 +228,73 @@ fwp_vres_d_t fwp_vres_alloc()
        }
 
        FWP_DEBUG("Allocated vres id = %d\n",i);
-       fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
+       set_flag(&fwp_vres_table.entry[i], USED);
        pthread_mutex_unlock(&fwp_vres_table.lock);
        return (&fwp_vres_table.entry[i]);
 }
 
-inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
+static int apply_params(fwp_vres_t *vres)
 {
        int rv;
-
-       /* copy vres paramters into vres structure */
-       rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
-       if (!rv)
-               return rv;
-       memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
-
-       return 0;
+       vres->period = vres->params.period;
+       vres->budget = vres->params.budget;
+       set_flag(vres, UNTOUCHED);
+       if (get_flag(vres, CHANGED)) {
+               clear_flag(vres, CHANGED);
+               rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+       }
+       return rv;
 }
 
 /**
  * Set vres params
  *
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
  * \param[in] params Vres parameters
  *
  * \return On success returns zero. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 {
-       fwp_vres_t *vres = vresd;
-       
+       int rv = 0;
+
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }
 
-       return _fwp_vres_set_params(vres, params);
+       pthread_mutex_lock(&vres->mutex);
+
+       if (vres->epoint &&
+           params->src.s_addr != vres->params.src.s_addr) {
+               errno = EREMCHG;
+               rv = -1;
+               goto out;
+       }
+       vres->params = *params;
+       if (vres->epoint) {
+               set_flag(vres, CHANGED);
+               if (get_flag(vres, UNTOUCHED))
+                       rv = apply_params(vres);
+       }
+out:
+       pthread_mutex_unlock(&vres->mutex);
+       return rv;
 }
 
 /**
  * Creates new vres
  *
  * \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
  *
  * \return On success returns descriptor of vres. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
 {
        int rv;
        fwp_vres_t *vres;
@@ -272,20 +304,29 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
                errno = ENOMEM;
                return -1;
        }
-       /* initialize msg queue */
-       fwp_msgq_init(&vres->tx_queue);
+
+       pthread_mutexattr_t ma;
+       rv = pthread_mutexattr_init(&ma);
+       rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
+       if (rv) return rv;
+       pthread_mutex_init(&vres->mutex, &ma);
+       pthread_cond_init(&vres->cond, NULL);
        
-       memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+       vres->params = *params;
+       apply_params(vres);
+       fwp_msgq_init(&vres->msg_queue);
+
        pthread_attr_init(&vres->tx_thread_attr);
        if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
                            fwp_vres_tx_thread, (void*) vres)) != 0){
                goto err;
        }
 
-       *vresdp = vres;
+       *vresp = vres;
+       
        return 0;
 err:   
+       fwp_msgq_dequeue_all(&vres->msg_queue);
        fwp_vres_free(vres);
        return -1; 
 }
@@ -293,49 +334,159 @@ err:
 /**
  * Destroys vres
  *
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
  *
  * \return On success returns 0. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
 {      
-       fwp_vres_t *vres = vresd;
-
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }
        
        pthread_cancel(vres->tx_thread);
+       pthread_cond_destroy(&vres->cond);
+       pthread_mutex_destroy(&vres->mutex);
                
-       FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
+       fwp_msgq_dequeue_all(&vres->msg_queue);
+       fwp_vres_free(vres);
+       
+       FWP_DEBUG("Vres destroyed.\n");
        return  0;
 }
 
-static void fwp_vres_cleanup(void *_vres)
+static void do_consume_budget(struct fwp_vres *vres, size_t size)
 {
-       fwp_vres_t *vres = (fwp_vres_t*)_vres;
+       if (get_flag(vres, UNTOUCHED)) {
+               /* Setup next replenish time */
+               struct timespec now;
+               clear_flag(vres, UNTOUCHED);
+               if (get_flag(vres, QUEUED))
+                       now = vres->replenish_at;
+               else
+                       clock_gettime(CLOCK_MONOTONIC, &now);
+               fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
+               sem_post(&vres->consumed);
+       }
+       vres->budget -= size;
+}
 
-       fwp_msgq_dequeue_all(&vres->tx_queue);
-       fwp_vres_free(vres);
+int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+       int ret = 0;
+       if (vres->params.budget < size) {
+               errno = ENOSR;
+               return -1;
+       }
+       while (can_block && vres->budget < size) {
+               ret = pthread_cond_wait(&vres->cond, &vres->mutex);
+               /* The budget might have been changed while we were
+                * waiting, so check it again. */
+               if (vres->params.budget < size) {
+                       errno = ENOSR;
+                       return -1;
+               }
+       }
+       if (ret == 0) {
+               if (vres->budget >= size) {
+                       do_consume_budget(vres, size);
+                       ret = 0;
+               } else {
+                       set_flag(vres, QUEUED);
+                       ret = 1;
+               }
+       }
+       return ret;
 }
 
-static inline void 
-fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
-                       fwp_budget_t  *budget)
+/** 
+ * Tries to consume (a part of) budget
+ * 
+ * @param vres VRES whose budget is conumed.
+ * @param size How much to consume (in bytes).
+ * @param can_block True, indicates that the function can block to
+ * wait for budget replenishment. False causes no blocking and if 1 is
+ * returned, the calles must call fwp_vres_enqueue() to enqueue the
+ * packet to be sent later after replenishing.
+ * 
+ * @return Zero if budget was consumed, 1 if there is not enough
+ * budget available and blocking was not allowed, -1 in case of error.
+ */
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
 {
-       if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {  
-               /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
-               period->tv_sec = vres->params.period / SEC_TO_USEC;*/
-               *period = vres->params.period;
-               *budget = vres->params.budget; 
-               FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
-                               "period_nsec=%ld.\n",vres->params.budget, 
-                               period->tv_sec, period->tv_nsec);
-               fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+       int ret = 0;
+       pthread_mutex_lock(&vres->mutex);
+       ret = __consume_budget(vres, size, can_block);
+       pthread_mutex_unlock(&vres->mutex);
+       return ret;
+}
+
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size)
+{
+       struct fwp_msgb *msgb;
+       int ret;
+       
+       if (!(msgb = fwp_msgb_alloc(size)))
+               return -1;
+       memcpy(msgb->data, msg, size);
+       fwp_msgb_put(msgb, size);
+       ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
+       if (ret) {
+               fwp_msgb_free(msgb);
+               return ret;
        }
+       return ret;
+}
+
+static void wait_for_replenish(struct fwp_vres *vres)
+{
+       sem_wait(&vres->consumed);
+       clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
+                       &vres->replenish_at, NULL);
+}
+
+static void send_queue(struct fwp_vres *vres)
+{
+       struct fwp_msgb *msgb;
+       bool can_send;
+       msgb = fwp_msgq_dequeue(&vres->msg_queue);
+       can_send = (0 == __consume_budget(vres, msgb->len, false));
+       if (!can_send) {
+               fwp_msgb_free(msgb);
+               FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
+               return;
+       }
+       /* If we cannot send the whole queue, the flag will be set
+        * later by __consume_budget(). */
+       clear_flag(vres, QUEUED);
+
+       while (msgb) {
+               fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
+               fwp_msgb_free(msgb);
+               msgb = fwp_msgq_peek(&vres->msg_queue);
+               if (msgb) {
+                       can_send = (0 == __consume_budget(vres, msgb->len, false));
+                       if (can_send) {
+                               msgb = fwp_msgq_dequeue(&vres->msg_queue);
+                       } else {
+                               msgb = NULL;
+                               return;
+                       }
+               }
+       }
+}
+
+static void replenish(struct fwp_vres *vres)
+{
+       pthread_mutex_lock(&vres->mutex);
+       apply_params(vres);
+       if (get_flag(vres, QUEUED))
+               send_queue(vres);
+       pthread_cond_broadcast(&vres->cond);
+       pthread_mutex_unlock(&vres->mutex);
 }
 
 /**
@@ -345,120 +496,58 @@ fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
 static void* fwp_vres_tx_thread(void *_vres)
 {
        struct fwp_vres *vres = (struct fwp_vres*)_vres;
-       struct fwp_msgq *msgq = &vres->tx_queue;
-       struct fwp_msgb *msgb = NULL;
        unsigned int    ac_id = vres->params.ac_id;
-       fwp_budget_t    budget = vres->params.budget;
-       fwp_budget_t    curr_budget;
-       int             rc;
-               
-       struct timespec  start_period, end_period, period;
-       struct timespec  current_time, interval;
 
        fwp_set_rt_prio(90 - ac_id);
-       
-
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
-       pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
-       pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+       pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
        
-       clock_gettime(CLOCK_MONOTONIC, &start_period);
-
        while (1) {
-               /* wait for next period and then send */
-               fwp_timespec_add(&end_period, &start_period, &period);
-               clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);
-       
-               sem_wait(&msgq->empty_lock);
-               fwp_vres_sched_update(vres, &period, &budget);
-               clock_gettime(CLOCK_MONOTONIC, &start_period);
-               
-               /*msgb = fwp_msgq_dequeue(msgq);
-               *if (msgb){*/
-               curr_budget = 0;
-               while ((curr_budget < budget)&& 
-                      (msgb = fwp_msgq_dequeue(msgq))) {
-                       rc = _fwp_vres_send(vres->ac_sockd, msgb);
-                       if (!(rc < 0)) {
-                               FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                               /* Switch to this in the future */
-                                //curr_budget+= msgb->len;
-                                curr_budget++;
-                       }
-                       
-                       fwp_msgb_free(msgb);
-               }
-
-               /*pthread_testcancel(); */
-               /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
-               
-               fwp_timespec_add(&end_period, &start_period, &period);
-               clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);
-               */
+               wait_for_replenish(vres);
+               pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+               replenish(vres);
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
        }
-       
-       /* it should normaly never come here */ 
-       pthread_cleanup_pop(0); 
-       fwp_vres_free(vres);
-       
-       return NULL;
-}
-
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
-{
-       fwp_vres_t *vres = vresd;
 
-       if (fwp_vres_is_valid(vres)) {
-               return fwp_msgq_enqueue(&vres->tx_queue, msgb);
-       } else {
-               errno = EPERM;  /* TODO: Use more appropriate error than EPERM. */
-               return -1;
-       }
+       return NULL;
 }
 
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd)
 {
-       fwp_vres_t *vres = vresd;
        int rv = 0;
 
-       pthread_mutex_lock(&fwp_vres_table.lock);
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                rv = -1;
                goto err;
        }
        
-       if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
-               errno = EPERM;  
+       if (vres->epoint) { /*if already bounded */
+               errno = EBUSY;
                rv = -1;
                goto err;
        }
 
        vres->ac_sockd = sockd;
-       rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
-       /*if (rv)
-               goto err;*/
-       fwp_vres_set_flag(vres, FWP_VF_BOUND);
+       rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+       if (rv)
+               goto err;
+       vres->epoint = ep;
 err:
-       pthread_mutex_unlock(&fwp_vres_table.lock);
        return rv;
 }
 
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
-       if (!fwp_vres_is_valid(vresd)) {
+       if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }
-       fwp_vres_clear_flag(vres, FWP_VF_BOUND);
+       pthread_mutex_lock(&vres->mutex);
+       vres->epoint = NULL;
+       pthread_mutex_unlock(&vres->mutex);
        /* TODO: consider what to do with pending messages */
-       /* fwp_vres_free_msgb(vres->tx_queue); */
+       fwp_msgq_dequeue_all(&vres->msg_queue);
        return 0;
 }