]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/tests/timing/fwp-timing.c
fwp-timing: Added histogram and statistics
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
index a6f8c6d336773509a64c7067a10ca043c7cd92f7..c0c1b5bb0087c10a48930a099ce656c3b2c86988 100644 (file)
 #include <fwp_res.h>
 #include <error.h>
 #include <sys/mman.h>
+#include <semaphore.h>
 
-int opt_budget = 1024;
-int opt_period = 20;
-bool opt_async = false;
+bool opt_verbose = false;
 
-struct in_addr src, dst;
+#define HIST_MAX_US 1000000
+#define HIST_RES_US 10
+
+struct histogram {
+       unsigned cnt[(HIST_MAX_US+1)/HIST_RES_US];
+       unsigned max;
+};
+
+/* static void hist_init(struct histogram *h) */
+/* { */
+/*     memset(h, 0, sizeof(*h)); */
+/* } */
+
+static void hist_add(struct histogram *h, int us)
+{
+       assert(us > 0);
+       if (us > HIST_MAX_US)
+               us = HIST_MAX_US;
+       __sync_fetch_and_add(&h->cnt[us/HIST_RES_US], 1);
+
+       unsigned max;
+       do {
+               max = h->max;
+       } while (us > max && ! __sync_bool_compare_and_swap(&h->max, max, us));
+               
+}
+
+static unsigned hist_get_percentile(struct histogram *h, unsigned p)
+{
+       uint64_t sum = 0, psum;
+       int i;
+       for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++)
+               sum += h->cnt[i];
+
+       psum = sum * p / 100;
+       sum = 0;
+       for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++) {
+               sum += h->cnt[i];
+               if (sum >= psum)
+                       break;
+       }
+       return i*HIST_RES_US;
+}
+
+struct stats {
+       uint32_t sent;
+       uint32_t received;
+       uint32_t lost;
+} stats;
+
+struct histogram hist;
 
 void set_rt_prio(int priority)
 {
@@ -57,8 +106,18 @@ void set_rt_prio(int priority)
        mlockall(MCL_CURRENT | MCL_FUTURE);
 }
 
+struct stream_params {
+       int budget;
+       int period_ms;
+       bool async;
+       struct in_addr src, dst;
+       int number;
+       int id;
+       int count;
+};
+
 
-int negotiate_contract(frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
 {
        frsh_contract_t contract;
        int ret;
@@ -76,8 +135,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
                NULL);
        if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
 
-       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, opt_budget, &budget);
-       period = fosa_msec_to_rel_time(opt_period);
+       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
+       period = fosa_msec_to_rel_time(p->period_ms);
        ret = frsh_contract_set_basic_params(&contract,
                                             &budget,
                                             &period,
@@ -96,7 +155,7 @@ int negotiate_contract(frsh_vres_id_t *vres)
 
        fwp = malloc(sizeof(*fwp));
        if (!fwp) PERROR_AND_EXIT(errno, "malloc");
-       fwp->src = src.s_addr;
+       fwp->src = p->src.s_addr;
        ret = fres_contract_add_fwp(contract, fwp);
        if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
 
@@ -108,7 +167,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
        return 0;
 }
 
