#include <error.h>
#include <sys/mman.h>
#include <semaphore.h>
+#include <ul_logreg.h>
bool opt_verbose = false;
bool opt_quiet = false;
static struct sched_param param;
if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1) {
- fprintf(stderr, "warning: sched_get_priority_max failed");
+ fprintf(stderr, "warning: sched_get_priority_max failed\n");
}
if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1) {
- fprintf(stderr, "warning: sched_get_priority_min failed");
+ fprintf(stderr, "warning: sched_get_priority_min failed\n");
}
if (priority > maxpri) {
param.sched_priority = priority;
if (sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
- fprintf(stderr, "warning: sched_setscheduler failed");
+ fprintf(stderr, "warning: sched_setscheduler failed\n");
}
mlockall(MCL_CURRENT | MCL_FUTURE);
int number;
int id;
int count;
+ frsh_vres_id_t vres;
+ pthread_t thread;
};
-int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p)
{
frsh_contract_t contract;
int ret;
ret = fres_contract_add_fwp(contract, fwp);
if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
- ret = frsh_contract_negotiate(&contract, vres);
- if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
+ ret = frsh_contract_negotiate(&contract, &p->vres);
frsh_contract_destroy(&contract);
- return 0;
+ return ret;
}
void create_endpoints(struct stream_params *p,
- frsh_vres_id_t vres,
frsh_send_endpoint_t *epsrc,
frsh_receive_endpoint_t *epdst)
{
epsrc);
if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
- ret = frsh_send_endpoint_bind(vres, *epsrc);
+ ret = frsh_send_endpoint_bind(p->vres, *epsrc);
if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
}
static struct option long_opts[] = {
+ { "loglevel",required_argument, 0, 'l' },
{ "period", required_argument, 0, 'p' },
{ "budget", required_argument, 0, 'b' },
{ "source", required_argument, 0, 's' },
usage(void)
{
printf("usage: fwp-timing [ options ]\n");
+ printf(" -l, --loglevel <number>|<domain>=<number>,...\n");
printf(" -p, --period <ms> period in miliseconds\n");
printf(" -b, --budget <bytes> how many bytes is sent in each period\n");
printf(" -s, --source <ip> source IP address\n");
int ret;
bool options_found = false;
- while ((opt = getopt_long(*argc, *argv, "/ab:c:d:n:p:qs:v", long_opts, NULL)) != -1) {
+ while ((opt = getopt_long(*argc, *argv, "/ab:c:d:l:n:p:qs:v", long_opts, NULL)) != -1) {
options_found = true;
switch (opt) {
case 'a':
exit(1);
}
break;
+ case 'l':
+ ul_log_domain_arg2levels(optarg);
+ break;
case 'n':
p->number = atoi(optarg);
break;
void *sender(void *arg)
{
struct stream_params *p = arg;
- frsh_vres_id_t vres;
frsh_send_endpoint_t epsrc;
struct receiver_params *rp;
int ret;
msg = malloc(p->budget);
if (!msg) error(1, errno, "malloc msg");
- negotiate_contract(p, &vres);
-
rp = malloc(sizeof(*rp));
rp->budget = p->budget;
rp->id = p->id;
-
- create_endpoints(p, vres, &epsrc, &rp->epdst);
-
+
+ create_endpoints(p, &epsrc, &rp->epdst);
+
set_rt_prio(50);
ret = pthread_create(&receiver_id, NULL, receiver, rp);
- if (signal(SIGTERM, stopper) == SIG_ERR)
- error(1, errno, "Error in signal registration");
- if (signal(SIGINT, stopper) == SIG_ERR)
- error(1, errno, "Signal handler registration error");
-
struct timespec next_period;
struct timespec tss;
clock_gettime(CLOCK_MONOTONIC, &next_period);
&next_period, NULL);
}
- frsh_contract_cancel(vres);
- free(p);
+
sem_post(&finished);
return NULL;
}
int main(int argc, char *argv[])
{
int ret;
+ int i;
int num = 0;
+ bool negotiation_failure = false;
+
struct stream_params sp = {
.budget = 1024,
.period_ms = 20,
.number = 1,
.count = -1,
};
- struct stream_params *p = NULL;
+ struct stream_params *p[100];
+
+ memset(p, 0, sizeof(p));
+
+ if (signal(SIGTERM, stopper) == SIG_ERR)
+ error(1, errno, "Error in signal registration");
+ if (signal(SIGINT, stopper) == SIG_ERR)
+ error(1, errno, "Signal handler registration error");
sem_init(&finished, 0, 0);
do {
ret = parse_opts(&argc, &argv, &sp);
if (num == 0 || ret == 0) {
- int i;
for (i=0; i<sp.number; i++) {
- pthread_t thread;
- p = malloc(sizeof(*p));
- if (!p) error(1, errno, "malloc");
- *p = sp;
- p->id = num;
- pthread_create(&thread, NULL, sender, p);
- num++;
+ p[num] = malloc(sizeof(*p[0]));
+ if (!p[num]) error(1, errno, "malloc");
+ *p[num] = sp;
+ p[num]->id = num;
+ ret = negotiate_contract(p[num]);
+ if (!ret)
+ num++;
+ else {
+ PERROR_FRESCOR(ret, "frsh_contract_negotiate");
+ free(p[num]);
+ negotiation_failure = true;
+ break;
+ }
}
}
} while(ret == 0);
+ if (negotiation_failure) {
+ goto destroy;
+ }
+
+ for (i=0; i<num; i++) {
+ pthread_create(&p[i]->thread, NULL, sender, p[i]);
+ }
+
while (!exit_flag && !opt_quiet) {
int v;
print_stat(false);
usleep(100000);
}
- while (num--)
- sem_wait(&finished);
+destroy:
+ for (i=0; i<num; i++) {
+ if (!negotiation_failure)
+ pthread_join(p[i]->thread, NULL);
+ frsh_contract_cancel(p[i]->vres);
+ free(p[i]);
+ }
+ stats.lost = stats.sent - stats.received;
print_stat(true);
return 0;