]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Implemented synchronous and asynchronous sending
authorMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 6 Nov 2009 14:02:15 +0000 (15:02 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 6 Nov 2009 14:02:15 +0000 (15:02 +0100)
The main goal of this big change is to avoid delays caused by CPU
scheduler when rescheduling from application thread to VRES TX thread.
According to our fwp-timing.c experiment, these delays can by even
several milliseconds long.

Now, whenever VRES capacity allows, send operation is invoked directly
from application thread. Only if the budget is insufficient, the
message can be queued for sending later by VRES thread, provided that
asynchronous send was requested. In case of synchronous send, the
application thread is blocked until the budget is replenished.

Besides the above change, the code was cleaned up a lot.

Signed-off-by: Michal Sojka <sojkam1@fel.cvut.cz>
17 files changed:
.topmsg
fwp/lib/frsh_fwp/fwp_fna.c
fwp/lib/frsh_fwp/fwp_fra.c
fwp/lib/fwp/fwp_conf.h
fwp/lib/fwp/fwp_endpoint.c
fwp/lib/fwp/fwp_endpoint.h
fwp/lib/fwp/fwp_msgb.c
fwp/lib/fwp/fwp_msgb.h
fwp/lib/fwp/fwp_msgq.c
fwp/lib/fwp/fwp_msgq.h
fwp/lib/fwp/fwp_vres.c
fwp/lib/fwp/fwp_vres.h
fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test1.c
fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test2.c
fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest1.c
fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest2.c
fwp/tests/timing/fwp-timing.c

diff --git a/.topmsg b/.topmsg
index b268c6a4387b60536014948d26f7142eb3568621..5973efdd1483f5de22102a501b6bdb9d92ac40cf 100644 (file)
--- a/.topmsg
+++ b/.topmsg
@@ -1,10 +1,17 @@
 From: Michal Sojka <sojkam1@fel.cvut.cz>
-Subject: [PATCH] t/fwp-timing
+Subject: Implemented synchronous and asynchronous sending
 
-Application for measure FWP timing properites
+The main goal of this big change is to avoid delays caused by CPU
+scheduler when rescheduling from application thread to VRES TX thread.
+According to our fwp-timing.c experiment, these delays can by even
+several milliseconds long.
 
-TODO: It is necessary to implement synchronous sending in FWP. Without
-this it is possible that FWP vres slows down due to scheduling latencies
-and we then measure constantly higher delays.
+Now, whenever VRES capacity allows, send operation is invoked directly
+from application thread. Only if the budget is insufficient, the
+message can be queued for sending later by VRES thread, provided that
+asynchronous send was requested. In case of synchronous send, the
+application thread is blocked until the budget is replenished.
+
+Besides the above change, the code was cleaned up a lot.
 
 Signed-off-by: Michal Sojka <sojkam1@fel.cvut.cz>
index e26dd26504e43a026f20ae661e6427ff944be0bb..a51e55847d8ace225e8e6e047413aaedc54d7c3e 100644 (file)
@@ -82,7 +82,7 @@ int fwp_fna_send_endpoint_created(fna_endpoint_data_t  *endpoint)
 {
        unsigned int node, port;
        fwp_endpoint_attr_t *attr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
        int rv;
        frsh_send_endpoint_protocol_info_t *spi;
        
@@ -102,7 +102,7 @@ int fwp_fna_receive_endpoint_created(fna_endpoint_data_t  *endpoint)
 {
        unsigned int node,port;
        fwp_endpoint_attr_t *attr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
        int rv;
 
        node = (unsigned int) endpoint->destination;
@@ -123,7 +123,7 @@ int fwp_fna_receive_endpoint_created(fna_endpoint_data_t  *endpoint)
 int fwp_fna_send_endpoint_bind(fna_endpoint_data_t *endpoint, fna_vres_id_t vres)
 {
        return fwp_send_endpoint_bind(endpoint->protocol_info.body, 
-                                       (fwp_vres_d_t) vres->priv);
+                                       (struct fwp_vres *) vres->priv);
 }
 
 int fwp_fna_send_endpoint_unbind(fna_endpoint_data_t *endpoint)
@@ -137,31 +137,22 @@ int fwp_fna_endpoint_destroy(fna_endpoint_data_t  *endpoint)
 }
 
 /** FNA send routine */
-int fwp_fna_send(const fna_endpoint_data_t *endpoint, const void *msg, 
-                       const size_t size)
-{
-       fwp_endpoint_t *fwp_epoint;
-
-       fwp_epoint = endpoint->protocol_info.body;
-       return fwp_send(fwp_epoint, msg, size);
-}
-
 int fwp_fna_send_sync(const fna_endpoint_data_t *endpoint, const void *msg, 
                        const size_t size)
 {
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
        fwp_epoint = endpoint->protocol_info.body;
-       return fwp_send(fwp_epoint, msg, size);
+       return fwp_send_sync(fwp_epoint, msg, size);
 }
 
 int fwp_fna_send_async(const fna_endpoint_data_t *endpoint,const void *msg,
                   const size_t size)
 {
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
-       fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;    
-       return fwp_send(fwp_epoint, msg, size); /* FIXME */
+       fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;       
+       return fwp_send_async(fwp_epoint, msg, size);
 }
 
 /** FNA receive routines */
@@ -171,10 +162,10 @@ int fwp_fna_receive(const fna_endpoint_data_t *endpoint,
 {
        unsigned int from_addr;
        ssize_t len;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
        int flags = 0;
        
-       fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+       fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
        len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
        if (len < 0) 
                return len;
@@ -191,10 +182,10 @@ int fwp_fna_receive_sync(const fna_endpoint_data_t *endpoint, void *buffer,
 {
        unsigned int from_addr;
        ssize_t len;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
        int flags = 0;
        
-       fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+       fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
        len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
        if (len < 0) 
                return len;
@@ -213,10 +204,10 @@ int fwp_fna_receive_async(const fna_endpoint_data_t *endpoint, void *buffer,
 {
        unsigned int from_addr;
        ssize_t len;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
        int flags = 0;
        
-       fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+       fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
        len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
        if (len < 0) 
                return len;
@@ -250,7 +241,7 @@ fna_operations_t fwp_fna_operations = {
     .fna_resource_get_capacity = NULL,
     .fna_resource_get_total_weight = NULL,
     .fna_vres_decrease_capacity = NULL,
-    .fna_send_sync = NULL,
+    .fna_send_sync = fwp_fna_send_sync,
     .fna_send_async = fwp_fna_send_async,
     .fna_receive_sync = fwp_fna_receive_sync,
     .fna_receive_async = fwp_fna_receive_async,
index 813cd5e5bc9a8852adc1a8cdf98e4296dc4028b2..f6e15c4bdb7070c60195e5113d48a6f71ccedb1b 100644 (file)
@@ -66,7 +66,7 @@ static int create_vres(fres_vres_t *vres, void *priv)
        fres_block_fwp_sched *fwp_sched;
        fres_block_fwp *fwp;
        fwp_vres_params_t vparams = {0};
-       fwp_vres_d_t      fwp_vresd = {0};
+       struct fwp_vres  *fwp_vres = NULL;
        int rv;
        size_t bytes;
        char src[21] = "N/A";
@@ -88,10 +88,10 @@ static int create_vres(fres_vres_t *vres, void *priv)
                snprintf(src, sizeof(src), "%s", inet_ntoa(vparams.src));
        }
        /* Create vres */
-       if ((rv = fwp_vres_create(&vparams, &fwp_vresd))) {
+       if ((rv = fwp_vres_create(&vparams, &fwp_vres))) {
                return  rv;
        }
-       vres->priv = fwp_vresd;
+       vres->priv = fwp_vres;
 
        fres_contract_id_to_string(id, &vres->id, sizeof(id));
        ul_logmsg("Creating FWP VRes (id=%s, period=%ld ms, budget=%ld bytes AC=%d src=%s)\n",
@@ -103,12 +103,12 @@ static int create_vres(fres_vres_t *vres, void *priv)
 
 static int cancel_vres(fres_vres_t *vres, void *priv)
 {
-       fwp_vres_d_t    fwp_vresd;
+       struct fwp_vres *       fwp_vres;
        fres_block_basic *basic;
        char id[40];
 
-       fwp_vresd = (fwp_vres_d_t) vres->priv;
-       fwp_vres_destroy(fwp_vresd);
+       fwp_vres = (struct fwp_vres *) vres->priv;
+       fwp_vres_destroy(fwp_vres);
 
        basic = fres_contract_get_basic(vres->allocated);
        fres_contract_id_to_string(id, &vres->id, sizeof(id));
@@ -124,7 +124,7 @@ int change_vres(fres_vres_t *vres, void *priv)
        fres_block_basic *basic;
        fres_block_fwp_sched *fwp_sched;
        fwp_vres_params_t vparams;
-       fwp_vres_d_t      fwp_vresd;
+       struct fwp_vres *fwp_vres;
        int rv;
        size_t bytes;
        
@@ -136,10 +136,10 @@ int change_vres(fres_vres_t *vres, void *priv)
        vparams.budget = bytes;
        vparams.period = basic->period;
        vparams.ac_id = fwp_sched->ac_id;
-       fwp_vresd = vres->priv;
+       fwp_vres = vres->priv;
 
        /* Changing vres */
-       if ((rv = fwp_vres_set_params(fwp_vresd, &vparams))) {
+       if ((rv = fwp_vres_set_params(fwp_vres, &vparams))) {
                return  rv;
        }
 
index ba1331356ef5d0741d28f7a21e7facb4aa630eea..bc2a9ed4511cfba8742b4ef4b2e0a2b4ee4d1bfa 100644 (file)
@@ -47,7 +47,7 @@
 #define _FWP_CONF_H
 
 #define FWP_AC_NUM     4
-#define FWP_MSGQ_SIZE 2048 
+#define FWP_MSGQ_SIZE (1<<11)  /* Must be power of 2 */
 
 #define FWP_MY_ADDR_DEFAULT    "127.0.0.1"
 #define FWP_MNGR_ADDR_DEFAULT  "127.0.0.1"
index 0531284cbfe763c0e39f39b5a40ac4beb8848195..56964aadb8745ac0edb7247d858de500324c6acc 100644 (file)
 #include <unistd.h>
 #include <netinet/in.h>
 #include "fwp_utils.h"
+#include "fwp_vres.h"
+#include <frsh_error.h>
 
 #include <pthread.h>
 #include "fwp_debug.h"
+#include "fwp_msgq.h"
 
 typedef unsigned int fwp_endpoint_id_t;
 
@@ -95,6 +98,9 @@ struct fwp_endpoint{
        fd_set                  fdset;
        /** specific operation options*/
        int                     flags;  
+       /** Forced source address. If non-zero, packets are sent over
+        * the specified interface. */
+       struct in_addr  src;
 };
 
 /**
@@ -104,9 +110,9 @@ struct fwp_endpoint{
  * On error, NULL is returned. 
  *
  */
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static struct fwp_endpoint* fwp_endpoint_alloc()
 {
-       return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
+       return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
 }
 
 /**
@@ -116,7 +122,7 @@ static fwp_endpoint_t* fwp_endpoint_alloc()
  * On error, NULL is returned. 
  *
  */
-static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
+static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
 {
        free(endpoint);
 }
@@ -128,11 +134,11 @@ static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
  * \return On success 0 is returned. 
  * On error, negative error value is returned and errno is set appropriately. 
  */
-int fwp_endpoint_destroy(fwp_endpoint_t *ep)
+int fwp_endpoint_destroy(struct fwp_endpoint *ep)
 {
        if (ep->sockd > 0) 
                close(ep->sockd);
-       
+
        fwp_endpoint_free(ep);  
        return 0;
 }
@@ -147,7 +153,7 @@ int fwp_endpoint_destroy(fwp_endpoint_t *ep)
  * \return On success 0 is returned. 
  * On error, negative error value is returned. 
  */
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node, 
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, 
                                unsigned int *port, fwp_endpoint_attr_t *attr)
 {
        if (node) *node = ep->node;
@@ -179,10 +185,10 @@ int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
 int fwp_send_endpoint_create(unsigned int node,
                                unsigned int port, 
                                fwp_endpoint_attr_t *attr,
-                               fwp_endpoint_t **epoint)
+                               struct fwp_endpoint **epoint)
 {      
        struct sockaddr_in *addr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
        fwp_epoint = fwp_endpoint_alloc();      
         if (!fwp_epoint) {
@@ -272,10 +278,10 @@ err:
  */
 int fwp_receive_endpoint_create(unsigned int port,
                                fwp_endpoint_attr_t *attr,
-                               fwp_endpoint_t **epp)
+                               struct fwp_endpoint **epp)
 {
        struct sockaddr_in *addr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
        fwp_epoint = fwp_endpoint_alloc();      
         if (!fwp_epoint) {
@@ -383,18 +389,16 @@ err:
  *
  * \return On success returns 0. On error, -1 and errno is set appropriately.
  */
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
 {
        int rv = 0;
-#ifndef FWP_WITHOUT_CONTNEGT
+
+       if (ep->vres)
+               return FRSH_ERR_ALREADY_BOUND;
        
        ep->vres = vres;
-       rv = fwp_vres_bind(vres, ep->sockd);
-#endif
-       /* if send endpoint is already bound 
-       if (epoint->type == FWP_EPOINT_BOUND) {  
-               fwp_send_endpoint_unbind(epoint);
-       }*/
+       rv = fwp_vres_bind(vres, ep, ep->sockd);
+
        return rv;
 }
 
@@ -405,7 +409,7 @@ int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
 {
        int rv = 0;
 
@@ -424,10 +428,10 @@ int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
  * On success, it returns zero.  
  *
  */
-static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
 {
        int csockd;
-//     fwp_endpoint_t *fwp_epoint = epointd;
+//     struct fwp_endpoint *fwp_epoint = epointd;
        fwp_sockaddr_t  peer;
        int i;
 
@@ -467,7 +471,7 @@ static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer, 
+int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer, 
                        size_t buffer_size)
 {
        fwp_sockaddr_t *peer = &ep->peer;
@@ -516,7 +520,7 @@ int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer,
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
                        void *buffer, const size_t buffer_size,
                        unsigned int *from, int flags)
 {
@@ -561,6 +565,53 @@ ssize_t fwp_recv(fwp_endpoint_t *ep,
                }
        }
 }
+/** 
+ * Physically send the message.
+ *
+ * This function should be called either by fwp_send_sync()/async() of
+ * by VRES to send delayed messaged.
+ * 
+ * @param ep 
+ * @param data 
+ * @param size 
+ * 
+ * @return 
+ */
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+                            void *data, const size_t size)
+{
+       struct iovec  iov;
+       struct msghdr msg = {0};
+       ssize_t ret;
+       char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+       iov.iov_base = data;
+       iov.iov_len = size;
+
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+       if (ep->src.s_addr != 0) {
+               struct cmsghdr *cmsg;
+               struct in_pktinfo *ipi;
+
+               memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+               msg.msg_control = cmsg_buf;
+               msg.msg_controllen = sizeof(cmsg_buf);
+
+               cmsg = CMSG_FIRSTHDR(&msg);
+
+               cmsg->cmsg_level = SOL_IP;
+               cmsg->cmsg_type = IP_PKTINFO;
+               cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+               ipi->ipi_spec_dst = ep->src;
+       }
+       ret = sendmsg(ep->sockd, &msg, 0);
+       return ret;
+}
 
 /**
  * Sends message through vres
@@ -574,42 +625,31 @@ ssize_t fwp_recv(fwp_endpoint_t *ep,
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send(fwp_endpoint_t *ep,const void *msg, const size_t size)
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, size_t size)
 {
-       struct fwp_msgb *msgb;
-       
-/*     if (!fwp_endpoint_is_valid(epointd)){
-               errno = EINVAL;
-               return -1;
-       }
-       if (!fwp_endpoint_is_bound(epointd)){
-               errno = EPERM;
-               return -1;
-       }*/
+       int ret;
 
-       /*if (flags && MSG_DONTWAIT) 
-               msgb = fwp_msgb_alloc(buffer_size);
-       else {*/
-               if (!(msgb = fwp_msgb_alloc(size))) {
-                       errno = ENOMEM;
-                       return -1;
-               }
+       if (!ep->vres)
+               return FRSH_ERR_NOT_BOUND;
 
-               /*msgb->peer = &ep->peer;*/
-               /*msgb->data = msg;*/
-               /*msgb->flags = epoint->flags;*/
-               
-               /* data must be copied since msg may change while
-                * message is waiting in transmission queue 
-                * */
-               memcpy(msgb->data, msg, size);
-               fwp_msgb_put(msgb, size);
-               /*msgb->tail = msgb->data + size;
-               msgb->len = size;*/
-
-       /*}*/
-
-       /* TODO: test whether _fwp_vres_send is successful */
-       return fwp_vres_send(ep->vres, msgb);
+       if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
+               ret = fwp_endpoint_do_send(ep, msg, size);
+       else
+               ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
+       return ret;
+}
+
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, size_t size)
+{
+       int ret;
+
+       if (!ep->vres)
+               return FRSH_ERR_NOT_BOUND;
+
+       ret = fwp_vres_consume_budget(ep->vres, size, true);
+       if (ret)
+               return ret;
+       ret = fwp_endpoint_do_send(ep, msg, size);
+       return ret;
 }
 
index 18358952bc79b8a65719f54cf238dae826826ecd..8ddee25b81add6a89d04b603aa67c80fdb0293a3 100644 (file)
@@ -54,7 +54,6 @@ typedef enum {
 } fwp_endpoint_reliability_t;
 
 struct fwp_endpoint;
-typedef struct fwp_endpoint fwp_endpoint_t;
 
 typedef unsigned int fwp_addr_t;
 
@@ -71,23 +70,26 @@ struct fwp_endpoint_attr {
 
 #include "fwp_vres.h"
 
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node, 
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, 
                                unsigned int *port, fwp_endpoint_attr_t *attr);
 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
                                fwp_endpoint_attr_t *attr, 
-                               fwp_endpoint_t **epoint);
+                               struct fwp_endpoint **epoint);
 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
                                fwp_endpoint_attr_t *attr, 
-                               fwp_endpoint_t **epoint);
-int fwp_endpoint_destroy(fwp_endpoint_t *ep);
+                               struct fwp_endpoint **epoint);
+int fwp_endpoint_destroy(struct fwp_endpoint *ep);
 
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres);
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep);
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres);
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep);
 
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
                        void *buffer, const size_t buffer_size,
                        unsigned int *from, int flags);
-int fwp_send(fwp_endpoint_t *ep, const void *msg, const size_t size);
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, const size_t size);
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, const size_t size);
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+                            void *data, const size_t size);
 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr);
 
 static inline int fwp_endpoint_attr_setreliability(fwp_endpoint_attr_t *attr, 
index c5193c76f75df22b9193ee9ffb70db113ac10be3..5aa9bb6f943229ea73ecbf66ce4feab9ce3a4eea 100644 (file)
@@ -77,7 +77,6 @@ struct fwp_msgb* fwp_msgb_alloc(size_t buf_size)
        msgb->len = 0;
        msgb->data = (unsigned char*) msgb + sizeof(struct fwp_msgb);
        msgb->tail = msgb->data;
-       msgb->peer = NULL;
        
        return msgb;
 }
