]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/tests/timing/fwp-timing.c
Allow fwp-timing to generate several streams simultaneously
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
index d344489d897d06ffefb9dc5def445fd1138473cf..1a6c8ad8972a0e70d7c438cfba533b5226faa23b 100644 (file)
 #include <fwp_res.h>
 #include <error.h>
 #include <sys/mman.h>
-
-int opt_budget = 1024;
-int opt_period = 20;
-
-struct in_addr src, dst;
+#include <semaphore.h>
 
 void set_rt_prio(int priority)
 {
@@ -56,8 +52,17 @@ void set_rt_prio(int priority)
        mlockall(MCL_CURRENT | MCL_FUTURE);
 }
 
+struct stream_params {
+       int budget;
+       int period_ms;
+       bool async;
+       struct in_addr src, dst;
+       int number;
+       int id;
+};
+
 
-int negotiate_contract(frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
 {
        frsh_contract_t contract;
        int ret;
@@ -75,8 +80,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
                NULL);
        if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
 
-       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, opt_budget, &budget);
-       period = fosa_msec_to_rel_time(opt_period);
+       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
+       period = fosa_msec_to_rel_time(p->period_ms);
        ret = frsh_contract_set_basic_params(&contract,
                                             &budget,
                                             &period,
@@ -95,7 +100,7 @@ int negotiate_contract(frsh_vres_id_t *vres)
 
        fwp = malloc(sizeof(*fwp));
        if (!fwp) PERROR_AND_EXIT(errno, "malloc");
-       fwp->src = src.s_addr;
+       fwp->src = p->src.s_addr;
        ret = fres_contract_add_fwp(contract, fwp);
        if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
 
@@ -107,7 +112,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
        return 0;
 }
 
