]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Added a possibility to measure saturation bandwidth.
authorMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 18 Jan 2008 16:51:44 +0000 (17:51 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 18 Jan 2008 16:51:44 +0000 (17:51 +0100)
wme_test/wclient.c

index 09ef2775e34e77245cc70612ed09f7e2bdd00051..9d723a76d81b4e898f4bfe7f2ca30db5ea29dd01 100644 (file)
@@ -65,6 +65,7 @@ struct stream {
        /* Input parameters */
        enum ac ac;             /*  */
        int bandwidth_bps;      /* bits per second */
+       bool measure_saturation; /* Measure the saturation bandwidth first */
        int jitter;             /* percent */
        /* Mulualy exclusive input parameters */
        int packet_size;
@@ -112,6 +113,8 @@ void stream_to_text(char *stream_desc, size_t n, struct stream *stream, int seco
                snprintf(real, sizeof(real), "; real: sent %lld (%lld/s), received %lld (%lld/s)",
                         stream->sent, stream->sent/seconds,
                         stream->received, stream->received/seconds);
+       } else {
+               real[0]=0;
        }
        
        snprintf(stream_desc, n, "%d: %s %s (%d bytes per %s +-%s, %d packets/s)%s",
@@ -352,24 +355,32 @@ void* sender(void* arg)
        struct sockaddr_in rem_addr;
        union msg_buff buff;
        unsigned long int seqn;
-       struct timespec time_to_wait, current_time, period, interval;
+       unsigned long int really_sent = 0;
+       struct timespec time_to_wait, current_time, period, interval, measure_start, measure_end;
        struct hostent* ph;
        struct stream* stream = (struct stream*) arg;
+       int bandwidth;
        int packet_size;
        unsigned period_usec;
        char stream_desc[100];
-
-       if (stream->bandwidth_bps == 0)
-               goto out;
+       bool this_queue_is_full = false, this_queue_is_full_old = false;
 
        set_rt_prio(90-stream->ac);
 
+       bandwidth = stream->bandwidth_bps;
+       if (bandwidth == 0) {
+               /* Either we are going to measure saturation bandwidth
+                  or we will return before the main loop. */
+               bandwidth = 300*Mbit;
+       }
+
        if (stream->packet_size) {
                packet_size = stream->packet_size;
-               period_usec = SEC_TO_USEC*packet_size*8/stream->bandwidth_bps;
+               period_usec = SEC_TO_USEC*packet_size*8/bandwidth;
+               if (period_usec == 0) period_usec = 1;
        } else if (stream->period_usec) {
                period_usec = stream->period_usec;
-               packet_size = (long long)stream->bandwidth_bps/8 * period_usec/SEC_TO_USEC;
+               packet_size = (long long)bandwidth/8 * period_usec/SEC_TO_USEC;
        } else {
                char buf[200];
                stream_to_text(buf, sizeof(buf), stream, 0);
@@ -380,9 +391,9 @@ void* sender(void* arg)
        stream->period_usec = period_usec;
        stream->jitter = opt_jitter;
 
+       bandwidth = stream->bandwidth_bps;
 
        stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
-
        printf("%s\n", stream_desc);
 
        if (packet_size < sizeof(struct msg_t)) {
@@ -390,7 +401,6 @@ void* sender(void* arg)
                exit(1);
        }
 
-       
        memset(&rem_addr,0, sizeof(rem_addr));
                
        rem_addr.sin_family = AF_INET;
@@ -406,6 +416,9 @@ void* sender(void* arg)
        
        block_signals();
 
+       if (bandwidth == 0 && !stream->measure_saturation)
+               goto out;
+
        while (!exit_flag) {
 
                buff.msg.seqn = seqn;
@@ -414,15 +427,16 @@ void* sender(void* arg)
                
                clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
 
-               while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0,\
+               errno = 0;
+               while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0, \
                                (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
                                if (errno == EINTR) continue;
                                if (errno == EAGAIN) {
-                                       if (!some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
+                                       if (opt_wait_for_queue_is_full && !some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
                                                some_queue_is_full = true;
                                                reset_statistics();
                                        }
-
+                                       this_queue_is_full = true;
                                        break;
                                } else {
                                        perror("Error while sending");
@@ -431,12 +445,37 @@ void* sender(void* arg)
                }
 
 #ifdef DEBUG
-               printf("%d", ac);
+               printf("%d", stream->ac);
                fflush(stdout);
 #endif
                seqn++;
                stream->sent++;
 
+               /* Measure saturation bandwidth */
+               if (stream->measure_saturation && this_queue_is_full && !this_queue_is_full_old) {
+                       measure_start = buff.msg.send_timestamp;
+               }
+               this_queue_is_full_old = this_queue_is_full;
+
+               if (stream->measure_saturation && this_queue_is_full && errno != EAGAIN) {
+                       really_sent++;
+                       measure_end = buff.msg.send_timestamp;
+                       timespec_sub(&interval,&measure_end,&measure_start);
+                       if (really_sent >= 100 && interval.tv_sec >= 20) {
+                               period_usec = (interval.tv_sec*SEC_TO_USEC + interval.tv_nsec/USEC_TO_NSEC) / really_sent;
+
+                               bandwidth = (long long)packet_size*8*SEC_TO_USEC/period_usec;
+                               stream->bandwidth_bps = bandwidth;
+                               stream->period_usec = period_usec;
+                               stream->measure_saturation = false;
+                               reset_statistics();
+
+                               stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
+                               printf("\n%s\n", stream_desc);
+                       }
+               }
+
+                   
                /*           |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
                /* |-------------|     nominal period*/
                if (opt_jitter) {
@@ -482,10 +521,16 @@ char* parse_bandwidths(char *params)
                if (*params == ':') {
                        params++;
 
-                       bw = strtol(params, &next_char, 10);
-                       if (next_char == params)
-                               return params;
-                       params = next_char;
+                       if (strncmp(params, "SAT", 3) == 0) {
+                               bw = 0;
+                               sp->measure_saturation = true;
+                               params+=3;
+                       } else {
+                               bw = strtol(params, &next_char, 10);
+                               if (next_char == params)
+                                       return params;
+                               params = next_char;
+                       }
                } else
                        bw = opt_def_bandwidth;
                
@@ -529,7 +574,7 @@ char* parse_bandwidths(char *params)
 
 int main(int argc, char *argv[])
 {
-       int ac, i, rc, seconds, seconds_start;
+       int ac, i, rc, seconds;
        pthread_attr_t attr;
        pthread_t thread;
        char opt;
@@ -604,6 +649,7 @@ int main(int argc, char *argv[])
                exit(1);
        }
 
+       reset_statistics();
 
        if (nr_streams == 0)
                parse_bandwidths("BE");
@@ -658,16 +704,9 @@ int main(int argc, char *argv[])
        }
        
        seconds = 0;
-       seconds_start = -1;
        while (!exit_flag) {
                sleep(1);
                seconds++;
-               if (opt_wait_for_queue_is_full) {
-                       if (some_queue_is_full && seconds_start == -1) {
-                               printf("\nSome queue is full\n");
-                               seconds_start = seconds;
-                       }
-               }
                fprintf(stderr, "\r%3ds", seconds);
                for (ac = 0; ac < AC_NUM; ac++) {
                        int delta = receivers[ac].received - receivers[ac].last_received;
@@ -684,9 +723,11 @@ int main(int argc, char *argv[])
        for (i=0; i < nr_streams + AC_NUM; i++) {
                sem_wait(&sem_thread_finished);
        }
+       struct timespec end_timestamp, measure_length;
+       clock_gettime(CLOCK_MONOTONIC,&end_timestamp);
+       timespec_sub(&measure_length, &end_timestamp, &reset_timestamp);
 
-       if (seconds_start == -1) seconds_start=0;
-       save_results(argc, argv, seconds-seconds_start);
+       save_results(argc, argv, measure_length.tv_sec);
 
        return 0;
 }