]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/fwp/fwp_vres.c
Removed "descriptor" types
[frescor/fwp.git] / fwp / lib / fwp / fwp_vres.c
index 1bed4c9c5c94ed20b3ae8bbf00396ffdd7c70ff0..32a9eff4f0f75ef0e4d91e5b9184d62c69cd1614 100644 (file)
@@ -59,11 +59,10 @@ static void* fwp_vres_tx_thread(void *_vres);
 typedef enum {
        FWP_VF_USED             = 0,
        FWP_VF_BOUND            = 1,
-       FWP_VF_RESCHED          = 2,
+       FWP_VF_CHANGED          = 2,
 } fwp_vres_flag_t;
 
 fwp_vres_params_t fwp_vres_params_default = {
-       .id = 0,        
        .ac_id = FWP_AC_VO,
         .budget = 100,
        .period = {.tv_sec = 2 , .tv_nsec = 111111}
@@ -180,11 +179,41 @@ static int fwp_vres_ac_open(fwp_ac_t ac_id)
 }
 #endif
 
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
+static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
 {
-       /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0, 
-                       msgb->peer->addr, msgb->peer->addrlen);*/
-       return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
+       struct iovec  iov;
+       struct msghdr msg = {0};
+       ssize_t ret;
+       char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+       iov.iov_base = msgb->data;
+       iov.iov_len = msgb->len;
+
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+
+
+       if (vres->params.src.s_addr != 0) {
+               struct cmsghdr *cmsg;
+               struct in_pktinfo *ipi;
+
+               memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+               msg.msg_control = cmsg_buf;
+               msg.msg_controllen = sizeof(cmsg_buf);
+
+               cmsg = CMSG_FIRSTHDR(&msg);
+
+               cmsg->cmsg_level = SOL_IP;
+               cmsg->cmsg_type = IP_PKTINFO;
+               cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+               ipi->ipi_spec_dst = vres->params.src;
+       }
+       ret = sendmsg(vres->ac_sockd, &msg, 0);
+       return ret;
 }
 
 static inline void fwp_vres_free(fwp_vres_t *vres)
@@ -225,10 +254,8 @@ int fwp_vres_table_init(unsigned int max_vres)
        return 0;
 }
 
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
        return (vres - fwp_vres_table.entry);
 }
 
@@ -237,7 +264,7 @@ fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
  *
  * \return On success returns vres descriptor. 
  */
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
 {
        int i;
        unsigned int max_vres;
@@ -272,7 +299,7 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
        if (!rv)
                return rv;
        memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+       fwp_vres_set_flag(vres, FWP_VF_CHANGED);
 
        return 0;
 }
@@ -280,17 +307,15 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 /**
  * Set vres params
  *
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
  * \param[in] params Vres parameters
  *
  * \return On success returns zero. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 {
-       fwp_vres_t *vres = vresd;
-       
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
@@ -303,13 +328,13 @@ int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
  * Creates new vres
  *
  * \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
  *
  * \return On success returns descriptor of vres. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
 {
        int rv;
        fwp_vres_t *vres;
@@ -323,14 +348,14 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
        fwp_msgq_init(&vres->tx_queue);
        
        memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-       fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+       fwp_vres_set_flag(vres, FWP_VF_CHANGED);
        pthread_attr_init(&vres->tx_thread_attr);
        if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
                            fwp_vres_tx_thread, (void*) vres)) != 0){
                goto err;
        }
 
-       *vresdp = vres;
+       *vresp = vres;
        return 0;
 err:   
        fwp_vres_free(vres);
@@ -340,16 +365,14 @@ err:
 /**
  * Destroys vres
  *
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
  *
  * \return On success returns 0. 
  * On error, negative error code is returned. 
  *
  */
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
 {      
-       fwp_vres_t *vres = vresd;
-
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
@@ -357,7 +380,7 @@ int fwp_vres_destroy(fwp_vres_d_t vresd)
        
        pthread_cancel(vres->tx_thread);
                
-       FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);   
+       FWP_DEBUG("Vres destroyed.\n");
        return  0;
 }
 
