]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/tests/timing/fwp-timing.c
Allow fwp-timing to generate several streams simultaneously
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
1 /*
2  * A tool for measuring FWP latency. This program aims to be simpler
3  * than wme_test and is intended to be run on a single machine with
4  * multiple wifi interfaces and send-to-self kernel patch applied.
5  *
6  * This program creates both sides of communication and measures the
7  * communication delays.
8  */
9
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <frsh.h>
13 #include <getopt.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdlib.h>
17 #include <sys/socket.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <sys/param.h>
21 #include <fwp_res.h>
22 #include <error.h>
23 #include <sys/mman.h>
24 #include <semaphore.h>
25
26 void set_rt_prio(int priority)
27 {
28         int maxpri, minpri;
29         static struct sched_param param;
30
31         if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
32                 fprintf(stderr, "warning: sched_get_priority_max failed");
33         }
34
35         if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
36                 fprintf(stderr, "warning: sched_get_priority_min failed");
37         }
38
39         if (priority > maxpri)  {
40                 fprintf(stderr, "warning: maximum priority allowed is %d.\n", maxpri);
41         }
42         if (priority < minpri)  {
43                 fprintf(stderr, "warning: minimum priority allowed is %d.\n", minpri);
44         }
45
46         param.sched_priority = priority;
47
48         if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
49                 fprintf(stderr, "warning: sched_setscheduler failed");
50         }
51
52         mlockall(MCL_CURRENT | MCL_FUTURE);
53 }
54
55 struct stream_params {
56         int budget;
57         int period_ms;
58         bool async;
59         struct in_addr src, dst;
60         int number;
61         int id;
62 };
63
64
65 int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
66 {
67         frsh_contract_t contract;
68         int ret;
69         frsh_rel_time_t period;
70         frsh_rel_time_t budget;
71         frsh_rel_time_t deadline;
72         fres_block_fwp *fwp;    
73         
74         ret = frsh_contract_init(&contract);
75         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_init");
76
77         ret = frsh_contract_set_resource_and_label(
78                 &contract,
79                 FRSH_RT_NETWORK, FRSH_NETPF_FWP,
80                 NULL);
81         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
82
83         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
84         period = fosa_msec_to_rel_time(p->period_ms);
85         ret = frsh_contract_set_basic_params(&contract,
86                                              &budget,
87                                              &period,
88                                              FRSH_WT_BOUNDED,
89                                              FRSH_CT_REGULAR);
90         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
91
92
93         /* FWP doesn't accept smaller deadlines than 30 ms. */
94         if (frsh_rel_time_smaller(period, frsh_msec_to_rel_time(30)))
95                 deadline = frsh_msec_to_rel_time(30);
96         else
97                 deadline = period;
98         ret = frsh_contract_set_timing_reqs(&contract, false, &deadline);
99         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_timing_reqs");
100
101         fwp = malloc(sizeof(*fwp));
102         if (!fwp) PERROR_AND_EXIT(errno, "malloc");
103         fwp->src = p->src.s_addr;
104         ret = fres_contract_add_fwp(contract, fwp);
105         if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
106
107         ret = frsh_contract_negotiate(&contract, vres);
108         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
109
110         frsh_contract_destroy(&contract);
111
112         return 0;
113 }
114
115 void create_endpoints(struct stream_params *p,
116                       frsh_vres_id_t vres,
117                       frsh_send_endpoint_t *epsrc,
118                       frsh_receive_endpoint_t *epdst)
119 {
120         int ret;
121         frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
122         frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
123         frsh_endpoint_queueing_info_t qi = { .queue_size=0,
124                                              .queue_policy=FRSH_QRP_OLDEST };
125
126         ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
127                                            epdst);
128         if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
129                 
130         unsigned int port;
131         frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
132
133         ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
134                                         p->dst.s_addr, port, spi,
135                                         epsrc);
136         if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
137                 
138         ret = frsh_send_endpoint_bind(vres, *epsrc);
139         if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
140 }
141
142
143 static struct option long_opts[] = {
144     { "period", 1, 0, 'p' },
145     { "budget", 1, 0, 'b' },
146     { "source", 1, 0, 's' },
147     { "dest",   1, 0, 'd' },
148     { "async",  0, 0, 'a' },
149     { "number", 0, 0, 'n' },
150     { 0, 0, 0, 0}
151 };
152
153 static void
154 usage(void)
155 {
156         printf("usage: fwp-timing [ options ]\n");
157         printf("  -p, --period <ms>  period in miliseconds\n");
158         printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
159         printf("  -s, --source <ip>  source IP address\n");
160         printf("  -d, --dest <ip:port> destination IP address and port\n");
161         printf("  -a, --async  Send packets asynchronously\n");
162         printf("  -n, --number Number of streams with the same parameters\n");
163         printf("  -/, --stream  New stream separator\n");
164 }
165
166 int parse_opts(int *argc, char **argv[], struct stream_params *p)
167 {
168         int opt;
169         int ret;
170         bool options_found = false;
171
172         while ((opt = getopt_long(*argc, *argv, "/ab:p:s:d:n:", long_opts, NULL)) != -1) {
173                 options_found = true;
174                 switch (opt) {
175                 case 'a':
176                         p->async = true;
177                         break;
178                 case 'b':
179                         p->budget = atoi(optarg);
180                         break;
181                 case 'p':
182                         p->period_ms = atoi(optarg);
183                         break;
184                 case 's':
185                         ret = inet_aton(optarg, &p->src);
186                         if (!ret) {
187                                 fprintf(stderr, "Source IP address not recognized: %s\n",
188                                         optarg);
189                                 usage();
190                                 exit(1);
191                         }
192                         break;
193                 case 'd':
194                         ret = inet_aton(optarg, &p->dst);
195                         if (!ret) {
196                                 fprintf(stderr, "Destination IP address not recognized: %s\n",
197                                         optarg);
198                                 usage();
199                                 exit(1);
200                         }
201                         break;
202                 case 'n':
203                         p->number = atoi(optarg);
204                         break;
205                 case '/':
206                         break;
207                 default:
208                         usage();
209                         exit(1);
210                 }
211                 if (opt == '/')
212                         break; 
213         }
214         return (options_found) ? 0 : -1;
215 }
216
217 volatile bool exit_flag = false;
218
219 void stopper()
220 {
221         exit_flag = true;
222 }
223
224 int
225 timespec_subtract (result, x, y)
226      struct timespec *result, *x, *y;
227 {
228   /* Perform the carry for the later subtraction by updating Y. */
229   if (x->tv_nsec < y->tv_nsec) {
230     int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
231     y->tv_nsec -= 1000000000 * nsec;
232     y->tv_sec += nsec;
233   }
234   if (x->tv_nsec - y->tv_nsec > 1000000000) {
235     int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
236     y->tv_nsec += 1000000000 * nsec;
237     y->tv_sec -= nsec;
238   }
239
240   /* Compute the time remaining to wait.
241      `tv_nsec' is certainly positive. */
242   result->tv_sec = x->tv_sec - y->tv_sec;
243   result->tv_nsec = x->tv_nsec - y->tv_nsec;
244
245   /* Return 1 if result is negative. */
246   return x->tv_sec < y->tv_sec;
247 }
248
249 static inline double ts2d(struct timespec *ts)
250 {
251         return ts->tv_sec + 1e-9*ts->tv_nsec;
252 }
253
254 static inline double tsdiff2d(struct timespec *x,
255                               struct timespec *y)
256 {
257         struct timespec r;
258         timespec_subtract(&r, x, y);
259         return ts2d(&r);
260 }
261
262 struct msg {
263         int cnt;
264         struct timespec ts;
265 };
266
267 struct receiver_params {
268         int budget;
269         frsh_receive_endpoint_t epdst;
270         int id;
271 };
272
273 void *receiver(void *arg)
274 {
275         struct receiver_params *rp = arg;
276         frsh_receive_endpoint_t epdst = rp->epdst;
277         size_t mlen;
278         int ret;
279         int last_cnt = -1;
280         struct timespec tss, tsr;
281         struct msg *msg;
282         msg = malloc(rp->budget);
283         if (!msg) error(1, errno, "malloc msg");
284
285         while (!exit_flag) {
286                 ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
287                 clock_gettime(CLOCK_MONOTONIC, &tsr);
288                 tss = msg->ts;
289                 if (msg->cnt != last_cnt+1)
290                         printf("packet(s) lost!\n");
291                 printf("%3d: %10d: %10.3lf ms\n",
292                        rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
293                 last_cnt = msg->cnt;
294         }
295         free(rp);
296         return NULL;
297 }
298
299 sem_t finished;
300
301 void *sender(void *arg)
302 {
303         struct stream_params *p = arg;
304         frsh_vres_id_t vres;
305         frsh_send_endpoint_t epsrc;
306         struct receiver_params *rp;
307         int ret;
308         struct msg *msg;
309         long int cnt=0;
310         pthread_t receiver_id;
311
312         msg = malloc(p->budget);
313         if (!msg) error(1, errno, "malloc msg");
314
315         negotiate_contract(p, &vres);
316
317         rp = malloc(sizeof(*rp));
318         rp->budget = p->budget;
319         rp->id = p->id;
320
321         create_endpoints(p, vres, &epsrc, &rp->epdst);
322
323         set_rt_prio(50);
324
325         ret = pthread_create(&receiver_id, NULL, receiver, rp);
326         
327         if (signal(SIGTERM, stopper) == SIG_ERR)
328                 error(1, errno, "Error in signal registration");
329         if (signal(SIGINT, stopper) == SIG_ERR)
330                 error(1, errno, "Signal handler registration error");
331
332         struct timespec next_period;
333         struct timespec tss;
334         clock_gettime(CLOCK_MONOTONIC, &next_period);
335         while (!exit_flag) {
336                 clock_gettime(CLOCK_MONOTONIC, &tss);
337                 msg->cnt = cnt++;
338                 msg->ts = tss;
339                 if (p->async)
340                         ret = frsh_send_async(epsrc, msg, p->budget);
341                 else {
342                         ret = frsh_send_sync(epsrc, msg, p->budget);
343                         clock_gettime(CLOCK_MONOTONIC, &next_period);
344                 }
345                 next_period.tv_sec  += (p->period_ms/1000);
346                 next_period.tv_nsec += (p->period_ms%1000) * 1000000;
347                 if (next_period.tv_nsec >= 1000000000) {
348                         next_period.tv_nsec -= 1000000000;
349                         next_period.tv_sec++;
350                 }
351                 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
352                                 &next_period, NULL);
353         }
354
355         frsh_contract_cancel(vres);
356         free(p);
357         sem_post(&finished);
358         return NULL;
359 }
360
361 int main(int argc, char *argv[])
362 {
363         int ret;
364         int num = 0;
365         struct stream_params sp = {
366                 .budget = 1024,
367                 .period_ms = 20,
368                 .async = false,
369                 .src.s_addr = htonl(INADDR_LOOPBACK),
370                 .dst.s_addr = htonl(INADDR_LOOPBACK),
371                 .number = 1,
372         };
373         struct stream_params *p = NULL;
374
375         sem_init(&finished, 0, 0);
376         
377         ret = frsh_init();
378         if (ret) PERROR_AND_EXIT(ret, "frsh_init");
379         
380         do {
381                 ret = parse_opts(&argc, &argv, &sp);
382                 if (num == 0 || ret == 0) {
383                         int i;
384                         for (i=0; i<sp.number; i++) {
385                                 pthread_t thread;
386                                 p = malloc(sizeof(*p));
387                                 if (!p) error(1, errno, "malloc");
388                                 *p = sp;
389                                 p->id = num;
390                                 pthread_create(&thread, NULL, sender, p);
391                                 num++;
392                         }
393                 }
394         } while(ret == 0);
395
396         while (num--)
397                 sem_wait(&finished);
398         
399         return 0;
400 }