]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/tests/timing/fwp-timing.c
fwp-timing: Final version and a simple gnuplot script
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
1 /*
2  * A tool for measuring FWP latency. This program aims to be simpler
3  * than wme_test and is intended to be run on a single machine with
4  * multiple wifi interfaces and send-to-self kernel patch applied.
5  *
6  * This program creates both sides of communication and measures the
7  * communication delays.
8  */
9
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <frsh.h>
13 #include <getopt.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdlib.h>
17 #include <sys/socket.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <sys/param.h>
21 #include <fwp_res.h>
22 #include <error.h>
23 #include <sys/mman.h>
24 #include <semaphore.h>
25 #include <ul_logreg.h>
26
27 bool opt_verbose = false;
28 bool opt_quiet = false;
29
30 #define HIST_MAX_US 1000000
31 #define HIST_RES_US 10
32
33 struct histogram {
34         unsigned cnt[(HIST_MAX_US+1)/HIST_RES_US];
35         unsigned max;
36 };
37
38 /* static void hist_init(struct histogram *h) */
39 /* { */
40 /*      memset(h, 0, sizeof(*h)); */
41 /* } */
42
43 static void hist_add(struct histogram *h, int us)
44 {
45         assert(us > 0);
46         if (us > HIST_MAX_US)
47                 us = HIST_MAX_US;
48         __sync_fetch_and_add(&h->cnt[us/HIST_RES_US], 1);
49
50         unsigned max;
51         do {
52                 max = h->max;
53         } while (us > max && ! __sync_bool_compare_and_swap(&h->max, max, us));
54                 
55 }
56
57 static unsigned hist_get_percentile(struct histogram *h, unsigned p)
58 {
59         uint64_t sum = 0, psum;
60         int i;
61         for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++)
62                 sum += h->cnt[i];
63
64         psum = sum * p / 100;
65         sum = 0;
66         for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++) {
67                 sum += h->cnt[i];
68                 if (sum >= psum)
69                         break;
70         }
71         return i*HIST_RES_US;
72 }
73
74 struct stats {
75         uint32_t sent;
76         uint32_t received;
77         uint32_t lost;
78 } stats;
79
80 struct histogram hist;
81
82 void set_rt_prio(int priority)
83 {
84         int maxpri, minpri;
85         static struct sched_param param;
86
87         if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
88                 fprintf(stderr, "warning: sched_get_priority_max failed\n");
89         }
90
91         if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
92                 fprintf(stderr, "warning: sched_get_priority_min failed\n");
93         }
94
95         if (priority > maxpri)  {
96                 fprintf(stderr, "warning: maximum priority allowed is %d.\n", maxpri);
97         }
98         if (priority < minpri)  {
99                 fprintf(stderr, "warning: minimum priority allowed is %d.\n", minpri);
100         }
101
102         param.sched_priority = priority;
103
104         if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
105                 fprintf(stderr, "warning: sched_setscheduler failed\n");
106         }
107
108         mlockall(MCL_CURRENT | MCL_FUTURE);
109 }
110
111 struct stream_params {
112         int budget;
113         int period_ms;
114         bool async;
115         struct in_addr src, dst;
116         int number;
117         int id;
118         int count;
119         frsh_vres_id_t vres;
120         pthread_t thread;
121 };
122
123
124 int negotiate_contract(struct stream_params *p)
125 {
126         frsh_contract_t contract;
127         int ret;
128         frsh_rel_time_t period;
129         frsh_rel_time_t budget;
130         frsh_rel_time_t deadline;
131         fres_block_fwp *fwp;    
132         
133         ret = frsh_contract_init(&contract);
134         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_init");
135
136         ret = frsh_contract_set_resource_and_label(
137                 &contract,
138                 FRSH_RT_NETWORK, FRSH_NETPF_FWP,
139                 NULL);
140         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
141
142         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
143         period = fosa_msec_to_rel_time(p->period_ms);
144         ret = frsh_contract_set_basic_params(&contract,
145                                              &budget,
146                                              &period,
147                                              FRSH_WT_BOUNDED,
148                                              FRSH_CT_REGULAR);
149         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
150
151
152         /* FWP doesn't accept smaller deadlines than 30 ms. */
153         if (frsh_rel_time_smaller(period, frsh_msec_to_rel_time(30)))
154                 deadline = frsh_msec_to_rel_time(30);
155         else
156                 deadline = period;
157         ret = frsh_contract_set_timing_reqs(&contract, false, &deadline);
158         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_timing_reqs");
159
160         fwp = malloc(sizeof(*fwp));
161         if (!fwp) PERROR_AND_EXIT(errno, "malloc");
162         fwp->src = p->src.s_addr;
163         ret = fres_contract_add_fwp(contract, fwp);
164         if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
165
166         ret = frsh_contract_negotiate(&contract, &p->vres);
167
168         frsh_contract_destroy(&contract);
169
170         return ret;
171 }
172
173 void create_endpoints(struct stream_params *p,
174                       frsh_send_endpoint_t *epsrc,
175                       frsh_receive_endpoint_t *epdst)
176 {
177         int ret;
178         frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
179         frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
180         frsh_endpoint_queueing_info_t qi = { .queue_size=0,
181                                              .queue_policy=FRSH_QRP_OLDEST };
182
183         ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
184                                            epdst);
185         if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
186                 
187         unsigned int port;
188         frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
189
190         ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
191                                         p->dst.s_addr, port, spi,
192                                         epsrc);
193         if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
194                 
195         ret = frsh_send_endpoint_bind(p->vres, *epsrc);
196         if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
197 }
198
199
200 static struct option long_opts[] = {
201     { "loglevel",required_argument, 0, 'l' },
202     { "period", required_argument, 0, 'p' },
203     { "budget", required_argument, 0, 'b' },
204     { "source", required_argument, 0, 's' },
205     { "dest",   required_argument, 0, 'd' },
206     { "async",  no_argument,       0, 'a' },
207     { "number", required_argument, 0, 'n' },
208     { "count",  required_argument, 0, 'c' },
209     { "verbose",no_argument,       0, 'v' },
210     { "quiet",  no_argument,       0, 'q' },
211     { 0, 0, 0, 0}
212 };
213
214 static void
215 usage(void)
216 {
217         printf("usage: fwp-timing [ options ]\n");
218         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
219         printf("  -p, --period <ms>  period in miliseconds\n");
220         printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
221         printf("  -s, --source <ip>  source IP address\n");
222         printf("  -d, --dest <ip:port> destination IP address and port\n");
223         printf("  -a, --async  Send packets asynchronously\n");
224         printf("  -n, --number Number of streams with the same parameters\n");
225         printf("  -c, --count Number of messages to send [infinity]\n");
226         printf("  -q, --quiet Print only final statistics\n");
227         printf("  -/, --stream  New stream separator\n");
228         printf("  -v, --verbose Be more verbose\n");
229 }
230
231 int parse_opts(int *argc, char **argv[], struct stream_params *p)
232 {
233         int opt;
234         int ret;
235         bool options_found = false;
236
237         while ((opt = getopt_long(*argc, *argv, "/ab:c:d:l:n:p:qs:v", long_opts, NULL)) != -1) {
238                 options_found = true;
239                 switch (opt) {
240                 case 'a':
241                         p->async = true;
242                         break;
243                 case 'b':
244                         p->budget = atoi(optarg);
245                         break;
246                 case 'c':
247                         p->count = atoi(optarg);
248                         break;
249                 case 'd':
250                         ret = inet_aton(optarg, &p->dst);
251                         if (!ret) {
252                                 fprintf(stderr, "Destination IP address not recognized: %s\n",
253                                         optarg);
254                                 usage();
255                                 exit(1);
256                         }
257                         break;
258                 case 'l':
259                         ul_log_domain_arg2levels(optarg);
260                         break;
261                 case 'n':
262                         p->number = atoi(optarg);
263                         break;
264                 case 'p':
265                         p->period_ms = atoi(optarg);
266                         break;
267                 case 's':
268                         ret = inet_aton(optarg, &p->src);
269                         if (!ret) {
270                                 fprintf(stderr, "Source IP address not recognized: %s\n",
271                                         optarg);
272                                 usage();
273                                 exit(1);
274                         }
275                         break;
276                 case 'v':
277                         opt_verbose = true;
278                         break;
279                 case 'q':
280                         opt_quiet = true;
281                         break;
282                 case '/':
283                         break;
284                 default:
285                         usage();
286                         exit(1);
287                 }
288                 if (opt == '/')
289                         break; 
290         }
291         return (options_found) ? 0 : -1;
292 }
293
294 volatile bool exit_flag = false;
295
296 void stopper()
297 {
298         exit_flag = true;
299 }
300
301 int
302 timespec_subtract (result, x, y)
303      struct timespec *result, *x, *y;
304 {
305   /* Perform the carry for the later subtraction by updating Y. */
306   if (x->tv_nsec < y->tv_nsec) {
307     int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
308     y->tv_nsec -= 1000000000 * nsec;
309     y->tv_sec += nsec;
310   }
311   if (x->tv_nsec - y->tv_nsec > 1000000000) {
312     int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
313     y->tv_nsec += 1000000000 * nsec;
314     y->tv_sec -= nsec;
315   }
316
317   /* Compute the time remaining to wait.
318      `tv_nsec' is certainly positive. */
319   result->tv_sec = x->tv_sec - y->tv_sec;
320   result->tv_nsec = x->tv_nsec - y->tv_nsec;
321
322   /* Return 1 if result is negative. */
323   return x->tv_sec < y->tv_sec;
324 }
325
326 static inline double ts2d(struct timespec *ts)
327 {
328         return ts->tv_sec + 1e-9*ts->tv_nsec;
329 }
330
331 static inline double tsdiff2d(struct timespec *x,
332                               struct timespec *y)
333 {
334         struct timespec r;
335         timespec_subtract(&r, x, y);
336         return ts2d(&r);
337 }
338
339 static inline int tsdiff2us(struct timespec *x,
340                               struct timespec *y)
341 {
342         struct timespec r;
343         timespec_subtract(&r, x, y);
344         return r.tv_sec*1000000 + r.tv_nsec/1000;
345 }
346
347 struct msg {
348         int cnt;
349         struct timespec ts;
350 };
351
352 struct receiver_params {
353         int budget;
354         frsh_receive_endpoint_t epdst;
355         int id;
356 };
357
358 void *receiver(void *arg)
359 {
360         struct receiver_params *rp = arg;
361         frsh_receive_endpoint_t epdst = rp->epdst;
362         size_t mlen;
363         int ret;
364         int last_cnt = -1;
365         struct timespec tss, tsr;
366         struct msg *msg;
367         msg = malloc(rp->budget);
368         if (!msg) error(1, errno, "malloc msg");
369
370         while (!exit_flag) {
371                 ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
372                 clock_gettime(CLOCK_MONOTONIC, &tsr);
373                 tss = msg->ts;
374                 if (msg->cnt != last_cnt+1) {
375                         if (!opt_quiet)
376                                 printf("%3d: packet(s) lost!\n", rp->id);
377                         __sync_fetch_and_add(&stats.lost, msg->cnt - last_cnt+1);
378                 } else {
379                         hist_add(&hist, tsdiff2us(&tsr, &tss));
380                         __sync_fetch_and_add(&stats.received, 1);
381                 }
382                 if (opt_verbose)
383                         printf("%3d: %10d: %10.3lf ms\n",
384                                rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
385                 last_cnt = msg->cnt;
386         }
387         free(rp);
388         return NULL;
389 }
390
391 sem_t finished;
392
393 void *sender(void *arg)
394 {
395         struct stream_params *p = arg;
396         frsh_send_endpoint_t epsrc;
397         struct receiver_params *rp;
398         int ret;
399         struct msg *msg;
400         long int cnt=0;
401         pthread_t receiver_id;
402
403         msg = malloc(p->budget);
404         if (!msg) error(1, errno, "malloc msg");
405
406         rp = malloc(sizeof(*rp));
407         rp->budget = p->budget;
408         rp->id = p->id;
409         
410         create_endpoints(p, &epsrc, &rp->epdst);
411                 
412         set_rt_prio(50);
413
414         ret = pthread_create(&receiver_id, NULL, receiver, rp);
415         
416         struct timespec next_period;
417         struct timespec tss;
418         clock_gettime(CLOCK_MONOTONIC, &next_period);
419         while (!exit_flag && (p->count == -1 || p->count--)) {
420                 clock_gettime(CLOCK_MONOTONIC, &tss);
421                 msg->cnt = cnt++;
422                 msg->ts = tss;
423                 if (p->async)
424                         ret = frsh_send_async(epsrc, msg, p->budget);
425                 else {
426                         ret = frsh_send_sync(epsrc, msg, p->budget);
427                         clock_gettime(CLOCK_MONOTONIC, &next_period);
428                 }
429                 __sync_fetch_and_add(&stats.sent, 1);
430                 next_period.tv_sec  += (p->period_ms/1000);
431                 next_period.tv_nsec += (p->period_ms%1000) * 1000000;
432                 if (next_period.tv_nsec >= 1000000000) {
433                         next_period.tv_nsec -= 1000000000;
434                         next_period.tv_sec++;
435                 }
436                 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
437                                 &next_period, NULL);
438         }
439
440         
441         sem_post(&finished);
442         return NULL;
443 }
444
445 void print_stat(bool final)
446 {
447         printf("Sent: %5d  Received: %5d  Lost: %5d Max: %8.3f ms",
448                stats.sent, stats.received, stats.lost, hist.max/1000.0);
449         if (final) {
450                 printf("  Packetloss: %7.3f %%  90%%: %8.3f ms\n",
451                        100.0*stats.lost/stats.sent,
452                        hist_get_percentile(&hist, 90)/1000.0);
453         }
454         else
455                 printf("\r");
456         fflush(stdout);
457 }
458
459 int main(int argc, char *argv[])
460 {
461         int ret;
462         int i;
463         int num = 0;
464         bool negotiation_failure = false;
465         
466         struct stream_params sp = {
467                 .budget = 1024,
468                 .period_ms = 20,
469                 .async = false,
470                 .src.s_addr = htonl(INADDR_LOOPBACK),
471                 .dst.s_addr = htonl(INADDR_LOOPBACK),
472                 .number = 1,
473                 .count = -1,
474         };
475         struct stream_params *p[100];
476
477         memset(p, 0, sizeof(p));
478
479         if (signal(SIGTERM, stopper) == SIG_ERR)
480                 error(1, errno, "Error in signal registration");
481         if (signal(SIGINT, stopper) == SIG_ERR)
482                 error(1, errno, "Signal handler registration error");
483
484         sem_init(&finished, 0, 0);
485         
486         ret = frsh_init();
487         if (ret) PERROR_AND_EXIT(ret, "frsh_init");
488         
489         do {
490                 ret = parse_opts(&argc, &argv, &sp);
491                 if (num == 0 || ret == 0) {
492                         for (i=0; i<sp.number; i++) {
493                                 p[num] = malloc(sizeof(*p[0]));
494                                 if (!p[num]) error(1, errno, "malloc");
495                                 *p[num] = sp;
496                                 p[num]->id = num;
497                                 ret = negotiate_contract(p[num]);
498                                 if (!ret)
499                                         num++;
500                                 else {
501                                         PERROR_FRESCOR(ret, "frsh_contract_negotiate");
502                                         free(p[num]);
503                                         negotiation_failure = true;
504                                         break;
505                                 }
506                         }
507                 }
508         } while(ret == 0);
509
510         if (negotiation_failure) {
511                 goto destroy;
512         }
513
514         for (i=0; i<num; i++)
515                 pthread_create(&p[i]->thread, NULL, sender, p[i]);
516
517         while (!exit_flag && !opt_quiet) {
518                 int v;
519                 print_stat(false);
520                 sem_getvalue(&finished, &v);
521                 if (v == num)
522                         break;
523                 usleep(100000);
524         }
525
526         for (i=0; i<num; i++)
527                 pthread_join(p[i]->thread, NULL);
528 destroy:
529         for (i=0; i<num; i++) {
530                 frsh_contract_cancel(p[i]->vres);
531                 free(p[i]);
532         }
533
534         stats.lost = stats.sent - stats.received;
535         print_stat(true);
536         
537         return negotiation_failure ? 1 : 0;
538 }