From: Michal Sojka <sojkam1@fel.cvut.cz>
-Subject: [PATCH] t/fwp-timing
+Subject: Implemented synchronous and asynchronous sending
-Application for measure FWP timing properites
+The main goal of this big change is to avoid delays caused by CPU
+scheduler when rescheduling from application thread to VRES TX thread.
+According to our fwp-timing.c experiment, these delays can by even
+several milliseconds long.
-TODO: It is necessary to implement synchronous sending in FWP. Without
-this it is possible that FWP vres slows down due to scheduling latencies
-and we then measure constantly higher delays.
+Now, whenever VRES capacity allows, send operation is invoked directly
+from application thread. Only if the budget is insufficient, the
+message can be queued for sending later by VRES thread, provided that
+asynchronous send was requested. In case of synchronous send, the
+application thread is blocked until the budget is replenished.
+
+Besides the above change, the code was cleaned up a lot.
Signed-off-by: Michal Sojka <sojkam1@fel.cvut.cz>
{
unsigned int node, port;
fwp_endpoint_attr_t *attr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
int rv;
frsh_send_endpoint_protocol_info_t *spi;
{
unsigned int node,port;
fwp_endpoint_attr_t *attr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
int rv;
node = (unsigned int) endpoint->destination;
int fwp_fna_send_endpoint_bind(fna_endpoint_data_t *endpoint, fna_vres_id_t vres)
{
return fwp_send_endpoint_bind(endpoint->protocol_info.body,
- (fwp_vres_d_t) vres->priv);
+ (struct fwp_vres *) vres->priv);
}
int fwp_fna_send_endpoint_unbind(fna_endpoint_data_t *endpoint)
}
/** FNA send routine */
-int fwp_fna_send(const fna_endpoint_data_t *endpoint, const void *msg,
- const size_t size)
-{
- fwp_endpoint_t *fwp_epoint;
-
- fwp_epoint = endpoint->protocol_info.body;
- return fwp_send(fwp_epoint, msg, size);
-}
-
int fwp_fna_send_sync(const fna_endpoint_data_t *endpoint, const void *msg,
const size_t size)
{
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = endpoint->protocol_info.body;
- return fwp_send(fwp_epoint, msg, size);
+ return fwp_send_sync(fwp_epoint, msg, size);
}
int fwp_fna_send_async(const fna_endpoint_data_t *endpoint,const void *msg,
const size_t size)
{
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
- fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
- return fwp_send(fwp_epoint, msg, size); /* FIXME */
+ fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
+ return fwp_send_async(fwp_epoint, msg, size);
}
/** FNA receive routines */
{
unsigned int from_addr;
ssize_t len;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
int flags = 0;
- fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+ fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
if (len < 0)
return len;
{
unsigned int from_addr;
ssize_t len;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
int flags = 0;
- fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+ fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
if (len < 0)
return len;
{
unsigned int from_addr;
ssize_t len;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
int flags = 0;
- fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body;
+ fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body;
len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags);
if (len < 0)
return len;
.fna_resource_get_capacity = NULL,
.fna_resource_get_total_weight = NULL,
.fna_vres_decrease_capacity = NULL,
- .fna_send_sync = NULL,
+ .fna_send_sync = fwp_fna_send_sync,
.fna_send_async = fwp_fna_send_async,
.fna_receive_sync = fwp_fna_receive_sync,
.fna_receive_async = fwp_fna_receive_async,
fres_block_fwp_sched *fwp_sched;
fres_block_fwp *fwp;
fwp_vres_params_t vparams = {0};
- fwp_vres_d_t fwp_vresd = {0};
+ struct fwp_vres *fwp_vres = NULL;
int rv;
size_t bytes;
char src[21] = "N/A";
snprintf(src, sizeof(src), "%s", inet_ntoa(vparams.src));
}
/* Create vres */
- if ((rv = fwp_vres_create(&vparams, &fwp_vresd))) {
+ if ((rv = fwp_vres_create(&vparams, &fwp_vres))) {
return rv;
}
- vres->priv = fwp_vresd;
+ vres->priv = fwp_vres;
fres_contract_id_to_string(id, &vres->id, sizeof(id));
ul_logmsg("Creating FWP VRes (id=%s, period=%ld ms, budget=%ld bytes AC=%d src=%s)\n",
static int cancel_vres(fres_vres_t *vres, void *priv)
{
- fwp_vres_d_t fwp_vresd;
+ struct fwp_vres * fwp_vres;
fres_block_basic *basic;
char id[40];
- fwp_vresd = (fwp_vres_d_t) vres->priv;
- fwp_vres_destroy(fwp_vresd);
+ fwp_vres = (struct fwp_vres *) vres->priv;
+ fwp_vres_destroy(fwp_vres);
basic = fres_contract_get_basic(vres->allocated);
fres_contract_id_to_string(id, &vres->id, sizeof(id));
fres_block_basic *basic;
fres_block_fwp_sched *fwp_sched;
fwp_vres_params_t vparams;
- fwp_vres_d_t fwp_vresd;
+ struct fwp_vres *fwp_vres;
int rv;
size_t bytes;
vparams.budget = bytes;
vparams.period = basic->period;
vparams.ac_id = fwp_sched->ac_id;
- fwp_vresd = vres->priv;
+ fwp_vres = vres->priv;
/* Changing vres */
- if ((rv = fwp_vres_set_params(fwp_vresd, &vparams))) {
+ if ((rv = fwp_vres_set_params(fwp_vres, &vparams))) {
return rv;
}
#define _FWP_CONF_H
#define FWP_AC_NUM 4
-#define FWP_MSGQ_SIZE 2048
+#define FWP_MSGQ_SIZE (1<<11) /* Must be power of 2 */
#define FWP_MY_ADDR_DEFAULT "127.0.0.1"
#define FWP_MNGR_ADDR_DEFAULT "127.0.0.1"
#include <unistd.h>
#include <netinet/in.h>
#include "fwp_utils.h"
+#include "fwp_vres.h"
+#include <frsh_error.h>
#include <pthread.h>
#include "fwp_debug.h"
+#include "fwp_msgq.h"
typedef unsigned int fwp_endpoint_id_t;
fd_set fdset;
/** specific operation options*/
int flags;
+ /** Forced source address. If non-zero, packets are sent over
+ * the specified interface. */
+ struct in_addr src;
};
/**
* On error, NULL is returned.
*
*/
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static struct fwp_endpoint* fwp_endpoint_alloc()
{
- return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
+ return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
}
/**
* On error, NULL is returned.
*
*/
-static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
+static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
{
free(endpoint);
}
* \return On success 0 is returned.
* On error, negative error value is returned and errno is set appropriately.
*/
-int fwp_endpoint_destroy(fwp_endpoint_t *ep)
+int fwp_endpoint_destroy(struct fwp_endpoint *ep)
{
if (ep->sockd > 0)
close(ep->sockd);
-
+
fwp_endpoint_free(ep);
return 0;
}
* \return On success 0 is returned.
* On error, negative error value is returned.
*/
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node,
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node,
unsigned int *port, fwp_endpoint_attr_t *attr)
{
if (node) *node = ep->node;
int fwp_send_endpoint_create(unsigned int node,
unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epoint)
+ struct fwp_endpoint **epoint)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
*/
int fwp_receive_endpoint_create(unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epp)
+ struct fwp_endpoint **epp)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
*
* \return On success returns 0. On error, -1 and errno is set appropriately.
*/
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
{
int rv = 0;
-#ifndef FWP_WITHOUT_CONTNEGT
+
+ if (ep->vres)
+ return FRSH_ERR_ALREADY_BOUND;
ep->vres = vres;
- rv = fwp_vres_bind(vres, ep->sockd);
-#endif
- /* if send endpoint is already bound
- if (epoint->type == FWP_EPOINT_BOUND) {
- fwp_send_endpoint_unbind(epoint);
- }*/
+ rv = fwp_vres_bind(vres, ep, ep->sockd);
+
return rv;
}
* \return On success returns 0. On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
{
int rv = 0;
* On success, it returns zero.
*
*/
-static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
{
int csockd;
-// fwp_endpoint_t *fwp_epoint = epointd;
+// struct fwp_endpoint *fwp_epoint = epointd;
fwp_sockaddr_t peer;
int i;
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer,
+int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer,
size_t buffer_size)
{
fwp_sockaddr_t *peer = &ep->peer;
* On error, -1 is returned and errno is set appropriately.
*
*/
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
void *buffer, const size_t buffer_size,
unsigned int *from, int flags)
{
}
}
}
+/**
+ * Physically send the message.
+ *
+ * This function should be called either by fwp_send_sync()/async() of
+ * by VRES to send delayed messaged.
+ *
+ * @param ep
+ * @param data
+ * @param size
+ *
+ * @return
+ */
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+ void *data, const size_t size)
+{
+ struct iovec iov;
+ struct msghdr msg = {0};
+ ssize_t ret;
+ char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+ iov.iov_base = data;
+ iov.iov_len = size;
+
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (ep->src.s_addr != 0) {
+ struct cmsghdr *cmsg;
+ struct in_pktinfo *ipi;
+
+ memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+ msg.msg_control = cmsg_buf;
+ msg.msg_controllen = sizeof(cmsg_buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ ipi->ipi_spec_dst = ep->src;
+ }
+ ret = sendmsg(ep->sockd, &msg, 0);
+ return ret;
+}
/**
* Sends message through vres
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send(fwp_endpoint_t *ep,const void *msg, const size_t size)
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, size_t size)
{
- struct fwp_msgb *msgb;
-
-/* if (!fwp_endpoint_is_valid(epointd)){
- errno = EINVAL;
- return -1;
- }
- if (!fwp_endpoint_is_bound(epointd)){
- errno = EPERM;
- return -1;
- }*/
+ int ret;
- /*if (flags && MSG_DONTWAIT)
- msgb = fwp_msgb_alloc(buffer_size);
- else {*/
- if (!(msgb = fwp_msgb_alloc(size))) {
- errno = ENOMEM;
- return -1;
- }
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
- /*msgb->peer = &ep->peer;*/
- /*msgb->data = msg;*/
- /*msgb->flags = epoint->flags;*/
-
- /* data must be copied since msg may change while
- * message is waiting in transmission queue
- * */
- memcpy(msgb->data, msg, size);
- fwp_msgb_put(msgb, size);
- /*msgb->tail = msgb->data + size;
- msgb->len = size;*/
-
- /*}*/
-
- /* TODO: test whether _fwp_vres_send is successful */
- return fwp_vres_send(ep->vres, msgb);
+ if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ else
+ ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
+ return ret;
+}
+
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, size_t size)
+{
+ int ret;
+
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
+
+ ret = fwp_vres_consume_budget(ep->vres, size, true);
+ if (ret)
+ return ret;
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ return ret;
}
} fwp_endpoint_reliability_t;
struct fwp_endpoint;
-typedef struct fwp_endpoint fwp_endpoint_t;
typedef unsigned int fwp_addr_t;
#include "fwp_vres.h"
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node,
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node,
unsigned int *port, fwp_endpoint_attr_t *attr);
int fwp_send_endpoint_create(unsigned int node, unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epoint);
+ struct fwp_endpoint **epoint);
int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epoint);
-int fwp_endpoint_destroy(fwp_endpoint_t *ep);
+ struct fwp_endpoint **epoint);
+int fwp_endpoint_destroy(struct fwp_endpoint *ep);
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres);
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep);
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres);
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep);
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
void *buffer, const size_t buffer_size,
unsigned int *from, int flags);
-int fwp_send(fwp_endpoint_t *ep, const void *msg, const size_t size);
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, const size_t size);
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, const size_t size);
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+ void *data, const size_t size);
int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr);
static inline int fwp_endpoint_attr_setreliability(fwp_endpoint_attr_t *attr,
msgb->len = 0;
msgb->data = (unsigned char*) msgb + sizeof(struct fwp_msgb);
msgb->tail = msgb->data;
- msgb->peer = NULL;
return msgb;
}
size_t len; /**< msg data length*/
unsigned char *data; /**< msg data */
unsigned char *tail; /**< msg data end*/
- struct fwp_sockaddr *peer; /**< peer address*/
- /*int flags; MSG_DONTWAIT for async*/
} fwp_msgb_t;
struct fwp_msgb* fwp_msgb_alloc(size_t buf_size);
return msgb;
}
+struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq)
+{
+ struct fwp_msgb* msgb;
+
+ pthread_mutex_lock(&msgq->lock);
+
+ if (msgq->nr_pending > 0)
+ msgb = msgq->queue[msgq->out];
+ else
+ msgb = NULL;
+ pthread_mutex_unlock(&msgq->lock);
+
+ return msgb;
+}
+
/*
* Dequeue all messages from message queue
*
int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb* msgb);
struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq);
+struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq);
void fwp_msgq_dequeue_all(struct fwp_msgq *msgq);
/* void fwp_msgq_setpolicy */
/**************************************************************************/
/* ---------------------------------------------------------------------- */
-/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
+/* Copyright (C) 2006 - 2009 FRESCOR consortium partners: */
/* */
/* Universidad de Cantabria, SPAIN */
/* University of York, UK */
/* however invalidate any other reasons why the executable file might be */
/* covered by the GNU Public License. */
/**************************************************************************/
+
#include "fwp_utils.h"
#include "fwp_vres.h"
static void* fwp_vres_tx_thread(void *_vres);
typedef enum {
- FWP_VF_USED = 0,
- FWP_VF_BOUND = 1,
+ USED,
+ UNTOUCHED,
+ CHANGED,
+ QUEUED,
} fwp_vres_flag_t;
fwp_vres_params_t fwp_vres_params_default = {
.ac_id = FWP_AC_VO,
.budget = 100,
- .period = {.tv_sec = 2 , .tv_nsec = 111111}
+ .period = {.tv_sec = 2 , .tv_nsec = 111111},
+ .src = { 0 },
};
/**
*/
struct fwp_vres{
struct fwp_vres_params params;
- /* consideration: move tx_queue to endpoint */
- /**< queue for messages to send */
- struct fwp_msgq tx_queue;
fwp_vres_flag_t flags;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond; /**< Signalizes budget replenishment */
+ fwp_budget_t budget; /**< Current remaining budget */
+ fwp_period_t period; /**< Period for this "activation" */
+ struct timespec replenish_at; /**< Time of next replenishment */
+ sem_t consumed;
/**< endpoint bounded to this vres */
- /*fwp_endpoint_t *epoint; */
+ struct fwp_endpoint *epoint;
pthread_t tx_thread; /**< tx_thread id*/
pthread_attr_t tx_thread_attr;
- int ac_sockd; /**< ac socket descriptor */
+ /** Copy of bound enpoint's socket - used for future changes
+ * of vres parameters. */
+ int ac_sockd;
+ /** Queue for messages to send */
+ struct fwp_msgq msg_queue;
};
typedef
return 0;
}
-static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags |= (1 << flag);
}
-static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
vres->flags &= ~(1 << flag);
}
-static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
+static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag)
{
return !!(vres->flags & (1 << flag));
}
-static inline void fwp_vres_clearall_flag(fwp_vres_t *vres)
+static inline void clear_all_flags(fwp_vres_t *vres)
{
vres->flags = 0;
}
-#if 0
-/* Deprecated */
-static int fwp_vres_ac_open(fwp_ac_t ac_id)
-{
- int sockd;
- unsigned int tos;
-
- if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) {
- errno = EINVAL;
- return -1;
- }
-
- if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
- FWP_ERROR("Unable to open socket for AC: %s", strerror(errno));
- return (-1);
- }
-
- tos = ac_to_tos[ac_id];
-
- if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) {
- int e = errno;
- FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno));
- close(sockfd);
- errno = e;
- return -1;
- }
-
- return sockd;
-}
-#endif
-
-static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
- struct iovec iov;
- struct msghdr msg = {0};
- ssize_t ret;
- char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
-
- iov.iov_base = msgb->data;
- iov.iov_len = msgb->len;
-
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
-
-
- if (vres->params.src.s_addr != 0) {
- struct cmsghdr *cmsg;
- struct in_pktinfo *ipi;
-
- memset(cmsg_buf, 0, sizeof(cmsg_buf));
-
- msg.msg_control = cmsg_buf;
- msg.msg_controllen = sizeof(cmsg_buf);
-
- cmsg = CMSG_FIRSTHDR(&msg);
-
- cmsg->cmsg_level = SOL_IP;
- cmsg->cmsg_type = IP_PKTINFO;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
-
- ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
- ipi->ipi_spec_dst = vres->params.src;
- }
- ret = sendmsg(vres->ac_sockd, &msg, 0);
- return ret;
-}
-
static inline void fwp_vres_free(fwp_vres_t *vres)
{
- fwp_vres_clearall_flag(vres);
+ /* Clear USED flag */
+ clear_all_flags(vres);
}
static inline int fwp_vres_is_valid(fwp_vres_t *vres)
int id = vres - fwp_vres_table.entry;
if ((id < 0) || (id > fwp_vres_table.max_vres - 1) ||
- (!fwp_vres_get_flag(vres, FWP_VF_USED)))
+ (!get_flag(vres, USED)))
return 0;
return 1;
i = 0;
max_vres = fwp_vres_table.max_vres;
while ((i < max_vres) &&
- (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) {
+ (get_flag(&fwp_vres_table.entry[i], USED))) {
i++;
}
}
FWP_DEBUG("Allocated vres id = %d\n",i);
- fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED);
+ set_flag(&fwp_vres_table.entry[i], USED);
pthread_mutex_unlock(&fwp_vres_table.lock);
return (&fwp_vres_table.entry[i]);
}
-inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
+static int apply_params(fwp_vres_t *vres)
{
int rv;
-
- /* copy vres paramters into vres structure */
- rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id);
- if (!rv)
- return rv;
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
-
- return 0;
+ vres->period = vres->params.period;
+ vres->budget = vres->params.budget;
+ set_flag(vres, UNTOUCHED);
+ if (get_flag(vres, CHANGED)) {
+ clear_flag(vres, CHANGED);
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ }
+ return rv;
}
/**
*/
int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
{
+ int rv = 0;
+
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}
- return _fwp_vres_set_params(vres, params);
+ pthread_mutex_lock(&vres->mutex);
+
+ if (vres->epoint &&
+ params->src.s_addr != vres->params.src.s_addr) {
+ errno = EREMCHG;
+ rv = -1;
+ goto out;
+ }
+ vres->params = *params;
+ if (vres->epoint) {
+ set_flag(vres, CHANGED);
+ if (get_flag(vres, UNTOUCHED))
+ rv = apply_params(vres);
+ }
+out:
+ pthread_mutex_unlock(&vres->mutex);
+ return rv;
}
/**
errno = ENOMEM;
return -1;
}
- /* initialize msg queue */
- fwp_msgq_init(&vres->tx_queue);
+
+ pthread_mutexattr_t ma;
+ rv = pthread_mutexattr_init(&ma);
+ rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT);
+ if (rv) return rv;
+ pthread_mutex_init(&vres->mutex, &ma);
+ pthread_cond_init(&vres->cond, NULL);
- memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
+ vres->params = *params;
+ apply_params(vres);
+ fwp_msgq_init(&vres->msg_queue);
+
pthread_attr_init(&vres->tx_thread_attr);
if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
fwp_vres_tx_thread, (void*) vres)) != 0){
}
*vresp = vres;
+
return 0;
err:
+ fwp_msgq_dequeue_all(&vres->msg_queue);
fwp_vres_free(vres);
return -1;
}
}
pthread_cancel(vres->tx_thread);
+ pthread_cond_destroy(&vres->cond);
+ pthread_mutex_destroy(&vres->mutex);
+ fwp_msgq_dequeue_all(&vres->msg_queue);
+ fwp_vres_free(vres);
+
FWP_DEBUG("Vres destroyed.\n");
return 0;
}
-static void fwp_vres_cleanup(void *_vres)
+static void do_consume_budget(struct fwp_vres *vres, size_t size)
{
- fwp_vres_t *vres = (fwp_vres_t*)_vres;
+ if (get_flag(vres, UNTOUCHED)) {
+ /* Setup next replenish time */
+ struct timespec now;
+ clear_flag(vres, UNTOUCHED);
+ if (get_flag(vres, QUEUED))
+ now = vres->replenish_at;
+ else
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_add(&vres->replenish_at, &now, &vres->period);
+ sem_post(&vres->consumed);
+ }
+ vres->budget -= size;
+}
- fwp_msgq_dequeue_all(&vres->tx_queue);
- fwp_vres_free(vres);
+int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ while (can_block && vres->budget < size) {
+ ret = pthread_cond_wait(&vres->cond, &vres->mutex);
+ /* The budget might have been changed while we were
+ * waiting, so check it again. */
+ if (vres->params.budget < size) {
+ errno = ENOSR;
+ return -1;
+ }
+ }
+ if (ret == 0) {
+ if (vres->budget >= size) {
+ do_consume_budget(vres, size);
+ ret = 0;
+ } else {
+ set_flag(vres, QUEUED);
+ ret = 1;
+ }
+ }
+ return ret;
+}
+
+/**
+ * Tries to consume (a part of) budget
+ *
+ * @param vres VRES whose budget is conumed.
+ * @param size How much to consume (in bytes).
+ * @param can_block True, indicates that the function can block to
+ * wait for budget replenishment. False causes no blocking and if 1 is
+ * returned, the calles must call fwp_vres_enqueue() to enqueue the
+ * packet to be sent later after replenishing.
+ *
+ * @return Zero if budget was consumed, 1 if there is not enough
+ * budget available and blocking was not allowed, -1 in case of error.
+ */
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block)
+{
+ int ret = 0;
+ pthread_mutex_lock(&vres->mutex);
+ ret = __consume_budget(vres, size, can_block);
+ pthread_mutex_unlock(&vres->mutex);
+ return ret;
+}
+
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size)
+{
+ struct fwp_msgb *msgb;
+ int ret;
+
+ if (!(msgb = fwp_msgb_alloc(size)))
+ return -1;
+ memcpy(msgb->data, msg, size);
+ fwp_msgb_put(msgb, size);
+ ret = fwp_msgq_enqueue(&vres->msg_queue, msgb);
+ if (ret) {
+ fwp_msgb_free(msgb);
+ return ret;
+ }
+ return ret;
}
-static inline void
-fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
- fwp_budget_t *budget)
+static void wait_for_replenish(struct fwp_vres *vres)
{
- *period = vres->params.period;
- *budget = vres->params.budget;
+ sem_wait(&vres->consumed);
+ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
+ &vres->replenish_at, NULL);
+}
+
+static void send_queue(struct fwp_vres *vres)
+{
+ struct fwp_msgb *msgb;
+ bool can_send;
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (!can_send) {
+ fwp_msgb_free(msgb);
+ FWP_ERROR("Cannot send queued packet (budget decreased?)\n");
+ return;
+ }
+ /* If we cannot send the whole queue, the flag will be set
+ * later by __consume_budget(). */
+ clear_flag(vres, QUEUED);
+
+ while (msgb) {
+ fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len);
+ fwp_msgb_free(msgb);
+ msgb = fwp_msgq_peek(&vres->msg_queue);
+ if (msgb) {
+ can_send = (0 == __consume_budget(vres, msgb->len, false));
+ if (can_send) {
+ msgb = fwp_msgq_dequeue(&vres->msg_queue);
+ } else {
+ msgb = NULL;
+ return;
+ }
+ }
+ }
+}
+
+static void replenish(struct fwp_vres *vres)
+{
+ pthread_mutex_lock(&vres->mutex);
+ apply_params(vres);
+ if (get_flag(vres, QUEUED))
+ send_queue(vres);
+ pthread_cond_broadcast(&vres->cond);
+ pthread_mutex_unlock(&vres->mutex);
}
/**
static void* fwp_vres_tx_thread(void *_vres)
{
struct fwp_vres *vres = (struct fwp_vres*)_vres;
- struct fwp_msgq *msgq = &vres->tx_queue;
- struct fwp_msgb *msgb = NULL;
unsigned int ac_id = vres->params.ac_id;
- fwp_budget_t budget = vres->params.budget;
- fwp_budget_t curr_budget;
- int rc;
- struct timespec start, period, interval, now;
fwp_set_rt_prio(90 - ac_id);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- pthread_cleanup_push(fwp_vres_cleanup, (void*)vres);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- /* just for sure */
- fwp_vres_sched_update(vres, &period, &budget);
- curr_budget = budget;
- clock_gettime(CLOCK_MONOTONIC, &start);
-
while (1) {
- msgb = fwp_msgq_dequeue(msgq);
- if (msgb->len > budget) {
- FWP_ERROR("Message too large: %zd -> skipping\n",
- msgb->len);
- goto skip_message;
- }
- if (curr_budget < msgb->len) {
- /* needs to replenish */
- clock_gettime(CLOCK_MONOTONIC, &now);
- fwp_timespec_sub(&interval, &now, &start);
- ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
- fwp_timespec_sub(&interval, &period, &interval);
- if (interval.tv_sec > 0 ||
- (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
- /* We have to wait to replenish */
- ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
- nanosleep(&interval, NULL);
- fwp_timespec_add(&start, &now, &interval);
- } else {
- start = now;
- }
- fwp_vres_sched_update(vres, &period, &budget);
- curr_budget = budget;
- }
-
- rc = _fwp_vres_send(vres, msgb);
- if (!(rc < 0)) {
- FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget -= msgb->len;
- } else {
- FWP_ERROR("Message sent error %d\n",rc);
- }
- skip_message:
- fwp_msgb_free(msgb);
-
- /*pthread_testcancel(); */
- /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
-
- fwp_timespec_add(&end_period, &start_period, &period);
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);
- */
+ wait_for_replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ replenish(vres);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
}
-
- /* it should normaly never come here */
- pthread_cleanup_pop(0);
- fwp_vres_free(vres);
-
- return NULL;
-}
-int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
-{
- if (fwp_vres_is_valid(vres)) {
- return fwp_msgq_enqueue(&vres->tx_queue, msgb);
- } else {
- errno = EINVAL;
- return -1;
- }
+ return NULL;
}
-/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_t *vres, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd)
{
int rv = 0;
- pthread_mutex_lock(&fwp_vres_table.lock);
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
rv = -1;
goto err;
}
- if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
+ if (vres->epoint) { /*if already bounded */
errno = EBUSY;
rv = -1;
goto err;
}
vres->ac_sockd = sockd;
- rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id);
- /*if (rv)
- goto err;*/
- fwp_vres_set_flag(vres, FWP_VF_BOUND);
+ rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id);
+ if (rv)
+ goto err;
+ vres->epoint = ep;
err:
- pthread_mutex_unlock(&fwp_vres_table.lock);
return rv;
}
errno = EINVAL;
return -1;
}
- fwp_vres_clear_flag(vres, FWP_VF_BOUND);
+ pthread_mutex_lock(&vres->mutex);
+ vres->epoint = NULL;
+ pthread_mutex_unlock(&vres->mutex);
/* TODO: consider what to do with pending messages */
- /* fwp_vres_free_msgb(vres->tx_queue); */
+ fwp_msgq_dequeue_all(&vres->msg_queue);
return 0;
}
#define _FWP_VRES_H
#include <netinet/in.h>
+#include <stdbool.h>
struct fwp_vres;
typedef struct fwp_vres fwp_vres_t;
/** all time units are in microseconds */
fwp_period_t period;
fwp_ac_t ac_id; /**< AC id ~ priority of vres */
- /** Forced source address. If non-zero, packets are sent over
- * the specified interface. */
struct in_addr src;
} fwp_vres_params_t;
int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp);
int fwp_vres_destroy(fwp_vres_t *vres);
-int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb);
-int fwp_vres_bind(fwp_vres_t *vres, int sockd);
+int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block);
+struct fwp_endpoint;
+int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size);
+int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd);
int fwp_vres_unbind(fwp_vres_t *vres);
extern fwp_vres_params_t fwp_vres_params_default;
char msg1[] = "Hello1";
char msg2[] = "Hello2";
char buffer[30];
- fwp_endpoint_t *sepoint1, *sepoint2, *repoint1, *repoint2;
+ struct fwp_endpoint *sepoint1, *sepoint2, *repoint1, *repoint2;
fwp_endpoint_attr_t attr;
fwp_addr_t from;
printf("Send endpoint 2 created\n");
fwp_send_endpoint_bind(sepoint2, vres2);
- fwp_send(sepoint1, msg1, sizeof(msg1));
- fwp_send(sepoint1, msg2, sizeof(msg2));
+ fwp_send_sync(sepoint1, msg1, sizeof(msg1));
+ fwp_send_sync(sepoint1, msg2, sizeof(msg2));
for (i = 0; i < 2; i++) {
if ((len = fwp_recv(repoint1, buffer, sizeof(buffer), &from,
void* receiver(void* arg)
{
- fwp_endpoint_t *repoint1;
+ struct fwp_endpoint *repoint1;
int i,len;
char buffer[30];
fwp_addr_t from;
struct fwp_vres_params vparam1, vparam2;
char msg1[] = "Hello1";
char msg2[] = "Hello2";
- fwp_endpoint_t *sepoint1;
+ struct fwp_endpoint *sepoint1;
pthread_t thread;
vparam1.ac_id = FWP_AC_VO;
printf("Send endpoint 1 created\n");
fwp_send_endpoint_bind(sepoint1, vres1);
- fwp_send(sepoint1, msg1, sizeof(msg1));
+ fwp_send_sync(sepoint1, msg1, sizeof(msg1));
printf("Sent msg1\n");
- fwp_send(sepoint1, msg2, sizeof(msg2));
+ fwp_send_sync(sepoint1, msg2, sizeof(msg2));
printf("Sent msg2\n");
pthread_join(thread, (void**) NULL);
struct fwp_vres_params vparam1;
char msg1[15];
char buffer[30];
- fwp_endpoint_t *sepoint, *repoint;
+ struct fwp_endpoint *sepoint, *repoint;
int count;
struct timespec sendtime;
fwp_endpoint_attr_t attr;
for (count = 0; count < NUM; count++) {
sprintf(msg1,"msg%d",count);
- fwp_send(sepoint, msg1, sizeof(msg1));
+ fwp_send_sync(sepoint, msg1, sizeof(msg1));
clock_gettime(CLOCK_MONOTONIC, &sendtime);
printf("Sent: sec = %ld nsec = %ld \n", sendtime.tv_sec,
void* sender()
{
- fwp_endpoint_t *sepoint;
+ struct fwp_endpoint *sepoint;
fwp_vres_t *vres;
struct fwp_vres_params vparam1;
char msg1[10];
while (count < NUM){
count++;
sprintf(msg1,"msg%d sent\n",count);
- fwp_send(sepoint, msg1, sizeof(msg1));
+ fwp_send_sync(sepoint, msg1, sizeof(msg1));
printf(msg1);
/*clock_gettime(CLOCK_MONOTONIC, &sendtime);
{
ssize_t len;
char buffer[30];
- fwp_endpoint_t *repoint;
+ struct fwp_endpoint *repoint;
int count;
struct timespec recvtime;
fwp_addr_t from;
int opt_budget = 1024;
int opt_period = 20;
+bool opt_async = false;
struct in_addr src, dst;
{ "budget", 1, 0, 'b' },
{ "source", 1, 0, 's' },
{ "dest", 1, 0, 'd' },
+ { "async", 0, 0, 'a' },
{ 0, 0, 0, 0}
};
printf(" -b, --budget <bytes> how many bytes is sent in each period\n");
printf(" -s, --source <ip> source IP address\n");
printf(" -d, --dest <ip:port> destination IP address and port\n");
+ printf(" -a, --async Send packets asynchronously\n");
}
void parse_opts(int argc, char *argv[])
char opt;
int ret;
- while ((opt = getopt_long(argc, argv, "b:p:s:d:", long_opts, NULL)) != -1) {
+ while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) {
switch (opt) {
+ case 'a':
+ opt_async = true;
+ break;
case 'b':
opt_budget = atoi(optarg);
break;
frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
size_t mlen;
int ret;
+ int last_cnt = -1;
struct timespec tss, tsr;
struct msg *msg;
msg = malloc(opt_budget);
ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
clock_gettime(CLOCK_MONOTONIC, &tsr);
tss = msg->ts;
+ if (msg->cnt != last_cnt+1)
+ printf("packet(s) lost!\n");
printf("%10d: %10.3lf ms\n",
msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+ last_cnt = msg->cnt;
}
return NULL;
}
if (signal(SIGINT, stopper) == SIG_ERR)
error(1, errno, "Signal handler registration error");
- struct timespec next_period, tss;
+ struct timespec next_period;
+ struct timespec tss;
clock_gettime(CLOCK_MONOTONIC, &next_period);
while (!exit_flag) {
clock_gettime(CLOCK_MONOTONIC, &tss);
msg->cnt = cnt++;
msg->ts = tss;
- ret = frsh_send_async(epsrc, msg, opt_budget);
-
- next_period.tv_nsec += opt_period * 1000000;
+ if (opt_async)
+ ret = frsh_send_async(epsrc, msg, opt_budget);
+ else {
+ ret = frsh_send_sync(epsrc, msg, opt_budget);
+ clock_gettime(CLOCK_MONOTONIC, &next_period);
+ }
+ next_period.tv_sec += (opt_period/1000);
+ next_period.tv_nsec += (opt_period%1000) * 1000000;
if (next_period.tv_nsec >= 1000000000) {
next_period.tv_nsec -= 1000000000;
next_period.tv_sec++;
}
-
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
&next_period, NULL);
}