#include <error.h>
#include <sys/mman.h>
#include <semaphore.h>
+#include <ul_logreg.h>
bool opt_verbose = false;
+bool opt_quiet = false;
-#define HIST_MAX_US 1000000
+#define HIST_MAX_US 10000000
#define HIST_RES_US 10
struct histogram {
static struct sched_param param;
if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1) {
- fprintf(stderr, "warning: sched_get_priority_max failed");
+ fprintf(stderr, "warning: sched_get_priority_max failed\n");
}
if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1) {
- fprintf(stderr, "warning: sched_get_priority_min failed");
+ fprintf(stderr, "warning: sched_get_priority_min failed\n");
}
if (priority > maxpri) {
param.sched_priority = priority;
if (sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
- fprintf(stderr, "warning: sched_setscheduler failed");
+ fprintf(stderr, "warning: sched_setscheduler failed\n");
}
mlockall(MCL_CURRENT | MCL_FUTURE);
int number;
int id;
int count;
+ frsh_vres_id_t vres;
+ pthread_t thread;
+ int jitter;
};
-int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p)
{
frsh_contract_t contract;
int ret;
ret = fres_contract_add_fwp(contract, fwp);
if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
- ret = frsh_contract_negotiate(&contract, vres);
- if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
+ ret = frsh_contract_negotiate(&contract, &p->vres);
frsh_contract_destroy(&contract);
- return 0;
+ return ret;
}
void create_endpoints(struct stream_params *p,
- frsh_vres_id_t vres,
frsh_send_endpoint_t *epsrc,
frsh_receive_endpoint_t *epdst)
{
epsrc);
if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
- ret = frsh_send_endpoint_bind(vres, *epsrc);
+ ret = frsh_send_endpoint_bind(p->vres, *epsrc);
if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
}
static struct option long_opts[] = {
- { "period", 1, 0, 'p' },
- { "budget", 1, 0, 'b' },
- { "source", 1, 0, 's' },
- { "dest", 1, 0, 'd' },
- { "async", 0, 0, 'a' },
- { "number", 0, 0, 'n' },
- { "count", 0, 0, 'c' },
- { "verbose",0, 0, 'v' },
+ { "loglevel",required_argument, 0, 'l' },
+ { "period", required_argument, 0, 'p' },
+ { "budget", required_argument, 0, 'b' },
+ { "source", required_argument, 0, 's' },
+ { "dest", required_argument, 0, 'd' },
+ { "async", no_argument, 0, 'a' },
+ { "number", required_argument, 0, 'n' },
+ { "count", required_argument, 0, 'c' },
+ { "verbose",no_argument, 0, 'v' },
+ { "quiet", no_argument, 0, 'q' },
+ { "jitter", required_argument, 0, 'j' },
{ 0, 0, 0, 0}
};
usage(void)
{
printf("usage: fwp-timing [ options ]\n");
+ printf(" -l, --loglevel <number>|<domain>=<number>,...\n");
printf(" -p, --period <ms> period in miliseconds\n");
printf(" -b, --budget <bytes> how many bytes is sent in each period\n");
printf(" -s, --source <ip> source IP address\n");
printf(" -a, --async Send packets asynchronously\n");
printf(" -n, --number Number of streams with the same parameters\n");
printf(" -c, --count Number of messages to send [infinity]\n");
+ printf(" -q, --quiet Print only final statistics\n");
printf(" -/, --stream New stream separator\n");
printf(" -v, --verbose Be more verbose\n");
+ printf(" -j, --jitter <percent> Sent jitter given as percentage of period\n");
}
int parse_opts(int *argc, char **argv[], struct stream_params *p)
int ret;
bool options_found = false;
- while ((opt = getopt_long(*argc, *argv, "/ab:c:p:s:d:n:v", long_opts, NULL)) != -1) {
+ while ((opt = getopt_long(*argc, *argv, "/ab:c:d:j:l:n:p:qs:v", long_opts, NULL)) != -1) {
options_found = true;
switch (opt) {
case 'a':
exit(1);
}
break;
+ case 'j':
+ p->jitter = atoi(optarg);
+ break;
+ case 'l':
+ ul_log_domain_arg2levels(optarg);
+ break;
case 'n':
p->number = atoi(optarg);
break;
break;
case 'v':
opt_verbose = true;
+ break;
+ case 'q':
+ opt_quiet = true;
+ break;
case '/':
break;
default:
clock_gettime(CLOCK_MONOTONIC, &tsr);
tss = msg->ts;
if (msg->cnt != last_cnt+1) {
- printf("%3d: packet(s) lost!\n", rp->id);
+ if (!opt_quiet)
+ printf("%3d: packet(s) lost!\n", rp->id);
__sync_fetch_and_add(&stats.lost, msg->cnt - last_cnt+1);
} else {
hist_add(&hist, tsdiff2us(&tsr, &tss));
void *sender(void *arg)
{
struct stream_params *p = arg;
- frsh_vres_id_t vres;
frsh_send_endpoint_t epsrc;
struct receiver_params *rp;
int ret;
msg = malloc(p->budget);
if (!msg) error(1, errno, "malloc msg");
- negotiate_contract(p, &vres);
-
rp = malloc(sizeof(*rp));
rp->budget = p->budget;
rp->id = p->id;
-
- create_endpoints(p, vres, &epsrc, &rp->epdst);
-
+
+ create_endpoints(p, &epsrc, &rp->epdst);
+
set_rt_prio(50);
ret = pthread_create(&receiver_id, NULL, receiver, rp);
- if (signal(SIGTERM, stopper) == SIG_ERR)
- error(1, errno, "Error in signal registration");
- if (signal(SIGINT, stopper) == SIG_ERR)
- error(1, errno, "Signal handler registration error");
-
struct timespec next_period;
struct timespec tss;
clock_gettime(CLOCK_MONOTONIC, &next_period);
clock_gettime(CLOCK_MONOTONIC, &next_period);
}
__sync_fetch_and_add(&stats.sent, 1);
- next_period.tv_sec += (p->period_ms/1000);
- next_period.tv_nsec += (p->period_ms%1000) * 1000000;
+
+ int delay_ms;
+ if (p->jitter)
+ delay_ms = p->period_ms*(100-p->jitter)/100
+ + rand() % (2*p->period_ms*p->jitter/100);
+ else
+ delay_ms = p->period_ms;
+
+ next_period.tv_sec += (delay_ms/1000);
+ next_period.tv_nsec += (delay_ms%1000) * 1000000;
if (next_period.tv_nsec >= 1000000000) {
next_period.tv_nsec -= 1000000000;
next_period.tv_sec++;
&next_period, NULL);
}
- frsh_contract_cancel(vres);
- free(p);
+
sem_post(&finished);
return NULL;
}
-void print_stat(bool full)
+void print_stat(bool final)
{
- printf("Sent: %5d Received: %5d Lost: %5d Max: %6.3f ms",
+ printf("Sent: %5d Received: %5d Lost: %5d Max: %8.3f ms",
stats.sent, stats.received, stats.lost, hist.max/1000.0);
- if (full) {
- printf(" Max: %6.3f ms 90%%: %6.3f ms",
- hist_get_percentile(&hist, 100)/1000.0,
- hist_get_percentile(&hist, 90)/1000.0);
+ if (final) {
+ printf(" Packetloss: %7.3f %% 95%%: %8.3f ms 99%%: %8.3f ms\n",
+ 100.0*stats.lost/stats.sent,
+ hist_get_percentile(&hist, 95)/1000.0,
+ hist_get_percentile(&hist, 99)/1000.0);
}
- printf("\r");
+ else
+ printf("\r");
fflush(stdout);
}
int main(int argc, char *argv[])
{
int ret;
+ int i;
int num = 0;
+ bool negotiation_failure = false;
+
struct stream_params sp = {
.budget = 1024,
.period_ms = 20,
.number = 1,
.count = -1,
};
- struct stream_params *p = NULL;
+ struct stream_params *p[100];
+
+ memset(p, 0, sizeof(p));
+
+ if (signal(SIGTERM, stopper) == SIG_ERR)
+ error(1, errno, "Error in signal registration");
+ if (signal(SIGINT, stopper) == SIG_ERR)
+ error(1, errno, "Signal handler registration error");
sem_init(&finished, 0, 0);
do {
ret = parse_opts(&argc, &argv, &sp);
if (num == 0 || ret == 0) {
- int i;
for (i=0; i<sp.number; i++) {
- pthread_t thread;
- p = malloc(sizeof(*p));
- if (!p) error(1, errno, "malloc");
- *p = sp;
- p->id = num;
- pthread_create(&thread, NULL, sender, p);
- num++;
+ p[num] = malloc(sizeof(*p[0]));
+ if (!p[num]) error(1, errno, "malloc");
+ *p[num] = sp;
+ p[num]->id = num;
+ ret = negotiate_contract(p[num]);
+ if (!ret)
+ num++;
+ else {
+ PERROR_FRESCOR(ret, "frsh_contract_negotiate");
+ free(p[num]);
+ negotiation_failure = true;
+ break;
+ }
}
}
} while(ret == 0);
- while (!exit_flag) {
+ if (negotiation_failure) {
+ goto destroy;
+ }
+
+ for (i=0; i<num; i++)
+ pthread_create(&p[i]->thread, NULL, sender, p[i]);
+
+ while (!exit_flag && !opt_quiet) {
int v;
print_stat(false);
sem_getvalue(&finished, &v);
usleep(100000);
}
- while (num--)
- sem_wait(&finished);
+ for (i=0; i<num; i++)
+ pthread_join(p[i]->thread, NULL);
+destroy:
+ for (i=0; i<num; i++) {
+ frsh_contract_cancel(p[i]->vres);
+ free(p[i]);
+ }
+ stats.lost = stats.sent - stats.received;
print_stat(true);
- printf("\n");
- return 0;
+ return negotiation_failure ? 1 : 0;
}