From: Michal Sojka Date: Fri, 6 Nov 2009 14:02:15 +0000 (+0100) Subject: Implemented synchronous and asynchronous sending X-Git-Url: http://rtime.felk.cvut.cz/gitweb/frescor/fwp.git/commitdiff_plain/0b59f4792b0e22c2a1fb703d632ef1c08ecb78fb Implemented synchronous and asynchronous sending The main goal of this big change is to avoid delays caused by CPU scheduler when rescheduling from application thread to VRES TX thread. According to our fwp-timing.c experiment, these delays can by even several milliseconds long. Now, whenever VRES capacity allows, send operation is invoked directly from application thread. Only if the budget is insufficient, the message can be queued for sending later by VRES thread, provided that asynchronous send was requested. In case of synchronous send, the application thread is blocked until the budget is replenished. Besides the above change, the code was cleaned up a lot. Signed-off-by: Michal Sojka --- diff --git a/.topmsg b/.topmsg index b268c6a..5973efd 100644 --- a/.topmsg +++ b/.topmsg @@ -1,10 +1,17 @@ From: Michal Sojka -Subject: [PATCH] t/fwp-timing +Subject: Implemented synchronous and asynchronous sending -Application for measure FWP timing properites +The main goal of this big change is to avoid delays caused by CPU +scheduler when rescheduling from application thread to VRES TX thread. +According to our fwp-timing.c experiment, these delays can by even +several milliseconds long. -TODO: It is necessary to implement synchronous sending in FWP. Without -this it is possible that FWP vres slows down due to scheduling latencies -and we then measure constantly higher delays. +Now, whenever VRES capacity allows, send operation is invoked directly +from application thread. Only if the budget is insufficient, the +message can be queued for sending later by VRES thread, provided that +asynchronous send was requested. In case of synchronous send, the +application thread is blocked until the budget is replenished. + +Besides the above change, the code was cleaned up a lot. Signed-off-by: Michal Sojka diff --git a/fwp/lib/frsh_fwp/fwp_fna.c b/fwp/lib/frsh_fwp/fwp_fna.c index e26dd26..a51e558 100644 --- a/fwp/lib/frsh_fwp/fwp_fna.c +++ b/fwp/lib/frsh_fwp/fwp_fna.c @@ -82,7 +82,7 @@ int fwp_fna_send_endpoint_created(fna_endpoint_data_t *endpoint) { unsigned int node, port; fwp_endpoint_attr_t *attr; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; int rv; frsh_send_endpoint_protocol_info_t *spi; @@ -102,7 +102,7 @@ int fwp_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint) { unsigned int node,port; fwp_endpoint_attr_t *attr; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; int rv; node = (unsigned int) endpoint->destination; @@ -123,7 +123,7 @@ int fwp_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint) int fwp_fna_send_endpoint_bind(fna_endpoint_data_t *endpoint, fna_vres_id_t vres) { return fwp_send_endpoint_bind(endpoint->protocol_info.body, - (fwp_vres_d_t) vres->priv); + (struct fwp_vres *) vres->priv); } int fwp_fna_send_endpoint_unbind(fna_endpoint_data_t *endpoint) @@ -137,31 +137,22 @@ int fwp_fna_endpoint_destroy(fna_endpoint_data_t *endpoint) } /** FNA send routine */ -int fwp_fna_send(const fna_endpoint_data_t *endpoint, const void *msg, - const size_t size) -{ - fwp_endpoint_t *fwp_epoint; - - fwp_epoint = endpoint->protocol_info.body; - return fwp_send(fwp_epoint, msg, size); -} - int fwp_fna_send_sync(const fna_endpoint_data_t *endpoint, const void *msg, const size_t size) { - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; fwp_epoint = endpoint->protocol_info.body; - return fwp_send(fwp_epoint, msg, size); + return fwp_send_sync(fwp_epoint, msg, size); } int fwp_fna_send_async(const fna_endpoint_data_t *endpoint,const void *msg, const size_t size) { - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; - fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body; - return fwp_send(fwp_epoint, msg, size); /* FIXME */ + fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body; + return fwp_send_async(fwp_epoint, msg, size); } /** FNA receive routines */ @@ -171,10 +162,10 @@ int fwp_fna_receive(const fna_endpoint_data_t *endpoint, { unsigned int from_addr; ssize_t len; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; int flags = 0; - fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body; + fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body; len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags); if (len < 0) return len; @@ -191,10 +182,10 @@ int fwp_fna_receive_sync(const fna_endpoint_data_t *endpoint, void *buffer, { unsigned int from_addr; ssize_t len; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; int flags = 0; - fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body; + fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body; len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags); if (len < 0) return len; @@ -213,10 +204,10 @@ int fwp_fna_receive_async(const fna_endpoint_data_t *endpoint, void *buffer, { unsigned int from_addr; ssize_t len; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; int flags = 0; - fwp_epoint = (fwp_endpoint_t*) endpoint->protocol_info.body; + fwp_epoint = (struct fwp_endpoint*) endpoint->protocol_info.body; len = fwp_recv(fwp_epoint, buffer, buffer_size, &from_addr, flags); if (len < 0) return len; @@ -250,7 +241,7 @@ fna_operations_t fwp_fna_operations = { .fna_resource_get_capacity = NULL, .fna_resource_get_total_weight = NULL, .fna_vres_decrease_capacity = NULL, - .fna_send_sync = NULL, + .fna_send_sync = fwp_fna_send_sync, .fna_send_async = fwp_fna_send_async, .fna_receive_sync = fwp_fna_receive_sync, .fna_receive_async = fwp_fna_receive_async, diff --git a/fwp/lib/frsh_fwp/fwp_fra.c b/fwp/lib/frsh_fwp/fwp_fra.c index 813cd5e..f6e15c4 100644 --- a/fwp/lib/frsh_fwp/fwp_fra.c +++ b/fwp/lib/frsh_fwp/fwp_fra.c @@ -66,7 +66,7 @@ static int create_vres(fres_vres_t *vres, void *priv) fres_block_fwp_sched *fwp_sched; fres_block_fwp *fwp; fwp_vres_params_t vparams = {0}; - fwp_vres_d_t fwp_vresd = {0}; + struct fwp_vres *fwp_vres = NULL; int rv; size_t bytes; char src[21] = "N/A"; @@ -88,10 +88,10 @@ static int create_vres(fres_vres_t *vres, void *priv) snprintf(src, sizeof(src), "%s", inet_ntoa(vparams.src)); } /* Create vres */ - if ((rv = fwp_vres_create(&vparams, &fwp_vresd))) { + if ((rv = fwp_vres_create(&vparams, &fwp_vres))) { return rv; } - vres->priv = fwp_vresd; + vres->priv = fwp_vres; fres_contract_id_to_string(id, &vres->id, sizeof(id)); ul_logmsg("Creating FWP VRes (id=%s, period=%ld ms, budget=%ld bytes AC=%d src=%s)\n", @@ -103,12 +103,12 @@ static int create_vres(fres_vres_t *vres, void *priv) static int cancel_vres(fres_vres_t *vres, void *priv) { - fwp_vres_d_t fwp_vresd; + struct fwp_vres * fwp_vres; fres_block_basic *basic; char id[40]; - fwp_vresd = (fwp_vres_d_t) vres->priv; - fwp_vres_destroy(fwp_vresd); + fwp_vres = (struct fwp_vres *) vres->priv; + fwp_vres_destroy(fwp_vres); basic = fres_contract_get_basic(vres->allocated); fres_contract_id_to_string(id, &vres->id, sizeof(id)); @@ -124,7 +124,7 @@ int change_vres(fres_vres_t *vres, void *priv) fres_block_basic *basic; fres_block_fwp_sched *fwp_sched; fwp_vres_params_t vparams; - fwp_vres_d_t fwp_vresd; + struct fwp_vres *fwp_vres; int rv; size_t bytes; @@ -136,10 +136,10 @@ int change_vres(fres_vres_t *vres, void *priv) vparams.budget = bytes; vparams.period = basic->period; vparams.ac_id = fwp_sched->ac_id; - fwp_vresd = vres->priv; + fwp_vres = vres->priv; /* Changing vres */ - if ((rv = fwp_vres_set_params(fwp_vresd, &vparams))) { + if ((rv = fwp_vres_set_params(fwp_vres, &vparams))) { return rv; } diff --git a/fwp/lib/fwp/fwp_conf.h b/fwp/lib/fwp/fwp_conf.h index ba13313..bc2a9ed 100644 --- a/fwp/lib/fwp/fwp_conf.h +++ b/fwp/lib/fwp/fwp_conf.h @@ -47,7 +47,7 @@ #define _FWP_CONF_H #define FWP_AC_NUM 4 -#define FWP_MSGQ_SIZE 2048 +#define FWP_MSGQ_SIZE (1<<11) /* Must be power of 2 */ #define FWP_MY_ADDR_DEFAULT "127.0.0.1" #define FWP_MNGR_ADDR_DEFAULT "127.0.0.1" diff --git a/fwp/lib/fwp/fwp_endpoint.c b/fwp/lib/fwp/fwp_endpoint.c index 0531284..56964aa 100644 --- a/fwp/lib/fwp/fwp_endpoint.c +++ b/fwp/lib/fwp/fwp_endpoint.c @@ -50,9 +50,12 @@ #include #include #include "fwp_utils.h" +#include "fwp_vres.h" +#include #include #include "fwp_debug.h" +#include "fwp_msgq.h" typedef unsigned int fwp_endpoint_id_t; @@ -95,6 +98,9 @@ struct fwp_endpoint{ fd_set fdset; /** specific operation options*/ int flags; + /** Forced source address. If non-zero, packets are sent over + * the specified interface. */ + struct in_addr src; }; /** @@ -104,9 +110,9 @@ struct fwp_endpoint{ * On error, NULL is returned. * */ -static fwp_endpoint_t* fwp_endpoint_alloc() +static struct fwp_endpoint* fwp_endpoint_alloc() { - return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t)); + return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint)); } /** @@ -116,7 +122,7 @@ static fwp_endpoint_t* fwp_endpoint_alloc() * On error, NULL is returned. * */ -static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint) +static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint) { free(endpoint); } @@ -128,11 +134,11 @@ static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint) * \return On success 0 is returned. * On error, negative error value is returned and errno is set appropriately. */ -int fwp_endpoint_destroy(fwp_endpoint_t *ep) +int fwp_endpoint_destroy(struct fwp_endpoint *ep) { if (ep->sockd > 0) close(ep->sockd); - + fwp_endpoint_free(ep); return 0; } @@ -147,7 +153,7 @@ int fwp_endpoint_destroy(fwp_endpoint_t *ep) * \return On success 0 is returned. * On error, negative error value is returned. */ -int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node, +int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, unsigned int *port, fwp_endpoint_attr_t *attr) { if (node) *node = ep->node; @@ -179,10 +185,10 @@ int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr) int fwp_send_endpoint_create(unsigned int node, unsigned int port, fwp_endpoint_attr_t *attr, - fwp_endpoint_t **epoint) + struct fwp_endpoint **epoint) { struct sockaddr_in *addr; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; fwp_epoint = fwp_endpoint_alloc(); if (!fwp_epoint) { @@ -272,10 +278,10 @@ err: */ int fwp_receive_endpoint_create(unsigned int port, fwp_endpoint_attr_t *attr, - fwp_endpoint_t **epp) + struct fwp_endpoint **epp) { struct sockaddr_in *addr; - fwp_endpoint_t *fwp_epoint; + struct fwp_endpoint *fwp_epoint; fwp_epoint = fwp_endpoint_alloc(); if (!fwp_epoint) { @@ -383,18 +389,16 @@ err: * * \return On success returns 0. On error, -1 and errno is set appropriately. */ -int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres) +int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres) { int rv = 0; -#ifndef FWP_WITHOUT_CONTNEGT + + if (ep->vres) + return FRSH_ERR_ALREADY_BOUND; ep->vres = vres; - rv = fwp_vres_bind(vres, ep->sockd); -#endif - /* if send endpoint is already bound - if (epoint->type == FWP_EPOINT_BOUND) { - fwp_send_endpoint_unbind(epoint); - }*/ + rv = fwp_vres_bind(vres, ep, ep->sockd); + return rv; } @@ -405,7 +409,7 @@ int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres) * \return On success returns 0. On error, -1 is returned and errno is set appropriately. * */ -int fwp_send_endpoint_unbind(fwp_endpoint_t *ep) +int fwp_send_endpoint_unbind(struct fwp_endpoint *ep) { int rv = 0; @@ -424,10 +428,10 @@ int fwp_send_endpoint_unbind(fwp_endpoint_t *ep) * On success, it returns zero. * */ -static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint) +static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint) { int csockd; -// fwp_endpoint_t *fwp_epoint = epointd; +// struct fwp_endpoint *fwp_epoint = epointd; fwp_sockaddr_t peer; int i; @@ -467,7 +471,7 @@ static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint) * On error, -1 is returned and errno is set appropriately. * */ -int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer, +int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer, size_t buffer_size) { fwp_sockaddr_t *peer = &ep->peer; @@ -516,7 +520,7 @@ int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer, * On error, -1 is returned and errno is set appropriately. * */ -ssize_t fwp_recv(fwp_endpoint_t *ep, +ssize_t fwp_recv(struct fwp_endpoint *ep, void *buffer, const size_t buffer_size, unsigned int *from, int flags) { @@ -561,6 +565,53 @@ ssize_t fwp_recv(fwp_endpoint_t *ep, } } } +/** + * Physically send the message. + * + * This function should be called either by fwp_send_sync()/async() of + * by VRES to send delayed messaged. + * + * @param ep + * @param data + * @param size + * + * @return + */ +ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep, + void *data, const size_t size) +{ + struct iovec iov; + struct msghdr msg = {0}; + ssize_t ret; + char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))]; + + iov.iov_base = data; + iov.iov_len = size; + + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + if (ep->src.s_addr != 0) { + struct cmsghdr *cmsg; + struct in_pktinfo *ipi; + + memset(cmsg_buf, 0, sizeof(cmsg_buf)); + + msg.msg_control = cmsg_buf; + msg.msg_controllen = sizeof(cmsg_buf); + + cmsg = CMSG_FIRSTHDR(&msg); + + cmsg->cmsg_level = SOL_IP; + cmsg->cmsg_type = IP_PKTINFO; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)); + + ipi = (struct in_pktinfo*)CMSG_DATA(cmsg); + ipi->ipi_spec_dst = ep->src; + } + ret = sendmsg(ep->sockd, &msg, 0); + return ret; +} /** * Sends message through vres @@ -574,42 +625,31 @@ ssize_t fwp_recv(fwp_endpoint_t *ep, * On error, -1 is returned and errno is set appropriately. * */ -int fwp_send(fwp_endpoint_t *ep,const void *msg, const size_t size) +int fwp_send_async(struct fwp_endpoint *ep, void *msg, size_t size) { - struct fwp_msgb *msgb; - -/* if (!fwp_endpoint_is_valid(epointd)){ - errno = EINVAL; - return -1; - } - if (!fwp_endpoint_is_bound(epointd)){ - errno = EPERM; - return -1; - }*/ + int ret; - /*if (flags && MSG_DONTWAIT) - msgb = fwp_msgb_alloc(buffer_size); - else {*/ - if (!(msgb = fwp_msgb_alloc(size))) { - errno = ENOMEM; - return -1; - } + if (!ep->vres) + return FRSH_ERR_NOT_BOUND; - /*msgb->peer = &ep->peer;*/ - /*msgb->data = msg;*/ - /*msgb->flags = epoint->flags;*/ - - /* data must be copied since msg may change while - * message is waiting in transmission queue - * */ - memcpy(msgb->data, msg, size); - fwp_msgb_put(msgb, size); - /*msgb->tail = msgb->data + size; - msgb->len = size;*/ - - /*}*/ - - /* TODO: test whether _fwp_vres_send is successful */ - return fwp_vres_send(ep->vres, msgb); + if (fwp_vres_consume_budget(ep->vres, size, false) == 0) + ret = fwp_endpoint_do_send(ep, msg, size); + else + ret = fwp_vres_enqueue(ep->vres, ep, msg, size); + return ret; +} + +int fwp_send_sync(struct fwp_endpoint *ep, void *msg, size_t size) +{ + int ret; + + if (!ep->vres) + return FRSH_ERR_NOT_BOUND; + + ret = fwp_vres_consume_budget(ep->vres, size, true); + if (ret) + return ret; + ret = fwp_endpoint_do_send(ep, msg, size); + return ret; } diff --git a/fwp/lib/fwp/fwp_endpoint.h b/fwp/lib/fwp/fwp_endpoint.h index 1835895..8ddee25 100644 --- a/fwp/lib/fwp/fwp_endpoint.h +++ b/fwp/lib/fwp/fwp_endpoint.h @@ -54,7 +54,6 @@ typedef enum { } fwp_endpoint_reliability_t; struct fwp_endpoint; -typedef struct fwp_endpoint fwp_endpoint_t; typedef unsigned int fwp_addr_t; @@ -71,23 +70,26 @@ struct fwp_endpoint_attr { #include "fwp_vres.h" -int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node, +int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, unsigned int *port, fwp_endpoint_attr_t *attr); int fwp_send_endpoint_create(unsigned int node, unsigned int port, fwp_endpoint_attr_t *attr, - fwp_endpoint_t **epoint); + struct fwp_endpoint **epoint); int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port, fwp_endpoint_attr_t *attr, - fwp_endpoint_t **epoint); -int fwp_endpoint_destroy(fwp_endpoint_t *ep); + struct fwp_endpoint **epoint); +int fwp_endpoint_destroy(struct fwp_endpoint *ep); -int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres); -int fwp_send_endpoint_unbind(fwp_endpoint_t *ep); +int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres); +int fwp_send_endpoint_unbind(struct fwp_endpoint *ep); -ssize_t fwp_recv(fwp_endpoint_t *ep, +ssize_t fwp_recv(struct fwp_endpoint *ep, void *buffer, const size_t buffer_size, unsigned int *from, int flags); -int fwp_send(fwp_endpoint_t *ep, const void *msg, const size_t size); +int fwp_send_sync(struct fwp_endpoint *ep, void *msg, const size_t size); +int fwp_send_async(struct fwp_endpoint *ep, void *msg, const size_t size); +ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep, + void *data, const size_t size); int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr); static inline int fwp_endpoint_attr_setreliability(fwp_endpoint_attr_t *attr, diff --git a/fwp/lib/fwp/fwp_msgb.c b/fwp/lib/fwp/fwp_msgb.c index c5193c7..5aa9bb6 100644 --- a/fwp/lib/fwp/fwp_msgb.c +++ b/fwp/lib/fwp/fwp_msgb.c @@ -77,7 +77,6 @@ struct fwp_msgb* fwp_msgb_alloc(size_t buf_size) msgb->len = 0; msgb->data = (unsigned char*) msgb + sizeof(struct fwp_msgb); msgb->tail = msgb->data; - msgb->peer = NULL; return msgb; } diff --git a/fwp/lib/fwp/fwp_msgb.h b/fwp/lib/fwp/fwp_msgb.h index 95b683d..f57a4b7 100644 --- a/fwp/lib/fwp/fwp_msgb.h +++ b/fwp/lib/fwp/fwp_msgb.h @@ -67,8 +67,6 @@ struct fwp_msgb { size_t len; /**< msg data length*/ unsigned char *data; /**< msg data */ unsigned char *tail; /**< msg data end*/ - struct fwp_sockaddr *peer; /**< peer address*/ - /*int flags; MSG_DONTWAIT for async*/ } fwp_msgb_t; struct fwp_msgb* fwp_msgb_alloc(size_t buf_size); diff --git a/fwp/lib/fwp/fwp_msgq.c b/fwp/lib/fwp/fwp_msgq.c index 0fcc73d..1af89bd 100644 --- a/fwp/lib/fwp/fwp_msgq.c +++ b/fwp/lib/fwp/fwp_msgq.c @@ -124,6 +124,21 @@ struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq) return msgb; } +struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq) +{ + struct fwp_msgb* msgb; + + pthread_mutex_lock(&msgq->lock); + + if (msgq->nr_pending > 0) + msgb = msgq->queue[msgq->out]; + else + msgb = NULL; + pthread_mutex_unlock(&msgq->lock); + + return msgb; +} + /* * Dequeue all messages from message queue * diff --git a/fwp/lib/fwp/fwp_msgq.h b/fwp/lib/fwp/fwp_msgq.h index 5ab0ae0..d30a048 100644 --- a/fwp/lib/fwp/fwp_msgq.h +++ b/fwp/lib/fwp/fwp_msgq.h @@ -73,6 +73,7 @@ void fwp_msgq_init(struct fwp_msgq *msgq); int fwp_msgq_enqueue(struct fwp_msgq *msgq, struct fwp_msgb* msgb); struct fwp_msgb* fwp_msgq_dequeue(struct fwp_msgq *msgq); +struct fwp_msgb* fwp_msgq_peek(struct fwp_msgq *msgq); void fwp_msgq_dequeue_all(struct fwp_msgq *msgq); /* void fwp_msgq_setpolicy */ diff --git a/fwp/lib/fwp/fwp_vres.c b/fwp/lib/fwp/fwp_vres.c index 9a203e2..1177a4e 100644 --- a/fwp/lib/fwp/fwp_vres.c +++ b/fwp/lib/fwp/fwp_vres.c @@ -1,6 +1,6 @@ /**************************************************************************/ /* ---------------------------------------------------------------------- */ -/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */ +/* Copyright (C) 2006 - 2009 FRESCOR consortium partners: */ /* */ /* Universidad de Cantabria, SPAIN */ /* University of York, UK */ @@ -43,6 +43,7 @@ /* however invalidate any other reasons why the executable file might be */ /* covered by the GNU Public License. */ /**************************************************************************/ + #include "fwp_utils.h" #include "fwp_vres.h" @@ -57,14 +58,17 @@ static void* fwp_vres_tx_thread(void *_vres); typedef enum { - FWP_VF_USED = 0, - FWP_VF_BOUND = 1, + USED, + UNTOUCHED, + CHANGED, + QUEUED, } fwp_vres_flag_t; fwp_vres_params_t fwp_vres_params_default = { .ac_id = FWP_AC_VO, .budget = 100, - .period = {.tv_sec = 2 , .tv_nsec = 111111} + .period = {.tv_sec = 2 , .tv_nsec = 111111}, + .src = { 0 }, }; /** @@ -74,15 +78,22 @@ fwp_vres_params_t fwp_vres_params_default = { */ struct fwp_vres{ struct fwp_vres_params params; - /* consideration: move tx_queue to endpoint */ - /**< queue for messages to send */ - struct fwp_msgq tx_queue; fwp_vres_flag_t flags; + pthread_mutex_t mutex; + pthread_cond_t cond; /**< Signalizes budget replenishment */ + fwp_budget_t budget; /**< Current remaining budget */ + fwp_period_t period; /**< Period for this "activation" */ + struct timespec replenish_at; /**< Time of next replenishment */ + sem_t consumed; /**< endpoint bounded to this vres */ - /*fwp_endpoint_t *epoint; */ + struct fwp_endpoint *epoint; pthread_t tx_thread; /**< tx_thread id*/ pthread_attr_t tx_thread_attr; - int ac_sockd; /**< ac socket descriptor */ + /** Copy of bound enpoint's socket - used for future changes + * of vres parameters. */ + int ac_sockd; + /** Queue for messages to send */ + struct fwp_msgq msg_queue; }; typedef @@ -127,97 +138,30 @@ static inline int fwp_vres_set_ac(int sockd, fwp_ac_t ac_id) return 0; } -static inline void fwp_vres_set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) +static inline void set_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) { vres->flags |= (1 << flag); } -static inline void fwp_vres_clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) +static inline void clear_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) { vres->flags &= ~(1 << flag); } -static inline int fwp_vres_get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) +static inline int get_flag(fwp_vres_t *vres, fwp_vres_flag_t flag) { return !!(vres->flags & (1 << flag)); } -static inline void fwp_vres_clearall_flag(fwp_vres_t *vres) +static inline void clear_all_flags(fwp_vres_t *vres) { vres->flags = 0; } -#if 0 -/* Deprecated */ -static int fwp_vres_ac_open(fwp_ac_t ac_id) -{ - int sockd; - unsigned int tos; - - if ((ac_id < 0)||(ac_id >= FWP_AC_NUM)) { - errno = EINVAL; - return -1; - } - - if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - FWP_ERROR("Unable to open socket for AC: %s", strerror(errno)); - return (-1); - } - - tos = ac_to_tos[ac_id]; - - if (setsockopt(sockd, SOL_IP, IP_TOS, &tos, sizeof(tos)) == -1) { - int e = errno; - FWP_ERROR("setsockopt(IP_TOS): %s", strerror(errno)); - close(sockfd); - errno = e; - return -1; - } - - return sockd; -} -#endif - -static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb) -{ - struct iovec iov; - struct msghdr msg = {0}; - ssize_t ret; - char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))]; - - iov.iov_base = msgb->data; - iov.iov_len = msgb->len; - - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - - - - if (vres->params.src.s_addr != 0) { - struct cmsghdr *cmsg; - struct in_pktinfo *ipi; - - memset(cmsg_buf, 0, sizeof(cmsg_buf)); - - msg.msg_control = cmsg_buf; - msg.msg_controllen = sizeof(cmsg_buf); - - cmsg = CMSG_FIRSTHDR(&msg); - - cmsg->cmsg_level = SOL_IP; - cmsg->cmsg_type = IP_PKTINFO; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)); - - ipi = (struct in_pktinfo*)CMSG_DATA(cmsg); - ipi->ipi_spec_dst = vres->params.src; - } - ret = sendmsg(vres->ac_sockd, &msg, 0); - return ret; -} - static inline void fwp_vres_free(fwp_vres_t *vres) { - fwp_vres_clearall_flag(vres); + /* Clear USED flag */ + clear_all_flags(vres); } static inline int fwp_vres_is_valid(fwp_vres_t *vres) @@ -225,7 +169,7 @@ static inline int fwp_vres_is_valid(fwp_vres_t *vres) int id = vres - fwp_vres_table.entry; if ((id < 0) || (id > fwp_vres_table.max_vres - 1) || - (!fwp_vres_get_flag(vres, FWP_VF_USED))) + (!get_flag(vres, USED))) return 0; return 1; @@ -273,7 +217,7 @@ fwp_vres_t *fwp_vres_alloc() i = 0; max_vres = fwp_vres_table.max_vres; while ((i < max_vres) && - (fwp_vres_get_flag(&fwp_vres_table.entry[i], FWP_VF_USED))) { + (get_flag(&fwp_vres_table.entry[i], USED))) { i++; } @@ -284,22 +228,22 @@ fwp_vres_t *fwp_vres_alloc() } FWP_DEBUG("Allocated vres id = %d\n",i); - fwp_vres_set_flag(&fwp_vres_table.entry[i], FWP_VF_USED); + set_flag(&fwp_vres_table.entry[i], USED); pthread_mutex_unlock(&fwp_vres_table.lock); return (&fwp_vres_table.entry[i]); } -inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params) +static int apply_params(fwp_vres_t *vres) { int rv; - - /* copy vres paramters into vres structure */ - rv = fwp_vres_set_ac(vres->ac_sockd, params->ac_id); - if (!rv) - return rv; - memcpy(&vres->params, params, sizeof(struct fwp_vres_params)); - - return 0; + vres->period = vres->params.period; + vres->budget = vres->params.budget; + set_flag(vres, UNTOUCHED); + if (get_flag(vres, CHANGED)) { + clear_flag(vres, CHANGED); + rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id); + } + return rv; } /** @@ -314,12 +258,30 @@ inline int _fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params) */ int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params) { + int rv = 0; + if (!fwp_vres_is_valid(vres)) { errno = EINVAL; return -1; } - return _fwp_vres_set_params(vres, params); + pthread_mutex_lock(&vres->mutex); + + if (vres->epoint && + params->src.s_addr != vres->params.src.s_addr) { + errno = EREMCHG; + rv = -1; + goto out; + } + vres->params = *params; + if (vres->epoint) { + set_flag(vres, CHANGED); + if (get_flag(vres, UNTOUCHED)) + rv = apply_params(vres); + } +out: + pthread_mutex_unlock(&vres->mutex); + return rv; } /** @@ -342,10 +304,18 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp) errno = ENOMEM; return -1; } - /* initialize msg queue */ - fwp_msgq_init(&vres->tx_queue); + + pthread_mutexattr_t ma; + rv = pthread_mutexattr_init(&ma); + rv = pthread_mutexattr_setprotocol(&ma, PTHREAD_PRIO_INHERIT); + if (rv) return rv; + pthread_mutex_init(&vres->mutex, &ma); + pthread_cond_init(&vres->cond, NULL); - memcpy(&vres->params, params, sizeof(struct fwp_vres_params)); + vres->params = *params; + apply_params(vres); + fwp_msgq_init(&vres->msg_queue); + pthread_attr_init(&vres->tx_thread_attr); if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr, fwp_vres_tx_thread, (void*) vres)) != 0){ @@ -353,8 +323,10 @@ int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp) } *vresp = vres; + return 0; err: + fwp_msgq_dequeue_all(&vres->msg_queue); fwp_vres_free(vres); return -1; } @@ -376,25 +348,145 @@ int fwp_vres_destroy(fwp_vres_t *vres) } pthread_cancel(vres->tx_thread); + pthread_cond_destroy(&vres->cond); + pthread_mutex_destroy(&vres->mutex); + fwp_msgq_dequeue_all(&vres->msg_queue); + fwp_vres_free(vres); + FWP_DEBUG("Vres destroyed.\n"); return 0; } -static void fwp_vres_cleanup(void *_vres) +static void do_consume_budget(struct fwp_vres *vres, size_t size) { - fwp_vres_t *vres = (fwp_vres_t*)_vres; + if (get_flag(vres, UNTOUCHED)) { + /* Setup next replenish time */ + struct timespec now; + clear_flag(vres, UNTOUCHED); + if (get_flag(vres, QUEUED)) + now = vres->replenish_at; + else + clock_gettime(CLOCK_MONOTONIC, &now); + fwp_timespec_add(&vres->replenish_at, &now, &vres->period); + sem_post(&vres->consumed); + } + vres->budget -= size; +} - fwp_msgq_dequeue_all(&vres->tx_queue); - fwp_vres_free(vres); +int __consume_budget(struct fwp_vres *vres, size_t size, bool can_block) +{ + int ret = 0; + if (vres->params.budget < size) { + errno = ENOSR; + return -1; + } + while (can_block && vres->budget < size) { + ret = pthread_cond_wait(&vres->cond, &vres->mutex); + /* The budget might have been changed while we were + * waiting, so check it again. */ + if (vres->params.budget < size) { + errno = ENOSR; + return -1; + } + } + if (ret == 0) { + if (vres->budget >= size) { + do_consume_budget(vres, size); + ret = 0; + } else { + set_flag(vres, QUEUED); + ret = 1; + } + } + return ret; +} + +/** + * Tries to consume (a part of) budget + * + * @param vres VRES whose budget is conumed. + * @param size How much to consume (in bytes). + * @param can_block True, indicates that the function can block to + * wait for budget replenishment. False causes no blocking and if 1 is + * returned, the calles must call fwp_vres_enqueue() to enqueue the + * packet to be sent later after replenishing. + * + * @return Zero if budget was consumed, 1 if there is not enough + * budget available and blocking was not allowed, -1 in case of error. + */ +int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block) +{ + int ret = 0; + pthread_mutex_lock(&vres->mutex); + ret = __consume_budget(vres, size, can_block); + pthread_mutex_unlock(&vres->mutex); + return ret; +} + +int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size) +{ + struct fwp_msgb *msgb; + int ret; + + if (!(msgb = fwp_msgb_alloc(size))) + return -1; + memcpy(msgb->data, msg, size); + fwp_msgb_put(msgb, size); + ret = fwp_msgq_enqueue(&vres->msg_queue, msgb); + if (ret) { + fwp_msgb_free(msgb); + return ret; + } + return ret; } -static inline void -fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, - fwp_budget_t *budget) +static void wait_for_replenish(struct fwp_vres *vres) { - *period = vres->params.period; - *budget = vres->params.budget; + sem_wait(&vres->consumed); + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, + &vres->replenish_at, NULL); +} + +static void send_queue(struct fwp_vres *vres) +{ + struct fwp_msgb *msgb; + bool can_send; + msgb = fwp_msgq_dequeue(&vres->msg_queue); + can_send = (0 == __consume_budget(vres, msgb->len, false)); + if (!can_send) { + fwp_msgb_free(msgb); + FWP_ERROR("Cannot send queued packet (budget decreased?)\n"); + return; + } + /* If we cannot send the whole queue, the flag will be set + * later by __consume_budget(). */ + clear_flag(vres, QUEUED); + + while (msgb) { + fwp_endpoint_do_send(vres->epoint, msgb->data, msgb->len); + fwp_msgb_free(msgb); + msgb = fwp_msgq_peek(&vres->msg_queue); + if (msgb) { + can_send = (0 == __consume_budget(vres, msgb->len, false)); + if (can_send) { + msgb = fwp_msgq_dequeue(&vres->msg_queue); + } else { + msgb = NULL; + return; + } + } + } +} + +static void replenish(struct fwp_vres *vres) +{ + pthread_mutex_lock(&vres->mutex); + apply_params(vres); + if (get_flag(vres, QUEUED)) + send_queue(vres); + pthread_cond_broadcast(&vres->cond); + pthread_mutex_unlock(&vres->mutex); } /** @@ -404,112 +496,45 @@ fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period, static void* fwp_vres_tx_thread(void *_vres) { struct fwp_vres *vres = (struct fwp_vres*)_vres; - struct fwp_msgq *msgq = &vres->tx_queue; - struct fwp_msgb *msgb = NULL; unsigned int ac_id = vres->params.ac_id; - fwp_budget_t budget = vres->params.budget; - fwp_budget_t curr_budget; - int rc; - struct timespec start, period, interval, now; fwp_set_rt_prio(90 - ac_id); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - pthread_cleanup_push(fwp_vres_cleanup, (void*)vres); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - /* just for sure */ - fwp_vres_sched_update(vres, &period, &budget); - curr_budget = budget; - clock_gettime(CLOCK_MONOTONIC, &start); - while (1) { - msgb = fwp_msgq_dequeue(msgq); - if (msgb->len > budget) { - FWP_ERROR("Message too large: %zd -> skipping\n", - msgb->len); - goto skip_message; - } - if (curr_budget < msgb->len) { - /* needs to replenish */ - clock_gettime(CLOCK_MONOTONIC, &now); - fwp_timespec_sub(&interval, &now, &start); - ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec); - fwp_timespec_sub(&interval, &period, &interval); - if (interval.tv_sec > 0 || - (interval.tv_sec == 0 && interval.tv_nsec > 0)) { - /* We have to wait to replenish */ - ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec); - nanosleep(&interval, NULL); - fwp_timespec_add(&start, &now, &interval); - } else { - start = now; - } - fwp_vres_sched_update(vres, &period, &budget); - curr_budget = budget; - } - - rc = _fwp_vres_send(vres, msgb); - if (!(rc < 0)) { - FWP_DEBUG("Message sent through AC%d\n",ac_id); - curr_budget -= msgb->len; - } else { - FWP_ERROR("Message sent error %d\n",rc); - } - skip_message: - fwp_msgb_free(msgb); - - /*pthread_testcancel(); */ - /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - - fwp_timespec_add(&end_period, &start_period, &period); - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - fwp_timespec_sub(&interval, &end_period, ¤t_time); - nanosleep(&interval, NULL); - */ + wait_for_replenish(vres); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + replenish(vres); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } - - /* it should normaly never come here */ - pthread_cleanup_pop(0); - fwp_vres_free(vres); - - return NULL; -} -int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb) -{ - if (fwp_vres_is_valid(vres)) { - return fwp_msgq_enqueue(&vres->tx_queue, msgb); - } else { - errno = EINVAL; - return -1; - } + return NULL; } -/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/ -int fwp_vres_bind(fwp_vres_t *vres, int sockd) +/*int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *epoint)*/ +int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd) { int rv = 0; - pthread_mutex_lock(&fwp_vres_table.lock); if (!fwp_vres_is_valid(vres)) { errno = EINVAL; rv = -1; goto err; } - if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */ + if (vres->epoint) { /*if already bounded */ errno = EBUSY; rv = -1; goto err; } vres->ac_sockd = sockd; - rv = fwp_vres_set_ac(vres->ac_sockd,vres->params.ac_id); - /*if (rv) - goto err;*/ - fwp_vres_set_flag(vres, FWP_VF_BOUND); + rv = fwp_vres_set_ac(vres->ac_sockd, vres->params.ac_id); + if (rv) + goto err; + vres->epoint = ep; err: - pthread_mutex_unlock(&fwp_vres_table.lock); return rv; } @@ -519,8 +544,10 @@ int fwp_vres_unbind(fwp_vres_t *vres) errno = EINVAL; return -1; } - fwp_vres_clear_flag(vres, FWP_VF_BOUND); + pthread_mutex_lock(&vres->mutex); + vres->epoint = NULL; + pthread_mutex_unlock(&vres->mutex); /* TODO: consider what to do with pending messages */ - /* fwp_vres_free_msgb(vres->tx_queue); */ + fwp_msgq_dequeue_all(&vres->msg_queue); return 0; } diff --git a/fwp/lib/fwp/fwp_vres.h b/fwp/lib/fwp/fwp_vres.h index 65bfd91..4d36ffb 100644 --- a/fwp/lib/fwp/fwp_vres.h +++ b/fwp/lib/fwp/fwp_vres.h @@ -47,6 +47,7 @@ #define _FWP_VRES_H #include +#include struct fwp_vres; typedef struct fwp_vres fwp_vres_t; @@ -81,8 +82,6 @@ struct fwp_vres_params { /** all time units are in microseconds */ fwp_period_t period; fwp_ac_t ac_id; /**< AC id ~ priority of vres */ - /** Forced source address. If non-zero, packets are sent over - * the specified interface. */ struct in_addr src; } fwp_vres_params_t; @@ -94,8 +93,10 @@ int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params); int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp); int fwp_vres_destroy(fwp_vres_t *vres); -int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb); -int fwp_vres_bind(fwp_vres_t *vres, int sockd); +int fwp_vres_consume_budget(struct fwp_vres *vres, size_t size, bool can_block); +struct fwp_endpoint; +int fwp_vres_enqueue(struct fwp_vres *vres, struct fwp_endpoint *ep, void *msg, size_t size); +int fwp_vres_bind(fwp_vres_t *vres, struct fwp_endpoint *ep, int sockd); int fwp_vres_unbind(fwp_vres_t *vres); extern fwp_vres_params_t fwp_vres_params_default; diff --git a/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test1.c b/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test1.c index d32bd33..ff7f062 100644 --- a/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test1.c +++ b/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test1.c @@ -28,7 +28,7 @@ int main() char msg1[] = "Hello1"; char msg2[] = "Hello2"; char buffer[30]; - fwp_endpoint_t *sepoint1, *sepoint2, *repoint1, *repoint2; + struct fwp_endpoint *sepoint1, *sepoint2, *repoint1, *repoint2; fwp_endpoint_attr_t attr; fwp_addr_t from; @@ -85,8 +85,8 @@ int main() printf("Send endpoint 2 created\n"); fwp_send_endpoint_bind(sepoint2, vres2); - fwp_send(sepoint1, msg1, sizeof(msg1)); - fwp_send(sepoint1, msg2, sizeof(msg2)); + fwp_send_sync(sepoint1, msg1, sizeof(msg1)); + fwp_send_sync(sepoint1, msg2, sizeof(msg2)); for (i = 0; i < 2; i++) { if ((len = fwp_recv(repoint1, buffer, sizeof(buffer), &from, diff --git a/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test2.c b/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test2.c index 3e5c5b1..c42de6a 100644 --- a/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test2.c +++ b/fwp/lib/fwp/tests/fwp_prototest/fwp_sendrecv_test2.c @@ -25,7 +25,7 @@ fwp_endpoint_attr_t attr; void* receiver(void* arg) { - fwp_endpoint_t *repoint1; + struct fwp_endpoint *repoint1; int i,len; char buffer[30]; fwp_addr_t from; @@ -60,7 +60,7 @@ int main() struct fwp_vres_params vparam1, vparam2; char msg1[] = "Hello1"; char msg2[] = "Hello2"; - fwp_endpoint_t *sepoint1; + struct fwp_endpoint *sepoint1; pthread_t thread; vparam1.ac_id = FWP_AC_VO; @@ -97,9 +97,9 @@ int main() printf("Send endpoint 1 created\n"); fwp_send_endpoint_bind(sepoint1, vres1); - fwp_send(sepoint1, msg1, sizeof(msg1)); + fwp_send_sync(sepoint1, msg1, sizeof(msg1)); printf("Sent msg1\n"); - fwp_send(sepoint1, msg2, sizeof(msg2)); + fwp_send_sync(sepoint1, msg2, sizeof(msg2)); printf("Sent msg2\n"); pthread_join(thread, (void**) NULL); diff --git a/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest1.c b/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest1.c index 27a2395..fc0faab 100644 --- a/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest1.c +++ b/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest1.c @@ -29,7 +29,7 @@ int main() struct fwp_vres_params vparam1; char msg1[15]; char buffer[30]; - fwp_endpoint_t *sepoint, *repoint; + struct fwp_endpoint *sepoint, *repoint; int count; struct timespec sendtime; fwp_endpoint_attr_t attr; @@ -76,7 +76,7 @@ int main() for (count = 0; count < NUM; count++) { sprintf(msg1,"msg%d",count); - fwp_send(sepoint, msg1, sizeof(msg1)); + fwp_send_sync(sepoint, msg1, sizeof(msg1)); clock_gettime(CLOCK_MONOTONIC, &sendtime); printf("Sent: sec = %ld nsec = %ld \n", sendtime.tv_sec, diff --git a/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest2.c b/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest2.c index d4dbe5c..e43ffd4 100644 --- a/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest2.c +++ b/fwp/lib/fwp/tests/fwp_vrestest/fwp_vrestest2.c @@ -31,7 +31,7 @@ fwp_endpoint_attr_t attr; void* sender() { - fwp_endpoint_t *sepoint; + struct fwp_endpoint *sepoint; fwp_vres_t *vres; struct fwp_vres_params vparam1; char msg1[10]; @@ -65,7 +65,7 @@ void* sender() while (count < NUM){ count++; sprintf(msg1,"msg%d sent\n",count); - fwp_send(sepoint, msg1, sizeof(msg1)); + fwp_send_sync(sepoint, msg1, sizeof(msg1)); printf(msg1); /*clock_gettime(CLOCK_MONOTONIC, &sendtime); @@ -91,7 +91,7 @@ void* receiver() { ssize_t len; char buffer[30]; - fwp_endpoint_t *repoint; + struct fwp_endpoint *repoint; int count; struct timespec recvtime; fwp_addr_t from; diff --git a/fwp/tests/timing/fwp-timing.c b/fwp/tests/timing/fwp-timing.c index d344489..a6f8c6d 100644 --- a/fwp/tests/timing/fwp-timing.c +++ b/fwp/tests/timing/fwp-timing.c @@ -24,6 +24,7 @@ int opt_budget = 1024; int opt_period = 20; +bool opt_async = false; struct in_addr src, dst; @@ -139,6 +140,7 @@ static struct option long_opts[] = { { "budget", 1, 0, 'b' }, { "source", 1, 0, 's' }, { "dest", 1, 0, 'd' }, + { "async", 0, 0, 'a' }, { 0, 0, 0, 0} }; @@ -150,6 +152,7 @@ usage(void) printf(" -b, --budget how many bytes is sent in each period\n"); printf(" -s, --source source IP address\n"); printf(" -d, --dest destination IP address and port\n"); + printf(" -a, --async Send packets asynchronously\n"); } void parse_opts(int argc, char *argv[]) @@ -157,8 +160,11 @@ void parse_opts(int argc, char *argv[]) char opt; int ret; - while ((opt = getopt_long(argc, argv, "b:p:s:d:", long_opts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) { switch (opt) { + case 'a': + opt_async = true; + break; case 'b': opt_budget = atoi(optarg); break; @@ -245,6 +251,7 @@ void *receiver(void *arg) frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg; size_t mlen; int ret; + int last_cnt = -1; struct timespec tss, tsr; struct msg *msg; msg = malloc(opt_budget); @@ -254,8 +261,11 @@ void *receiver(void *arg) ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL); clock_gettime(CLOCK_MONOTONIC, &tsr); tss = msg->ts; + if (msg->cnt != last_cnt+1) + printf("packet(s) lost!\n"); printf("%10d: %10.3lf ms\n", msg->cnt, tsdiff2d(&tsr, &tss)*1000); + last_cnt = msg->cnt; } return NULL; } @@ -288,20 +298,25 @@ void run() if (signal(SIGINT, stopper) == SIG_ERR) error(1, errno, "Signal handler registration error"); - struct timespec next_period, tss; + struct timespec next_period; + struct timespec tss; clock_gettime(CLOCK_MONOTONIC, &next_period); while (!exit_flag) { clock_gettime(CLOCK_MONOTONIC, &tss); msg->cnt = cnt++; msg->ts = tss; - ret = frsh_send_async(epsrc, msg, opt_budget); - - next_period.tv_nsec += opt_period * 1000000; + if (opt_async) + ret = frsh_send_async(epsrc, msg, opt_budget); + else { + ret = frsh_send_sync(epsrc, msg, opt_budget); + clock_gettime(CLOCK_MONOTONIC, &next_period); + } + next_period.tv_sec += (opt_period/1000); + next_period.tv_nsec += (opt_period%1000) * 1000000; if (next_period.tv_nsec >= 1000000000) { next_period.tv_nsec -= 1000000000; next_period.tv_sec++; } - clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_period, NULL); }