]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/tests/timing/fwp-timing.c
fwp-timing: Allow maximal delay up to 10 seconds
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
index c0c1b5bb0087c10a48930a099ce656c3b2c86988..4ed0c28458fc6d08a1af20c6183e13d7c5e43744 100644 (file)
 #include <error.h>
 #include <sys/mman.h>
 #include <semaphore.h>
+#include <ul_logreg.h>
 
 bool opt_verbose = false;
+bool opt_quiet = false;
 
-#define HIST_MAX_US 1000000
+#define HIST_MAX_US 10000000
 #define HIST_RES_US 10
 
 struct histogram {
@@ -83,11 +85,11 @@ void set_rt_prio(int priority)
        static struct sched_param param;
 
        if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
-               fprintf(stderr, "warning: sched_get_priority_max failed");
+               fprintf(stderr, "warning: sched_get_priority_max failed\n");
        }
 
        if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
-               fprintf(stderr, "warning: sched_get_priority_min failed");
+               fprintf(stderr, "warning: sched_get_priority_min failed\n");
        }
 
        if (priority > maxpri)  {
@@ -100,7 +102,7 @@ void set_rt_prio(int priority)
        param.sched_priority = priority;
 
        if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
-               fprintf(stderr, "warning: sched_setscheduler failed");
+               fprintf(stderr, "warning: sched_setscheduler failed\n");
        }
 
        mlockall(MCL_CURRENT | MCL_FUTURE);
@@ -114,10 +116,13 @@ struct stream_params {
        int number;
        int id;
        int count;
+       frsh_vres_id_t vres;
+       pthread_t thread;
+       int jitter;
 };
 
 
-int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p)
 {
        frsh_contract_t contract;
        int ret;
@@ -159,16 +164,14 @@ int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
        ret = fres_contract_add_fwp(contract, fwp);
        if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
 
-       ret = frsh_contract_negotiate(&contract, vres);
-       if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
+       ret = frsh_contract_negotiate(&contract, &p->vres);
 
        frsh_contract_destroy(&contract);
 
-       return 0;
+       return ret;
 }
 
 void create_endpoints(struct stream_params *p,
-                     frsh_vres_id_t vres,
                      frsh_send_endpoint_t *epsrc,
                      frsh_receive_endpoint_t *epdst)
 {
@@ -190,20 +193,23 @@ void create_endpoints(struct stream_params *p,
                                        epsrc);
        if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
                
-       ret = frsh_send_endpoint_bind(vres, *epsrc);
+       ret = frsh_send_endpoint_bind(p->vres, *epsrc);
        if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
 }
 
 
 static struct option long_opts[] = {
-    { "period", 1, 0, 'p' },
-    { "budget", 1, 0, 'b' },
-    { "source", 1, 0, 's' },
-    { "dest",  1, 0, 'd' },
-    { "async",         0, 0, 'a' },
-    { "number",        0, 0, 'n' },
-    { "count", 0, 0, 'c' },
-    { "verbose",0, 0, 'v' },
+    { "loglevel",required_argument, 0, 'l' },
+    { "period", required_argument, 0, 'p' },
+    { "budget", required_argument, 0, 'b' },
+    { "source", required_argument, 0, 's' },
+    { "dest",  required_argument, 0, 'd' },
+    { "async",         no_argument,       0, 'a' },
+    { "number",        required_argument, 0, 'n' },
+    { "count", required_argument, 0, 'c' },
+    { "verbose",no_argument,      0, 'v' },
+    { "quiet",  no_argument,      0, 'q' },
+    { "jitter", required_argument, 0, 'j' },
     { 0, 0, 0, 0}
 };
 
@@ -211,6 +217,7 @@ static void
 usage(void)
 {
        printf("usage: fwp-timing [ options ]\n");
+       printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
        printf("  -p, --period <ms>  period in miliseconds\n");
        printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
        printf("  -s, --source <ip>  source IP address\n");
@@ -218,8 +225,10 @@ usage(void)
        printf("  -a, --async  Send packets asynchronously\n");
        printf("  -n, --number Number of streams with the same parameters\n");
        printf("  -c, --count Number of messages to send [infinity]\n");
+       printf("  -q, --quiet Print only final statistics\n");
        printf("  -/, --stream  New stream separator\n");
        printf("  -v, --verbose Be more verbose\n");
+       printf("  -j, --jitter <percent> Sent jitter given as percentage of period\n");
 }
 
 int parse_opts(int *argc, char **argv[], struct stream_params *p)