index 95b683d4ae4ae82770084e2bf06bba05939047d6..f57a4b77667546339dfa8b0c7736ae0ca6f11a45 100644 (file)
@@ -67,8 +67,6 @@ struct fwp_msgb {
        size_t                  len;            /**< msg data length*/
        unsigned char           *data;          /**< msg data */
        unsigned char           *tail;          /**< msg data end*/
-       struct fwp_sockaddr     *peer;          /**< peer address*/
-       /*int flags;  MSG_DONTWAIT for async*/
 } fwp_msgb_t;
 
 struct fwp_msgb* fwp_msgb_alloc(size_t buf_size);
index 0fcc73d0f4d4bc607026e2d6c18bfb8d97cbbaa5..1af89bdfe2e5363059f51308d96851629d570594 100644 (file)
@@ -124,6 +124,21 @@ struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq)
        return msgb;
 } 
 
+struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq)
+{
+       struct fwp_msgb* msgb;
+       
+       pthread_mutex_lock(&msgq->lock);
+
+       if (msgq->nr_pending > 0)
+               msgb = msgq->queue[msgq->out];
+       else
+               msgb = NULL;
+       pthread_mutex_unlock(&msgq->lock);
+
+       return msgb;
+} 
+
 /*
  * Dequeue all messages from message queue
  *
index 5ab0ae0b9156f66b3835e7dbb3f0a08dc17c0dca..d30a048962bac83cc97d743623c2178082d6fb40 100644 (file)
@@ -73,6 +73,7 @@ void fwp_msgq_init(struct fwp_msgq *msgq);
 int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb* msgb);
 
 struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq);
+struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq);
 void fwp_msgq_dequeue_all(struct fwp_msgq *msgq);
 
 /* void fwp_msgq_setpolicy */
