]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - wme_test/wclient.c
Task runs with RT priority, added run script, common code added to common.c.
[frescor/fwp.git] / wme_test / wclient.c
1 #include <errno.h>
2 #include <sys/types.h>
3 #include <sys/socket.h>
4 #include <netinet/in.h>
5 #include <arpa/inet.h>
6 #include <netdb.h>
7 #include <signal.h>
8 #include <sys/wait.h>
9 #include <stdio.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <time.h>
13 #include <string.h>
14 #include <pthread.h>
15 #include <string.h>
16 #include <stdlib.h>
17 #include <stdbool.h>
18 #include "common.h"
19 #include <semaphore.h>
20
21 /*#define AC_VO 0
22 #define AC_VI 1
23 #define AC_BE 2
24 #define AC_BK 3
25 */
26
27 #define PARAM_SERVERADDR 1
28 #define MAX_SENDENDPOINTS  10
29
30 int ac_sockfd[AC_QUEUES];
31
32 struct receiver {
33         pthread_t thread;
34 } receivers[AC_QUEUES];
35
36 FILE* logfd;
37 char* server_addr; 
38
39 const char logfname[] = "delay_stats.dat";
40
41 struct msg_t {
42         unsigned int tos;
43         struct timespec send_timestamp;
44         unsigned long int seqn;
45         unsigned char padding[MTU];
46 };
47
48 /* maximal traffic delay in ms - 10 s*/
49 #define MAX_DELAY_US 10000000
50 #define GRANULARITY 100
51
52 unsigned delay_stats[AC_QUEUES][MAX_DELAY_US/GRANULARITY];
53
54 /*struct ac_stats[AC_QUEUES] {
55    unsigned long int min_trans_time;
56    unsigned long int sum_trans_time;
57    struct timespec   recv_timestamp;
58    struct timespec   send_timestamp; 
59 };*/
60
61 struct send_endpoint {
62         int ac;
63         long period_nsec;
64         int bandwidth_bps;      /* bits per second */
65 };
66
67 #define MSEC (1000*1000)
68 #define Mbit (1024*1024)
69 #define Kbit 1024
70
71
72 /*
73 struct send_endpoint sepoint[] = {
74         { .ac = AC_VO, .period_nsec=200*MSEC, .bandwidth_bps = 34*Kbit },
75         { .ac = AC_VI, .period_nsec=25*MSEC, .bandwidth_bps =  480*Kbit },
76         { .ac = AC_BE, .period_nsec=40*MSEC, .bandwidth_bps =  300*Kbit },
77         { .ac = AC_BK, .period_nsec=40*MSEC, .bandwidth_bps =  300*Kbit },
78 //      { .ac = AC_VI, .period_nsec=17*MSEC, .bandwidth_bps =  675*Kbit },
79 };
80 */
81
82 struct send_endpoint sepoint[] = {
83         { .ac = AC_VO, .period_nsec=40*MSEC, .bandwidth_bps = 300*Kbit },
84         { .ac = AC_VI, .period_nsec=40*MSEC, .bandwidth_bps =  300*Kbit },
85         { .ac = AC_BE, .period_nsec=60*MSEC, .bandwidth_bps =  200*Kbit },
86         { .ac = AC_BK, .period_nsec=60*MSEC, .bandwidth_bps =  200*Kbit },
87 };
88
89 unsigned int nr_sepoints = sizeof(sepoint)/sizeof(*sepoint);
90
91 sem_t sem_thread_finished;
92
93 bool exit_flag = false;
94
95 void stopper()
96 {
97         int i;
98         exit_flag = true;
99
100         /* Interrupt all receivers */
101         for (i=0; i<AC_QUEUES; i++) {
102                 pthread_kill(receivers[i].thread, SIGUSR1);
103         }
104 }
105
106 void save_results()
107 {
108         int ac, i, maxi;
109         bool allzeros;
110         unsigned sum[AC_QUEUES];
111
112         printf("\nWriting data to log file...\n");
113
114         allzeros = true;
115         for (maxi = MAX_DELAY_US/GRANULARITY - 1; maxi >= 0; maxi--) {
116                 for (ac = 0; ac < AC_QUEUES; ac++) {
117                         if (delay_stats[ac][maxi] != 0) allzeros = false;
118                 }
119                 if (!allzeros) break;
120         }
121         if (maxi < 3000/GRANULARITY) maxi = 3000/GRANULARITY;
122
123         for (ac = 0; ac < AC_QUEUES; ac++) { 
124                 sum[ac] = 0;
125                 for ( i = 0 ; i < maxi; i++) 
126                         sum[ac]+=delay_stats[ac][i];
127                 if (sum[ac] == 0)
128                         fprintf(stderr, "No response in AC %d\n", ac);
129         }
130
131         for (ac = 0; ac < AC_QUEUES; ac++) { 
132                 for ( i = 0 ; i < maxi; i++) {
133                         double val;
134                         val = (double)delay_stats[ac][i]*100.0 / sum[ac];
135                         fprintf(logfd,"%f %lf\n", i*GRANULARITY/1000.0, val);
136                 }
137                 
138                 fprintf(logfd, "\n\n");
139         }
140         
141         printf("Finished.\n");
142         fclose(logfd);
143
144         exit(0);
145 }
146
147 static inline 
148 void timespec_add (struct timespec *sum, const struct timespec *left,
149               const struct timespec *right)
150 {
151         sum->tv_sec = left->tv_sec + right->tv_sec;
152         sum->tv_nsec = left->tv_nsec + right->tv_nsec;
153
154         if (sum->tv_nsec >= 1000000000){
155                 ++sum->tv_sec;
156                 sum->tv_nsec -= 1000000000;
157         }
158 }
159
160 static inline 
161 void timespec_sub (struct timespec *diff, const struct timespec *left,
162               const struct timespec *right)
163 {
164         diff->tv_sec = left->tv_sec - right->tv_sec;
165         diff->tv_nsec = left->tv_nsec - right->tv_nsec;
166
167         if (diff->tv_nsec < 0){
168                   --diff->tv_sec;
169                   diff->tv_nsec += 1000000000;
170         }
171 }
172
173 int create_ac_socket(unsigned int ac) 
174 {
175         int sockfd;
176         unsigned int yes=1, tos;
177
178
179         if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
180         {
181                 perror("Unable to open socket");
182                 return -1;
183         }
184         
185         if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
186                 perror("Unable to set socket");
187                 return -1;
188         }
189
190         
191         //tos = ((AC_QUEUES - ac) *2 - 1)*32;
192         tos = ac_to_tos[ac];
193         if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
194                 perror("Unable to set TOS");
195                 close(sockfd);
196                 return -1;
197         }
198
199         return sockfd;
200 }
201
202 void empty_handler()
203 {
204 }
205
206 void* receiver(void* queue)
207 {
208         struct msg_t    msg;
209         struct  sockaddr_in rem_addr;
210         int     mlen;
211         unsigned int ac, rem_addr_length; 
212         unsigned long int trans_time_usec;
213         unsigned long int min_trans_time;
214         struct timespec   send_timestamp,recv_timestamp, trans_time; 
215         
216         min_trans_time = ~0;
217         
218         block_signals();
219         set_rt_prio(99);
220
221         ac = (int)queue;
222         rem_addr_length = sizeof(rem_addr);
223         while (!exit_flag) {
224                 mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0,    \
225                                 (struct sockaddr*)&rem_addr, &rem_addr_length);
226                 if (mlen < 0) {
227                         if (errno == EINTR) continue;
228                         perror("Chyba pri prijimani pozadavku");
229                         goto out;
230                 }       
231                 clock_gettime(CLOCK_MONOTONIC,&recv_timestamp);
232                 send_timestamp = msg.send_timestamp;
233                 
234                 timespec_sub(&trans_time,&recv_timestamp ,&send_timestamp);
235                 trans_time_usec = (trans_time.tv_sec * 1000000 + \
236                                          trans_time.tv_nsec / 1000) /2;
237           
238                 if (trans_time_usec < MAX_DELAY_US)
239                         delay_stats[ac][trans_time_usec/GRANULARITY]++;
240         
241                 /*if (trans_time_nsec < min_trans_time) 
242                         min_trans_time = trans_time_nsec;*/
243                 /*printf("seqn= %lu tos= %d start= %lu(s).%lu(ns)"\
244                          "stop= %lu(s).%lu(ns)\n trans_time = %lums\n",\
245                          msg.seqn, msg.tos, send_timestamp.tv_sec,\
246                          send_timestamp.tv_nsec,recv_timestamp.tv_sec,\
247                          recv_timestamp.tv_nsec, trans_time_msec); */
248         }
249 out:
250         sem_post(&sem_thread_finished);
251         return NULL;
252 }
253
254 void* sender(void* endpoint)
255 {
256         struct sockaddr_in rem_addr;
257         struct msg_t msg;
258         unsigned long int seqn;
259         struct timespec time_to_wait, current_time, period, interval;
260         struct hostent* ph;
261         struct send_endpoint* spoint = (struct send_endpoint*) endpoint;
262         int ac = spoint->ac;
263         
264         memset(&rem_addr,0, sizeof(rem_addr));
265         //-------------------------------------------------------------------
266         // TODO: not functioning - check it
267                 
268         if ((rem_addr.sin_addr.s_addr = inet_addr(server_addr)) == INADDR_NONE){
269            ph = gethostbyname(server_addr);
270            if (ph) 
271                    rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
272            else {
273                    perror("Unknown server"); 
274            }
275         }
276         
277         //------------------------------------------------------------------
278  
279         rem_addr.sin_family = AF_INET;
280         rem_addr.sin_addr.s_addr =  inet_addr(server_addr);
281         rem_addr.sin_port = htons(BASE_PORT + ac);
282         seqn = 0;
283         
284         period.tv_nsec = spoint->period_nsec;
285         period.tv_sec = 0;
286
287         block_signals();
288         set_rt_prio(90-ac);
289
290         while (!exit_flag) {
291
292                 msg.seqn = seqn;
293                 msg.tos = ac_to_tos[ac];
294                 
295                 clock_gettime(CLOCK_MONOTONIC,&msg.send_timestamp);
296                 
297                 while (sendto(ac_sockfd[ac], &msg, sizeof(msg), 0,\
298                                 (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
299                                 if (errno == EINTR) continue;
300                                 perror("Error while sending");
301                                 goto out;
302                 }
303
304 #ifdef DEBUG
305                 printf("%d", ac);
306                 fflush(stdout);
307 #endif
308                 seqn++;
309                 
310                 timespec_add(&time_to_wait,&msg.send_timestamp,&period);
311                 clock_gettime(CLOCK_MONOTONIC,&current_time);
312                 timespec_sub(&interval,&time_to_wait,&current_time);
313                 nanosleep(&interval,NULL);
314         }
315 out:
316         sem_post(&sem_thread_finished);
317         return NULL;
318 }
319
320 int main(int argc, char *argv[])
321 {
322         int ac, i, rc;
323         pthread_attr_t attr;
324         pthread_t thread;
325         int use_stdin = 0;
326         char opt;
327
328
329         while ((opt = getopt(argc, argv, "hs")) != -1) {
330                 switch (opt) {
331                         case 's':
332                                 use_stdin = 1;
333                                 break;
334                         default:
335                                 fprintf(stderr, "Usage: %s [ options ] server_addr\n", argv[0]);
336                                 fprintf(stderr, "options:  -s  read streams from stdin\n");
337                                 exit(1);
338                 }
339         }
340         if (optind < argc) {
341                 server_addr = argv[optind];
342         } else {
343                 fprintf(stderr, "Expected server address argument\n");
344                 exit(1);
345         }
346
347
348                 
349         memset(delay_stats,0, sizeof(delay_stats));     
350         pthread_attr_init(&attr);
351
352         if ((logfd = fopen(logfname,"w+")) == NULL) {
353                 fprintf(stderr,"Can not open %s\n", logfname);
354                 exit(1);
355         }
356                 
357
358         if (signal(SIGTERM, stopper) == SIG_ERR) {
359                 perror("Error in signal registration");
360                 exit(1);
361         }
362                 
363         if (signal(SIGINT, stopper) == SIG_ERR) {
364                 perror("Signal handler registration error");
365                 exit(1);
366         }
367
368         struct sigaction sa;
369         sa.sa_handler = empty_handler;
370         sa.sa_flags = 0;        /* don't restart syscalls */
371
372         if (sigaction(SIGUSR1, &sa, NULL) < 0) {
373                 perror("sigaction error");
374                 exit(1);
375         }
376
377         sem_init(&sem_thread_finished, 0, 0);
378         
379         /* create four receivers each per AC */
380         for (ac = AC_QUEUES - 1; ac >= 0; ac--) {
381                 ac_sockfd[ac] = create_ac_socket(ac);
382                 rc = pthread_create(&receivers[ac].thread, &attr, receiver, (void*) ac);
383                 if (rc) {
384                         printf("Error while creating receiver %d\n",rc);
385                         return 1;
386                 }
387         }               
388                         
389         /* create sendpoints */
390         for (i = 0; i < nr_sepoints; i++) {
391                 rc = pthread_create(&thread, &attr, sender, (void*) &sepoint[i]);
392                 if (rc) {
393                         printf("Error while creating sender %d\n",rc);
394                         return 1;
395                 }
396         }
397         
398         while (!exit_flag) {
399                 sleep(100000);
400         }
401
402         printf("Waiting for threads to finish\n");
403         /* Wait for all threads to finish */
404         for (i=0; i < nr_sepoints + AC_QUEUES; i++) {
405                 sem_wait(&sem_thread_finished);
406         }
407
408         save_results();
409
410         return 0;
411 }