#include "fwp_msgq.h"
#include "fwp_endpoint.h"
+#include "fwp_debug.h"
#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
static void* fwp_vres_tx_thread(void *_vres);
typedef enum {
- FWP_VF_USED = 1 ,
- FWP_VF_BOUND = 2 ,
- FWP_VF_RESCHED = 4 ,
+ FWP_VF_USED = 0,
+ FWP_VF_BOUND = 1,
+ FWP_VF_CHANGED = 2,
} fwp_vres_flag_t;
fwp_vres_params_t fwp_vres_params_default = {
- .id = 0,
.ac_id = FWP_AC_VO,
.budget = 100,
.period = {.tv_sec = 2 , .tv_nsec = 111111}
}
#endif
-static inline int _fwp_vres_send(unsigned int ac_sockd, struct fwp_msgb* msgb)
+static inline int _fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
{
- /*_fwp_sendto(ac_sockd, msgb->data, msgb->len, 0,
- msgb->peer->addr, msgb->peer->addrlen);*/
- return _fwp_send(ac_sockd, msgb->data, msgb->len, 0);
+ struct iovec iov;
+ struct msghdr msg = {0};
+ ssize_t ret;
+ char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+ iov.iov_base = msgb->data;
+ iov.iov_len = msgb->len;
+
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+
+
+ if (vres->params.src.s_addr != 0) {
+ struct cmsghdr *cmsg;
+ struct in_pktinfo *ipi;
+
+ memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+ msg.msg_control = cmsg_buf;
+ msg.msg_controllen = sizeof(cmsg_buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ ipi->ipi_spec_dst = vres->params.src;
+ }
+ ret = sendmsg(vres->ac_sockd, &msg, 0);
+ return ret;
}
static inline void fwp_vres_free(fwp_vres_t *vres)
return 0;
}
-fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd)
+fwp_vres_id_t fwp_vres_get_id(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
return (vres - fwp_vres_table.entry);
}
*
* \return On success returns vres descriptor.
*/
-fwp_vres_d_t fwp_vres_alloc()
+fwp_vres_t *fwp_vres_alloc()
{
int i;
unsigned int max_vres;
if (!rv)
return rv;
memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_set_flag(vres, FWP_VF_CHANGED);
return 0;
}
/**
* Set vres params
*
- * \param[in] vresdp Vres descriptor
+ * \param[in] vresp Vres descriptor
* \param[in] params Vres parameters
*
* \return On success returns zero.
* On error, negative error code is returned.
*
*/
-int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params)
+int fwp_vres_set_params(fwp_vres_t *vres, fwp_vres_params_t *params)
{
- fwp_vres_t *vres = vresd;
-
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
* Creates new vres
*
* \param[in] params Vres parameters
- * \param[out] vresdp Pointer to the descriptor of newly created vres
+ * \param[out] vresp Pointer to the descriptor of newly created vres
*
* \return On success returns descriptor of vres.
* On error, negative error code is returned.
*
*/
-int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp)
+int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_t **vresp)
{
int rv;
fwp_vres_t *vres;
fwp_msgq_init(&vres->tx_queue);
memcpy(&vres->params, params, sizeof(struct fwp_vres_params));
- fwp_vres_set_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_set_flag(vres, FWP_VF_CHANGED);
pthread_attr_init(&vres->tx_thread_attr);
if ((rv = pthread_create(&vres->tx_thread, &vres->tx_thread_attr,
fwp_vres_tx_thread, (void*) vres)) != 0){
goto err;
}
- *vresdp = vres;
+ *vresp = vres;
return 0;
err:
fwp_vres_free(vres);
/**
* Destroys vres
*
- * \param[in] vresd Vres descriptor
+ * \param[in] vres Vres descriptor
*
* \return On success returns 0.
* On error, negative error code is returned.
*
*/
-int fwp_vres_destroy(fwp_vres_d_t vresd)
+int fwp_vres_destroy(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
pthread_cancel(vres->tx_thread);
- FWP_DEBUG("Vres vparam_id=%d destroyed.\n", vres->params.id);
+ FWP_DEBUG("Vres destroyed.\n");
return 0;
}
fwp_vres_sched_update(fwp_vres_t *vres, struct timespec *period,
fwp_budget_t *budget)
{
- if (fwp_vres_get_flag(vres, FWP_VF_RESCHED)) {
+ if (fwp_vres_get_flag(vres, FWP_VF_CHANGED)) {
/*period->tv_nsec = vres->params.period % SEC_TO_USEC;
period->tv_sec = vres->params.period / SEC_TO_USEC;*/
*period = vres->params.period;
FWP_DEBUG("Vres tx thread with budget=%ld period_sec=%ld "
"period_nsec=%ld.\n",vres->params.budget,
period->tv_sec, period->tv_nsec);
- fwp_vres_clear_flag(vres, FWP_VF_RESCHED);
+ fwp_vres_clear_flag(vres, FWP_VF_CHANGED);
}
}
fwp_budget_t budget = vres->params.budget;
fwp_budget_t curr_budget;
int rc;
- struct timespec start_period, period, interval, current_time;
+ struct timespec start, period, interval, now;
fwp_set_rt_prio(90 - ac_id);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
/* just for sure */
fwp_vres_sched_update(vres, &period, &budget);
- clock_gettime(CLOCK_MONOTONIC, &start_period);
curr_budget = budget;
+ clock_gettime(CLOCK_MONOTONIC, &start);
while (1) {
- /* wait for next period and then send */
- /*clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, &end_period, ¤t_time);
- nanosleep(&interval, NULL);*/
-
- /*while((rc = sem_wait(&msgq->empty_lock, &end_period))==-1
- && errno == EINTR) {
- continue;
- }*/
-
msgb = fwp_msgq_dequeue(msgq);
- fwp_vres_sched_update(vres, &period, &budget);
- if ((curr_budget + msgb->len) > budget) {
- /* need to recharge */
- clock_gettime(CLOCK_MONOTONIC, ¤t_time);
- fwp_timespec_sub(&interval, ¤t_time, &start_period);
- fwp_timespec_modulo(&interval, &interval, &period);
+ if (msgb->len > budget) {
+ FWP_ERROR("Message too large: %zd -> skipping\n",
+ msgb->len);
+ goto skip_message;
+ }
+ if (curr_budget < msgb->len) {
+ /* needs to replenish */
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fwp_timespec_sub(&interval, &now, &start);
+ ul_logtrash("start=%ld.%09ld, now=%ld.%09ld diff=%ld.%09ld\n", (long)start.tv_sec, (long)start.tv_nsec, (long)now.tv_sec, (long)now.tv_nsec, (long)interval.tv_sec, (long)interval.tv_nsec);
fwp_timespec_sub(&interval, &period, &interval);
- nanosleep(&interval, NULL);
- curr_budget = 0;
- clock_gettime(CLOCK_MONOTONIC, &start_period);
+ if (interval.tv_sec > 0 ||
+ (interval.tv_sec == 0 && interval.tv_nsec > 0)) {
+ /* We have to wait to replenish */
+ ul_logtrash("sleeping=%ld.%09ld\n", (long)interval.tv_sec, (long)interval.tv_nsec);
+ nanosleep(&interval, NULL);
+ fwp_timespec_add(&start, &now, &interval);
+ } else {
+ start = now;
+ }
+ fwp_vres_sched_update(vres, &period, &budget);
+ curr_budget = budget;
}
- rc = _fwp_vres_send(vres->ac_sockd, msgb);
+ rc = _fwp_vres_send(vres, msgb);
if (!(rc < 0)) {
FWP_DEBUG("Message sent through AC%d\n",ac_id);
- curr_budget+= msgb->len;
+ curr_budget -= msgb->len;
} else {
- FWP_DEBUG("Message sent error %d\n",rc);
+ FWP_ERROR("Message sent error %d\n",rc);
}
-
+ skip_message:
fwp_msgb_free(msgb);
/*pthread_testcancel(); */
return NULL;
}
-int fwp_vres_send(fwp_vres_d_t vresd, struct fwp_msgb* msgb)
+int fwp_vres_send(fwp_vres_t *vres, struct fwp_msgb* msgb)
{
- fwp_vres_t *vres = vresd;
-
if (fwp_vres_is_valid(vres)) {
return fwp_msgq_enqueue(&vres->tx_queue, msgb);
} else {
- errno = EPERM; /* TODO: Use more appropriate error than EPERM. */
+ errno = EINVAL;
return -1;
}
}
-/*int fwp_vres_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)*/
-int fwp_vres_bind(fwp_vres_d_t vresd, int sockd)
+/*int fwp_vres_bind(fwp_vres_t *vres, fwp_endpoint_t *epoint)*/
+int fwp_vres_bind(fwp_vres_t *vres, int sockd)
{
- fwp_vres_t *vres = vresd;
int rv = 0;
pthread_mutex_lock(&fwp_vres_table.lock);
}
if (fwp_vres_get_flag(vres, FWP_VF_BOUND)) { /*if already bounded */
- errno = EPERM;
+ errno = EBUSY;
rv = -1;
goto err;
}
return rv;
}
-int fwp_vres_unbind(fwp_vres_d_t vresd)
+int fwp_vres_unbind(fwp_vres_t *vres)
{
- fwp_vres_t *vres = vresd;
-
- if (!fwp_vres_is_valid(vresd)) {
+ if (!fwp_vres_is_valid(vres)) {
errno = EINVAL;
return -1;
}