@@ -373,7 +396,7 @@ static inline void
 fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
                        fwp_budget_t  *budget)
 {
-       if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {  
+       if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {  
                /*period->tv_nsec = vres->params.period % SEC_TO_USEC;
                period->tv_sec = vres->params.period / SEC_TO_USEC;*/
                *period = vres->params.period;
@@ -381,7 +404,7 @@ fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
                FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
                                "period_nsec=%ld.\n",vres->params.budget, 
                                period->tv_sec, period->tv_nsec);
-               fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+               fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
        }
 }
 
@@ -398,7 +421,7 @@ static void* fwp_vres_tx_thread(void *_vres)
        fwp_budget_t    budget = vres->params.budget;
        fwp_budget_t    curr_budget;
        int             rc;
-       struct timespec start_period, period, interval, current_time;
+       struct timespec start, period, interval, now;
 
        fwp_set_rt_prio(90 - ac_id);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
@@ -407,41 +430,43 @@ static void* fwp_vres_tx_thread(void *_vres)
        
        /* just for sure */
        fwp_vres_sched_update(vres, &period, &budget);
-       clock_gettime(CLOCK_MONOTONIC, &start_period);
-       curr_budget = 0;
+       curr_budget = budget;
+       clock_gettime(CLOCK_MONOTONIC, &start);
 
        while (1) {
-               /* wait for next period and then send */
-               /*clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);*/
-       
-               /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1 
-                       && errno == EINTR) {
-                       continue;
-               }*/
-               
                msgb = fwp_msgq_dequeue(msgq);
-               fwp_vres_sched_update(vres, &period, &budget);  
-               if ((curr_budget + msgb->len) > budget) {
-                       /* need to recharge */
-                       clock_gettime(CLOCK_MONOTONIC, &current_time);
-                       fwp_timespec_sub(&interval, &current_time, &start_period);
-                       fwp_timespec_modulo(&interval, &interval, &period);
+               if (msgb->len > budget) {
+                       FWP_ERROR("Message too large: %zd -> skipping\n",
+                                 msgb->len);
+                       goto skip_message;
+               }
+               if (curr_budget < msgb->len) {
+                       /* needs to replenish */
+                       clock_gettime(CLOCK_MONOTONIC, &now);
+                       fwp_timespec_sub(&interval, &now, &start);
+                       ul_logtrash("start=%ld.%09ld,  now=%ld.%09ld  diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
                        fwp_timespec_sub(&interval, &period, &interval);
-                       nanosleep(&interval, NULL);
-                       curr_budget = 0;
-                       clock_gettime(CLOCK_MONOTONIC, &start_period);
+                       if (interval.tv_sec > 0 ||
+                           (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+                               /* We have to wait to replenish */
+                               ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+                               nanosleep(&interval, NULL);
+                               fwp_timespec_add(&start, &now, &interval);
+                       } else {
+                               start = now;
+                       }
+                       fwp_vres_sched_update(vres, &period, &budget);  
+                       curr_budget = budget;
                }
 
-               rc = _fwp_vres_send(vres->ac_sockd, msgb);
+               rc = _fwp_vres_send(vres, msgb);
                if (!(rc < 0)) {
                        FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                       curr_budget+= msgb->len;
+                       curr_budget -= msgb->len;
                } else {
-                       FWP_DEBUG("Message sent error %d\n",rc);
+                       FWP_ERROR("Message sent error %d\n",rc);
                }
-                       
+       skip_message:                   
                fwp_msgb_free(msgb);
 
                /*pthread_testcancel(); */
@@ -461,22 +486,19 @@ static void* fwp_vres_tx_thread(void *_vres)
        return NULL;
 }
 
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
+int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
 {
-       fwp_vres_t *vres = vresd;
-
        if (fwp_vres_is_valid(vres)) {
                return fwp_msgq_enqueue(&vres->tx_queue, msgb);
        } else {
-               errno = EPERM;  /* TODO: Use more appropriate error than EPERM. */
+               errno = EINVAL;
                return -1;
        }
 }
 
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, int sockd)
 {
-       fwp_vres_t *vres = vresd;
        int rv = 0;
 
        pthread_mutex_lock(&fwp_vres_table.lock);
@@ -487,7 +509,7 @@ int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
        }
        
        if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
-               errno = EPERM;  
+               errno = EBUSY;
                rv = -1;
                goto err;
        }
@@ -502,11 +524,9 @@ err:
        return rv;
 }
 
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
 {
-       fwp_vres_t *vres = vresd;
-       
-       if (!fwp_vres_is_valid(vresd)) {
+       if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }