]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/tests/timing/fwp-timing.c
a594ef373fd123bb97b9375f3ad960317e6eeba3
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
1 /*
2  * A tool for measuring FWP latency. This program aims to be simpler
3  * than wme_test and is intended to be run on a single machine with
4  * multiple wifi interfaces and send-to-self kernel patch applied.
5  *
6  * This program creates both sides of communication and measures the
7  * communication delays.
8  */
9
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <frsh.h>
13 #include <getopt.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdlib.h>
17 #include <sys/socket.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <sys/param.h>
21 #include <fwp_res.h>
22 #include <error.h>
23 #include <sys/mman.h>
24 #include <semaphore.h>
25
26 bool opt_verbose = false;
27
28 struct stats {
29         
30 } stats;
31
32 void set_rt_prio(int priority)
33 {
34         int maxpri, minpri;
35         static struct sched_param param;
36
37         if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
38                 fprintf(stderr, "warning: sched_get_priority_max failed");
39         }
40
41         if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
42                 fprintf(stderr, "warning: sched_get_priority_min failed");
43         }
44
45         if (priority > maxpri)  {
46                 fprintf(stderr, "warning: maximum priority allowed is %d.\n", maxpri);
47         }
48         if (priority < minpri)  {
49                 fprintf(stderr, "warning: minimum priority allowed is %d.\n", minpri);
50         }
51
52         param.sched_priority = priority;
53
54         if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
55                 fprintf(stderr, "warning: sched_setscheduler failed");
56         }
57
58         mlockall(MCL_CURRENT | MCL_FUTURE);
59 }
60
61 struct stream_params {
62         int budget;
63         int period_ms;
64         bool async;
65         struct in_addr src, dst;
66         int number;
67         int id;
68         int count;
69 };
70
71
72 int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
73 {
74         frsh_contract_t contract;
75         int ret;
76         frsh_rel_time_t period;
77         frsh_rel_time_t budget;
78         frsh_rel_time_t deadline;
79         fres_block_fwp *fwp;    
80         
81         ret = frsh_contract_init(&contract);
82         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_init");
83
84         ret = frsh_contract_set_resource_and_label(
85                 &contract,
86                 FRSH_RT_NETWORK, FRSH_NETPF_FWP,
87                 NULL);
88         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
89
90         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
91         period = fosa_msec_to_rel_time(p->period_ms);
92         ret = frsh_contract_set_basic_params(&contract,
93                                              &budget,
94                                              &period,
95                                              FRSH_WT_BOUNDED,
96                                              FRSH_CT_REGULAR);
97         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
98
99
100         /* FWP doesn't accept smaller deadlines than 30 ms. */
101         if (frsh_rel_time_smaller(period, frsh_msec_to_rel_time(30)))
102                 deadline = frsh_msec_to_rel_time(30);
103         else
104                 deadline = period;
105         ret = frsh_contract_set_timing_reqs(&contract, false, &deadline);
106         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_timing_reqs");
107
108         fwp = malloc(sizeof(*fwp));
109         if (!fwp) PERROR_AND_EXIT(errno, "malloc");
110         fwp->src = p->src.s_addr;
111         ret = fres_contract_add_fwp(contract, fwp);
112         if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
113
114         ret = frsh_contract_negotiate(&contract, vres);
115         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
116
117         frsh_contract_destroy(&contract);
118
119         return 0;
120 }
121
122 void create_endpoints(struct stream_params *p,
123                       frsh_vres_id_t vres,
124                       frsh_send_endpoint_t *epsrc,
125                       frsh_receive_endpoint_t *epdst)
126 {
127         int ret;
128         frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
129         frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
130         frsh_endpoint_queueing_info_t qi = { .queue_size=0,
131                                              .queue_policy=FRSH_QRP_OLDEST };
132
133         ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
134                                            epdst);
135         if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
136                 
137         unsigned int port;
138         frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
139
140         ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
141                                         p->dst.s_addr, port, spi,
142                                         epsrc);
143         if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
144                 
145         ret = frsh_send_endpoint_bind(vres, *epsrc);
146         if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
147 }
148
149
150 static struct option long_opts[] = {
151     { "period", 1, 0, 'p' },
152     { "budget", 1, 0, 'b' },
153     { "source", 1, 0, 's' },
154     { "dest",   1, 0, 'd' },
155     { "async",  0, 0, 'a' },
156     { "number", 0, 0, 'n' },
157     { "count",  0, 0, 'c' },
158     { "verbose",0, 0, 'v' },
159     { 0, 0, 0, 0}
160 };
161
162 static void
163 usage(void)
164 {
165         printf("usage: fwp-timing [ options ]\n");
166         printf("  -p, --period <ms>  period in miliseconds\n");
167         printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
168         printf("  -s, --source <ip>  source IP address\n");
169         printf("  -d, --dest <ip:port> destination IP address and port\n");
170         printf("  -a, --async  Send packets asynchronously\n");
171         printf("  -n, --number Number of streams with the same parameters\n");
172         printf("  -c, --count Number of messages to send [infinity]\n");
173         printf("  -/, --stream  New stream separator\n");
174         printf("  -v, --verbose Be more verbose\n");
175 }
176
177 int parse_opts(int *argc, char **argv[], struct stream_params *p)
178 {
179         int opt;
180         int ret;
181         bool options_found = false;
182
183         while ((opt = getopt_long(*argc, *argv, "/ab:c:p:s:d:n:v", long_opts, NULL)) != -1) {
184                 options_found = true;
185                 switch (opt) {
186                 case 'a':
187                         p->async = true;
188                         break;
189                 case 'b':
190                         p->budget = atoi(optarg);
191                         break;
192                 case 'c':
193                         p->count = atoi(optarg);
194                         break;
195                 case 'd':
196                         ret = inet_aton(optarg, &p->dst);
197                         if (!ret) {
198                                 fprintf(stderr, "Destination IP address not recognized: %s\n",
199                                         optarg);
200                                 usage();
201                                 exit(1);
202                         }
203                         break;
204                 case 'n':
205                         p->number = atoi(optarg);
206                         break;
207                 case 'p':
208                         p->period_ms = atoi(optarg);
209                         break;
210                 case 's':
211                         ret = inet_aton(optarg, &p->src);
212                         if (!ret) {
213                                 fprintf(stderr, "Source IP address not recognized: %s\n",
214                                         optarg);
215                                 usage();
216                                 exit(1);
217                         }
218                         break;
219                 case 'v':
220                         opt_verbose = true;
221                 case '/':
222                         break;
223                 default:
224                         usage();
225                         exit(1);
226                 }
227                 if (opt == '/')
228                         break; 
229         }
230         return (options_found) ? 0 : -1;
231 }
232
233 volatile bool exit_flag = false;
234
235 void stopper()
236 {
237         exit_flag = true;
238 }
239
240 int
241 timespec_subtract (result, x, y)
242      struct timespec *result, *x, *y;
243 {
244   /* Perform the carry for the later subtraction by updating Y. */
245   if (x->tv_nsec < y->tv_nsec) {
246     int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
247     y->tv_nsec -= 1000000000 * nsec;
248     y->tv_sec += nsec;
249   }
250   if (x->tv_nsec - y->tv_nsec > 1000000000) {
251     int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
252     y->tv_nsec += 1000000000 * nsec;
253     y->tv_sec -= nsec;
254   }
255
256   /* Compute the time remaining to wait.
257      `tv_nsec' is certainly positive. */
258   result->tv_sec = x->tv_sec - y->tv_sec;
259   result->tv_nsec = x->tv_nsec - y->tv_nsec;
260
261   /* Return 1 if result is negative. */
262   return x->tv_sec < y->tv_sec;
263 }
264
265 static inline double ts2d(struct timespec *ts)
266 {
267         return ts->tv_sec + 1e-9*ts->tv_nsec;
268 }
269
270 static inline double tsdiff2d(struct timespec *x,
271                               struct timespec *y)
272 {
273         struct timespec r;
274         timespec_subtract(&r, x, y);
275         return ts2d(&r);
276 }
277
278 struct msg {
279         int cnt;
280         struct timespec ts;
281 };
282
283 struct receiver_params {
284         int budget;
285         frsh_receive_endpoint_t epdst;
286         int id;
287 };
288
289 void *receiver(void *arg)
290 {
291         struct receiver_params *rp = arg;
292         frsh_receive_endpoint_t epdst = rp->epdst;
293         size_t mlen;
294         int ret;
295         int last_cnt = -1;
296         struct timespec tss, tsr;
297         struct msg *msg;
298         msg = malloc(rp->budget);
299         if (!msg) error(1, errno, "malloc msg");
300
301         while (!exit_flag) {
302                 ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
303                 clock_gettime(CLOCK_MONOTONIC, &tsr);
304                 tss = msg->ts;
305                 if (msg->cnt != last_cnt+1)
306                         printf("%3d: packet(s) lost!\n", rp->id);
307                 if (opt_verbose)
308                         printf("%3d: %10d: %10.3lf ms\n",
309                                rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
310                 last_cnt = msg->cnt;
311         }
312         free(rp);
313         return NULL;
314 }
315
316 sem_t finished;
317
318 void *sender(void *arg)
319 {
320         struct stream_params *p = arg;
321         frsh_vres_id_t vres;
322         frsh_send_endpoint_t epsrc;
323         struct receiver_params *rp;
324         int ret;
325         struct msg *msg;
326         long int cnt=0;
327         pthread_t receiver_id;
328
329         msg = malloc(p->budget);
330         if (!msg) error(1, errno, "malloc msg");
331
332         negotiate_contract(p, &vres);
333
334         rp = malloc(sizeof(*rp));
335         rp->budget = p->budget;
336         rp->id = p->id;
337
338         create_endpoints(p, vres, &epsrc, &rp->epdst);
339
340         set_rt_prio(50);
341
342         ret = pthread_create(&receiver_id, NULL, receiver, rp);
343         
344         if (signal(SIGTERM, stopper) == SIG_ERR)
345                 error(1, errno, "Error in signal registration");
346         if (signal(SIGINT, stopper) == SIG_ERR)
347                 error(1, errno, "Signal handler registration error");
348
349         struct timespec next_period;
350         struct timespec tss;
351         clock_gettime(CLOCK_MONOTONIC, &next_period);
352         while (!exit_flag && (p->count == -1 || p->count--)) {
353                 clock_gettime(CLOCK_MONOTONIC, &tss);
354                 msg->cnt = cnt++;
355                 msg->ts = tss;
356                 if (p->async)
357                         ret = frsh_send_async(epsrc, msg, p->budget);
358                 else {
359                         ret = frsh_send_sync(epsrc, msg, p->budget);
360                         clock_gettime(CLOCK_MONOTONIC, &next_period);
361                 }
362                 next_period.tv_sec  += (p->period_ms/1000);
363                 next_period.tv_nsec += (p->period_ms%1000) * 1000000;
364                 if (next_period.tv_nsec >= 1000000000) {
365                         next_period.tv_nsec -= 1000000000;
366                         next_period.tv_sec++;
367                 }
368                 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
369                                 &next_period, NULL);
370         }
371
372         frsh_contract_cancel(vres);
373         free(p);
374         sem_post(&finished);
375         return NULL;
376 }
377
378 int main(int argc, char *argv[])
379 {
380         int ret;
381         int num = 0;
382         struct stream_params sp = {
383                 .budget = 1024,
384                 .period_ms = 20,
385                 .async = false,
386                 .src.s_addr = htonl(INADDR_LOOPBACK),
387                 .dst.s_addr = htonl(INADDR_LOOPBACK),
388                 .number = 1,
389                 .count = -1,
390         };
391         struct stream_params *p = NULL;
392
393         sem_init(&finished, 0, 0);
394         
395         ret = frsh_init();
396         if (ret) PERROR_AND_EXIT(ret, "frsh_init");
397         
398         do {
399                 ret = parse_opts(&argc, &argv, &sp);
400                 if (num == 0 || ret == 0) {
401                         int i;
402                         for (i=0; i<sp.number; i++) {
403                                 pthread_t thread;
404                                 p = malloc(sizeof(*p));
405                                 if (!p) error(1, errno, "malloc");
406                                 *p = sp;
407                                 p->id = num;
408                                 pthread_create(&thread, NULL, sender, p);
409                                 num++;
410                         }
411                 }
412         } while(ret == 0);
413
414         while (num--)
415                 sem_wait(&finished);
416         
417         return 0;
418 }