]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/fwp/fwp_vres.c
Removed "descriptor" types
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
index 575cc4060a52134eccfa588be0e26232bb06521b..32a9eff4f0f75ef0e4d91e5b9184d62c69cd1614 100644 (file)
@@ -1,21 +1,68 @@
+/**************************************************************************/
+/* ---------------------------------------------------------------------- */
+/* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                */
+/*                                                                       */
+/*   Universidad de Cantabria,              SPAIN                        */
+/*   University of York,                    UK                           */
+/*   Scuola Superiore Sant'Anna,            ITALY                        */
+/*   Kaiserslautern University,             GERMANY                      */
+/*   Univ. Politécnica  Valencia,           SPAIN                       */
+/*   Czech Technical University in Prague,  CZECH REPUBLIC               */
+/*   ENEA                                   SWEDEN                       */
+/*   Thales Communication S.A.              FRANCE                       */
+/*   Visual Tools S.A.                      SPAIN                        */
+/*   Rapita Systems Ltd                     UK                           */
+/*   Evidence                               ITALY                        */
+/*                                                                       */
+/*   See http://www.frescor.org for a link to partners' websites         */
+/*                                                                       */
+/*          FRESCOR project (FP6/2005/IST/5-034026) is funded            */
+/*       in part by the European Union Sixth Framework Programme         */
+/*       The European Union is not liable of any use that may be         */
+/*       made of this code.                                              */
+/*                                                                       */
+/*                                                                       */
+/*  This file is part of FWP (Frescor WLAN Protocol)                     */
+/*                                                                       */
+/* FWP is free software; you can redistribute it and/or modify it        */
+/* under terms of the GNU General Public License as published by the     */
+/* Free Software Foundation; either version 2, or (at your option) any   */
+/* later version.  FWP is distributed in the hope that it will be        */
+/* useful, but WITHOUT ANY WARRANTY; without even the implied warranty   */
+/* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU   */
+/* General Public License for more details. You should have received a   */
+/* copy of the GNU General Public License along with FWP; see file       */
+/* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
+/* Cambridge, MA 02139, USA.                                             */
+/*                                                                       */
+/* As a special exception, including FWP header files in a file,         */
+/* instantiating FWP generics or templates, or linking other files       */
+/* with FWP objects to produce an executable application, does not       */
+/* by itself cause the resulting executable application to be covered    */
+/* by the GNU General Public License. This exception does not            */
+/* however invalidate any other reasons why the executable file might be  */
+/* covered by the GNU Public License.                                    */
+/**************************************************************************/
 #include "fwp_utils.h"
 #include "fwp_vres.h"
 
 #include "fwp_msgq.h"
 #include "fwp_endpoint.h"
+#include "fwp_debug.h"
 
 #include <string.h>
+#include <errno.h>
+#include <stdlib.h>
 
 static void* fwp_vres_tx_thread(void *_vres);
 
 typedef enum {
-       FWP_VF_USED             = ,
-       FWP_VF_BOUND            = ,
-       FWP_VF_RESCHED          = 4 ,
+       FWP_VF_USED             = 0,
+       FWP_VF_BOUND            = 1,
+       FWP_VF_CHANGED          = 2,
 } fwp_vres_flag_t;
 
 fwp_vres_params_t fwp_vres_params_default = {
-       .id = 0,        
        .ac_id = FWP_AC_VO,
         .budget = 100,
        .period = {.tv_sec = 2 , .tv_nsec = 111111}
@@ -31,13 +78,12 @@ struct fwp_vres{
        /* consideration: move tx_queue to endpoint */
        /**< queue for messages to send */
        struct fwp_msgq                 tx_queue;   
-       int                             flags;
+       fwp_vres_flag_t                 flags;
        /**< endpoint bounded to this vres */
        /*fwp_endpoint_t                *epoint; */
        pthread_t                       tx_thread; /**< tx_thread id*/
        pthread_attr_t                  tx_thread_attr;
        int                             ac_sockd;  /**< ac socket descriptor */
-       fwp_sockaddr_t                  addr;   /**< dest addr,for effectivness*/
 };
 
 typedef
@@ -133,11 +179,41 @@ static int fwp_vres_ac_open(fwp_ac_t ac_id)
 }
 #endif
 
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
+static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
 {
-       /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
-                       msgb->peer->addr, msgb->peer->addrlen);*/
-       return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
+       struct iovec  iov;
+       struct msghdr msg = {0};
+       ssize_t ret;
+       char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+       iov.iov_base = msgb->data;
+       iov.iov_len = msgb->len;
+
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+
+
+       if (vres->params.src.s_addr != 0) {
+               struct cmsghdr *cmsg;
+               struct in_pktinfo *ipi;
+
+               memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+               msg.msg_control = cmsg_buf;
+               msg.msg_controllen = sizeof(cmsg_buf);
+
+               cmsg = CMSG_FIRSTHDR(&msg);
+
+               cmsg->cmsg_level = SOL_IP;
+               cmsg->cmsg_type = IP_PKTINFO;
+               cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+               ipi->ipi_spec_dst = vres->params.src;
+       }
+       ret = sendmsg(vres->ac_sockd, &msg, 0);
+       return ret;
 }
 
 static inline void fwp_vres_free(fwp_vres_t *vres)
