]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - wme_test/wclient.c
9d723a76d81b4e898f4bfe7f2ca30db5ea29dd01
[frescor/fwp.git] / wme_test / wclient.c
1 #include <errno.h>
2 #include <sys/types.h>
3 #include <sys/socket.h>
4 #include <netinet/in.h>
5 #include <arpa/inet.h>
6 #include <netdb.h>
7 #include <signal.h>
8 #include <sys/wait.h>
9 #include <stdio.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <time.h>
13 #include <string.h>
14 #include <pthread.h>
15 #include <string.h>
16 #include <stdlib.h>
17 #include <stdbool.h>
18 #include "common.h"
19 #include <semaphore.h>
20
21 #define MAX_STREAMS  10 
22 #define MIN_GRANULARITY 100
23
24 unsigned opt_packet_size = 800;
25 unsigned opt_period_usec = 10*MSEC_TO_USEC;
26 unsigned opt_jitter = 0;
27 char    *opt_output = "delay_stats";
28 unsigned opt_count_sec = 0;
29 unsigned opt_def_bandwidth = 200;
30 unsigned opt_def_period_msec = 0;
31 int opt_granularity_usec = MIN_GRANULARITY;
32 bool opt_wait_for_queue_is_full; /* Don't gather any statistics until any queue is full */
33
34 bool some_queue_is_full = false;
35 struct timespec reset_timestamp;
36
37 /* Locked when some queue is full to prevent multiple resets of
38    statstics. */
39 pthread_mutex_t queue_full_mutex = PTHREAD_MUTEX_INITIALIZER;
40
41 int ac_sockfd[AC_NUM];
42
43 struct receiver {
44         pthread_t thread;
45         unsigned received, last_received;
46 } receivers[AC_NUM];
47
48 FILE* logfd;
49 char* server_addr; 
50 char logfname[100];
51
52 /* maximal traffic delay in ms - 10 s*/
53 #define MAX_DELAY_US 10000000
54
55 unsigned delay_stats[AC_NUM][MAX_DELAY_US/MIN_GRANULARITY];
56
57 /*struct ac_stats[AC_NUM] {
58    unsigned long int min_trans_time;
59    unsigned long int sum_trans_time;
60    struct timespec   recv_timestamp;
61    struct timespec   send_timestamp; 
62 };*/
63
64 struct stream {
65         /* Input parameters */
66         enum ac ac;             /*  */
67         int bandwidth_bps;      /* bits per second */
68         bool measure_saturation; /* Measure the saturation bandwidth first */
69         int jitter;             /* percent */
70         /* Mulualy exclusive input parameters */
71         int packet_size;
72         long period_usec;       /* all time units are in microseconds */
73
74         /* Statistics */
75         unsigned long long sent, received;
76 };
77
78 /*
79 struct send_endpoint sepoint[] = {
80         { .ac = AC_VO, .period_usec=200*MSEC_TO_USEC, .bandwidth_bps = 34*Kbit },
81         { .ac = AC_VI, .period_usec=25*MSEC_TO_USEC, .bandwidth_bps =  480*Kbit },
82         { .ac = AC_BE, .period_usec=40*MSEC_TO_USEC, .bandwidth_bps =  300*Kbit },
83         { .ac = AC_BK, .period_usec=40*MSEC_TO_USEC, .bandwidth_bps =  300*Kbit },
84 //      { .ac = AC_VI, .period_usec=17*MSEC_TO_USEC, .bandwidth_bps =  675*Kbit },
85 };
86 */
87
88 struct stream streams[MAX_STREAMS];
89
90 unsigned int nr_streams = 0;
91
92 sem_t sem_thread_finished;
93
94 bool exit_flag = false;
95
96 void stopper()
97 {
98         int i;
99         exit_flag = true;
100
101         /* Interrupt all receivers */
102         for (i=0; i < AC_NUM; i++) {
103                 pthread_kill(receivers[i].thread, SIGUSR1);
104         }
105 }
106
107 void stream_to_text(char *stream_desc, size_t n, struct stream *stream, int seconds)
108 {
109         char buf[3][12];
110         char real[100];
111
112         if (seconds) {
113                 snprintf(real, sizeof(real), "; real: sent %lld (%lld/s), received %lld (%lld/s)",
114                          stream->sent, stream->sent/seconds,
115                          stream->received, stream->received/seconds);
116         } else {
117                 real[0]=0;
118         }
119         
120         snprintf(stream_desc, n, "%d: %s %s (%d bytes per %s +-%s, %d packets/s)%s",
121                  stream-streams, ac_to_text[stream->ac], bandwidth_to_text(buf[0], stream->bandwidth_bps),
122                  stream->packet_size, usec_to_text(buf[1], stream->period_usec),
123                  usec_to_text(buf[2], stream->jitter*stream->period_usec/100),
124                  (int)(SEC_TO_USEC/stream->period_usec), real);
125 }
126
127 void save_results(int argc, char *argv[], int seconds)
128 {
129         int ac, i, maxi;
130         const int mini = 3000/opt_granularity_usec;
131         bool allzeros;
132         unsigned sum[AC_NUM];
133         double val;
134
135         fprintf(stderr, "Writing data to %s... ", logfname);
136         fflush(stderr);
137
138         fprintf(logfd, "# Invoked as: ");
139         for (i=0; i<argc; i++) fprintf(logfd, "%s ", argv[i]);
140         fprintf(logfd, "\n");
141
142         if (seconds != opt_count_sec) {
143                 fprintf(logfd, "# Data gathered for %d seconds.\n", seconds);
144         }
145                 
146         for (i = 0; i < nr_streams; i++) {
147                 char stream_desc[120];
148                 stream_to_text(stream_desc, sizeof(stream_desc), &streams[i], seconds);
149                 fprintf(logfd, "# Stream %s\n", stream_desc);
150         }
151
152         /* Find maximal delay */
153         allzeros = true;
154         for (maxi = MAX_DELAY_US/opt_granularity_usec - 1; maxi >= 0; maxi--) {
155                 for (ac = 0; ac < AC_NUM; ac++) {
156                         if (delay_stats[ac][maxi] != 0) allzeros = false;
157                 }
158                 if (!allzeros) break;
159         }
160         maxi++;
161         if (maxi < mini) maxi = mini;
162
163         /* Calculate total number of sent packets per AC */
164         for (ac = 0; ac < AC_NUM; ac++) sum[ac] = 0;
165         for (i = 0; i < nr_streams; i++) {
166                 ac = streams[i].ac;
167                 sum[ac] += streams[i].sent;
168         }
169
170 #if 0
171         /* Write pdf */
172         for ( i = 0 ; i < maxi; i++) {
173                 fprintf(logfd,"\n%f", i*opt_granularity_usec/1000.0);
174                 for (ac = 0; ac < AC_NUM; ac++) { 
175                         if (sum[ac])
176                                 val = (double)delay_stats[ac][i]*100.0 / sum[ac];
177                         else val = -1; /* Don't display this ac */
178                         fprintf(logfd," %lf", val);
179                 }
180         }
181         
182         fprintf(logfd,"\n\n");
183 #endif
184         
185         /* Write PDF */
186         for (ac = 0; ac < AC_NUM; ac++) {
187                 unsigned long long integral = 0, last = -1;
188
189                 fprintf(logfd,"%f %f\n", 0.0, 0.0);
190
191                 if (sum[ac] != 0) {
192                         i=0;
193                         while (delay_stats[ac][i] == 0) i++;
194                 
195                         for (i++; i < maxi+1; i++) {
196                                 if (last != integral) {
197                                         val = (double)integral*100.0 / sum[ac];
198                                         fprintf(logfd,"%f %f\n", i*opt_granularity_usec/1000.0, val);
199                                         last = integral;
200                                 }
201                                 if (i>0)
202                                         integral += delay_stats[ac][i-1];
203                         }
204                 }
205                 fprintf(logfd,"\n\n");
206         }
207         
208         fprintf(stderr, "finished.\n");
209         fclose(logfd);
210
211         exit(0);
212 }
213
214 static inline 
215 void timespec_add (struct timespec *sum, const struct timespec *left,
216               const struct timespec *right)
217 {
218         sum->tv_sec = left->tv_sec + right->tv_sec;
219         sum->tv_nsec = left->tv_nsec + right->tv_nsec;
220
221         if (sum->tv_nsec >= 1000000000){
222                 ++sum->tv_sec;
223                 sum->tv_nsec -= 1000000000;
224         }
225 }
226
227 static inline 
228 void timespec_sub (struct timespec *diff, const struct timespec *left,
229               const struct timespec *right)
230 {
231         diff->tv_sec = left->tv_sec - right->tv_sec;
232         diff->tv_nsec = left->tv_nsec - right->tv_nsec;
233
234         if (diff->tv_nsec < 0){
235                   --diff->tv_sec;
236                   diff->tv_nsec += 1000000000;
237         }
238 }
239
240 int create_ac_socket(unsigned int ac) 
241 {
242         int sockfd;
243         unsigned int yes=1, tos;
244
245
246         if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
247         {
248                 perror("Unable to open socket");
249                 return -1;
250         }
251         if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != 0) {
252                 perror("set non-blocking socket");
253                 return -1;
254         }
255         if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
256                 perror("Unable to set socket");
257                 return -1;
258         }
259
260         
261         //tos = ((AC_NUM - ac) *2 - 1)*32;
262         tos = ac_to_tos[ac];
263         if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
264                 perror("Unable to set TOS");
265                 close(sockfd);
266                 return -1;
267         }
268
269         return sockfd;
270 }
271
272 void empty_handler()
273 {
274 }
275
276 void reset_statistics()
277 {
278         int i;
279         /* Mutexes??? */
280         for (i = 0; i < nr_streams; i++) {
281                 streams[i].sent = 0;
282                 streams[i].received = 0;
283         }
284         memset(delay_stats, 0, sizeof(delay_stats));
285         clock_gettime(CLOCK_MONOTONIC, &reset_timestamp);
286 }
287
288 void* receiver(void* queue)
289 {
290         struct msg_t    msg;
291         struct  sockaddr_in rem_addr;
292         int     mlen, ret;
293         unsigned int ac, rem_addr_length; 
294         unsigned long int trans_time_usec;
295         unsigned long int min_trans_time;
296         struct timespec   send_timestamp,recv_timestamp, trans_time, time_from_reset;
297         fd_set fdset;
298         
299         min_trans_time = ~0;
300         
301         block_signals();
302         set_rt_prio(99);
303
304         ac = (int)queue;
305         rem_addr_length = sizeof(rem_addr);
306         FD_ZERO(&fdset);
307         while (!exit_flag) {
308                 FD_SET(ac_sockfd[ac], &fdset);
309                 ret = select(ac_sockfd[ac]+1, &fdset, NULL, NULL, NULL);
310                 if (ret < 0) {
311                         if (errno == EINTR) continue;
312                         perror("receiver select");
313                         goto out;
314                 }
315                 mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0,    \
316                                 (struct sockaddr*)&rem_addr, &rem_addr_length);
317                 if (mlen < 0) {
318                         perror("Chyba pri prijimani pozadavku");
319                         goto out;
320                 }       
321                 clock_gettime(CLOCK_MONOTONIC,&recv_timestamp);
322                 send_timestamp = msg.send_timestamp;
323
324                 /* Check whether this message was sent after reset_statistics() */
325                 timespec_sub(&time_from_reset, &send_timestamp, &reset_timestamp);
326                 if (time_from_reset.tv_sec < 0) {
327                         printf("ted\n");
328                         continue; /* If so, don't count it */
329                 }
330                 timespec_sub(&trans_time,&recv_timestamp ,&send_timestamp);
331                 trans_time_usec = (trans_time.tv_sec * SEC_TO_USEC + \
332                                          trans_time.tv_nsec / USEC_TO_NSEC) /2;
333           
334                 if (trans_time_usec < MAX_DELAY_US)
335                         delay_stats[ac][trans_time_usec/opt_granularity_usec]++;
336
337                 receivers[ac].received++;
338                 streams[msg.stream].received++;
339         
340                 /*if (trans_time_nsec < min_trans_time) 
341                         min_trans_time = trans_time_nsec;*/
342                 /*printf("seqn= %lu tos= %d start= %lu(s).%lu(ns)"\
343                          "stop= %lu(s).%lu(ns)\n trans_time = %lums\n",\
344                          msg.seqn, msg.tos, send_timestamp.tv_sec,\
345                          send_timestamp.tv_nsec,recv_timestamp.tv_sec,\
346                          recv_timestamp.tv_nsec, trans_time_msec); */
347         }
348 out:
349         sem_post(&sem_thread_finished);
350         return NULL;
351 }
352
353 void* sender(void* arg)
354 {
355         struct sockaddr_in rem_addr;
356         union msg_buff buff;
357         unsigned long int seqn;
358         unsigned long int really_sent = 0;
359         struct timespec time_to_wait, current_time, period, interval, measure_start, measure_end;
360         struct hostent* ph;
361         struct stream* stream = (struct stream*) arg;
362         int bandwidth;
363         int packet_size;
364         unsigned period_usec;
365         char stream_desc[100];
366         bool this_queue_is_full = false, this_queue_is_full_old = false;
367
368         set_rt_prio(90-stream->ac);
369
370         bandwidth = stream->bandwidth_bps;
371         if (bandwidth == 0) {
372                 /* Either we are going to measure saturation bandwidth
373                    or we will return before the main loop. */
374                 bandwidth = 300*Mbit;
375         }
376
377         if (stream->packet_size) {
378                 packet_size = stream->packet_size;
379                 period_usec = SEC_TO_USEC*packet_size*8/bandwidth;
380                 if (period_usec == 0) period_usec = 1;
381         } else if (stream->period_usec) {
382                 period_usec = stream->period_usec;
383                 packet_size = (long long)bandwidth/8 * period_usec/SEC_TO_USEC;
384         } else {
385                 char buf[200];
386                 stream_to_text(buf, sizeof(buf), stream, 0);
387                 fprintf(stderr, "Neither packet size nor period was specified for a stream %s\n", buf);
388                 exit(1);
389         }
390         stream->packet_size = packet_size;
391         stream->period_usec = period_usec;
392         stream->jitter = opt_jitter;
393
394         bandwidth = stream->bandwidth_bps;
395
396         stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
397         printf("%s\n", stream_desc);
398
399         if (packet_size < sizeof(struct msg_t)) {
400                 fprintf(stderr, "Packet size too small (min %d)\n", sizeof(struct msg_t));
401                 exit(1);
402         }
403
404         memset(&rem_addr,0, sizeof(rem_addr));
405                 
406         rem_addr.sin_family = AF_INET;
407         ph = gethostbyname(server_addr);
408         if (ph) 
409                 rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
410         else {
411                 perror("Unknown server");
412                 exit(1);
413         }
414         rem_addr.sin_port = htons(BASE_PORT + stream->ac);
415         seqn = 0;
416         
417         block_signals();
418
419         if (bandwidth == 0 && !stream->measure_saturation)
420                 goto out;
421
422         while (!exit_flag) {
423
424                 buff.msg.seqn = seqn;
425                 buff.msg.tos = ac_to_tos[stream->ac];
426                 buff.msg.stream = stream-streams;
427                 
428                 clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
429
430                 errno = 0;
431                 while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0, \
432                                 (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
433                                 if (errno == EINTR) continue;
434                                 if (errno == EAGAIN) {
435                                         if (opt_wait_for_queue_is_full && !some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
436                                                 some_queue_is_full = true;
437                                                 reset_statistics();
438                                         }
439                                         this_queue_is_full = true;
440                                         break;
441                                 } else {
442                                         perror("Error while sending");
443                                         goto out;
444                                 }
445                 }
446
447 #ifdef DEBUG
448                 printf("%d", stream->ac);
449                 fflush(stdout);
450 #endif
451                 seqn++;
452                 stream->sent++;
453
454                 /* Measure saturation bandwidth */
455                 if (stream->measure_saturation && this_queue_is_full && !this_queue_is_full_old) {
456                         measure_start = buff.msg.send_timestamp;
457                 }
458                 this_queue_is_full_old = this_queue_is_full;
459
460                 if (stream->measure_saturation && this_queue_is_full && errno != EAGAIN) {
461                         really_sent++;
462                         measure_end = buff.msg.send_timestamp;
463                         timespec_sub(&interval,&measure_end,&measure_start);
464                         if (really_sent >= 100 && interval.tv_sec >= 20) {
465                                 period_usec = (interval.tv_sec*SEC_TO_USEC + interval.tv_nsec/USEC_TO_NSEC) / really_sent;
466
467                                 bandwidth = (long long)packet_size*8*SEC_TO_USEC/period_usec;
468                                 stream->bandwidth_bps = bandwidth;
469                                 stream->period_usec = period_usec;
470                                 stream->measure_saturation = false;
471                                 reset_statistics();
472
473                                 stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
474                                 printf("\n%s\n", stream_desc);
475                         }
476                 }
477
478                     
479                 /*           |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
480                 /* |-------------|     nominal period*/
481                 if (opt_jitter) {
482                         period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-opt_jitter)/100
483                                                  + rand() % (2*period_usec*opt_jitter/100));
484                 } else {
485                         period.tv_nsec = USEC_TO_NSEC*(period_usec);
486                 }
487                 period.tv_sec = 0;
488
489                 timespec_add(&time_to_wait,&buff.msg.send_timestamp,&period);
490                 clock_gettime(CLOCK_MONOTONIC,&current_time);
491                 timespec_sub(&interval,&time_to_wait,&current_time);
492                 nanosleep(&interval,NULL);
493         }
494 out:
495         sem_post(&sem_thread_finished);
496         return NULL;
497 }
498
499 char* parse_bandwidths(char *params)
500 {
501         struct stream *sp = &streams[nr_streams];
502
503         while (*params && nr_streams < MAX_STREAMS) {
504                 char *ac_ids[AC_NUM] = { [AC_VO]="VO", [AC_VI]="VI", [AC_BE]="BE", [AC_BK]="BK" };
505                 int i;
506                 char *next_char;
507
508                 if (strlen(params) < 2)
509                         return params;
510                 for (i=0; i<AC_NUM; i++) {
511                         if (strncmp(params, ac_ids[i], 2) == 0) {
512                                 sp->ac = i;
513                                 params+=strlen(ac_ids[i]);
514                                 break;
515                         }
516                 }
517                 if (i==AC_NUM)
518                         return params;
519
520                 long bw;
521                 if (*params == ':') {
522                         params++;
523
524                         if (strncmp(params, "SAT", 3) == 0) {
525                                 bw = 0;
526                                 sp->measure_saturation = true;
527                                 params+=3;
528                         } else {
529                                 bw = strtol(params, &next_char, 10);
530                                 if (next_char == params)
531                                         return params;
532                                 params = next_char;
533                         }
534                 } else
535                         bw = opt_def_bandwidth;
536                 
537                 sp->bandwidth_bps = bw*Kbit;
538
539                 long period = 0;
540                 long packet_size = 0;
541                 if (*params == '@') {
542                         params++;
543                         period = strtol(params, &next_char, 10);
544                         if (period == 0)
545                                 return params;
546                         params = next_char;
547                 }
548                 else {
549                         if (*params == '/') {
550                                 params++;
551                                 packet_size = strtol(params, &next_char, 10);
552                                 if (packet_size == 0)
553                                         return params;
554                                 params = next_char;
555                         } else {
556                                 packet_size = opt_packet_size;
557                                 period = opt_def_period_msec;
558                         }
559                 }
560                 sp->period_usec = period*MSEC_TO_USEC;
561                 sp->packet_size = packet_size;
562
563                 
564
565                 if (*params != '\0' && *params != ',')
566                         return params;
567                 nr_streams++;
568                 sp++;
569                 if (*params == ',')
570                         params++;
571         }
572         return NULL;
573 }
574
575 int main(int argc, char *argv[])
576 {
577         int ac, i, rc, seconds;
578         pthread_attr_t attr;
579         pthread_t thread;
580         char opt;
581
582
583         while ((opt = getopt(argc, argv, "B:b:c:g:j:o:qs:T:")) != -1) {
584                 switch (opt) {
585                         case 'B':
586                                 opt_def_bandwidth = atoi(optarg);
587                                 break;
588                         case 'b': {
589                                 char *error;
590                                 error = parse_bandwidths(optarg);
591                                 if (error != NULL) {
592                                         if (*error == '\0')
593                                                 fprintf(stderr, "Bandwidth parse error - string to short\n");
594                                         else
595                                                 fprintf(stderr, "Bandwidth parse error at '%s'\n", error);
596                                         exit(1);
597                                 }
598                                 break;
599                         }
600                         case 'c':
601                                 opt_count_sec = atoi(optarg);
602                                 break;
603                         case 'g':
604                                 opt_granularity_usec = atoi(optarg);
605                                 if (opt_granularity_usec < MIN_GRANULARITY) {
606                                         fprintf(stderr, "Granulatiry too small (min %d)!\n", MIN_GRANULARITY);
607                                         exit(1);
608                                 }
609                                 break;
610                         case 'j':
611                                 opt_jitter = atoi(optarg);
612                                 break;
613                         case 'o':
614                                 opt_output = optarg;
615                                 break;
616                         case 'q':
617                                 opt_wait_for_queue_is_full = true;
618                                 break;
619                         case 's':
620                                 opt_packet_size = atoi(optarg);
621                                 break;
622                         case 'T':
623                                 opt_def_period_msec = atoi(optarg);
624                                 break;
625                         default:
626                                 fprintf(stderr, "Usage: %s [ options ] server_addr\n\n", argv[0]);
627                                 fprintf(stderr, "Options:\n");
628                                 fprintf(stderr, "    -B  default bandwidth for -b option [kbit]\n");
629                                 fprintf(stderr, "    -b  bandwidth of streams (VO|VI|BE|BK)[:<kbit>][@<msec> or /<bytes>][,...]\n");
630                                 fprintf(stderr, "    -c  count (number of seconds to run)\n");
631                                 fprintf(stderr, "    -g  histogram granularity [usec]\n");
632                                 fprintf(stderr, "    -j  send jitter (0-100) [%%]\n");
633                                 fprintf(stderr, "    -o  output filename (.dat will be appended)\n");
634                                 fprintf(stderr, "    -q  gather statistics only after some queue becomes full\n");
635                                 fprintf(stderr, "    -s  size of data payload in packets [bytes] (default: %d)\n", opt_packet_size);
636                                 fprintf(stderr, "    -T  default period for -b option [msec]\n");
637                                 exit(1);
638                 }
639         }
640         if (opt_packet_size && opt_def_period_msec) {
641                 fprintf(stderr, "Error: Nonzero -T and -s can't be used together!.\n");
642                 exit(1);
643         }
644
645         if (optind < argc) {
646                 server_addr = argv[optind];
647         } else {
648                 fprintf(stderr, "Expected server address argument\n");
649                 exit(1);
650         }
651
652         reset_statistics();
653
654         if (nr_streams == 0)
655                 parse_bandwidths("BE");
656                 
657         memset(delay_stats,0, sizeof(delay_stats));     
658         pthread_attr_init(&attr);
659
660         snprintf(logfname, sizeof(logfname), "%s.dat", opt_output);
661
662         if ((logfd = fopen(logfname,"w+")) == NULL) {
663                 fprintf(stderr,"Can not open %s\n", logfname);
664                 exit(1);
665         }
666         if (signal(SIGTERM, stopper) == SIG_ERR) {
667                 perror("Error in signal registration");
668                 exit(1);
669         }
670                 
671         if (signal(SIGINT, stopper) == SIG_ERR) {
672                 perror("Signal handler registration error");
673                 exit(1);
674         }
675
676         struct sigaction sa;
677         sa.sa_handler = empty_handler;
678         sa.sa_flags = 0;        /* don't restart syscalls */
679
680         if (sigaction(SIGUSR1, &sa, NULL) < 0) {
681                 perror("sigaction error");
682                 exit(1);
683         }
684
685         sem_init(&sem_thread_finished, 0, 0);
686
687         /* create four receivers each per AC */
688         for (ac = AC_NUM - 1; ac >= 0; ac--) {
689                 ac_sockfd[ac] = create_ac_socket(ac);
690                 rc = pthread_create(&receivers[ac].thread, &attr, receiver, (void*) ac);
691                 if (rc) {
692                         fprintf(stderr, "Error while creating receiver %d\n",rc);
693                         return 1;
694                 }
695         }               
696                         
697         /* create sendpoints */
698         for (i = 0; i < nr_streams; i++) {
699                 rc = pthread_create(&thread, &attr, sender, (void*) &streams[i]);
700                 if (rc) {
701                         fprintf(stderr, "Error while creating sender %d\n",rc);
702                         return 1;
703                 }
704         }
705         
706         seconds = 0;
707         while (!exit_flag) {
708                 sleep(1);
709                 seconds++;
710                 fprintf(stderr, "\r%3ds", seconds);
711                 for (ac = 0; ac < AC_NUM; ac++) {
712                         int delta = receivers[ac].received - receivers[ac].last_received;
713                         receivers[ac].last_received = receivers[ac].received;
714                         fprintf(stderr, " %s %5d %4d/s", ac_to_text[ac], receivers[ac].received, delta);
715                 }
716                 fflush(stderr);
717                 if (seconds == opt_count_sec)
718                         stopper();
719         }
720
721         fprintf(stderr, "\nWaiting for threads to finish\n");
722         /* Wait for all threads to finish */
723         for (i=0; i < nr_streams + AC_NUM; i++) {
724                 sem_wait(&sem_thread_finished);
725         }
726         struct timespec end_timestamp, measure_length;
727         clock_gettime(CLOCK_MONOTONIC,&end_timestamp);
728         timespec_sub(&measure_length, &end_timestamp, &reset_timestamp);
729
730         save_results(argc, argv, measure_length.tv_sec);
731
732         return 0;
733 }