#include <fwp_res.h>
#include <error.h>
#include <sys/mman.h>
-
-int opt_budget = 1024;
-int opt_period = 20;
-bool opt_async = false;
-
-struct in_addr src, dst;
+#include <semaphore.h>
void set_rt_prio(int priority)
{
mlockall(MCL_CURRENT | MCL_FUTURE);
}
+struct stream_params {
+ int budget;
+ int period_ms;
+ bool async;
+ struct in_addr src, dst;
+ int number;
+ int id;
+};
+
-int negotiate_contract(frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
{
frsh_contract_t contract;
int ret;
NULL);
if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
- frsh_network_bytes_to_budget(FRSH_NETPF_FWP, opt_budget, &budget);
- period = fosa_msec_to_rel_time(opt_period);
+ frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
+ period = fosa_msec_to_rel_time(p->period_ms);
ret = frsh_contract_set_basic_params(&contract,
&budget,
&period,
fwp = malloc(sizeof(*fwp));
if (!fwp) PERROR_AND_EXIT(errno, "malloc");
- fwp->src = src.s_addr;
+ fwp->src = p->src.s_addr;
ret = fres_contract_add_fwp(contract, fwp);
if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
return 0;
}
-void create_endpoints(frsh_vres_id_t vres,
+void create_endpoints(struct stream_params *p,
+ frsh_vres_id_t vres,
frsh_send_endpoint_t *epsrc,
frsh_receive_endpoint_t *epdst)
{
frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
- dst.s_addr, port, spi,
+ p->dst.s_addr, port, spi,
epsrc);
if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
{ "source", 1, 0, 's' },
{ "dest", 1, 0, 'd' },
{ "async", 0, 0, 'a' },
+ { "number", 0, 0, 'n' },
{ 0, 0, 0, 0}
};
printf(" -s, --source <ip> source IP address\n");
printf(" -d, --dest <ip:port> destination IP address and port\n");
printf(" -a, --async Send packets asynchronously\n");
+ printf(" -n, --number Number of streams with the same parameters\n");
+ printf(" -/, --stream New stream separator\n");
}
-void parse_opts(int argc, char *argv[])
+int parse_opts(int *argc, char **argv[], struct stream_params *p)
{
- char opt;
+ int opt;
int ret;
+ bool options_found = false;
- while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) {
+ while ((opt = getopt_long(*argc, *argv, "/ab:p:s:d:n:", long_opts, NULL)) != -1) {
+ options_found = true;
switch (opt) {
case 'a':
- opt_async = true;
+ p->async = true;
break;
case 'b':
- opt_budget = atoi(optarg);
+ p->budget = atoi(optarg);
break;
case 'p':
- opt_period = atoi(optarg);
+ p->period_ms = atoi(optarg);
break;
case 's':
- ret = inet_aton(optarg, &src);
+ ret = inet_aton(optarg, &p->src);
if (!ret) {
fprintf(stderr, "Source IP address not recognized: %s\n",
optarg);
}
break;
case 'd':
- ret = inet_aton(optarg, &dst);
+ ret = inet_aton(optarg, &p->dst);
if (!ret) {
fprintf(stderr, "Destination IP address not recognized: %s\n",
optarg);
exit(1);
}
break;
+ case 'n':
+ p->number = atoi(optarg);
+ break;
+ case '/':
+ break;
default:
usage();
exit(1);
}
+ if (opt == '/')
+ break;
}
+ return (options_found) ? 0 : -1;
}
volatile bool exit_flag = false;
struct timespec ts;
};
+struct receiver_params {
+ int budget;
+ frsh_receive_endpoint_t epdst;
+ int id;
+};
+
void *receiver(void *arg)
{
- frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
+ struct receiver_params *rp = arg;
+ frsh_receive_endpoint_t epdst = rp->epdst;
size_t mlen;
int ret;
int last_cnt = -1;
struct timespec tss, tsr;
struct msg *msg;
- msg = malloc(opt_budget);
+ msg = malloc(rp->budget);
if (!msg) error(1, errno, "malloc msg");
while (!exit_flag) {
- ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
+ ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
clock_gettime(CLOCK_MONOTONIC, &tsr);
tss = msg->ts;
if (msg->cnt != last_cnt+1)
printf("packet(s) lost!\n");
- printf("%10d: %10.3lf ms\n",
- msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+ printf("%3d: %10d: %10.3lf ms\n",
+ rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
last_cnt = msg->cnt;
}
+ free(rp);
return NULL;
}
-void run()
+sem_t finished;
+
+void *sender(void *arg)
{
+ struct stream_params *p = arg;
frsh_vres_id_t vres;
frsh_send_endpoint_t epsrc;
- frsh_receive_endpoint_t epdst;
+ struct receiver_params *rp;
int ret;
struct msg *msg;
long int cnt=0;
pthread_t receiver_id;
- msg = malloc(opt_budget);
+ msg = malloc(p->budget);
if (!msg) error(1, errno, "malloc msg");
- ret = frsh_init();
- if (ret) PERROR_AND_EXIT(ret, "frsh_init");
-
- negotiate_contract(&vres);
- create_endpoints(vres, &epsrc, &epdst);
+ negotiate_contract(p, &vres);
+
+ rp = malloc(sizeof(*rp));
+ rp->budget = p->budget;
+ rp->id = p->id;
+
+ create_endpoints(p, vres, &epsrc, &rp->epdst);
set_rt_prio(50);
- ret = pthread_create(&receiver_id, NULL, receiver, epdst);
+ ret = pthread_create(&receiver_id, NULL, receiver, rp);
if (signal(SIGTERM, stopper) == SIG_ERR)
error(1, errno, "Error in signal registration");
clock_gettime(CLOCK_MONOTONIC, &tss);
msg->cnt = cnt++;
msg->ts = tss;
- if (opt_async)
- ret = frsh_send_async(epsrc, msg, opt_budget);
+ if (p->async)
+ ret = frsh_send_async(epsrc, msg, p->budget);
else {
- ret = frsh_send_sync(epsrc, msg, opt_budget);
+ ret = frsh_send_sync(epsrc, msg, p->budget);
clock_gettime(CLOCK_MONOTONIC, &next_period);
}
- next_period.tv_sec += (opt_period/1000);
- next_period.tv_nsec += (opt_period%1000) * 1000000;
+ next_period.tv_sec += (p->period_ms/1000);
+ next_period.tv_nsec += (p->period_ms%1000) * 1000000;
if (next_period.tv_nsec >= 1000000000) {
next_period.tv_nsec -= 1000000000;
next_period.tv_sec++;
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
&next_period, NULL);
}
-
+
frsh_contract_cancel(vres);
+ free(p);
+ sem_post(&finished);
+ return NULL;
}
int main(int argc, char *argv[])
{
- src.s_addr = htonl(INADDR_LOOPBACK);
- dst.s_addr = htonl(INADDR_LOOPBACK);
+ int ret;
+ int num = 0;
+ struct stream_params sp = {
+ .budget = 1024,
+ .period_ms = 20,
+ .async = false,
+ .src.s_addr = htonl(INADDR_LOOPBACK),
+ .dst.s_addr = htonl(INADDR_LOOPBACK),
+ .number = 1,
+ };
+ struct stream_params *p = NULL;
+
+ sem_init(&finished, 0, 0);
+
+ ret = frsh_init();
+ if (ret) PERROR_AND_EXIT(ret, "frsh_init");
+
+ do {
+ ret = parse_opts(&argc, &argv, &sp);
+ if (num == 0 || ret == 0) {
+ int i;
+ for (i=0; i<sp.number; i++) {
+ pthread_t thread;
+ p = malloc(sizeof(*p));
+ if (!p) error(1, errno, "malloc");
+ *p = sp;
+ p->id = num;
+ pthread_create(&thread, NULL, sender, p);
+ num++;
+ }
+ }
+ } while(ret == 0);
+
+ while (num--)
+ sem_wait(&finished);
- parse_opts(argc, argv);
- run();
return 0;
}