@@ -158,7 +234,7 @@ static inline int fwp_vres_is_valid(fwp_vres_t *vres)
 
 /*inline int fwp_vres_get(fwp_vres_id_t vres_id, fwp_vres_t **vres )
 {
-       3if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
+       if ((vres_id < 0) || (vres_id > fwp_vres_table.nr_vres - 1))
                return -EINVAL;
        *vres = &fwp_vres_table.entry[vres_id];
        return 0;
@@ -178,10 +254,8 @@ int fwp_vres_table_init(unsigned int max_vres)
        return 0;
 }
 
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
        return (vres - fwp_vres_table.entry);
 }
 
@@ -190,7 +264,7 @@ fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
  *
  * \return On success returns vres descriptor. 
  */
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
 {
        int i;
        unsigned int max_vres;
@@ -225,7 +299,7 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
        if (!rv)
                return rv;
        memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+       fwp_vres_set_flag(vres, FWP_VF_CHANGED);
 
        return 0;
 }
@@ -233,17 +307,15 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 /**
  * Set vres params
  *
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
  * \param[in] params Vres parameters
  *
  * \return On success returns zero. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 {
-       fwp_vres_t *vres = vresd;
-       
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
@@ -256,13 +328,13 @@ int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
  * Creates new vres
  *
  * \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
  *
  * \return On success returns descriptor of vres. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
 {
        int rv;
        fwp_vres_t *vres;
@@ -276,14 +348,14 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
        fwp_msgq_init(&vres->tx_queue);
        
        memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+       fwp_vres_set_flag(vres, FWP_VF_CHANGED);
        pthread_attr_init(&vres->tx_thread_attr);
        if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
                            fwp_vres_tx_thread, (void*) vres)) != 0){
                goto err;
        }
 
-       *vresdp = vres;
+       *vresp = vres;
        return 0;
 err:   
        fwp_vres_free(vres);
@@ -293,16 +365,14 @@ err:
 /**
  * Destroys vres
  *
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
  *
  * \return On success returns 0. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
 {      
-       fwp_vres_t *vres = vresd;
-
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
@@ -310,7 +380,7 @@ int fwp_vres_destroy(fwp_vres_d_t vresd)
        
        pthread_cancel(vres->tx_thread);
                
-       FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
+       FWP_DEBUG("Vres destroyed.\n");
        return  0;
 }
 
@@ -326,7 +396,7 @@ static inline void
 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
                        fwp_budget_t  *budget)
 {
-       if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {  
+       if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {  
                /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
                period->tv_sec = vres->params.period / SEC_TO_USEC;*/
                *period = vres->params.period;
@@ -334,7 +404,7 @@ fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
                FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
                                "period_nsec=%ld.\n",vres->params.budget, 
                                period->tv_sec, period->tv_nsec);