-void create_endpoints(frsh_vres_id_t vres,
+void create_endpoints(struct stream_params *p,
+                     frsh_vres_id_t vres,
                      frsh_send_endpoint_t *epsrc,
                      frsh_receive_endpoint_t *epdst)
 {
@@ -125,7 +131,7 @@ void create_endpoints(frsh_vres_id_t vres,
        frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
 
        ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
-                                       dst.s_addr, port, spi,
+                                       p->dst.s_addr, port, spi,
                                        epsrc);
        if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
                
@@ -139,6 +145,8 @@ static struct option long_opts[] = {
     { "budget", 1, 0, 'b' },
     { "source", 1, 0, 's' },
     { "dest",  1, 0, 'd' },
+    { "async",         0, 0, 'a' },
+    { "number",        0, 0, 'n' },
     { 0, 0, 0, 0}
 };
 
@@ -150,23 +158,31 @@ usage(void)
        printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
        printf("  -s, --source <ip>  source IP address\n");
        printf("  -d, --dest <ip:port> destination IP address and port\n");
+       printf("  -a, --async  Send packets asynchronously\n");
+       printf("  -n, --number Number of streams with the same parameters\n");
+       printf("  -/, --stream  New stream separator\n");
 }
 
-void parse_opts(int argc, char *argv[])
+int parse_opts(int *argc, char **argv[], struct stream_params *p)
 {
-       char opt;
+       int opt;
        int ret;
+       bool options_found = false;
 
-       while ((opt = getopt_long(argc, argv, "b:p:s:d:", long_opts, NULL)) != -1) {
+       while ((opt = getopt_long(*argc, *argv, "/ab:p:s:d:n:", long_opts, NULL)) != -1) {
+               options_found = true;
                switch (opt) {
+               case 'a':
+                       p->async = true;
+                       break;
                case 'b':
-                       opt_budget = atoi(optarg);
+                       p->budget = atoi(optarg);
                        break;
                case 'p':
-                       opt_period = atoi(optarg);
+                       p->period_ms = atoi(optarg);
                        break;
                case 's':
-                       ret = inet_aton(optarg, &src);
+                       ret = inet_aton(optarg, &p->src);
                        if (!ret) {
                                fprintf(stderr, "Source IP address not recognized: %s\n",
                                        optarg);
@@ -175,7 +191,7 @@ void parse_opts(int argc, char *argv[])
                        }
                        break;
                case 'd':
-                       ret = inet_aton(optarg, &dst);
+                       ret = inet_aton(optarg, &p->dst);
                        if (!ret) {
                                fprintf(stderr, "Destination IP address not recognized: %s\n",
                                        optarg);
@@ -183,11 +199,19 @@ void parse_opts(int argc, char *argv[])
                                exit(1);
                        }
                        break;
+               case 'n':
+                       p->number = atoi(optarg);
+                       break;
+               case '/':
+                       break;
                default:
                        usage();
                        exit(1);
                }
+               if (opt == '/')
+                       break; 
        }
+       return (options_found) ? 0 : -1;
 }
 
 volatile bool exit_flag = false;
@@ -240,81 +264,137 @@ struct msg {
        struct timespec ts;
 };
 
+struct receiver_params {
+       int budget;
+       frsh_receive_endpoint_t epdst;
+       int id;
+};
+
 void *receiver(void *arg)
 {
-       frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
+       struct receiver_params *rp = arg;
+       frsh_receive_endpoint_t epdst = rp->epdst;
        size_t mlen;
        int ret;
+       int last_cnt = -1;
        struct timespec tss, tsr;
        struct msg *msg;
-       msg = malloc(opt_budget);
+       msg = malloc(rp->budget);
        if (!msg) error(1, errno, "malloc msg");
 
        while (!exit_flag) {
-               ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
+               ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
                clock_gettime(CLOCK_MONOTONIC, &tsr);
                tss = msg->ts;
-               printf("%10d: %10.3lf ms\n",
-                      msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+               if (msg->cnt != last_cnt+1)
+                       printf("packet(s) lost!\n");
+               printf("%3d: %10d: %10.3lf ms\n",
+                      rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+               last_cnt = msg->cnt;
        }
+       free(rp);
        return NULL;
 }
 
-void run()
+sem_t finished;
+
+void *sender(void *arg)
 {
+       struct stream_params *p = arg;
        frsh_vres_id_t vres;
        frsh_send_endpoint_t epsrc;
-       frsh_receive_endpoint_t epdst;
+       struct receiver_params *rp;
        int ret;
        struct msg *msg;
        long int cnt=0;
        pthread_t receiver_id;
 
-       msg = malloc(opt_budget);
+       msg = malloc(p->budget);
        if (!msg) error(1, errno, "malloc msg");
 
-       ret = frsh_init();
-       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
-       
-       negotiate_contract(&vres);
-       create_endpoints(vres, &epsrc, &epdst);
+       negotiate_contract(p, &vres);
+
+       rp = malloc(sizeof(*rp));
+       rp->budget = p->budget;
+       rp->id = p->id;
+
+       create_endpoints(p, vres, &epsrc, &rp->epdst);
 
        set_rt_prio(50);
 
-       ret = pthread_create(&receiver_id, NULL, receiver, epdst);
+       ret = pthread_create(&receiver_id, NULL, receiver, rp);
        
        if (signal(SIGTERM, stopper) == SIG_ERR)
                error(1, errno, "Error in signal registration");
        if (signal(SIGINT, stopper) == SIG_ERR)
                error(1, errno, "Signal handler registration error");
 
-       struct timespec next_period, tss;
+       struct timespec next_period;
+       struct timespec tss;
        clock_gettime(CLOCK_MONOTONIC, &next_period);
        while (!exit_flag) {
                clock_gettime(CLOCK_MONOTONIC, &tss);
                msg->cnt = cnt++;
                msg->ts = tss;
-               ret = frsh_send_async(epsrc, msg, opt_budget);
-
-               next_period.tv_nsec += opt_period * 1000000;
+               if (p->async)
+                       ret = frsh_send_async(epsrc, msg, p->budget);
+               else {
+                       ret = frsh_send_sync(epsrc, msg, p->budget);
+                       clock_gettime(CLOCK_MONOTONIC, &next_period);
+               }
+               next_period.tv_sec  += (p->period_ms/1000);
+               next_period.tv_nsec += (p->period_ms%1000) * 1000000;
                if (next_period.tv_nsec >= 1000000000) {
                        next_period.tv_nsec -= 1000000000;
                        next_period.tv_sec++;
                }
-
                clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
                                &next_period, NULL);
        }
-       
+
        frsh_contract_cancel(vres);
+       free(p);
+       sem_post(&finished);
+       return NULL;
 }
 
 int main(int argc, char *argv[])
 {
-       src.s_addr = htonl(INADDR_LOOPBACK);
-       dst.s_addr = htonl(INADDR_LOOPBACK);
+       int ret;
+       int num = 0;
+       struct stream_params sp = {
+               .budget = 1024,
+               .period_ms = 20,
+               .async = false,
+               .src.s_addr = htonl(INADDR_LOOPBACK),
+               .dst.s_addr = htonl(INADDR_LOOPBACK),
+               .number = 1,
+       };
+       struct stream_params *p = NULL;
+
+       sem_init(&finished, 0, 0);
+       
+       ret = frsh_init();
+       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
+       
+       do {
+               ret = parse_opts(&argc, &argv, &sp);
+               if (num == 0 || ret == 0) {
+                       int i;
+                       for (i=0; i<sp.number; i++) {
+                               pthread_t thread;
+                               p = malloc(sizeof(*p));
+                               if (!p) error(1, errno, "malloc");
+                               *p = sp;
+                               p->id = num;
+                               pthread_create(&thread, NULL, sender, p);
+                               num++;
+                       }
+               }
+       } while(ret == 0);
+
+       while (num--)
+               sem_wait(&finished);
        
-       parse_opts(argc, argv);
-       run();
        return 0;
 }