index 9a203e224f7d6d71bb8826260af1e09f842296ca..1177a4eb1ffa5b931f1cbe0c983ec605ced15a1e 100644 (file)
@@ -1,6 +1,6 @@
 /**************************************************************************/
 /* ---------------------------------------------------------------------- */
-/* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                */
+/* Copyright (C) 2006 - 2009 FRESCOR consortium partners:                */
 /*                                                                       */
 /*   Universidad de Cantabria,              SPAIN                        */
 /*   University of York,                    UK                           */
@@ -43,6 +43,7 @@
 /* however invalidate any other reasons why the executable file might be  */
 /* covered by the GNU Public License.                                    */
 /**************************************************************************/
+
 #include "fwp_utils.h"
 #include "fwp_vres.h"
 
 static void* fwp_vres_tx_thread(void *_vres);
 
 typedef enum {
-       FWP_VF_USED             = 0,
-       FWP_VF_BOUND            = 1,
+       USED,
+       UNTOUCHED,
+       CHANGED,
+       QUEUED,
 } fwp_vres_flag_t;
 
 fwp_vres_params_t fwp_vres_params_default = {
        .ac_id = FWP_AC_VO,
         .budget = 100,
-       .period = {.tv_sec = 2 , .tv_nsec = 111111}
+       .period = {.tv_sec = 2 , .tv_nsec = 111111},
+       .src = { 0 },
 };
 
 /**
@@ -74,15 +78,22 @@ fwp_vres_params_t fwp_vres_params_default = {
  */
 struct fwp_vres{
        struct fwp_vres_params          params;
-       /* consideration: move tx_queue to endpoint */
-       /**< queue for messages to send */
-       struct fwp_msgq                 tx_queue;   
        fwp_vres_flag_t                 flags;
+       pthread_mutex_t                 mutex;
+       pthread_cond_t                  cond; /**< Signalizes budget replenishment */
+       fwp_budget_t                    budget; /**< Current remaining budget */
+       fwp_period_t                    period; /**< Period for this "activation" */
+       struct timespec                 replenish_at; /**< Time of next replenishment */
+       sem_t                           consumed;
        /**< endpoint bounded to this vres */
-       /*fwp_endpoint_t                *epoint; */
+       struct fwp_endpoint             *epoint;
        pthread_t                       tx_thread; /**< tx_thread id*/
        pthread_attr_t                  tx_thread_attr;
-       int                             ac_sockd;  /**< ac socket descriptor */
+       /** Copy of bound enpoint's socket - used for future changes
+        * of vres parameters. */
+       int                             ac_sockd;
+       /** Queue for messages to send */
+       struct fwp_msgq                 msg_queue;   
 };
 
 typedef