-void create_endpoints(frsh_vres_id_t vres,
+void create_endpoints(struct stream_params *p,
+                     frsh_vres_id_t vres,
                      frsh_send_endpoint_t *epsrc,
                      frsh_receive_endpoint_t *epdst)
 {
@@ -126,7 +186,7 @@ void create_endpoints(frsh_vres_id_t vres,
        frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
 
        ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
-                                       dst.s_addr, port, spi,
+                                       p->dst.s_addr, port, spi,
                                        epsrc);
        if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
                
@@ -141,6 +201,9 @@ static struct option long_opts[] = {
     { "source", 1, 0, 's' },
     { "dest",  1, 0, 'd' },
     { "async",         0, 0, 'a' },
+    { "number",        0, 0, 'n' },
+    { "count", 0, 0, 'c' },
+    { "verbose",0, 0, 'v' },
     { 0, 0, 0, 0}
 };
 
@@ -153,47 +216,66 @@ usage(void)
        printf("  -s, --source <ip>  source IP address\n");
        printf("  -d, --dest <ip:port> destination IP address and port\n");
        printf("  -a, --async  Send packets asynchronously\n");
+       printf("  -n, --number Number of streams with the same parameters\n");
+       printf("  -c, --count Number of messages to send [infinity]\n");
+       printf("  -/, --stream  New stream separator\n");
+       printf("  -v, --verbose Be more verbose\n");
 }
 
-void parse_opts(int argc, char *argv[])
+int parse_opts(int *argc, char **argv[], struct stream_params *p)
 {
-       char opt;
+       int opt;
        int ret;
+       bool options_found = false;
 
-       while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) {
+       while ((opt = getopt_long(*argc, *argv, "/ab:c:p:s:d:n:v", long_opts, NULL)) != -1) {
+               options_found = true;
                switch (opt) {
                case 'a':
-                       opt_async = true;
+                       p->async = true;
                        break;
                case 'b':
-                       opt_budget = atoi(optarg);
+                       p->budget = atoi(optarg);
                        break;
-               case 'p':
-                       opt_period = atoi(optarg);
+               case 'c':
+                       p->count = atoi(optarg);
                        break;
-               case 's':
-                       ret = inet_aton(optarg, &src);
+               case 'd':
+                       ret = inet_aton(optarg, &p->dst);
                        if (!ret) {
-                               fprintf(stderr, "Source IP address not recognized: %s\n",
+                               fprintf(stderr, "Destination IP address not recognized: %s\n",
                                        optarg);
                                usage();
                                exit(1);
                        }
                        break;
-               case 'd':
-                       ret = inet_aton(optarg, &dst);
+               case 'n':
+                       p->number = atoi(optarg);
+                       break;
+               case 'p':
+                       p->period_ms = atoi(optarg);
+                       break;
+               case 's':
+                       ret = inet_aton(optarg, &p->src);
                        if (!ret) {
-                               fprintf(stderr, "Destination IP address not recognized: %s\n",
+                               fprintf(stderr, "Source IP address not recognized: %s\n",
                                        optarg);
                                usage();
                                exit(1);
                        }
                        break;
+               case 'v':
+                       opt_verbose = true;
+               case '/':
+                       break;
                default:
                        usage();
                        exit(1);
                }
+               if (opt == '/')
+                       break; 
        }
+       return (options_found) ? 0 : -1;
 }
 
 volatile bool exit_flag = false;
@@ -241,57 +323,84 @@ static inline double tsdiff2d(struct timespec *x,
        return ts2d(&r);
 }
 
+static inline int tsdiff2us(struct timespec *x,
+                             struct timespec *y)
+{
+       struct timespec r;
+       timespec_subtract(&r, x, y);
+       return r.tv_sec*1000000 + r.tv_nsec/1000;
+}
+
 struct msg {
        int cnt;
        struct timespec ts;
 };
 
+struct receiver_params {
+       int budget;
+       frsh_receive_endpoint_t epdst;
+       int id;
+};
+
 void *receiver(void *arg)
 {
-       frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
+       struct receiver_params *rp = arg;
+       frsh_receive_endpoint_t epdst = rp->epdst;
        size_t mlen;
        int ret;
        int last_cnt = -1;
        struct timespec tss, tsr;
        struct msg *msg;
-       msg = malloc(opt_budget);
+       msg = malloc(rp->budget);
        if (!msg) error(1, errno, "malloc msg");
 
        while (!exit_flag) {
-               ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
+               ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
                clock_gettime(CLOCK_MONOTONIC, &tsr);
                tss = msg->ts;
-               if (msg->cnt != last_cnt+1)
-                       printf("packet(s) lost!\n");
-               printf("%10d: %10.3lf ms\n",
-                      msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+               if (msg->cnt != last_cnt+1) {
+                       printf("%3d: packet(s) lost!\n", rp->id);
+                       __sync_fetch_and_add(&stats.lost, msg->cnt - last_cnt+1);
+               } else {
+                       hist_add(&hist, tsdiff2us(&tsr, &tss));
+                       __sync_fetch_and_add(&stats.received, 1);
+               }
+               if (opt_verbose)
+                       printf("%3d: %10d: %10.3lf ms\n",
+                              rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
                last_cnt = msg->cnt;
        }
+       free(rp);
        return NULL;
 }
 
-void run()
+sem_t finished;
+
+void *sender(void *arg)
 {
+       struct stream_params *p = arg;
        frsh_vres_id_t vres;
        frsh_send_endpoint_t epsrc;
-       frsh_receive_endpoint_t epdst;
+       struct receiver_params *rp;
        int ret;
        struct msg *msg;
        long int cnt=0;
        pthread_t receiver_id;
 
-       msg = malloc(opt_budget);
+       msg = malloc(p->budget);
        if (!msg) error(1, errno, "malloc msg");
 
-       ret = frsh_init();
-       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
-       
-       negotiate_contract(&vres);
-       create_endpoints(vres, &epsrc, &epdst);
+       negotiate_contract(p, &vres);
+
+       rp = malloc(sizeof(*rp));
+       rp->budget = p->budget;
+       rp->id = p->id;
+
+       create_endpoints(p, vres, &epsrc, &rp->epdst);
 
        set_rt_prio(50);
 
-       ret = pthread_create(&receiver_id, NULL, receiver, epdst);
+       ret = pthread_create(&receiver_id, NULL, receiver, rp);
        
        if (signal(SIGTERM, stopper) == SIG_ERR)
                error(1, errno, "Error in signal registration");
@@ -301,18 +410,19 @@ void run()
        struct timespec next_period;
        struct timespec tss;
        clock_gettime(CLOCK_MONOTONIC, &next_period);
-       while (!exit_flag) {
+       while (!exit_flag && (p->count == -1 || p->count--)) {
                clock_gettime(CLOCK_MONOTONIC, &tss);
                msg->cnt = cnt++;
                msg->ts = tss;
-               if (opt_async)
-                       ret = frsh_send_async(epsrc, msg, opt_budget);
+               if (p->async)
+                       ret = frsh_send_async(epsrc, msg, p->budget);
                else {
-                       ret = frsh_send_sync(epsrc, msg, opt_budget);
+                       ret = frsh_send_sync(epsrc, msg, p->budget);
                        clock_gettime(CLOCK_MONOTONIC, &next_period);
                }
-               next_period.tv_sec  += (opt_period/1000);
-               next_period.tv_nsec += (opt_period%1000) * 1000000;
+               __sync_fetch_and_add(&stats.sent, 1);
+               next_period.tv_sec  += (p->period_ms/1000);
+               next_period.tv_nsec += (p->period_ms%1000) * 1000000;
                if (next_period.tv_nsec >= 1000000000) {
                        next_period.tv_nsec -= 1000000000;
                        next_period.tv_sec++;
@@ -320,16 +430,76 @@ void run()
                clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
                                &next_period, NULL);
        }
-       
+
        frsh_contract_cancel(vres);
+       free(p);
+       sem_post(&finished);
+       return NULL;
+}
+
+void print_stat(bool full)
+{
+       printf("Sent: %5d  Received: %5d  Lost: %5d Max: %6.3f ms",
+              stats.sent, stats.received, stats.lost, hist.max/1000.0);
+       if (full) {
+               printf("  Max: %6.3f ms  90%%: %6.3f ms",
+                      hist_get_percentile(&hist, 100)/1000.0,
+                      hist_get_percentile(&hist, 90)/1000.0);
+       }
+       printf("\r");
+       fflush(stdout);
 }
 
 int main(int argc, char *argv[])
 {
-       src.s_addr = htonl(INADDR_LOOPBACK);
-       dst.s_addr = htonl(INADDR_LOOPBACK);
+       int ret;
+       int num = 0;
+       struct stream_params sp = {
+               .budget = 1024,
+               .period_ms = 20,
+               .async = false,
+               .src.s_addr = htonl(INADDR_LOOPBACK),
+               .dst.s_addr = htonl(INADDR_LOOPBACK),
+               .number = 1,
+               .count = -1,
+       };
+       struct stream_params *p = NULL;
+
+       sem_init(&finished, 0, 0);
+       
+       ret = frsh_init();
+       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
+       
+       do {
+               ret = parse_opts(&argc, &argv, &sp);
+               if (num == 0 || ret == 0) {
+                       int i;
+                       for (i=0; i<sp.number; i++) {
+                               pthread_t thread;
+                               p = malloc(sizeof(*p));
+                               if (!p) error(1, errno, "malloc");
+                               *p = sp;
+                               p->id = num;
+                               pthread_create(&thread, NULL, sender, p);
+                               num++;
+                       }
+               }
+       } while(ret == 0);
+
+       while (!exit_flag) {
+               int v;
+               print_stat(false);
+               sem_getvalue(&finished, &v);
+               if (v == num)
+                       break;
+               usleep(100000);
+       }
+
+       while (num--)
+               sem_wait(&finished);
+
+       print_stat(true);
+       printf("\n");
        
-       parse_opts(argc, argv);
-       run();
        return 0;
 }