]> rtime.felk.cvut.cz Git - frescor/frsh-forb.git/blob - src/fwp/fwp/tests/timing/fwp-timing.c
Get rid of the most of other warnings
[frescor/frsh-forb.git] / src / fwp / fwp / tests / timing / fwp-timing.c
1 /*
2  * A tool for measuring FWP latency. This program aims to be simpler
3  * than wme_test and is intended to be run on a single machine with
4  * multiple wifi interfaces and send-to-self kernel patch applied.
5  *
6  * This program creates both sides of communication and measures the
7  * communication delays.
8  */
9
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <frsh.h>
13 #include <getopt.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdlib.h>
17 #include <sys/socket.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <sys/param.h>
21 #include <fwp_res.h>
22 #include <error.h>
23 #include <sys/mman.h>
24 #include <semaphore.h>
25 #include <ul_logreg.h>
26
27 bool opt_verbose = false;
28 bool opt_quiet = false;
29
30 #define HIST_MAX_US 10000000
31 #define HIST_RES_US 10
32
33 struct histogram {
34         unsigned cnt[(HIST_MAX_US+1)/HIST_RES_US];
35         unsigned max;
36 };
37
38 /* static void hist_init(struct histogram *h) */
39 /* { */
40 /*      memset(h, 0, sizeof(*h)); */
41 /* } */
42
43 static void hist_add(struct histogram *h, int us)
44 {
45         assert(us > 0);
46         if (us > HIST_MAX_US)
47                 us = HIST_MAX_US;
48         __sync_fetch_and_add(&h->cnt[us/HIST_RES_US], 1);
49
50         unsigned max;
51         do {
52                 max = h->max;
53         } while (us > max && ! __sync_bool_compare_and_swap(&h->max, max, us));
54                 
55 }
56
57 static unsigned hist_get_percentile(struct histogram *h, unsigned p)
58 {
59         uint64_t sum = 0, psum;
60         int i;
61         for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++)
62                 sum += h->cnt[i];
63
64         psum = sum * p / 100;
65         sum = 0;
66         for (i=0; i<(HIST_MAX_US+1)/HIST_RES_US; i++) {
67                 sum += h->cnt[i];
68                 if (sum >= psum)
69                         break;
70         }
71         return i*HIST_RES_US;
72 }
73
74 struct stats {
75         uint32_t sent;
76         uint32_t received;
77         uint32_t lost;
78 } stats;
79
80 struct histogram hist;
81
82 void set_rt_prio(int priority)
83 {
84         int maxpri, minpri;
85         static struct sched_param param;
86
87         if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
88                 fprintf(stderr, "warning: sched_get_priority_max failed\n");
89         }
90
91         if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
92                 fprintf(stderr, "warning: sched_get_priority_min failed\n");
93         }
94
95         if (priority > maxpri)  {
96                 fprintf(stderr, "warning: maximum priority allowed is %d.\n", maxpri);
97         }
98         if (priority < minpri)  {
99                 fprintf(stderr, "warning: minimum priority allowed is %d.\n", minpri);
100         }
101
102         param.sched_priority = priority;
103
104         if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
105                 fprintf(stderr, "warning: sched_setscheduler failed\n");
106         }
107
108         mlockall(MCL_CURRENT | MCL_FUTURE);
109 }
110
111 struct stream_params {
112         int budget;
113         int period_ms;
114         bool async;
115         struct in_addr src, dst;
116         int number;
117         int id;
118         int count;
119         frsh_vres_id_t vres;
120         pthread_t thread;
121         int jitter;
122 };
123
124
125 int negotiate_contract(struct stream_params *p)
126 {
127         frsh_contract_t contract;
128         int ret;
129         frsh_rel_time_t period;
130         frsh_rel_time_t budget;
131         frsh_rel_time_t deadline;
132         fres_block_fwp *fwp;    
133         
134         ret = frsh_contract_init(&contract);
135         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_init");
136
137         ret = frsh_contract_set_resource_and_label(
138                 &contract,
139                 FRSH_RT_NETWORK, FRSH_NETPF_FWP,
140                 NULL);
141         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
142
143         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
144         period = fosa_msec_to_rel_time(p->period_ms);
145         ret = frsh_contract_set_basic_params(&contract,
146                                              &budget,
147                                              &period,
148                                              FRSH_WT_BOUNDED,
149                                              FRSH_CT_REGULAR);
150         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
151
152
153         /* FWP doesn't accept smaller deadlines than 30 ms. */
154         if (frsh_rel_time_smaller(period, frsh_msec_to_rel_time(30)))
155                 deadline = frsh_msec_to_rel_time(30);
156         else
157                 deadline = period;
158         ret = frsh_contract_set_timing_reqs(&contract, false, &deadline);
159         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_timing_reqs");
160
161         fwp = malloc(sizeof(*fwp));
162         if (!fwp) PERROR_AND_EXIT(errno, "malloc");
163         fwp->src = p->src.s_addr;
164         ret = fres_contract_add_fwp(contract, fwp);
165         if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
166
167         ret = frsh_contract_negotiate(&contract, &p->vres);
168
169         frsh_contract_destroy(&contract);
170
171         return ret;
172 }
173
174 void create_endpoints(struct stream_params *p,
175                       frsh_send_endpoint_t *epsrc,
176                       frsh_receive_endpoint_t *epdst)
177 {
178         int ret;
179         frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
180         frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
181         frsh_endpoint_queueing_info_t qi = { .queue_size=0,
182                                              .queue_policy=FRSH_QRP_OLDEST };
183
184         ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
185                                            epdst);
186         if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
187                 
188         unsigned int port;
189         frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
190
191         ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
192                                         p->dst.s_addr, port, spi,
193                                         epsrc);
194         if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
195                 
196         ret = frsh_send_endpoint_bind(p->vres, *epsrc);
197         if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
198 }
199
200
201 static struct option long_opts[] = {
202     { "loglevel",required_argument, 0, 'l' },
203     { "period", required_argument, 0, 'p' },
204     { "budget", required_argument, 0, 'b' },
205     { "source", required_argument, 0, 's' },
206     { "dest",   required_argument, 0, 'd' },
207     { "async",  no_argument,       0, 'a' },
208     { "number", required_argument, 0, 'n' },
209     { "count",  required_argument, 0, 'c' },
210     { "verbose",no_argument,       0, 'v' },
211     { "quiet",  no_argument,       0, 'q' },
212     { "jitter", required_argument, 0, 'j' },
213     { 0, 0, 0, 0}
214 };
215
216 static void
217 usage(void)
218 {
219         printf("usage: fwp-timing [ options ]\n");
220         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
221         printf("  -p, --period <ms>  period in miliseconds\n");
222         printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
223         printf("  -s, --source <ip>  source IP address\n");
224         printf("  -d, --dest <ip:port> destination IP address and port\n");
225         printf("  -a, --async  Send packets asynchronously\n");
226         printf("  -n, --number Number of streams with the same parameters\n");
227         printf("  -c, --count Number of messages to send [infinity]\n");
228         printf("  -q, --quiet Print only final statistics\n");
229         printf("  -/, --stream  New stream separator\n");
230         printf("  -v, --verbose Be more verbose\n");
231         printf("  -j, --jitter <percent> Sent jitter given as percentage of period\n");
232 }
233
234 int parse_opts(int *argc, char **argv[], struct stream_params *p)
235 {
236         int opt;
237         int ret;
238         bool options_found = false;
239
240         while ((opt = getopt_long(*argc, *argv, "/ab:c:d:j:l:n:p:qs:v", long_opts, NULL)) != -1) {
241                 options_found = true;
242                 switch (opt) {
243                 case 'a':
244                         p->async = true;
245                         break;
246                 case 'b':
247                         p->budget = atoi(optarg);
248                         break;
249                 case 'c':
250                         p->count = atoi(optarg);
251                         break;
252                 case 'd':
253                         ret = inet_aton(optarg, &p->dst);
254                         if (!ret) {
255                                 fprintf(stderr, "Destination IP address not recognized: %s\n",
256                                         optarg);
257                                 usage();
258                                 exit(1);
259                         }
260                         break;
261                 case 'j':
262                         p->jitter = atoi(optarg);
263                         break;
264                 case 'l':
265                         ul_log_domain_arg2levels(optarg);
266                         break;
267                 case 'n':
268                         p->number = atoi(optarg);
269                         break;
270                 case 'p':
271                         p->period_ms = atoi(optarg);
272                         break;
273                 case 's':
274                         ret = inet_aton(optarg, &p->src);
275                         if (!ret) {
276                                 fprintf(stderr, "Source IP address not recognized: %s\n",
277                                         optarg);
278                                 usage();
279                                 exit(1);
280                         }
281                         break;
282                 case 'v':
283                         opt_verbose = true;
284                         break;
285                 case 'q':
286                         opt_quiet = true;
287                         break;
288                 case '/':
289                         break;
290                 default:
291                         usage();
292                         exit(1);
293                 }
294                 if (opt == '/')
295                         break; 
296         }
297         return (options_found) ? 0 : -1;
298 }
299
300 volatile bool exit_flag = false;
301
302 void stopper()
303 {
304         exit_flag = true;
305 }
306
307 int
308 timespec_subtract (result, x, y)
309      struct timespec *result, *x, *y;
310 {
311   /* Perform the carry for the later subtraction by updating Y. */
312   if (x->tv_nsec < y->tv_nsec) {
313     int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
314     y->tv_nsec -= 1000000000 * nsec;
315     y->tv_sec += nsec;
316   }
317   if (x->tv_nsec - y->tv_nsec > 1000000000) {
318     int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
319     y->tv_nsec += 1000000000 * nsec;
320     y->tv_sec -= nsec;
321   }
322
323   /* Compute the time remaining to wait.
324      `tv_nsec' is certainly positive. */
325   result->tv_sec = x->tv_sec - y->tv_sec;
326   result->tv_nsec = x->tv_nsec - y->tv_nsec;
327
328   /* Return 1 if result is negative. */
329   return x->tv_sec < y->tv_sec;
330 }
331
332 static inline double ts2d(struct timespec *ts)
333 {
334         return ts->tv_sec + 1e-9*ts->tv_nsec;
335 }
336
337 static inline double tsdiff2d(struct timespec *x,
338                               struct timespec *y)
339 {
340         struct timespec r;
341         timespec_subtract(&r, x, y);
342         return ts2d(&r);
343 }
344
345 static inline int tsdiff2us(struct timespec *x,
346                               struct timespec *y)
347 {
348         struct timespec r;
349         timespec_subtract(&r, x, y);
350         return r.tv_sec*1000000 + r.tv_nsec/1000;
351 }
352
353 struct msg {
354         int cnt;
355         struct timespec ts;
356 };
357
358 struct receiver_params {
359         int budget;
360         frsh_receive_endpoint_t epdst;
361         int id;
362 };
363
364 void *receiver(void *arg)
365 {
366         struct receiver_params *rp = arg;
367         frsh_receive_endpoint_t epdst = rp->epdst;
368         size_t mlen;
369         int ret;
370         int last_cnt = -1;
371         struct timespec tss, tsr;
372         struct msg *msg;
373         msg = malloc(rp->budget);
374         if (!msg) error(1, errno, "malloc msg");
375
376         while (!exit_flag) {
377                 ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
378                 if (ret) error(1, errno, "frsh_receive_sync");
379                 clock_gettime(CLOCK_MONOTONIC, &tsr);
380                 tss = msg->ts;
381                 if (msg->cnt != last_cnt+1) {
382                         if (!opt_quiet)
383                                 printf("%3d: packet(s) lost!\n", rp->id);
384                         __sync_fetch_and_add(&stats.lost, msg->cnt - last_cnt+1);
385                 } else {
386                         hist_add(&hist, tsdiff2us(&tsr, &tss));
387                         __sync_fetch_and_add(&stats.received, 1);
388                 }
389                 if (opt_verbose)
390                         printf("%3d: %10d: %10.3lf ms\n",
391                                rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
392                 last_cnt = msg->cnt;
393         }
394         free(rp);
395         return NULL;
396 }
397
398 sem_t finished;
399
400 void *sender(void *arg)
401 {
402         struct stream_params *p = arg;
403         frsh_send_endpoint_t epsrc;
404         struct receiver_params *rp;
405         int ret;
406         struct msg *msg;
407         long int cnt=0;
408         pthread_t receiver_id;
409
410         msg = malloc(p->budget);
411         if (!msg) error(1, errno, "malloc msg");
412
413         rp = malloc(sizeof(*rp));
414         rp->budget = p->budget;
415         rp->id = p->id;
416         
417         create_endpoints(p, &epsrc, &rp->epdst);
418                 
419         set_rt_prio(50);
420
421         ret = pthread_create(&receiver_id, NULL, receiver, rp);
422         
423         struct timespec next_period;
424         struct timespec tss;
425         clock_gettime(CLOCK_MONOTONIC, &next_period);
426         while (!exit_flag && (p->count == -1 || p->count--)) {
427                 clock_gettime(CLOCK_MONOTONIC, &tss);
428                 msg->cnt = cnt++;
429                 msg->ts = tss;
430                 if (p->async)
431                         ret = frsh_send_async(epsrc, msg, p->budget);
432                 else {
433                         ret = frsh_send_sync(epsrc, msg, p->budget);
434                         clock_gettime(CLOCK_MONOTONIC, &next_period);
435                 }
436                 __sync_fetch_and_add(&stats.sent, 1);
437                 if (ret) error(1, errno, "frsh_send_*");
438
439                 int delay_ms;
440                 if (p->jitter)
441                         delay_ms = p->period_ms*(100-p->jitter)/100
442                                 + rand() % (2*p->period_ms*p->jitter/100);
443                 else
444                         delay_ms = p->period_ms;
445
446                 next_period.tv_sec  += (delay_ms/1000);
447                 next_period.tv_nsec += (delay_ms%1000) * 1000000;
448                 if (next_period.tv_nsec >= 1000000000) {
449                         next_period.tv_nsec -= 1000000000;
450                         next_period.tv_sec++;
451                 }
452                 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
453                                 &next_period, NULL);
454         }
455
456         
457         sem_post(&finished);
458         return NULL;
459 }
460
461 void print_stat(bool final)
462 {
463         printf("Sent: %5d  Received: %5d  Lost: %5d Max: %8.3f ms",
464                stats.sent, stats.received, stats.lost, hist.max/1000.0);
465         if (final) {
466                 printf("  Packetloss: %7.3f %%  95%%: %8.3f ms  99%%: %8.3f ms\n",
467                        100.0*stats.lost/stats.sent,
468                        hist_get_percentile(&hist, 95)/1000.0,
469                        hist_get_percentile(&hist, 99)/1000.0);
470         }
471         else
472                 printf("\r");
473         fflush(stdout);
474 }
475
476 int main(int argc, char *argv[])
477 {
478         int ret;
479         int i;
480         int num = 0;
481         bool negotiation_failure = false;
482         
483         struct stream_params sp = {
484                 .budget = 1024,
485                 .period_ms = 20,
486                 .async = false,
487                 .src.s_addr = htonl(INADDR_LOOPBACK),
488                 .dst.s_addr = htonl(INADDR_LOOPBACK),
489                 .number = 1,
490                 .count = -1,
491         };
492         struct stream_params *p[100];
493
494         memset(p, 0, sizeof(p));
495
496         if (signal(SIGTERM, stopper) == SIG_ERR)
497                 error(1, errno, "Error in signal registration");
498         if (signal(SIGINT, stopper) == SIG_ERR)
499                 error(1, errno, "Signal handler registration error");
500
501         sem_init(&finished, 0, 0);
502         
503         ret = frsh_init();
504         if (ret) PERROR_AND_EXIT(ret, "frsh_init");
505         
506         do {
507                 ret = parse_opts(&argc, &argv, &sp);
508                 if (num == 0 || ret == 0) {
509                         for (i=0; i<sp.number; i++) {
510                                 p[num] = malloc(sizeof(*p[0]));
511                                 if (!p[num]) error(1, errno, "malloc");
512                                 *p[num] = sp;
513                                 p[num]->id = num;
514                                 ret = negotiate_contract(p[num]);
515                                 if (!ret)
516                                         num++;
517                                 else {
518                                         PERROR_FRESCOR(ret, "frsh_contract_negotiate");
519                                         free(p[num]);
520                                         negotiation_failure = true;
521                                         break;
522                                 }
523                         }
524                 }
525         } while(ret == 0);
526
527         if (negotiation_failure) {
528                 goto destroy;
529         }
530
531         for (i=0; i<num; i++)
532                 pthread_create(&p[i]->thread, NULL, sender, p[i]);
533
534         while (!exit_flag && !opt_quiet) {
535                 int v;
536                 print_stat(false);
537                 sem_getvalue(&finished, &v);
538                 if (v == num)
539                         break;
540                 usleep(100000);
541         }
542
543         for (i=0; i<num; i++)
544                 pthread_join(p[i]->thread, NULL);
545 destroy:
546         for (i=0; i<num; i++) {
547                 frsh_contract_cancel(p[i]->vres);
548                 free(p[i]);
549         }
550
551         stats.lost = stats.sent - stats.received;
552         print_stat(true);
553         
554         return negotiation_failure ? 1 : 0;
555 }