@@ -127,97 +138,30 @@ static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id)
        return 0;
 }
 
-static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        vres->flags |= (1 << flag);
 }
 
-static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        vres->flags &= ~(1 << flag);
 }
 
-static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
 {
        return !!(vres->flags & (1 << flag));
 }
 
-static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
+static inline void clear_all_flags(fwp_vres_t *vres)
 {
        vres->flags = 0;
 }
 
-#if 0
-/* Deprecated */
-static int fwp_vres_ac_open(fwp_ac_t ac_id) 
-{
-       int sockd;
-       unsigned int tos;
-       
-       if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
-               errno = EINVAL;
-               return -1;
-       }
-
-       if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
-               FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
-               return (-1);
-       }
-
-       tos = ac_to_tos[ac_id];
-       
-       if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
-               int e = errno;
-               FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
-               close(sockfd);
-               errno = e;
-               return -1;
-       }
-       
-       return sockd;
-}
-#endif
-
-static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
-       struct iovec  iov;
-       struct msghdr msg = {0};
-       ssize_t ret;
-       char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
-
-       iov.iov_base = msgb->data;
-       iov.iov_len = msgb->len;
-
-       msg.msg_iov = &iov;
-       msg.msg_iovlen = 1;
-
-
-
-       if (vres->params.src.s_addr != 0) {
-               struct cmsghdr *cmsg;
-               struct in_pktinfo *ipi;
-
-               memset(cmsg_buf, 0, sizeof(cmsg_buf));
-
-               msg.msg_control = cmsg_buf;
-               msg.msg_controllen = sizeof(cmsg_buf);
-
-               cmsg = CMSG_FIRSTHDR(&msg);
-
-               cmsg->cmsg_level = SOL_IP;
-               cmsg->cmsg_type = IP_PKTINFO;
-               cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
-
-               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
-               ipi->ipi_spec_dst = vres->params.src;
-       }
-       ret = sendmsg(vres->ac_sockd, &msg, 0);
-       return ret;
-}
-
 static inline void fwp_vres_free(fwp_vres_t *vres)
 {
-       fwp_vres_clearall_flag(vres);
+       /* Clear USED flag */
+       clear_all_flags(vres);
 }
 
 static inline int fwp_vres_is_valid(fwp_vres_t *vres)
