]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - wme_test/wclient.c
Fixed handling of send when it would block.
[frescor/fwp.git] / wme_test / wclient.c
index 0ee5c33c393ed1a9e0a2f0c2dfd4461eee56daab..266ef30c790f56917f4868f5cd5429d864540cdf 100644 (file)
@@ -41,18 +41,6 @@ FILE* logfd;
 char* server_addr; 
 char logfname[100];
 
-struct msg_t {
-       unsigned int tos;
-       struct timespec send_timestamp;
-       unsigned long int seqn;
-       int stream;
-};
-
-union msg_buff {
-       struct msg_t msg;
-       char nonsense[BUFFSIZE];
-};
-
 /* maximal traffic delay in ms - 10 s*/
 #define MAX_DELAY_US 10000000
 
@@ -157,13 +145,11 @@ void save_results(int argc, char *argv[], int seconds)
        maxi++;
        if (maxi < mini) maxi = mini;
 
-       /* Calculate total number of received packets per AC */
-       for (ac = 0; ac < AC_NUM; ac++) { 
-               sum[ac] = 0;
-               for ( i = 0 ; i < maxi; i++) 
-                       sum[ac]+=delay_stats[ac][i];
-/*             if (sum[ac] == 0) */
-/*                     fprintf(stderr, "No response in AC %d\n", ac); */
+       /* Calculate total number of sent packets per AC */
+       for (ac = 0; ac < AC_NUM; ac++) sum[ac] = 0;
+       for (i = 0; i < nr_streams; i++) {
+               ac = streams[i].ac;
+               sum[ac] += streams[i].sent;
        }
 
 #if 0
@@ -247,7 +233,10 @@ int create_ac_socket(unsigned int ac)
                perror("Unable to open socket");
                return -1;
        }
-       
+       if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != 0) {
+               perror("set non-blocking socket");
+               return -1;
+       }
        if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
                perror("Unable to set socket");
                return -1;
@@ -273,11 +262,12 @@ void* receiver(void* queue)
 {
        struct msg_t    msg;
        struct  sockaddr_in rem_addr;
-       int     mlen;
+       int     mlen, ret;
        unsigned int ac, rem_addr_length; 
        unsigned long int trans_time_usec;
        unsigned long int min_trans_time;
-       struct timespec   send_timestamp,recv_timestamp, trans_time; 
+       struct timespec   send_timestamp,recv_timestamp, trans_time;
+       fd_set fdset;
        
        min_trans_time = ~0;
        
@@ -286,11 +276,18 @@ void* receiver(void* queue)
 
        ac = (int)queue;
         rem_addr_length = sizeof(rem_addr);
+       FD_ZERO(&fdset);
        while (!exit_flag) {
+               FD_SET(ac_sockfd[ac], &fdset);
+               ret = select(ac_sockfd[ac]+1, &fdset, NULL, NULL, NULL);
+               if (ret < 0) {
+                       if (errno == EINTR) continue;
+                       perror("receiver select");
+                       goto out;
+               }
                mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0,    \
                                (struct sockaddr*)&rem_addr, &rem_addr_length);
                if (mlen < 0) {
-                       if (errno == EINTR) continue;
                        perror("Chyba pri prijimani pozadavku");
                        goto out;
                }       
@@ -332,6 +329,9 @@ void* sender(void* arg)
        unsigned period_usec;
        char stream_desc[100];
 
+       if (stream->bandwidth_bps == 0)
+               goto out;
+
        set_rt_prio(90-stream->ac);
 
        if (opt_packet_size) {
@@ -378,12 +378,16 @@ void* sender(void* arg)
                buff.msg.stream = stream-streams;
                
                clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
-               
+
                while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0,\
                                (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
                                if (errno == EINTR) continue;
-                               perror("Error while sending");
-                               goto out;
+                               if (errno == EAGAIN) {
+                                       break;
+                               } else {
+                                       perror("Error while sending");
+                                       goto out;
+                               }
                }
 
 #ifdef DEBUG
@@ -439,7 +443,7 @@ char* parse_bandwidths(char *params)
                        params++;
 
                        bw = strtol(params, &next_char, 10);
-                       if (bw == 0)
+                       if (next_char == params)
                                return params;
                        params = next_char;
                } else
@@ -593,14 +597,14 @@ int main(int argc, char *argv[])
        seconds = 0;
        while (!exit_flag) {
                sleep(1);
-               fprintf(stderr, "\r");
+               seconds++;
+               fprintf(stderr, "\r%3ds", seconds);
                for (ac = 0; ac < AC_NUM; ac++) {
                        int delta = receivers[ac].received - receivers[ac].last_received;
                        receivers[ac].last_received = receivers[ac].received;
-                       fprintf(stderr, "%s: %5d (+%4d)  ", ac_to_text[ac], receivers[ac].received, delta);
+                       fprintf(stderr, " %s %5d %4d/s", ac_to_text[ac], receivers[ac].received, delta);
                }
                fflush(stderr);
-               seconds++;
                if (seconds == opt_count_sec)
                        stopper();
        }