]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/tests/timing/fwp-timing.c
fwp-timing: Added -c flag to specify the number of messages to be sent
[frescor/fwp.git] / fwp / tests / timing / fwp-timing.c
1 /*
2  * A tool for measuring FWP latency. This program aims to be simpler
3  * than wme_test and is intended to be run on a single machine with
4  * multiple wifi interfaces and send-to-self kernel patch applied.
5  *
6  * This program creates both sides of communication and measures the
7  * communication delays.
8  */
9
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 #include <frsh.h>
13 #include <getopt.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdlib.h>
17 #include <sys/socket.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <sys/param.h>
21 #include <fwp_res.h>
22 #include <error.h>
23 #include <sys/mman.h>
24 #include <semaphore.h>
25
26 void set_rt_prio(int priority)
27 {
28         int maxpri, minpri;
29         static struct sched_param param;
30
31         if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1)        {
32                 fprintf(stderr, "warning: sched_get_priority_max failed");
33         }
34
35         if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1)        {
36                 fprintf(stderr, "warning: sched_get_priority_min failed");
37         }
38
39         if (priority > maxpri)  {
40                 fprintf(stderr, "warning: maximum priority allowed is %d.\n", maxpri);
41         }
42         if (priority < minpri)  {
43                 fprintf(stderr, "warning: minimum priority allowed is %d.\n", minpri);
44         }
45
46         param.sched_priority = priority;
47
48         if (sched_setscheduler(0, SCHED_FIFO, &param) == -1)    {
49                 fprintf(stderr, "warning: sched_setscheduler failed");
50         }
51
52         mlockall(MCL_CURRENT | MCL_FUTURE);
53 }
54
55 struct stream_params {
56         int budget;
57         int period_ms;
58         bool async;
59         struct in_addr src, dst;
60         int number;
61         int id;
62         int count;
63 };
64
65
66 int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
67 {
68         frsh_contract_t contract;
69         int ret;
70         frsh_rel_time_t period;
71         frsh_rel_time_t budget;
72         frsh_rel_time_t deadline;
73         fres_block_fwp *fwp;    
74         
75         ret = frsh_contract_init(&contract);
76         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_init");
77
78         ret = frsh_contract_set_resource_and_label(
79                 &contract,
80                 FRSH_RT_NETWORK, FRSH_NETPF_FWP,
81                 NULL);
82         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
83
84         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
85         period = fosa_msec_to_rel_time(p->period_ms);
86         ret = frsh_contract_set_basic_params(&contract,
87                                              &budget,
88                                              &period,
89                                              FRSH_WT_BOUNDED,
90                                              FRSH_CT_REGULAR);
91         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
92
93
94         /* FWP doesn't accept smaller deadlines than 30 ms. */
95         if (frsh_rel_time_smaller(period, frsh_msec_to_rel_time(30)))
96                 deadline = frsh_msec_to_rel_time(30);
97         else
98                 deadline = period;
99         ret = frsh_contract_set_timing_reqs(&contract, false, &deadline);
100         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_timing_reqs");
101
102         fwp = malloc(sizeof(*fwp));
103         if (!fwp) PERROR_AND_EXIT(errno, "malloc");
104         fwp->src = p->src.s_addr;
105         ret = fres_contract_add_fwp(contract, fwp);
106         if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
107
108         ret = frsh_contract_negotiate(&contract, vres);
109         if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
110
111         frsh_contract_destroy(&contract);
112
113         return 0;
114 }
115
116 void create_endpoints(struct stream_params *p,
117                       frsh_vres_id_t vres,
118                       frsh_send_endpoint_t *epsrc,
119                       frsh_receive_endpoint_t *epdst)
120 {
121         int ret;
122         frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
123         frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
124         frsh_endpoint_queueing_info_t qi = { .queue_size=0,
125                                              .queue_policy=FRSH_QRP_OLDEST };
126
127         ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
128                                            epdst);
129         if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
130                 
131         unsigned int port;
132         frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
133
134         ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
135                                         p->dst.s_addr, port, spi,
136                                         epsrc);
137         if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
138                 
139         ret = frsh_send_endpoint_bind(vres, *epsrc);
140         if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
141 }
142
143
144 static struct option long_opts[] = {
145     { "period", 1, 0, 'p' },
146     { "budget", 1, 0, 'b' },
147     { "source", 1, 0, 's' },
148     { "dest",   1, 0, 'd' },
149     { "async",  0, 0, 'a' },
150     { "number", 0, 0, 'n' },
151     { "count",  0, 0, 'c' },
152     { 0, 0, 0, 0}
153 };
154
155 static void
156 usage(void)
157 {
158         printf("usage: fwp-timing [ options ]\n");
159         printf("  -p, --period <ms>  period in miliseconds\n");
160         printf("  -b, --budget <bytes>  how many bytes is sent in each period\n");
161         printf("  -s, --source <ip>  source IP address\n");
162         printf("  -d, --dest <ip:port> destination IP address and port\n");
163         printf("  -a, --async  Send packets asynchronously\n");
164         printf("  -n, --number Number of streams with the same parameters\n");
165         printf("  -c, --count Number of messages to send [infinity]\n");
166         printf("  -/, --stream  New stream separator\n");
167 }
168
169 int parse_opts(int *argc, char **argv[], struct stream_params *p)
170 {
171         int opt;
172         int ret;
173         bool options_found = false;
174
175         while ((opt = getopt_long(*argc, *argv, "/ab:c:p:s:d:n:", long_opts, NULL)) != -1) {
176                 options_found = true;
177                 switch (opt) {
178                 case 'a':
179                         p->async = true;
180                         break;
181                 case 'b':
182                         p->budget = atoi(optarg);
183                         break;
184                 case 'c':
185                         p->count = atoi(optarg);
186                         break;
187                 case 'p':
188                         p->period_ms = atoi(optarg);
189                         break;
190                 case 's':
191                         ret = inet_aton(optarg, &p->src);
192                         if (!ret) {
193                                 fprintf(stderr, "Source IP address not recognized: %s\n",
194                                         optarg);
195                                 usage();
196                                 exit(1);
197                         }
198                         break;
199                 case 'd':
200                         ret = inet_aton(optarg, &p->dst);
201                         if (!ret) {
202                                 fprintf(stderr, "Destination IP address not recognized: %s\n",
203                                         optarg);
204                                 usage();
205                                 exit(1);
206                         }
207                         break;
208                 case 'n':
209                         p->number = atoi(optarg);
210                         break;
211                 case '/':
212                         break;
213                 default:
214                         usage();
215                         exit(1);
216                 }
217                 if (opt == '/')
218                         break; 
219         }
220         return (options_found) ? 0 : -1;
221 }
222
223 volatile bool exit_flag = false;
224
225 void stopper()
226 {
227         exit_flag = true;
228 }
229
230 int
231 timespec_subtract (result, x, y)
232      struct timespec *result, *x, *y;
233 {
234   /* Perform the carry for the later subtraction by updating Y. */
235   if (x->tv_nsec < y->tv_nsec) {
236     int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
237     y->tv_nsec -= 1000000000 * nsec;
238     y->tv_sec += nsec;
239   }
240   if (x->tv_nsec - y->tv_nsec > 1000000000) {
241     int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
242     y->tv_nsec += 1000000000 * nsec;
243     y->tv_sec -= nsec;
244   }
245
246   /* Compute the time remaining to wait.
247      `tv_nsec' is certainly positive. */
248   result->tv_sec = x->tv_sec - y->tv_sec;
249   result->tv_nsec = x->tv_nsec - y->tv_nsec;
250
251   /* Return 1 if result is negative. */
252   return x->tv_sec < y->tv_sec;
253 }
254
255 static inline double ts2d(struct timespec *ts)
256 {
257         return ts->tv_sec + 1e-9*ts->tv_nsec;
258 }
259
260 static inline double tsdiff2d(struct timespec *x,
261                               struct timespec *y)
262 {
263         struct timespec r;
264         timespec_subtract(&r, x, y);
265         return ts2d(&r);
266 }
267
268 struct msg {
269         int cnt;
270         struct timespec ts;
271 };
272
273 struct receiver_params {
274         int budget;
275         frsh_receive_endpoint_t epdst;
276         int id;
277 };
278
279 void *receiver(void *arg)
280 {
281         struct receiver_params *rp = arg;
282         frsh_receive_endpoint_t epdst = rp->epdst;
283         size_t mlen;
284         int ret;
285         int last_cnt = -1;
286         struct timespec tss, tsr;
287         struct msg *msg;
288         msg = malloc(rp->budget);
289         if (!msg) error(1, errno, "malloc msg");
290
291         while (!exit_flag) {
292                 ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
293                 clock_gettime(CLOCK_MONOTONIC, &tsr);
294                 tss = msg->ts;
295                 if (msg->cnt != last_cnt+1)
296                         printf("packet(s) lost!\n");
297                 printf("%3d: %10d: %10.3lf ms\n",
298                        rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
299                 last_cnt = msg->cnt;
300         }
301         free(rp);
302         return NULL;
303 }
304
305 sem_t finished;
306
307 void *sender(void *arg)
308 {
309         struct stream_params *p = arg;
310         frsh_vres_id_t vres;
311         frsh_send_endpoint_t epsrc;
312         struct receiver_params *rp;
313         int ret;
314         struct msg *msg;
315         long int cnt=0;
316         pthread_t receiver_id;
317
318         msg = malloc(p->budget);
319         if (!msg) error(1, errno, "malloc msg");
320
321         negotiate_contract(p, &vres);
322
323         rp = malloc(sizeof(*rp));
324         rp->budget = p->budget;
325         rp->id = p->id;
326
327         create_endpoints(p, vres, &epsrc, &rp->epdst);
328
329         set_rt_prio(50);
330
331         ret = pthread_create(&receiver_id, NULL, receiver, rp);
332         
333         if (signal(SIGTERM, stopper) == SIG_ERR)
334                 error(1, errno, "Error in signal registration");
335         if (signal(SIGINT, stopper) == SIG_ERR)
336                 error(1, errno, "Signal handler registration error");
337
338         struct timespec next_period;
339         struct timespec tss;
340         clock_gettime(CLOCK_MONOTONIC, &next_period);
341         while (!exit_flag && (p->count == -1 || p->count--)) {
342                 clock_gettime(CLOCK_MONOTONIC, &tss);
343                 msg->cnt = cnt++;
344                 msg->ts = tss;
345                 if (p->async)
346                         ret = frsh_send_async(epsrc, msg, p->budget);
347                 else {
348                         ret = frsh_send_sync(epsrc, msg, p->budget);
349                         clock_gettime(CLOCK_MONOTONIC, &next_period);
350                 }
351                 next_period.tv_sec  += (p->period_ms/1000);
352                 next_period.tv_nsec += (p->period_ms%1000) * 1000000;
353                 if (next_period.tv_nsec >= 1000000000) {
354                         next_period.tv_nsec -= 1000000000;
355                         next_period.tv_sec++;
356                 }
357                 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
358                                 &next_period, NULL);
359         }
360
361         frsh_contract_cancel(vres);
362         free(p);
363         sem_post(&finished);
364         return NULL;
365 }
366
367 int main(int argc, char *argv[])
368 {
369         int ret;
370         int num = 0;
371         struct stream_params sp = {
372                 .budget = 1024,
373                 .period_ms = 20,
374                 .async = false,
375                 .src.s_addr = htonl(INADDR_LOOPBACK),
376                 .dst.s_addr = htonl(INADDR_LOOPBACK),
377                 .number = 1,
378                 .count = -1,
379         };
380         struct stream_params *p = NULL;
381
382         sem_init(&finished, 0, 0);
383         
384         ret = frsh_init();
385         if (ret) PERROR_AND_EXIT(ret, "frsh_init");
386         
387         do {
388                 ret = parse_opts(&argc, &argv, &sp);
389                 if (num == 0 || ret == 0) {
390                         int i;
391                         for (i=0; i<sp.number; i++) {
392                                 pthread_t thread;
393                                 p = malloc(sizeof(*p));
394                                 if (!p) error(1, errno, "malloc");
395                                 *p = sp;
396                                 p->id = num;
397                                 pthread_create(&thread, NULL, sender, p);
398                                 num++;
399                         }
400                 }
401         } while(ret == 0);
402
403         while (num--)
404                 sem_wait(&finished);
405         
406         return 0;
407 }