@@ -225,7 +169,7 @@ static inline int fwp_vres_is_valid(fwp_vres_t *vres)
        int id  = vres - fwp_vres_table.entry;
 
        if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || 
-               (!fwp_vres_get_flag(vres, FWP_VF_USED))) 
+               (!get_flag(vres, USED))) 
                return 0;
        
        return 1; 
@@ -273,7 +217,7 @@ fwp_vres_t *fwp_vres_alloc()
        i = 0;
        max_vres = fwp_vres_table.max_vres;
        while ((i < max_vres) && 
-               (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
+               (get_flag(&fwp_vres_table.entry[i], USED))) {
                i++;
        }
        
@@ -284,22 +228,22 @@ fwp_vres_t *fwp_vres_alloc()
        }
 
        FWP_DEBUG("Allocated vres id = %d\n",i);
-       fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
+       set_flag(&fwp_vres_table.entry[i], USED);
        pthread_mutex_unlock(&fwp_vres_table.lock);
        return (&fwp_vres_table.entry[i]);
 }
 
-inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
+static int apply_params(fwp_vres_t *vres)
 {
        int rv;
-
-       /* copy vres paramters into vres structure */
-       rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
-       if (!rv)
-               return rv;
-       memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-
-       return 0;
+       vres->period = vres->params.period;
+       vres->budget = vres->params.budget;
+       set_flag(vres, UNTOUCHED);
+       if (get_flag(vres, CHANGED)) {
+               clear_flag(vres, CHANGED);
+               rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+       }
+       return rv;
 }
 
 /**
@@ -314,12 +258,30 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
  */
 int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
 {
+       int rv = 0;
+
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                return -1;
        }
 
-       return _fwp_vres_set_params(vres, params);
+       pthread_mutex_lock(&vres->mutex);
+
+       if (vres->epoint &&
+           params->src.s_addr != vres->params.src.s_addr) {
+               errno = EREMCHG;
+               rv = -1;
+               goto out;
+       }
+       vres->params = *params;
+       if (vres->epoint) {
+               set_flag(vres, CHANGED);
+               if (get_flag(vres, UNTOUCHED))
+                       rv = apply_params(vres);
+       }
+out:
+       pthread_mutex_unlock(&vres->mutex);
+       return rv;
 }
 
 /**
@@ -342,10 +304,18 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
                errno = ENOMEM;
                return -1;
        }
-       /* initialize msg queue */
-       fwp_msgq_init(&vres->tx_queue);
+
+       pthread_mutexattr_t ma;
+       rv = pthread_mutexattr_init(&ma);
+       rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
+       if (rv) return rv;
+       pthread_mutex_init(&vres->mutex, &ma);
+       pthread_cond_init(&vres->cond, NULL);
        
-       memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
+       vres->params = *params;
+       apply_params(vres);
+       fwp_msgq_init(&vres->msg_queue);
+
        pthread_attr_init(&vres->tx_thread_attr);
        if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, 
                            fwp_vres_tx_thread, (void*) vres)) != 0){
@@ -353,8 +323,10 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
        }
 
        *vresp = vres;
+       
        return 0;
 err:   
+       fwp_msgq_dequeue_all(&vres->msg_queue);
        fwp_vres_free(vres);
        return -1; 
 }
@@ -376,25 +348,145 @@ int fwp_vres_destroy(fwp_vres_t *vres)
        }
        
        pthread_cancel(vres->tx_thread);
+       pthread_cond_destroy(&vres->cond);
+       pthread_mutex_destroy(&vres->mutex);
                