-               fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+               fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
        }
 }
 
@@ -351,46 +421,53 @@ static void* fwp_vres_tx_thread(void *_vres)
        fwp_budget_t    budget = vres->params.budget;
        fwp_budget_t    curr_budget;
        int             rc;
-               
-       struct timespec  start_period, end_period, period;
-       struct timespec  current_time, interval;
+       struct timespec start, period, interval, now;
 
        fwp_set_rt_prio(90 - ac_id);
-       
-
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
        pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
        pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
        
-       clock_gettime(CLOCK_MONOTONIC, &start_period);
+       /* just for sure */
+       fwp_vres_sched_update(vres, &period, &budget);
+       curr_budget = budget;
+       clock_gettime(CLOCK_MONOTONIC, &start);
 
        while (1) {
-               /* wait for next period and then send */
-               fwp_timespec_add(&end_period, &start_period, &period);
-               clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);
-       
-               sem_wait(&msgq->empty_lock);
-               fwp_vres_sched_update(vres, &period, &budget);
-               clock_gettime(CLOCK_MONOTONIC, &start_period);
-               
-               /*msgb = fwp_msgq_dequeue(msgq);
-               *if (msgb){*/
-               curr_budget = 0;
-               while ((curr_budget < budget)&& 
-                      (msgb = fwp_msgq_dequeue(msgq))) {
-                       rc = _fwp_vres_send(vres->ac_sockd, msgb);
-                       if (!(rc < 0)) {
-                               FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                               /* Switch to this in the future
-                                * curr_budget+= msgb->len;
-                                */
-                               curr_budget++;
+               msgb = fwp_msgq_dequeue(msgq);
+               if (msgb->len > budget) {
+                       FWP_ERROR("Message too large: %zd -> skipping\n",
+                                 msgb->len);
+                       goto skip_message;
+               }
+               if (curr_budget < msgb->len) {
+                       /* needs to replenish */
+                       clock_gettime(CLOCK_MONOTONIC, &now);
+                       fwp_timespec_sub(&interval, &now, &start);
+                       ul_logtrash("start=%ld.%09ld,  now=%ld.%09ld  diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
+                       fwp_timespec_sub(&interval, &period, &interval);
+                       if (interval.tv_sec > 0 ||
+                           (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+                               /* We have to wait to replenish */
+                               ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+                               nanosleep(&interval, NULL);
+                               fwp_timespec_add(&start, &now, &interval);
+                       } else {
+                               start = now;
                        }
-                       
-                       fwp_msgb_free(msgb);
+                       fwp_vres_sched_update(vres, &period, &budget);  
+                       curr_budget = budget;
+               }
+
+               rc = _fwp_vres_send(vres, msgb);
+               if (!(rc < 0)) {
+                       FWP_DEBUG("Message sent through AC%d\n",ac_id);
+                       curr_budget -= msgb->len;
+               } else {
+                       FWP_ERROR("Message sent error %d\n",rc);
                }
+       skip_message:                   
+               fwp_msgb_free(msgb);
 
                /*pthread_testcancel(); */
                /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
@@ -409,22 +486,19 @@ static void* fwp_vres_tx_thread(void *_vres)
        return NULL;
 }
 
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
+int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
 {
-       fwp_vres_t *vres = vresd;
-
        if (fwp_vres_is_valid(vres)) {
                return fwp_msgq_enqueue(&vres->tx_queue, msgb);
        } else {
-               errno = EPERM;  /* TODO: Use more appropriate error than EPERM. */
+               errno = EINVAL;
                return -1;
        }
 }
 
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, int sockd)
 {
-       fwp_vres_t *vres = vresd;
        int rv = 0;
 
        pthread_mutex_lock(&fwp_vres_table.lock);
@@ -435,7 +509,7 @@ int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
        }
        
        if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
-               errno = EPERM;  
+               errno = EBUSY;
                rv = -1;
                goto err;
        }
@@ -450,11 +524,9 @@ err:
        return rv;
 }
 
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
-       if (!fwp_vres_is_valid(vresd)) {
+       if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }