]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - wme_test/wclient.c
Zmena AC_QUEUES->AC_NUM
[frescor/fwp.git] / wme_test / wclient.c
1 #include <errno.h>
2 #include <sys/types.h>
3 #include <sys/socket.h>
4 #include <netinet/in.h>
5 #include <arpa/inet.h>
6 #include <netdb.h>
7 #include <signal.h>
8 #include <sys/wait.h>
9 #include <stdio.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <time.h>
13 #include <string.h>
14 #include <pthread.h>
15 #include <string.h>
16 #include <stdlib.h>
17 #include <stdbool.h>
18 #include "common.h"
19 #include <semaphore.h>
20
21 #define MSEC (1000)
22
23 #define MAX_SENDENDPOINTS  10 
24
25 unsigned opt_packet_size = 800;
26 unsigned opt_period_usec = 10*MSEC;
27 unsigned opt_jitter = 0;
28 char    *opt_output = "delay_stats";
29 unsigned opt_count_sec = 0;
30
31 int ac_sockfd[AC_NUM];
32
33 struct receiver {
34         pthread_t thread;
35         unsigned received, last_received;
36 } receivers[AC_NUM];
37
38 FILE* logfd;
39 char* server_addr; 
40 char logfname[100];
41
42 struct msg_t {
43         unsigned int tos;
44         struct timespec send_timestamp;
45         unsigned long int seqn;
46 };
47
48 union msg_buff {
49         struct msg_t msg;
50         char nonsense[BUFFSIZE];
51 };
52
53 /* maximal traffic delay in ms - 10 s*/
54 #define MAX_DELAY_US 10000000
55 #define GRANULARITY 100
56
57 unsigned delay_stats[AC_NUM][MAX_DELAY_US/GRANULARITY];
58
59 /*struct ac_stats[AC_NUM] {
60    unsigned long int min_trans_time;
61    unsigned long int sum_trans_time;
62    struct timespec   recv_timestamp;
63    struct timespec   send_timestamp; 
64 };*/
65
66 struct send_endpoint {
67         int ac;
68         long period_usec;       /* all time units are in microseconds */
69         int bandwidth_bps;      /* bits per second */
70 };
71
72 /*
73 struct send_endpoint sepoint[] = {
74         { .ac = AC_VO, .period_usec=200*MSEC, .bandwidth_bps = 34*Kbit },
75         { .ac = AC_VI, .period_usec=25*MSEC, .bandwidth_bps =  480*Kbit },
76         { .ac = AC_BE, .period_usec=40*MSEC, .bandwidth_bps =  300*Kbit },
77         { .ac = AC_BK, .period_usec=40*MSEC, .bandwidth_bps =  300*Kbit },
78 //      { .ac = AC_VI, .period_usec=17*MSEC, .bandwidth_bps =  675*Kbit },
79 };
80 */
81
82 struct send_endpoint sepoint[] = {
83         { .ac = AC_VO, .period_usec=10*MSEC, .bandwidth_bps =  300*Kbit },
84         { .ac = AC_VI, .period_usec=10*MSEC, .bandwidth_bps =  300*Kbit },
85         { .ac = AC_BE, .period_usec=10*MSEC, .bandwidth_bps =  300*Kbit },
86         { .ac = AC_BK, .period_usec=10*MSEC, .bandwidth_bps =  300*Kbit },
87 };
88
89 unsigned int nr_sepoints = sizeof(sepoint)/sizeof(*sepoint);
90
91 sem_t sem_thread_finished;
92
93 bool exit_flag = false;
94
95 void stopper()
96 {
97         int i;
98         exit_flag = true;
99
100         /* Interrupt all receivers */
101         for (i=0; i < AC_NUM; i++) {
102                 pthread_kill(receivers[i].thread, SIGUSR1);
103         }
104 }
105
106 void save_results()
107 {
108         int ac, i, maxi;
109         bool allzeros;
110         unsigned sum[AC_NUM];
111
112         fprintf(stderr, "\nWriting data to %s...\n", logfname);
113
114         allzeros = true;
115         for (maxi = MAX_DELAY_US/GRANULARITY - 1; maxi >= 0; maxi--) {
116                 for (ac = 0; ac < AC_NUM; ac++) {
117                         if (delay_stats[ac][maxi] != 0) allzeros = false;
118                 }
119                 if (!allzeros) break;
120         }
121         if (maxi < 3000/GRANULARITY) maxi = 3000/GRANULARITY;
122
123         for (ac = 0; ac < AC_NUM; ac++) { 
124                 sum[ac] = 0;
125                 for ( i = 0 ; i < maxi; i++) 
126                         sum[ac]+=delay_stats[ac][i];
127                 if (sum[ac] == 0)
128                         fprintf(stderr, "No response in AC %d\n", ac);
129         }
130
131         for ( i = 0 ; i < maxi; i++) {
132                 fprintf(logfd,"\n%f", i*GRANULARITY/1000.0);
133                 for (ac = 0; ac < AC_NUM; ac++) { 
134                         double val;
135                         val = (double)delay_stats[ac][i]*100.0 / sum[ac];
136                         fprintf(logfd," %lf", val);
137                 }
138         }
139         
140         fprintf(stderr, "Finished.\n");
141         fclose(logfd);
142
143         exit(0);
144 }
145
146 static inline 
147 void timespec_add (struct timespec *sum, const struct timespec *left,
148               const struct timespec *right)
149 {
150         sum->tv_sec = left->tv_sec + right->tv_sec;
151         sum->tv_nsec = left->tv_nsec + right->tv_nsec;
152
153         if (sum->tv_nsec >= 1000000000){
154                 ++sum->tv_sec;
155                 sum->tv_nsec -= 1000000000;
156         }
157 }
158
159 static inline 
160 void timespec_sub (struct timespec *diff, const struct timespec *left,
161               const struct timespec *right)
162 {
163         diff->tv_sec = left->tv_sec - right->tv_sec;
164         diff->tv_nsec = left->tv_nsec - right->tv_nsec;
165
166         if (diff->tv_nsec < 0){
167                   --diff->tv_sec;
168                   diff->tv_nsec += 1000000000;
169         }
170 }
171
172 int create_ac_socket(unsigned int ac) 
173 {
174         int sockfd;
175         unsigned int yes=1, tos;
176
177
178         if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
179         {
180                 perror("Unable to open socket");
181                 return -1;
182         }
183         
184         if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
185                 perror("Unable to set socket");
186                 return -1;
187         }
188
189         
190         //tos = ((AC_NUM - ac) *2 - 1)*32;
191         tos = ac_to_tos[ac];
192         if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
193                 perror("Unable to set TOS");
194                 close(sockfd);
195                 return -1;
196         }
197
198         return sockfd;
199 }
200
201 void empty_handler()
202 {
203 }
204
205 void* receiver(void* queue)
206 {
207         struct msg_t    msg;
208         struct  sockaddr_in rem_addr;
209         int     mlen;
210         unsigned int ac, rem_addr_length; 
211         unsigned long int trans_time_usec;
212         unsigned long int min_trans_time;
213         struct timespec   send_timestamp,recv_timestamp, trans_time; 
214         
215         min_trans_time = ~0;
216         
217         block_signals();
218         set_rt_prio(99);
219
220         ac = (int)queue;
221         rem_addr_length = sizeof(rem_addr);
222         while (!exit_flag) {
223                 mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0,    \
224                                 (struct sockaddr*)&rem_addr, &rem_addr_length);
225                 if (mlen < 0) {
226                         if (errno == EINTR) continue;
227                         perror("Chyba pri prijimani pozadavku");
228                         goto out;
229                 }       
230                 clock_gettime(CLOCK_MONOTONIC,&recv_timestamp);
231                 send_timestamp = msg.send_timestamp;
232                 
233                 timespec_sub(&trans_time,&recv_timestamp ,&send_timestamp);
234                 trans_time_usec = (trans_time.tv_sec * SEC_TO_USEC + \
235                                          trans_time.tv_nsec / USEC_TO_NSEC) /2;
236           
237                 if (trans_time_usec < MAX_DELAY_US)
238                         delay_stats[ac][trans_time_usec/GRANULARITY]++;
239
240                 receivers[ac].received++;
241         
242                 /*if (trans_time_nsec < min_trans_time) 
243                         min_trans_time = trans_time_nsec;*/
244                 /*printf("seqn= %lu tos= %d start= %lu(s).%lu(ns)"\
245                          "stop= %lu(s).%lu(ns)\n trans_time = %lums\n",\
246                          msg.seqn, msg.tos, send_timestamp.tv_sec,\
247                          send_timestamp.tv_nsec,recv_timestamp.tv_sec,\
248                          recv_timestamp.tv_nsec, trans_time_msec); */
249         }
250 out:
251         sem_post(&sem_thread_finished);
252         return NULL;
253 }
254
255 void* sender(void* endpoint)
256 {
257         struct sockaddr_in rem_addr;
258         union msg_buff buff;
259         unsigned long int seqn;
260         struct timespec time_to_wait, current_time, period, interval;
261         struct hostent* ph;
262         struct send_endpoint* spoint = (struct send_endpoint*) endpoint;
263         char buf1[12], buf2[12], buf3[12];
264         int ac = spoint->ac;
265         int packet_size;
266         unsigned period_usec;
267         char stream_desc[100];
268
269         if (opt_packet_size) {
270                 packet_size = opt_packet_size;
271                 period_usec = SEC_TO_USEC*packet_size*8/spoint->bandwidth_bps;
272         } else {
273                 period_usec = spoint->period_usec;
274                 packet_size = (long long)spoint->bandwidth_bps/8 * period_usec/SEC_TO_USEC;
275         }
276         snprintf(stream_desc, sizeof(stream_desc), "%d: %s %s (%d bytes per %s +-%s, %d packets/s)\n",
277                  spoint-sepoint, ac_to_text[ac], bandwidth_to_text(buf1, spoint->bandwidth_bps),
278                  packet_size, usec_to_text(buf2, period_usec),
279                  usec_to_text(buf3, opt_jitter*period_usec/100), (int) SEC_TO_USEC/period_usec);
280         printf("%s", stream_desc);
281         fprintf(logfd, "# Stream %s", stream_desc);
282
283         if (packet_size < sizeof(struct msg_t)) {
284                 fprintf(stderr, "Packet size too small (min %d)\n", sizeof(struct msg_t));
285                 exit(1);
286         }
287
288         
289         memset(&rem_addr,0, sizeof(rem_addr));
290                 
291         rem_addr.sin_family = AF_INET;
292         ph = gethostbyname(server_addr);
293         if (ph) 
294                 rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
295         else {
296                 perror("Unknown server");
297                 exit(1);
298         }
299         rem_addr.sin_port = htons(BASE_PORT + ac);
300         seqn = 0;
301         
302         block_signals();
303         set_rt_prio(90-ac);
304
305         while (!exit_flag) {
306
307                 buff.msg.seqn = seqn;
308                 buff.msg.tos = ac_to_tos[ac];
309                 
310                 clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
311                 
312                 while (sendto(ac_sockfd[ac], &buff, packet_size, 0,\
313                                 (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
314                                 if (errno == EINTR) continue;
315                                 perror("Error while sending");
316                                 goto out;
317                 }
318
319 #ifdef DEBUG
320                 printf("%d", ac);
321                 fflush(stdout);
322 #endif
323                 seqn++;
324
325                 /*           |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
326                 /* |-------------|     nominal period*/
327                 if (opt_jitter) {
328                         period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-opt_jitter)/100
329                                                  + rand() % (2*period_usec*opt_jitter/100));
330                 } else {
331                         period.tv_nsec = USEC_TO_NSEC*(period_usec);
332                 }
333                 period.tv_sec = 0;
334
335                 timespec_add(&time_to_wait,&buff.msg.send_timestamp,&period);
336                 clock_gettime(CLOCK_MONOTONIC,&current_time);
337                 timespec_sub(&interval,&time_to_wait,&current_time);
338                 nanosleep(&interval,NULL);
339         }
340 out:
341         sem_post(&sem_thread_finished);
342         return NULL;
343 }
344
345 int main(int argc, char *argv[])
346 {
347         int ac, i, rc;
348         pthread_attr_t attr;
349         pthread_t thread;
350         char opt;
351
352
353         while ((opt = getopt(argc, argv, "c:j:o:s:")) != -1) {
354                 switch (opt) {
355                         case 'c':
356                                 opt_count_sec = atoi(optarg);
357                                 break;
358                         case 'j':
359                                 opt_jitter = atoi(optarg);
360                                 break;
361                         case 'o':
362                                 opt_output = optarg;
363                                 break;
364                         case 's':
365                                 opt_packet_size = atoi(optarg);
366                                 break;
367                         default:
368                                 fprintf(stderr, "Usage: %s [ options ] server_addr\n\n", argv[0]);
369                                 fprintf(stderr, "Options:\n");
370                                 fprintf(stderr, "    -c  count (number of seconds to run)");
371                                 fprintf(stderr, "    -j  send jitter (0-100) [%%]\n");
372                                 fprintf(stderr, "    -o  output filename (.dat will be appended)");
373                                 fprintf(stderr, "    -s  size of data payload in packets [bytes]\n");
374                                 exit(1);
375                 }
376         }
377         if (optind < argc) {
378                 server_addr = argv[optind];
379         } else {
380                 fprintf(stderr, "Expected server address argument\n");
381                 exit(1);
382         }
383
384
385                 
386         memset(delay_stats,0, sizeof(delay_stats));     
387         pthread_attr_init(&attr);
388
389         snprintf(logfname, sizeof(logfname), "%s.dat", opt_output);
390
391         if ((logfd = fopen(logfname,"w+")) == NULL) {
392                 fprintf(stderr,"Can not open %s\n", logfname);
393                 exit(1);
394         }
395         fprintf(logfd, "# Invoked as: ");
396         for (i=0; i<argc; i++) fprintf(logfd, "%s ", argv[i]);
397         fprintf(logfd, "\n");
398                 
399         if (signal(SIGTERM, stopper) == SIG_ERR) {
400                 perror("Error in signal registration");
401                 exit(1);
402         }
403                 
404         if (signal(SIGINT, stopper) == SIG_ERR) {
405                 perror("Signal handler registration error");
406                 exit(1);
407         }
408
409         struct sigaction sa;
410         sa.sa_handler = empty_handler;
411         sa.sa_flags = 0;        /* don't restart syscalls */
412
413         if (sigaction(SIGUSR1, &sa, NULL) < 0) {
414                 perror("sigaction error");
415                 exit(1);
416         }
417
418         sem_init(&sem_thread_finished, 0, 0);
419
420         /* create four receivers each per AC */
421         for (ac = AC_NUM - 1; ac >= 0; ac--) {
422                 ac_sockfd[ac] = create_ac_socket(ac);
423                 rc = pthread_create(&receivers[ac].thread, &attr, receiver, (void*) ac);
424                 if (rc) {
425                         fprintf(stderr, "Error while creating receiver %d\n",rc);
426                         return 1;
427                 }
428         }               
429                         
430         /* create sendpoints */
431         for (i = 0; i < nr_sepoints; i++) {
432                 rc = pthread_create(&thread, &attr, sender, (void*) &sepoint[i]);
433                 if (rc) {
434                         fprintf(stderr, "Error while creating sender %d\n",rc);
435                         return 1;
436                 }
437         }
438         
439         i = 0;
440         while (!exit_flag) {
441                 fprintf(stderr, "\r");
442                 for (ac = 0; ac < AC_NUM; ac++) {
443                         int delta = receivers[ac].received - receivers[ac].last_received;
444                         receivers[ac].last_received = receivers[ac].received;
445                         fprintf(stderr, "%s: %5d (+%4d)  ", ac_to_text[ac], receivers[ac].received, delta);
446                 }
447                 fflush(stderr);
448                 sleep(1);
449                 i++;
450                 if (i == opt_count_sec) exit_flag = 1;
451         }
452
453         fprintf(stderr, "\nWaiting for threads to finish\n");
454         /* Wait for all threads to finish */
455         for (i=0; i < nr_sepoints + AC_NUM; i++) {
456                 sem_wait(&sem_thread_finished);
457         }
458         printf("\n");
459
460         save_results();
461
462         return 0;
463 }