+       fwp_msgq_dequeue_all(&vres->msg_queue);
+       fwp_vres_free(vres);
+       
        FWP_DEBUG("Vres destroyed.\n");
        return  0;
 }
 
-static void fwp_vres_cleanup(void *_vres)
+static void do_consume_budget(struct fwp_vres *vres, size_t size)
 {
-       fwp_vres_t *vres = (fwp_vres_t*)_vres;
+       if (get_flag(vres, UNTOUCHED)) {
+               /* Setup next replenish time */
+               struct timespec now;
+               clear_flag(vres, UNTOUCHED);
+               if (get_flag(vres, QUEUED))
+                       now = vres->replenish_at;
+               else
+                       clock_gettime(CLOCK_MONOTONIC, &now);
+               fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
+               sem_post(&vres->consumed);
+       }
+       vres->budget -= size;
+}
 
-       fwp_msgq_dequeue_all(&vres->tx_queue);
-       fwp_vres_free(vres);
+int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+       int ret = 0;
+       if (vres->params.budget < size) {
+               errno = ENOSR;
+               return -1;
+       }
+       while (can_block && vres->budget < size) {
+               ret = pthread_cond_wait(&vres->cond, &vres->mutex);
+               /* The budget might have been changed while we were
+                * waiting, so check it again. */
+               if (vres->params.budget < size) {
+                       errno = ENOSR;
+                       return -1;
+               }
+       }
+       if (ret == 0) {
+               if (vres->budget >= size) {
+                       do_consume_budget(vres, size);
+                       ret = 0;
+               } else {
+                       set_flag(vres, QUEUED);
+                       ret = 1;
+               }
+       }
+       return ret;
+}
+
+/** 
+ * Tries to consume (a part of) budget
+ * 
+ * @param vres VRES whose budget is conumed.
+ * @param size How much to consume (in bytes).
+ * @param can_block True, indicates that the function can block to
+ * wait for budget replenishment. False causes no blocking and if 1 is
+ * returned, the calles must call fwp_vres_enqueue() to enqueue the
+ * packet to be sent later after replenishing.
+ * 
+ * @return Zero if budget was consumed, 1 if there is not enough
+ * budget available and blocking was not allowed, -1 in case of error.
+ */
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+       int ret = 0;
+       pthread_mutex_lock(&vres->mutex);
+       ret = __consume_budget(vres, size, can_block);
+       pthread_mutex_unlock(&vres->mutex);
+       return ret;
+}
+
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size)
+{
+       struct fwp_msgb *msgb;
+       int ret;
+       
+       if (!(msgb = fwp_msgb_alloc(size)))
+               return -1;
+       memcpy(msgb->data, msg, size);
+       fwp_msgb_put(msgb, size);
+       ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
+       if (ret) {
+               fwp_msgb_free(msgb);
+               return ret;
+       }
+       return ret;
 }
 
-static inline void 
-fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, 
-                       fwp_budget_t  *budget)
+static void wait_for_replenish(struct fwp_vres *vres)
 {
-       *period = vres->params.period;
-       *budget = vres->params.budget; 
+       sem_wait(&vres->consumed);
+       clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
+                       &vres->replenish_at, NULL);
+}
+
+static void send_queue(struct fwp_vres *vres)
+{
+       struct fwp_msgb *msgb;
+       bool can_send;
+       msgb = fwp_msgq_dequeue(&vres->msg_queue);
+       can_send = (0 == __consume_budget(vres, msgb->len, false));
+       if (!can_send) {
+               fwp_msgb_free(msgb);
+               FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
+               return;
+       }
+       /* If we cannot send the whole queue, the flag will be set
+        * later by __consume_budget(). */
+       clear_flag(vres, QUEUED);
+
+       while (msgb) {
+               fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
+               fwp_msgb_free(msgb);
+               msgb = fwp_msgq_peek(&vres->msg_queue);
+               if (msgb) {
+                       can_send = (0 == __consume_budget(vres, msgb->len, false));
+                       if (can_send) {
+                               msgb = fwp_msgq_dequeue(&vres->msg_queue);
+                       } else {
+                               msgb = NULL;
+                               return;
+                       }
+               }
+       }
+}
+
+static void replenish(struct fwp_vres *vres)
+{
+       pthread_mutex_lock(&vres->mutex);
+       apply_params(vres);
+       if (get_flag(vres, QUEUED))
+               send_queue(vres);
+       pthread_cond_broadcast(&vres->cond);
+       pthread_mutex_unlock(&vres->mutex);
 }
 
 /**
@@ -404,112 +496,45 @@ fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
 static void* fwp_vres_tx_thread(void *_vres)
 {
        struct fwp_vres *vres = (struct fwp_vres*)_vres;
-       struct fwp_msgq *msgq = &vres->tx_queue;
-       struct fwp_msgb *msgb = NULL;
        unsigned int    ac_id = vres->params.ac_id;
-       fwp_budget_t    budget = vres->params.budget;
-       fwp_budget_t    curr_budget;
-       int             rc;
-       struct timespec start, period, interval, now;
 
        fwp_set_rt_prio(90 - ac_id);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
-       pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);       
-       pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+       pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);   
        
-       /* just for sure */
-       fwp_vres_sched_update(vres, &period, &budget);
-       curr_budget = budget;
-       clock_gettime(CLOCK_MONOTONIC, &start);
-
        while (1) {
-               msgb = fwp_msgq_dequeue(msgq);
-               if (msgb->len > budget) {
-                       FWP_ERROR("Message too large: %zd -> skipping\n",
-                                 msgb->len);
-                       goto skip_message;
-               }
-               if (curr_budget < msgb->len) {
-                       /* needs to replenish */
-                       clock_gettime(CLOCK_MONOTONIC, &now);
-                       fwp_timespec_sub(&interval, &now, &start);
-                       ul_logtrash("start=%ld.%09ld,  now=%ld.%09ld  diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
-                       fwp_timespec_sub(&interval, &period, &interval);
-                       if (interval.tv_sec > 0 ||
-                           (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
-                               /* We have to wait to replenish */
-                               ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
-                               nanosleep(&interval, NULL);
-                               fwp_timespec_add(&start, &now, &interval);
-                       } else {
-                               start = now;
-                       }
-                       fwp_vres_sched_update(vres, &period, &budget);  
-                       curr_budget = budget;
-               }
-
-               rc = _fwp_vres_send(vres, msgb);
-               if (!(rc < 0)) {
-                       FWP_DEBUG("Message sent through AC%d\n",ac_id);
-                       curr_budget -= msgb->len;
-               } else {
-                       FWP_ERROR("Message sent error %d\n",rc);
-               }
-       skip_message:                   
-               fwp_msgb_free(msgb);
-
-               /*pthread_testcancel(); */
-               /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);     
-               
-               fwp_timespec_add(&end_period, &start_period, &period);
-               clock_gettime(CLOCK_MONOTONIC, &current_time);
-               fwp_timespec_sub(&interval, &end_period, &current_time);
-               nanosleep(&interval, NULL);
-               */
+               wait_for_replenish(vres);
+               pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+               replenish(vres);
+               pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
        }
-       
-       /* it should normaly never come here */ 
-       pthread_cleanup_pop(0); 
-       fwp_vres_free(vres);
-       
-       return NULL;
-}
 
