]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - wme_test/wclient.c
Reject contracts with zero period and budget
[frescor/fwp.git] / wme_test / wclient.c
1 #include <errno.h>
2 #include <error.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <arpa/inet.h>
7 #include <netdb.h>
8 #include <signal.h>
9 #include <sys/wait.h>
10 #include <stdio.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <time.h>
14 #include <string.h>
15 #include <pthread.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <stdbool.h>
19 #include "common.h"
20 #include <semaphore.h>
21 #include <sys/ioctl.h>
22 #include <net/if.h>
23 #include <inttypes.h>
24
25 #ifdef WITH_FWP
26 #include <frsh.h>
27 #include <ncurses.h>
28 #include <fwp_res.h>
29 #endif
30
31 #define MAX_STREAMS  10 
32 #define MIN_GRANULARITY 100
33
34 unsigned opt_packet_size = 800;
35 int opt_send_buf_size = -1;
36 unsigned opt_period_usec = 10*MSEC_TO_USEC;
37 char    *opt_interface;
38 unsigned opt_jitter = 0;
39 char    *opt_output = "delay_stats";
40 unsigned opt_count_sec = 0;
41 unsigned opt_def_bandwidth = 50;
42 unsigned opt_def_period_msec = 0;
43 int opt_granularity_usec = MIN_GRANULARITY;
44 bool opt_wait_for_queue_is_full; /* Don't gather any statistics until any queue is full */
45 char *opt_comment = NULL;
46
47 bool some_queue_is_full = false;
48 uint64_t reset_timestamp; /* [nsec] */
49
50 bool some_contract_not_accepted = false;
51
52 /* Locked when some queue is full to prevent multiple resets of
53    statstics. */
54 pthread_mutex_t queue_full_mutex = PTHREAD_MUTEX_INITIALIZER;
55
56 int ac_sockfd[AC_NUM];
57
58 struct receiver {
59         bool valid;
60         pthread_t thread;
61         unsigned received, last_received;
62 };
63
64 struct receiver receivers[AC_NUM];
65
66 FILE* logfd;
67 char* server_addr; 
68 char logfname[100];
69
70 /* maximal traffic delay in ms - 10 s*/
71 #define MAX_DELAY_US 10000000
72
73 struct delay_stat {
74         unsigned csc;           /* Client-server-client delay divided by 2 */
75         unsigned cs;            /* Client-server delay */
76         unsigned sc;            /* Server-client delay */
77 };
78
79 struct delay_stat delay_stats[AC_NUM][MAX_DELAY_US/MIN_GRANULARITY];
80 pthread_mutex_t delay_stats_mutex = PTHREAD_MUTEX_INITIALIZER;
81
82 /*struct ac_stats[AC_NUM] {
83    unsigned long int min_trans_time;
84    unsigned long int sum_trans_time;
85    struct timespec   recv_timestamp;
86    struct timespec   send_timestamp; 
87 };*/
88
89 struct stream {
90         /* Input parameters */
91         enum ac ac;             /*  */
92         int bandwidth_bps;      /* bits per second */
93         int jitter;             /* percent */
94         /* Mulualy exclusive input parameters */
95         int packet_size;
96         long period_usec;       /* all time units are in microseconds */
97
98         /* Internal fields */
99 #ifndef WITH_FWP
100         struct sockaddr_in rem_addr;
101 #else
102         frsh_send_endpoint_t endpoint;
103         frsh_receive_endpoint_t resp_endpoint;
104         frsh_vres_id_t vres, vres_rcv;
105         uint16_t resp_port;
106         struct receiver receiver;
107         long wc_delay;          /* worst-case delay  */
108 #endif
109
110         /* Statistics */
111         pthread_mutex_t mutex;
112         unsigned long long sent, really_sent, received;
113 };
114
115 static struct cmsg_ipi {
116         struct cmsghdr cm;
117         struct in_pktinfo ipi;
118 } cmsg = { {sizeof(struct cmsg_ipi), SOL_IP, IP_PKTINFO},
119            {0, }};
120 int cmsg_len = 0;
121
122 /*
123 struct send_endpoint sepoint[] = {
124         { .ac = AC_VO, .period_usec=200*MSEC_TO_USEC, .bandwidth_bps = 34*Kbit },
125         { .ac = AC_VI, .period_usec=25*MSEC_TO_USEC, .bandwidth_bps =  480*Kbit },
126         { .ac = AC_BE, .period_usec=40*MSEC_TO_USEC, .bandwidth_bps =  300*Kbit },
127         { .ac = AC_BK, .period_usec=40*MSEC_TO_USEC, .bandwidth_bps =  300*Kbit },
128 //      { .ac = AC_VI, .period_usec=17*MSEC_TO_USEC, .bandwidth_bps =  675*Kbit },
129 };
130 */
131
132 struct stream streams[MAX_STREAMS];
133
134 unsigned int nr_streams = 0;
135
136 sem_t sem_thread_finished;
137
138 bool exit_flag = false;
139
140 #ifdef WITH_FWP
141 #define negotiate_contract_for_stream(s) negotiate_contract_for_stream_fwp(s)
142 #define create_stream_endpoint(s) create_stream_endpoint_fwp(s)
143 #define send_packet(s, b) send_packet_fwp(s, b)
144 #define recv_packet(s, b) recv_packet_fwp(s, b)
145 #define wait_for_all_threads_to_finish() wait_for_all_threads_to_finish_fwp()
146 #else
147 #define negotiate_contract_for_stream(s) 0
148 #define create_stream_endpoint(s) create_stream_endpoint_native(s)
149 #define send_packet(s, b) send_packet_native(s, b)
150 #define recv_packet(s, b) recv_packet_native(s, b)
151 #define wait_for_all_threads_to_finish() wait_for_all_threads_to_finish_native()
152 #endif
153
154 void stopper()
155 {
156         int i;
157         exit_flag = true;
158
159         /* Interrupt all receivers */
160 #ifdef WITH_FWP
161         for (i=0; i < nr_streams; i++) {
162                 if (streams[i].receiver.valid) pthread_kill(streams[i].receiver.thread, SIGUSR1);
163         }
164 #else
165         for (i=0; i < AC_NUM; i++) {
166                 pthread_kill(receivers[i].thread, SIGUSR1);
167         }       
168 #endif
169 }
170
171 void stream_to_text(char *stream_desc, size_t n, struct stream *stream, long long useconds)
172 {
173         char buf[3][12];
174         char real[100];
175
176         if (useconds) {
177                 snprintf(real, sizeof(real), "; real: %s sent %lld (%lld/s), received %lld (%lld/s)",
178                          bandwidth_to_text(buf[0], (long long)stream->really_sent*stream->packet_size*8*SEC_TO_USEC/useconds),
179                          stream->sent, stream->sent*SEC_TO_USEC/useconds,
180                          stream->received, stream->received*SEC_TO_USEC/useconds);
181         } else {
182                 real[0]=0;
183         }
184         
185         snprintf(stream_desc, n, "%"PRIdPTR": %s %s (%d bytes per %s +-%s, %d packets/s)%s",
186                  stream-streams, ac_to_text[stream->ac], bandwidth_to_text(buf[0], stream->bandwidth_bps),
187                  stream->packet_size, usec_to_text(buf[1], stream->period_usec),
188                  usec_to_text(buf[2], stream->jitter*stream->period_usec/100),
189                  (int)(SEC_TO_USEC/stream->period_usec), real);
190 }
191
192 void save_results(int argc, char *argv[], int useconds)
193 {
194         int ac, i, maxi;
195         const int mini = 3000/opt_granularity_usec;
196         bool allzeros;
197         unsigned send_count[AC_NUM];
198
199         fprintf(stderr, "Writing data to %s... ", logfname);
200         fflush(stderr);
201
202         fprintf(logfd, "# Invoked as: ");
203         for (i=0; i<argc; i++) fprintf(logfd, "%s ", argv[i]);
204         if (opt_comment) {
205                 fprintf(logfd, "(%s)", opt_comment);
206         }
207         fprintf(logfd, "\n");
208
209         if (useconds/SEC_TO_USEC != opt_count_sec) {
210                 char buf[20];
211                 usec_to_text(buf, useconds);
212                 fprintf(logfd, "# Data gathered for %s.\n", buf);
213         }
214                 
215         for (i = 0; i < nr_streams; i++) {
216                 char stream_desc[200];
217                 stream_to_text(stream_desc, sizeof(stream_desc), &streams[i], useconds);
218                 fprintf(logfd, "# Stream %s\n", stream_desc);
219         }
220
221         /* Find maximal delay */
222         allzeros = true;
223         for (maxi = MAX_DELAY_US/opt_granularity_usec - 1; maxi >= 0; maxi--) {
224                 for (ac = 0; ac < AC_NUM; ac++) {
225                         if ((delay_stats[ac][maxi].csc != 0) ||
226                             (delay_stats[ac][maxi].cs != 0) ||
227                             (delay_stats[ac][maxi].sc != 0))
228                                 allzeros = false;
229                 }
230                 if (!allzeros) break;
231         }
232         maxi++;
233         if (maxi < mini) maxi = mini;
234
235         /* Calculate total number of sent packets per AC */
236         memset(send_count, 0, sizeof(send_count));
237         for (i = 0; i < nr_streams; i++) {
238                 ac = streams[i].ac;
239                 send_count[ac] += streams[i].sent;
240         }
241
242 #if 0
243         /* Write pdf */
244         for ( i = 0 ; i < maxi; i++) {
245                 fprintf(logfd,"\n%f", i*opt_granularity_usec/1000.0);
246                 for (ac = 0; ac < AC_NUM; ac++) { 
247                         if (sum[ac])
248                                 val = (double)delay_stats[ac][i]*100.0 / sum[ac];
249                         else val = -1; /* Don't display this ac */
250                         fprintf(logfd," %lf", val);
251                 }
252         }
253         
254         fprintf(logfd,"\n\n");
255 #endif
256         
257         fprintf(logfd,"## Format: msec csc%% cs%% sc%%\n");
258
259         /* Write PDF */
260         for (ac = 0; ac < AC_NUM; ac++) {
261                 struct delay_stat integral = {0,0,0}, last = {-1,-1,-1};
262
263                 fprintf(logfd,"%f %f %f %f\n", 0.0, 0.0, 0.0, 0.0);
264
265                 if (send_count[ac] != 0) {
266                         i=0;
267                         while ((delay_stats[ac][i].csc == 0) &&
268                                (delay_stats[ac][i].cs == 0) &&
269                                (delay_stats[ac][i].sc == 0)) i++;
270                 
271                         for (i++; i < maxi+1; i++) {
272                                 if (memcmp(&last, &integral, sizeof(last))) {
273                                         char buf[3][20];
274                                         snprintf(buf[0], sizeof(buf[0]), "%f", (double)integral.csc*100.0 / send_count[ac]);
275                                         snprintf(buf[1], sizeof(buf[1]), "%f", (double)integral.cs *100.0 / send_count[ac]);
276                                         snprintf(buf[2], sizeof(buf[2]), "%f", (double)integral.sc *100.0 / send_count[ac]);
277                                                  
278                                         fprintf(logfd,"%f %s %s %s\n", i*opt_granularity_usec/1000.0,
279                                                 integral.csc != last.csc ? buf[0] : "-",
280                                                 integral.cs  != last.cs  ? buf[1] : "-",
281                                                 integral.sc  != last.sc  ? buf[2] : "-"
282                                                 );
283                                         last = integral;
284                                 }
285                                 if (i>0) {
286                                         integral.csc += delay_stats[ac][i-1].csc;
287                                         integral.sc  += delay_stats[ac][i-1].sc;
288                                         integral.cs  += delay_stats[ac][i-1].cs;
289                                 }
290                         }
291                 }
292                 fprintf(logfd,"\n\n");
293         }
294         
295         fprintf(stderr, "finished.\n");
296         fclose(logfd);
297
298         exit(0);
299 }
300
301 int create_ac_socket(intptr_t ac) 
302 {
303         int sockfd;
304         unsigned int yes=1, tos;
305
306
307         if ((sockfd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
308         {
309                 error(0, errno, "Unable to open socket");
310                 return -1;
311         }
312         if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != 0) {
313                 error(0, errno, "set non-blocking socket");
314                 return -1;
315         }
316         if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
317                 error(0, errno, "Unable to set socket");
318                 return -1;
319         }
320
321         if (opt_send_buf_size >= 0) {
322                 if (setsockopt(sockfd,SOL_SOCKET,SO_SNDBUF,&opt_send_buf_size,sizeof(opt_send_buf_size)) == -1) {
323                         error(0, errno, "Unable to set socket buffer size");
324                         return -1;
325                 }
326         }
327
328         
329         //tos = ((AC_NUM - ac) *2 - 1)*32;
330         tos = ac_to_tos[ac];
331         if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
332                 error(0, errno, "Unable to set TOS");
333                 close(sockfd);
334                 return -1;
335         }
336
337         return sockfd;
338 }
339
340 void empty_handler()
341 {
342 }
343
344 void reset_statistics()
345 {
346         int i;
347         struct timespec ts;
348         for (i = 0; i < nr_streams; i++) {
349                 pthread_mutex_lock(&streams[i].mutex);
350                 streams[i].sent = 0;
351                 streams[i].really_sent = 0;
352                 streams[i].received = 0;
353                 pthread_mutex_unlock(&streams[i].mutex);
354         }
355         pthread_mutex_lock(&delay_stats_mutex);
356         clock_gettime(CLOCK_REALTIME, &ts);
357         reset_timestamp = ts.tv_sec*1000000000LL + ts.tv_nsec;
358         memset(delay_stats, 0, sizeof(delay_stats));
359         pthread_mutex_unlock(&delay_stats_mutex);
360 }
361
362 #ifndef WITH_FWP
363 int recv_packet_native(intptr_t ac, struct msg_t *msg)
364 {
365         int     mlen, ret;
366         fd_set fdset;
367         struct  sockaddr_in rem_addr;
368         unsigned int rem_addr_length; 
369
370         FD_ZERO(&fdset);
371         FD_SET(ac_sockfd[ac], &fdset);
372         rem_addr_length = sizeof(rem_addr);
373         mlen = -1;
374         while (!exit_flag) {
375                 ret = select(ac_sockfd[ac]+1, &fdset, NULL, NULL, NULL);
376                 if (ret < 0) {
377                         if (errno == EINTR) continue;
378                         error(0, errno, "receiver select");
379                         return -1;
380                 }
381                 mlen = recvfrom(ac_sockfd[ac], msg, sizeof(*msg), 0,
382                                 (struct sockaddr*)&rem_addr, &rem_addr_length);
383                 break;
384         }
385         return mlen;
386 }
387 #else
388 int recv_packet_fwp(struct stream *stream, struct msg_t *msg)
389 {
390         size_t mlen;
391
392         mlen = frsh_receive_sync(stream->resp_endpoint, msg, sizeof(*msg), &mlen, NULL);
393         return mlen;
394 }
395 #endif
396
397 void* receiver(void* arg)
398 {
399         struct msg_t    msg;
400         long long int trans_time_usec, client_to_server_usec, server_to_client_usec;
401         long long int min_trans_time;
402         struct timespec ts;
403         uint64_t send_timestamp, server_timestamp, recv_timestamp;
404         int     mlen;
405         intptr_t ac;
406         
407         min_trans_time = ~0;
408         
409         block_signals();
410         set_rt_prio(99);
411
412         while (!exit_flag) {
413 #ifdef WITH_FWP
414                 struct stream *stream = arg;
415                 ac = stream->ac;
416                 mlen = recv_packet_fwp(stream, &msg);
417 #else
418                 ac = (intptr_t)arg;
419                 mlen = recv_packet_native(ac, &msg);
420 #endif
421                 if (mlen < 0) {
422                         if (errno != EINTR)
423                                 error(0, errno, "receive_packet error");
424                         goto out;
425                 }       
426                 clock_gettime(CLOCK_REALTIME,&ts);
427                 recv_timestamp = ts.tv_sec*1000000000LL + ts.tv_nsec;
428                 send_timestamp = msg.send_timestamp;
429                 server_timestamp = msg.sendback_timestamp;
430
431                 /* Check whether this message was sent after reset_statistics() */
432
433                 if (send_timestamp < reset_timestamp) {
434                         continue; /* If so, don't count it */
435                 }
436
437                 trans_time_usec = (recv_timestamp - send_timestamp) / 2 / 1000;
438                 client_to_server_usec = (server_timestamp - send_timestamp) / 1000;
439                 server_to_client_usec = (recv_timestamp - server_timestamp) / 1000;
440
441                 pthread_mutex_lock(&delay_stats_mutex);
442                 if (trans_time_usec < MAX_DELAY_US && trans_time_usec >= 0) {
443                         delay_stats[ac][trans_time_usec/opt_granularity_usec].csc++;
444                 }
445                 if (client_to_server_usec < MAX_DELAY_US && client_to_server_usec >= 0) {
446                         delay_stats[ac][client_to_server_usec/opt_granularity_usec].cs++;
447                 }
448                 if (server_to_client_usec < MAX_DELAY_US && server_to_client_usec >= 0) {
449                         delay_stats[ac][server_to_client_usec/opt_granularity_usec].sc++;
450                 }
451                 pthread_mutex_unlock(&delay_stats_mutex);
452
453 #ifdef WITH_FWP
454                 if (trans_time_usec > stream->wc_delay) {
455                         stream->wc_delay = trans_time_usec;
456                 }
457 #endif
458                 receivers[ac].received++;
459                 
460                 pthread_mutex_lock(&streams[msg.stream].mutex);
461                 streams[msg.stream].received++;
462                 pthread_mutex_unlock(&streams[msg.stream].mutex);
463         
464                 /*if (trans_time_nsec < min_trans_time) 
465                         min_trans_time = trans_time_nsec;*/
466                 /*printf("seqn= %lu tos= %d start= %lu(s).%lu(ns)"\
467                          "stop= %lu(s).%lu(ns)\n trans_time = %lums\n",\
468                          msg.seqn, msg.tos, send_timestamp.tv_sec,\
469                          send_timestamp.tv_nsec,recv_timestamp.tv_sec,\
470                          recv_timestamp.tv_nsec, trans_time_msec); */
471         }
472 out:
473         sem_post(&sem_thread_finished);
474         return NULL;
475 }
476
477 /** 
478  * Send a packet.
479  * 
480  * @return -1 in case of error, 1 in case of sucessfull send and 0
481  *         when all buffers are full.
482  */
483 #ifndef WITH_FWP
484 static inline int 
485 send_packet_native(struct stream* stream, union msg_buff* buff)
486 {
487         struct iovec  iov;
488         struct msghdr msg;
489
490         iov.iov_base = buff;
491         iov.iov_len = stream->packet_size;
492         msg.msg_name = (void*)&stream->rem_addr;
493         msg.msg_namelen = sizeof(stream->rem_addr);
494         msg.msg_iov = &iov;
495         msg.msg_iovlen = 1;
496         msg.msg_flags = 0;
497         msg.msg_control = &cmsg;
498         msg.msg_controllen = cmsg_len;
499
500         int ret = 1;
501         
502         while (sendmsg(ac_sockfd[stream->ac], &msg, 0) < 0) {
503                 if (errno == EINTR) continue;
504                 if (errno == EAGAIN) {
505                         if (opt_wait_for_queue_is_full && 
506                             !some_queue_is_full && 
507                             /* We use mutex as atomic test and set */
508                             (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
509                                 some_queue_is_full = true;
510                                 reset_statistics();
511                         }
512                         ret = 0;
513                         break;
514                 } else {
515                         error(0, errno, "Error while sending");
516                         ret = -1;
517                         break;
518                 }
519         }
520         return ret;
521 }
522 #else
523 static inline int 
524 send_packet_fwp(struct stream* stream, union msg_buff* buff)
525 {
526         int ret;
527
528         buff->msg.resp_port = htons(stream->resp_port);
529         ret = frsh_send_sync(stream->endpoint, buff, stream->packet_size);
530
531         return (ret == 0) ? 0 : -1;
532 }
533 #endif
534
535 static inline void
536 wait_for_next_send(struct stream* stream, struct timespec *last_send_time)
537 {
538         struct timespec time_to_wait, current_time, period, interval;
539         unsigned period_usec = stream->period_usec;
540
541         /*           |~~~+~~~| jitter interval (width = 2*stream->jitter percentage from period)*/
542         /* |-------------|     nominal period*/
543         if (stream->jitter) {
544                 period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-stream->jitter)/100
545                                                + rand() % (2*period_usec*stream->jitter/100));
546         } else {
547                 period.tv_nsec = USEC_TO_NSEC*(period_usec);
548         }
549         period.tv_sec = 0;
550         
551         timespec_add(&time_to_wait, last_send_time, &period);
552         clock_gettime(CLOCK_REALTIME,&current_time);
553         timespec_sub(&interval,&time_to_wait,&current_time);
554         nanosleep(&interval,NULL);
555 }
556
557
558 void* sender(void* arg)
559 {
560         union msg_buff buff;
561         unsigned long int seqn;
562         struct stream* stream = (struct stream*) arg;
563         struct timespec ts;
564         int ret;
565
566 #ifndef WITH_FWP
567         char stream_desc[100];
568         stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
569         printf("%s\n", stream_desc);
570 #endif
571         if (stream->bandwidth_bps == 0)
572                 goto out;
573
574         seqn = 0;
575         
576         block_signals();
577         set_rt_prio(90-stream->ac);
578
579         while (!exit_flag) {
580
581 /*              buff.msg.seqn = seqn++; */
582 /*              buff.msg.tos = ac_to_tos[stream->ac]; */
583                 buff.msg.stream = stream-streams;
584                 
585                 clock_gettime(CLOCK_REALTIME,&ts);
586                 buff.msg.send_timestamp = ts.tv_sec*1000000000LL + ts.tv_nsec;
587
588                 ret = send_packet(stream, &buff);
589                 if (ret < 0) {
590                         goto out;
591                 }
592
593                 pthread_mutex_lock(&stream->mutex);
594                 stream->sent++;
595                 if (ret > 0)
596                         stream->really_sent++;
597                 pthread_mutex_unlock(&stream->mutex);
598
599 #ifdef DEBUG
600                 printf("%d", stream->ac);
601                 fflush(stdout);
602 #endif
603
604                 wait_for_next_send(stream, &ts);
605         }
606 out:
607         sem_post(&sem_thread_finished);
608         return NULL;
609 }
610
611 #ifdef WITH_FWP
612 static int negotiate_contract_for_stream_fwp(struct stream *stream)
613 {
614         frsh_contract_t contract;
615         int ret;
616         frsh_rel_time_t budget, period, deadline;
617         frsh_signal_info_t si;
618
619         /* Contract for client->server stream */
620         frsh_contract_init(&contract);
621         frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
622         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
623         period = frsh_usec_to_rel_time(stream->period_usec);
624         frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_REGULAR);
625         deadline = frsh_usec_to_rel_time(3*stream->period_usec);
626         frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
627         
628         ret = frsh_contract_negotiate(&contract, &stream->vres);
629         frsh_contract_destroy(&contract);
630         if (ret != 0) {
631                 stream->vres = NULL;
632                 fprintf(stderr, "Send contract was not accepted\n");
633                 return ret;
634         }
635
636         /* Contract for server->client stream */
637         /* TODO: Use group negotiation for these two contracts */
638         frsh_contract_init(&contract);
639         frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
640         frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
641         period = frsh_usec_to_rel_time(stream->period_usec);
642         frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_DUMMY);
643         deadline = frsh_usec_to_rel_time(3*stream->period_usec);
644         frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
645         
646         ret = frsh_contract_negotiate(&contract, &stream->vres_rcv);
647         frsh_contract_destroy(&contract);
648         if (ret != 0) {
649                 fprintf(stderr, "Receive contract was not accepted\n");
650                 return ret;
651         }
652
653         /* We don't use the vres at server, since the server doesn't
654          * know the parameters. Instread, server uses plain
655          * sockets. */
656         
657         return ret;
658 }
659 #endif
660
661 #ifdef WITH_FWP
662 static void create_stream_endpoint_fwp(struct stream *stream)
663 {
664 /*      fwp_endpoint_attr_t  attr; */
665         int ret;
666         struct hostent* ph;
667         frsh_contract_t c;
668         fres_block_fwp_sched *fwp_sched;
669
670         frsh_vres_get_contract(stream->vres, &c);
671         fwp_sched = fres_contract_get_fwp_sched(c);
672
673         stream->ac = fwp_sched->ac_id;
674
675 /*      fwp_endpoint_attr_init(&attr); */
676 /*      fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_BESTEFFORT); */
677
678         ph = gethostbyname(server_addr);
679         if (ph && ph->h_addr_list[0]) {
680                 struct in_addr *a = (struct in_addr *)(ph->h_addr_list[0]);
681                 frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
682                 frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
683                 frsh_endpoint_queueing_info_t qi = { .queue_size=0, .queue_policy=FRSH_QRP_OLDEST };
684                 ret = frsh_send_endpoint_create(FRSH_NETPF_FWP, a->s_addr, BASE_PORT + stream->ac, 
685                                                 spi, &stream->endpoint);
686                 if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
687                 
688                 ret = frsh_send_endpoint_bind(stream->vres, stream->endpoint);
689                 if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
690
691                 ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
692                                                    &stream->resp_endpoint);
693                 if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
694                 
695                 unsigned int port;
696                 frsh_receive_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL, NULL);
697                 stream->resp_port = port;
698
699                 ret = pthread_create(&stream->receiver.thread, NULL, receiver, (void*)stream);
700                 if (ret) error(1, ret, "Error while creating receiver");
701                 
702                 stream->receiver.valid = true;
703         }
704         else {
705                 error(1, errno, "gethostbyname(%s)", server_addr);
706         }
707
708 }
709 #else
710 static void create_stream_endpoint_native(struct stream *stream)
711 {
712         struct hostent* ph;
713
714         memset(&stream->rem_addr,0, sizeof(stream->rem_addr));
715
716         stream->rem_addr.sin_family = AF_INET;
717         ph = gethostbyname(server_addr);
718         if (ph)
719                 stream->rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
720         else {
721                 error(1, errno, "gethostbyname(%s)", server_addr);
722         }
723         stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac);
724 }
725 #endif
726
727 static inline void
728 calc_stream_params(struct stream *stream)
729 {
730         int packet_size;
731         unsigned period_usec;
732         int bandwidth;
733         int ret;
734
735         /* If some parameters are not set explicitely, use default values. */
736         if (stream->bandwidth_bps < 0) stream->bandwidth_bps = opt_def_bandwidth * Kbit;
737         if (stream->packet_size < 0) stream->packet_size = opt_packet_size;
738         if (stream->period_usec < 0) stream->period_usec = opt_def_period_msec * MSEC_TO_USEC;
739
740         bandwidth = stream->bandwidth_bps;
741
742         /* Avoid arithmetic exception. Server thread will exit if
743            stream->bandwidth_bps == 0. */
744         if (bandwidth == 0) bandwidth = 1;
745
746         if (stream->packet_size) {
747                 packet_size = stream->packet_size;
748                 period_usec = SEC_TO_USEC*packet_size*8/bandwidth;
749                 if (period_usec == 0) period_usec = 1;
750         } else if (stream->period_usec) {
751                 period_usec = stream->period_usec;
752                 packet_size = (long long)bandwidth/8 * period_usec/SEC_TO_USEC;
753         } else {
754                 char buf[200];
755                 stream_to_text(buf, sizeof(buf), stream, 0);
756                 error(1, 0, "Neither packet size nor period was specified for a stream %s", buf);
757         }
758
759         if (packet_size < sizeof(struct msg_t)) {
760                 error(1, 0, "Packet size too small (min %zd)", sizeof(struct msg_t));
761         }
762
763         stream->packet_size = packet_size;
764         stream->period_usec = period_usec;
765         stream->jitter = opt_jitter;
766
767         ret = negotiate_contract_for_stream(stream);
768         if (ret == 0) {
769                 create_stream_endpoint(stream);
770         } else {
771                 char buf[200];
772                 stream_to_text(buf, sizeof(buf), stream, 0);
773                 fprintf(stderr, "Contract hasn't been accepted:\n%s\n", buf);
774                 stream->bandwidth_bps = 0;
775                 some_contract_not_accepted = true;
776         }
777 }
778
779 /** 
780  * Parse -b parameter.
781  * 
782  * @param params String to parse
783  * 
784  * @return NULL in case of success, pointer to a problematic character
785  *         on error.
786  */
787 char* parse_bandwidths(char *params)
788 {
789         struct stream *sp = &streams[nr_streams];
790
791         while (*params && nr_streams < MAX_STREAMS) {
792                 char *ac_ids[AC_NUM] = { [AC_VO]="VO", [AC_VI]="VI", [AC_BE]="BE", [AC_BK]="BK" };
793                 int i;
794                 char *next_char;
795
796                 if (strlen(params) < 2)
797                         return params;
798                 for (i=0; i<AC_NUM; i++) {
799                         if (strncmp(params, ac_ids[i], 2) == 0) {
800                                 sp->ac = i;
801                                 params+=strlen(ac_ids[i]);
802                                 break;
803                         }
804                 }
805                 if (i==AC_NUM)
806                         return params;
807
808                 long bw;
809                 if (*params == ':') {
810                         params++;
811
812                         bw = strtol(params, &next_char, 10);
813                         if (next_char == params)
814                                 return params;
815                         params = next_char;
816                 } else
817                         bw = -1;
818                 
819                 sp->bandwidth_bps = bw*Kbit;
820
821                 long period = 0;
822                 long packet_size = 0;
823                 if (*params == '@') {
824                         params++;
825                         period = strtol(params, &next_char, 10);
826                         if (period == 0)
827                                 return params;
828                         params = next_char;
829                 }
830                 else {
831                         if (*params == '/') {
832                                 params++;
833                                 packet_size = strtol(params, &next_char, 10);
834                                 if (packet_size == 0)
835                                         return params;
836                                 params = next_char;
837                         } else {
838                                 packet_size = -1; 
839                                 period = -1;
840                         }
841                 }
842                 sp->period_usec = period*MSEC_TO_USEC;
843                 sp->packet_size = packet_size;
844
845                 
846
847                 if (*params != '\0' && *params != ',')
848                         return params;
849                 nr_streams++;
850                 sp++;
851                 if (*params == ',')
852                         params++;
853         }
854         return NULL;
855 }
856
857 #ifdef WITH_FWP
858 void wait_for_all_threads_to_finish_fwp(void)
859 {
860         int i;
861         /* Wait for all threads to finish */
862         /* FIXME: */
863 /*      for (i=0; i < 2*nr_streams; i++) { */
864 /*              sem_wait(&sem_thread_finished); */
865 /*      } */
866 }
867 #else
868 void wait_for_all_threads_to_finish_native(void)
869 {
870         int i;
871         /* Wait for all threads to finish */
872         for (i=0; i < nr_streams + AC_NUM; i++) {
873                 sem_wait(&sem_thread_finished);
874         }
875 }
876 #endif
877
878 #ifdef WITH_FWP
879 void init_gui()
880 {
881         initscr();
882         cbreak();
883         noecho();
884 }
885
886 #define addfield(title, format, ...)                                    \
887         move(y, x);                                                     \
888         x+=strlen(title)+1;                                             \
889         if (i == 0) addstr(title);                                      \
890         else {                                                          \
891                 snprintf(str, sizeof(str), format, __VA_ARGS__);        \
892                 addstr(str);                                            \
893         }
894
895 void print_status(int seconds)
896 {
897         int i;
898         char str[200], s1[20];
899         int x = 0, y;
900         struct stream *s = NULL;
901         
902         for (i = 0; i <= nr_streams; i++) {
903                 if (i>0) s = &streams[i-1];
904                 y=i;
905                 x=0;
906                 addfield("Stream", "%d", i);
907                 addfield("Bandwidth", "%s", bandwidth_to_text(s1, s->bandwidth_bps));
908                 addfield("Packet size", "%d bytes", s->packet_size);
909                 addfield("Period   ", "%s", usec_to_text(s1, s->period_usec));
910                 addfield("AC   ", "%s", ac_to_text[s->ac]);
911                 addfield("Worst-case delay", "%s", usec_to_text(s1, s->wc_delay));
912                 addfield("Received responses", "%lld", s->received);
913         }
914         refresh();
915 }
916 #else
917 void init_gui() {}
918 void print_status(int seconds)
919 {
920         int ac;
921         fprintf(stderr, "\r%3ds", seconds);
922         for (ac = 0; ac < AC_NUM; ac++) {
923                 int delta = receivers[ac].received - receivers[ac].last_received;
924                 receivers[ac].last_received = receivers[ac].received;
925                 fprintf(stderr, " %s %5d %4d/s", ac_to_text[ac], receivers[ac].received, delta);
926         }
927         fflush(stderr);
928 }
929 #endif
930
931 int main(int argc, char *argv[])
932 {
933         int i, rc, frames, seconds;
934         pthread_attr_t attr;
935         pthread_t thread;
936         char opt;
937
938
939         while ((opt = getopt(argc, argv, "B:b:C:c:g:I:j:o:qQ:s:T:")) != -1) {
940                 switch (opt) {
941                         case 'B':
942                                 opt_def_bandwidth = atoi(optarg);
943                                 break;
944                         case 'b': {
945                                 char *errpos;
946                                 errpos = parse_bandwidths(optarg);
947                                 if (errpos != NULL) {
948                                         if (*errpos == '\0')
949                                                 error(1, 0, "Bandwidth parse error - string to short");
950                                         else
951                                                 error(1, 0, "Bandwidth parse error at '%s'", errpos);
952                                 }
953                                 break;
954                         }
955                         case 'C':
956                                 opt_comment = optarg;
957                                 break;
958                         case 'c':
959                                 opt_count_sec = atoi(optarg);
960                                 break;
961                         case 'g':
962                                 opt_granularity_usec = atoi(optarg);
963                                 if (opt_granularity_usec < MIN_GRANULARITY) {
964                                         error(1, 0, "Granulatiry too small (min %d)", MIN_GRANULARITY);
965                                 }
966                                 break;
967                         case 'I':
968                                 opt_interface = optarg;
969                                 break;
970                         case 'j':
971                                 #ifdef WITH_FWP
972                                 error(1, 0, "-j is not allowd when compiled with FWP");
973                                 #else
974                                 opt_jitter = atoi(optarg);
975                                 #endif
976                                 break;
977                         case 'o':
978                                 opt_output = optarg;
979                                 break;
980                         case 'Q':
981                                 opt_send_buf_size = atoi(optarg);
982                                 break;
983                         case 'q':
984                                 opt_wait_for_queue_is_full = true;
985                                 break;
986                         case 's':
987                                 opt_packet_size = atoi(optarg);
988                                 break;
989                         case 'T':
990                                 opt_def_period_msec = atoi(optarg);
991                                 break;
992                         default:
993                                 fprintf(stderr, "Usage: %s [ options ] server_addr\n\n", argv[0]);
994                                 fprintf(stderr, "Options:\n");
995                                 fprintf(stderr, "    -B  default bandwidth for -b option [kbit]\n");
996                                 fprintf(stderr, "    -b  bandwidth of streams (VO|VI|BE|BK)[:<kbit>][@<msec> or /<bytes>][,...]\n");
997                                 fprintf(stderr, "    -C  comment (added to header)\n");
998                                 fprintf(stderr, "    -c  count (number of seconds to run)\n");
999                                 fprintf(stderr, "    -g  histogram granularity [usec]\n");
1000                                 fprintf(stderr, "    -I  <interface> send packets from this interface\n");
1001                                 fprintf(stderr, "    -j  send jitter (0-100) [%%]\n");
1002                                 fprintf(stderr, "    -o  output filename (.dat will be appended)\n");
1003                                 fprintf(stderr, "    -q  gather statistics only after some queue becomes full\n");
1004                                 fprintf(stderr, "    -Q  <bytes> set size for socket send buffers\n");
1005                                 fprintf(stderr, "    -s  size of data payload in packets [bytes] (default: %d)\n", opt_packet_size);
1006                                 fprintf(stderr, "    -T  default period for -b option [msec]\n");
1007                                 exit(1);
1008                 }
1009         }
1010         if (opt_packet_size && opt_def_period_msec) {
1011                 error(1, 0, "Error: Nonzero -T and -s can't be used together!");
1012         }
1013
1014         if (optind < argc) {
1015                 server_addr = argv[optind];
1016         } else {
1017                 error(1, 0, "Expected server address argument");
1018         }
1019
1020         if (nr_streams == 0)
1021                 parse_bandwidths("BE");
1022                 
1023         pthread_attr_init(&attr);
1024
1025         snprintf(logfname, sizeof(logfname), "%s.dat", opt_output);
1026
1027         if ((logfd = fopen(logfname,"w+")) == NULL) {
1028                 error(1, errno ,"Can not open %s", logfname);
1029         }
1030         if (signal(SIGTERM, stopper) == SIG_ERR) {
1031                 error(1, errno, "Error in signal registration");
1032         }
1033                 
1034         if (signal(SIGINT, stopper) == SIG_ERR) {
1035                 error(1, errno, "Signal handler registration error");
1036         }
1037
1038         struct sigaction sa;
1039         sa.sa_handler = empty_handler;
1040         sa.sa_flags = 0;        /* don't restart syscalls */
1041
1042         if (sigaction(SIGUSR1, &sa, NULL) < 0) {
1043                 error(1, errno, "sigaction error");
1044         }
1045
1046         sem_init(&sem_thread_finished, 0, 0);
1047
1048         reset_statistics();
1049
1050 #ifdef WITH_FWP
1051         rc = frsh_init();
1052         if (rc != 0) {
1053                 error(1, errno, "FWP initialization failed");
1054         }
1055 #else
1056         intptr_t ac;
1057         /* create four receivers each per AC */
1058         for (ac = AC_NUM - 1; ac >= 0; ac--) {
1059                 ac_sockfd[ac] = create_ac_socket(ac);
1060                 if (ac_sockfd[ac] < 0) {
1061                         return 1;
1062                 }
1063                 rc = pthread_create(&receivers[ac].thread, &attr, receiver, (void*) ac);
1064                 if (rc) {
1065                         error(1, rc, "Error while creating receiver");
1066                 }
1067                 receivers[ac].valid = true;
1068         }               
1069 #endif
1070
1071                         
1072         if (opt_interface) {
1073                 struct ifreq ifr;
1074
1075                 memset(&ifr, 0, sizeof(ifr));
1076                 strncpy(ifr.ifr_name, opt_interface, IFNAMSIZ-1);
1077                 if (ioctl(ac_sockfd[AC_VO], SIOCGIFINDEX, &ifr) < 0) {
1078                         error(1, 0, "unknown iface %s", opt_interface);
1079                 }
1080                 cmsg.ipi.ipi_ifindex = ifr.ifr_ifindex;
1081                 cmsg_len = sizeof(cmsg);
1082         }
1083         /* create sendpoints */
1084         for (i = 0; i < nr_streams; i++) {
1085                 struct stream *s = &streams[i];
1086                 pthread_mutex_init(&s->mutex, NULL);
1087                 calc_stream_params(s);
1088                 rc = pthread_create(&thread, &attr, sender, (void*) s);
1089                 if (rc) error(1, rc, "Error while creating sender");
1090         }
1091
1092         if (some_contract_not_accepted) {
1093                 stopper();
1094         } else {
1095                 //init_gui();
1096
1097                 seconds = 1;
1098                 frames=0;
1099                 while (!exit_flag) {
1100 #ifdef WITH_FWP
1101                         usleep(40000);
1102 #else
1103                         sleep(1);
1104 #endif
1105                         frames++;
1106                         if (frames>=25) {
1107                                 seconds++;
1108                                 frames = 0;
1109                         }
1110                         print_status(seconds);
1111                         if (seconds == opt_count_sec)
1112                                 stopper();
1113                 }
1114         }
1115
1116 #ifdef WITH_FWP
1117         //endwin();
1118         for (i=0; i < nr_streams; i++) {
1119                 if (streams[i].vres)
1120                         frsh_contract_cancel(streams[i].vres);
1121                 if (streams[i].vres_rcv)
1122                         frsh_contract_cancel(streams[i].vres_rcv);
1123         }
1124 #else
1125         fprintf(stderr, "\nWaiting for threads to finish\n");
1126         wait_for_all_threads_to_finish();
1127
1128         struct timespec ts;
1129         uint64_t end_timestamp, measure_length;
1130         clock_gettime(CLOCK_REALTIME,&ts);
1131         end_timestamp = ts.tv_sec*1000000000LL+ts.tv_nsec;
1132         measure_length = end_timestamp - reset_timestamp;
1133
1134         save_results(argc, argv, measure_length/1000);
1135 #endif
1136
1137         return 0;
1138 }