@@ -228,7 +237,7 @@ int parse_opts(int *argc, char **argv[], struct stream_params *p)
        int ret;
        bool options_found = false;
 
-       while ((opt = getopt_long(*argc, *argv, "/ab:c:p:s:d:n:v", long_opts, NULL)) != -1) {
+       while ((opt = getopt_long(*argc, *argv, "/ab:c:d:j:l:n:p:qs:v", long_opts, NULL)) != -1) {
                options_found = true;
                switch (opt) {
                case 'a':
@@ -249,6 +258,12 @@ int parse_opts(int *argc, char **argv[], struct stream_params *p)
                                exit(1);
                        }
                        break;
+               case 'j':
+                       p->jitter = atoi(optarg);
+                       break;
+               case 'l':
+                       ul_log_domain_arg2levels(optarg);
+                       break;
                case 'n':
                        p->number = atoi(optarg);
                        break;
@@ -266,6 +281,10 @@ int parse_opts(int *argc, char **argv[], struct stream_params *p)
                        break;
                case 'v':
                        opt_verbose = true;
+                       break;
+               case 'q':
+                       opt_quiet = true;
+                       break;
                case '/':
                        break;
                default:
@@ -359,7 +378,8 @@ void *receiver(void *arg)
                clock_gettime(CLOCK_MONOTONIC, &tsr);
                tss = msg->ts;
                if (msg->cnt != last_cnt+1) {
-                       printf("%3d: packet(s) lost!\n", rp->id);
+                       if (!opt_quiet)
+                               printf("%3d: packet(s) lost!\n", rp->id);
                        __sync_fetch_and_add(&stats.lost, msg->cnt - last_cnt+1);
                } else {
                        hist_add(&hist, tsdiff2us(&tsr, &tss));
@@ -379,7 +399,6 @@ sem_t finished;
 void *sender(void *arg)
 {
        struct stream_params *p = arg;
-       frsh_vres_id_t vres;
        frsh_send_endpoint_t epsrc;
        struct receiver_params *rp;
        int ret;
@@ -390,23 +409,16 @@ void *sender(void *arg)
        msg = malloc(p->budget);
        if (!msg) error(1, errno, "malloc msg");
 
-       negotiate_contract(p, &vres);
-
        rp = malloc(sizeof(*rp));
        rp->budget = p->budget;
        rp->id = p->id;
-
-       create_endpoints(p, vres, &epsrc, &rp->epdst);
-
+       
+       create_endpoints(p, &epsrc, &rp->epdst);
+               
        set_rt_prio(50);
 
        ret = pthread_create(&receiver_id, NULL, receiver, rp);
        
-       if (signal(SIGTERM, stopper) == SIG_ERR)
-               error(1, errno, "Error in signal registration");
-       if (signal(SIGINT, stopper) == SIG_ERR)
-               error(1, errno, "Signal handler registration error");
-
        struct timespec next_period;
        struct timespec tss;
        clock_gettime(CLOCK_MONOTONIC, &next_period);
@@ -421,8 +433,16 @@ void *sender(void *arg)
                        clock_gettime(CLOCK_MONOTONIC, &next_period);
                }
                __sync_fetch_and_add(&stats.sent, 1);
-               next_period.tv_sec  += (p->period_ms/1000);
-               next_period.tv_nsec += (p->period_ms%1000) * 1000000;
+
+               int delay_ms;
+               if (p->jitter)
+                       delay_ms = p->period_ms*(100-p->jitter)/100
+                               + rand() % (2*p->period_ms*p->jitter/100);
+               else
+                       delay_ms = p->period_ms;
+
+               next_period.tv_sec  += (delay_ms/1000);
+               next_period.tv_nsec += (delay_ms%1000) * 1000000;
                if (next_period.tv_nsec >= 1000000000) {
                        next_period.tv_nsec -= 1000000000;
                        next_period.tv_sec++;
@@ -431,29 +451,33 @@ void *sender(void *arg)
                                &next_period, NULL);
        }
 
-       frsh_contract_cancel(vres);
-       free(p);
+       
        sem_post(&finished);
        return NULL;
 }
 
-void print_stat(bool full)
+void print_stat(bool final)
 {
-       printf("Sent: %5d  Received: %5d  Lost: %5d Max: %6.3f ms",
+       printf("Sent: %5d  Received: %5d  Lost: %5d Max: %8.3f ms",
               stats.sent, stats.received, stats.lost, hist.max/1000.0);
-       if (full) {
-               printf("  Max: %6.3f ms  90%%: %6.3f ms",
-                      hist_get_percentile(&hist, 100)/1000.0,
-                      hist_get_percentile(&hist, 90)/1000.0);
+       if (final) {
+               printf("  Packetloss: %7.3f %%  95%%: %8.3f ms  99%%: %8.3f ms\n",
+                      100.0*stats.lost/stats.sent,
+                      hist_get_percentile(&hist, 95)/1000.0,
+                      hist_get_percentile(&hist, 99)/1000.0);
        }
-       printf("\r");
+       else
+               printf("\r");
        fflush(stdout);
 }
 
 int main(int argc, char *argv[])
 {
        int ret;
+       int i;
        int num = 0;
+       bool negotiation_failure = false;
+       
        struct stream_params sp = {
                .budget = 1024,
                .period_ms = 20,
@@ -463,7 +487,14 @@ int main(int argc, char *argv[])
                .number = 1,
                .count = -1,
        };
-       struct stream_params *p = NULL;
+       struct stream_params *p[100];
+
+       memset(p, 0, sizeof(p));
+
+       if (signal(SIGTERM, stopper) == SIG_ERR)
+               error(1, errno, "Error in signal registration");
+       if (signal(SIGINT, stopper) == SIG_ERR)
+               error(1, errno, "Signal handler registration error");
 
        sem_init(&finished, 0, 0);
        
@@ -473,20 +504,32 @@ int main(int argc, char *argv[])
        do {
                ret = parse_opts(&argc, &argv, &sp);
                if (num == 0 || ret == 0) {
-                       int i;
                        for (i=0; i<sp.number; i++) {
-                               pthread_t thread;
-                               p = malloc(sizeof(*p));
-                               if (!p) error(1, errno, "malloc");
-                               *p = sp;
-                               p->id = num;
-                               pthread_create(&thread, NULL, sender, p);
-                               num++;
+                               p[num] = malloc(sizeof(*p[0]));
+                               if (!p[num]) error(1, errno, "malloc");
+                               *p[num] = sp;
+                               p[num]->id = num;
+                               ret = negotiate_contract(p[num]);
+                               if (!ret)
+                                       num++;
+                               else {
+                                       PERROR_FRESCOR(ret, "frsh_contract_negotiate");
+                                       free(p[num]);
+                                       negotiation_failure = true;
+                                       break;
+                               }
                        }
                }
        } while(ret == 0);
 
-       while (!exit_flag) {
+       if (negotiation_failure) {
+               goto destroy;
+       }
+
+       for (i=0; i<num; i++)
+               pthread_create(&p[i]->thread, NULL, sender, p[i]);
+
+       while (!exit_flag && !opt_quiet) {
                int v;
                print_stat(false);
                sem_getvalue(&finished, &v);
@@ -495,11 +538,16 @@ int main(int argc, char *argv[])
                usleep(100000);
        }
 
-       while (num--)
-               sem_wait(&finished);
+       for (i=0; i<num; i++)
+               pthread_join(p[i]->thread, NULL);
+destroy:
+       for (i=0; i<num; i++) {
+               frsh_contract_cancel(p[i]->vres);
+               free(p[i]);
+       }
 
+       stats.lost = stats.sent - stats.received;
        print_stat(true);
-       printf("\n");
        
-       return 0;
+       return negotiation_failure ? 1 : 0;
 }