-int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
-       if (fwp_vres_is_valid(vres)) {
-               return fwp_msgq_enqueue(&vres->tx_queue, msgb);
-       } else {
-               errno = EINVAL;
-               return -1;
-       }
+       return NULL;
 }
 
-/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_t *vres, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd)
 {
        int rv = 0;
 
-       pthread_mutex_lock(&fwp_vres_table.lock);
        if (!fwp_vres_is_valid(vres)) {
                errno = EINVAL;
                rv = -1;
                goto err;
        }
        
-       if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
+       if (vres->epoint) { /*if already bounded */
                errno = EBUSY;
                rv = -1;
                goto err;
        }
 
        vres->ac_sockd = sockd;
-       rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
-       /*if (rv)
-               goto err;*/
-       fwp_vres_set_flag(vres, FWP_VF_BOUND);
+       rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+       if (rv)
+               goto err;
+       vres->epoint = ep;
 err:
-       pthread_mutex_unlock(&fwp_vres_table.lock);
        return rv;
 }
 
@@ -519,8 +544,10 @@ int fwp_vres_unbind(fwp_vres_t *vres)
                errno = EINVAL;
                return -1;
        }
-       fwp_vres_clear_flag(vres, FWP_VF_BOUND);
+       pthread_mutex_lock(&vres->mutex);
+       vres->epoint = NULL;
+       pthread_mutex_unlock(&vres->mutex);
        /* TODO: consider what to do with pending messages */
-       /* fwp_vres_free_msgb(vres->tx_queue); */
+       fwp_msgq_dequeue_all(&vres->msg_queue);
        return 0;
 }
index 65bfd9177feae7802707ca0a5b7cfa6b8d76259a..4d36ffb1b4c351ed223bfe4e4b9cbaff50a2b0fd 100644 (file)
@@ -47,6 +47,7 @@
 #define _FWP_VRES_H
 
 #include <netinet/in.h>
+#include <stdbool.h>
 
 struct fwp_vres;
 typedef struct fwp_vres fwp_vres_t;
@@ -81,8 +82,6 @@ struct fwp_vres_params {
        /** all time units are in microseconds */       
        fwp_period_t    period; 
        fwp_ac_t        ac_id;          /**< AC id ~ priority of vres */
-       /** Forced source address. If non-zero, packets are sent over
-        * the specified interface. */
        struct in_addr  src;
 } fwp_vres_params_t;
 
@@ -94,8 +93,10 @@ int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params);
 int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp);
 int fwp_vres_destroy(fwp_vres_t *vres);
 
-int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb);
-int fwp_vres_bind(fwp_vres_t *vres, int sockd);
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block);
+struct fwp_endpoint;
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size);
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd);
 int fwp_vres_unbind(fwp_vres_t *vres);
 
 extern fwp_vres_params_t fwp_vres_params_default;
index d32bd338aaa2ffca50843691f884babe34a7222e..ff7f062467a63adb31b96b09cfe70c494f0902bb 100644 (file)
@@ -28,7 +28,7 @@ int main()
        char msg1[] = "Hello1";
        char msg2[] = "Hello2";
        char buffer[30];
-       fwp_endpoint_t *sepoint1, *sepoint2, *repoint1, *repoint2;
+       struct fwp_endpoint *sepoint1, *sepoint2, *repoint1, *repoint2;
        fwp_endpoint_attr_t attr;
        fwp_addr_t from;
        
@@ -85,8 +85,8 @@ int main()
        printf("Send endpoint 2 created\n");
        fwp_send_endpoint_bind(sepoint2, vres2);
        
-       fwp_send(sepoint1, msg1, sizeof(msg1));
-       fwp_send(sepoint1, msg2, sizeof(msg2));
+       fwp_send_sync(sepoint1, msg1, sizeof(msg1));
+       fwp_send_sync(sepoint1, msg2, sizeof(msg2));
 
        for (i = 0; i < 2; i++) {
                if ((len = fwp_recv(repoint1, buffer, sizeof(buffer), &from, 
index 3e5c5b1d44f82730fe4737c103cf6c0b35683e0a..c42de6a0088e0ef3d5b1c2b3670fbe4559840608 100644 (file)
@@ -25,7 +25,7 @@ fwp_endpoint_attr_t  attr;
 
 void* receiver(void* arg)
 {
-       fwp_endpoint_t *repoint1;
+       struct fwp_endpoint *repoint1;
        int i,len;
        char buffer[30];
        fwp_addr_t from;
@@ -60,7 +60,7 @@ int main()
        struct fwp_vres_params vparam1, vparam2;
        char msg1[] = "Hello1";
        char msg2[] = "Hello2";
-       fwp_endpoint_t *sepoint1;
+       struct fwp_endpoint *sepoint1;
        pthread_t thread;
        
        vparam1.ac_id = FWP_AC_VO; 
@@ -97,9 +97,9 @@ int main()
        printf("Send endpoint 1 created\n");
        fwp_send_endpoint_bind(sepoint1, vres1);
        
-       fwp_send(sepoint1, msg1, sizeof(msg1));
+       fwp_send_sync(sepoint1, msg1, sizeof(msg1));
        printf("Sent msg1\n");
-       fwp_send(sepoint1, msg2, sizeof(msg2));
+       fwp_send_sync(sepoint1, msg2, sizeof(msg2));
        printf("Sent msg2\n");
        
        pthread_join(thread, (void**) NULL);    
index 27a2395025660824c054e1d41391d39e32fa0bc7..fc0faab2fcd62af389077111e747fb1bfade2099 100644 (file)
@@ -29,7 +29,7 @@ int main()
        struct fwp_vres_params vparam1;
        char msg1[15];
        char buffer[30];
-       fwp_endpoint_t *sepoint, *repoint;
+       struct fwp_endpoint *sepoint, *repoint;
        int count;
        struct timespec  sendtime;
        fwp_endpoint_attr_t attr;
@@ -76,7 +76,7 @@ int main()
        
        for (count = 0; count < NUM; count++) { 
                sprintf(msg1,"msg%d",count);
-               fwp_send(sepoint, msg1, sizeof(msg1));
+               fwp_send_sync(sepoint, msg1, sizeof(msg1));
                
                clock_gettime(CLOCK_MONOTONIC, &sendtime);
                printf("Sent: sec = %ld nsec = %ld \n", sendtime.tv_sec,
index d4dbe5c5ef4a45986fdd70666f08218034b5cbf4..e43ffd4b908fece60ee1b392b33ef2e449bc7f4e 100644 (file)
@@ -31,7 +31,7 @@ fwp_endpoint_attr_t attr;
        
 void* sender()
 {
-       fwp_endpoint_t *sepoint;
+       struct fwp_endpoint *sepoint;
        fwp_vres_t *vres;
        struct fwp_vres_params vparam1;
        char msg1[10];
@@ -65,7 +65,7 @@ void* sender()
        while (count < NUM){
                count++;
                sprintf(msg1,"msg%d sent\n",count);
-               fwp_send(sepoint, msg1, sizeof(msg1));
+               fwp_send_sync(sepoint, msg1, sizeof(msg1));
        
                printf(msg1);   
                /*clock_gettime(CLOCK_MONOTONIC, &sendtime);
@@ -91,7 +91,7 @@ void* receiver()
 {
         ssize_t len;
        char buffer[30];
-       fwp_endpoint_t *repoint;
+       struct fwp_endpoint *repoint;
        int count;
        struct timespec recvtime;
        fwp_addr_t      from;
index d344489d897d06ffefb9dc5def445fd1138473cf..a6f8c6d336773509a64c7067a10ca043c7cd92f7 100644 (file)
@@ -24,6 +24,7 @@
 
 int opt_budget = 1024;
 int opt_period = 20;
+bool opt_async = false;
 
 struct in_addr src, dst;
 
@@ -139,6 +140,7 @@ static struct option long_opts[] = {
     { "budget", 1, 0, 'b' },
     { "source", 1, 0, 's' },
     { "dest",  1, 0, 'd' },
+    { "async",         0, 0, 'a' },
     { 0, 0, 0, 0}
 };
 
@@ -150,6 +152,7 @@ usage(void)
        printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
        printf("  -s, --source <ip>  source IP address\n");
        printf("  -d, --dest <ip:port> destination IP address and port\n");
+       printf("  -a, --async  Send packets asynchronously\n");
 }
 
 void parse_opts(int argc, char *argv[])
@@ -157,8 +160,11 @@ void parse_opts(int argc, char *argv[])
        char opt;
        int ret;
 
-       while ((opt = getopt_long(argc, argv, "b:p:s:d:", long_opts, NULL)) != -1) {
+       while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) {
                switch (opt) {
+               case 'a':
+                       opt_async = true;
+                       break;
                case 'b':
                        opt_budget = atoi(optarg);
                        break;
@@ -245,6 +251,7 @@ void *receiver(void *arg)
        frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
        size_t mlen;
        int ret;
+       int last_cnt = -1;
        struct timespec tss, tsr;
        struct msg *msg;
        msg = malloc(opt_budget);
@@ -254,8 +261,11 @@ void *receiver(void *arg)
                ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
                clock_gettime(CLOCK_MONOTONIC, &tsr);
                tss = msg->ts;
+               if (msg->cnt != last_cnt+1)
+                       printf("packet(s) lost!\n");
                printf("%10d: %10.3lf ms\n",
                       msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+               last_cnt = msg->cnt;
        }
        return NULL;
 }
@@ -288,20 +298,25 @@ void run()
        if (signal(SIGINT, stopper) == SIG_ERR)
                error(1, errno, "Signal handler registration error");
 
-       struct timespec next_period, tss;
+       struct timespec next_period;
+       struct timespec tss;
        clock_gettime(CLOCK_MONOTONIC, &next_period);
        while (!exit_flag) {
                clock_gettime(CLOCK_MONOTONIC, &tss);
                msg->cnt = cnt++;
                msg->ts = tss;
-               ret = frsh_send_async(epsrc, msg, opt_budget);
-
-               next_period.tv_nsec += opt_period * 1000000;
+               if (opt_async)
+                       ret = frsh_send_async(epsrc, msg, opt_budget);
+               else {
+                       ret = frsh_send_sync(epsrc, msg, opt_budget);
+                       clock_gettime(CLOCK_MONOTONIC, &next_period);
+               }
+               next_period.tv_sec  += (opt_period/1000);
+               next_period.tv_nsec += (opt_period%1000) * 1000000;
                if (next_period.tv_nsec >= 1000000000) {
                        next_period.tv_nsec -= 1000000000;
                        next_period.tv_sec++;
                }
-
                clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
                                &next_period, NULL);
        }