]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
wclient cleanup
authorMichal Sojka <sojkam1@fel.cvut.cz>
Sat, 19 Jan 2008 09:21:45 +0000 (10:21 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Sat, 19 Jan 2008 09:21:45 +0000 (10:21 +0100)
wme_test/wclient.c

index c9f06c4923b075817e800d3fd93714cd6d8155bf..9f8cbefb9aa39acf252d5921b2756f72845d9c1f 100644 (file)
@@ -70,6 +70,7 @@ struct stream {
        /* Mulualy exclusive input parameters */
        int packet_size;
        long period_usec;       /* all time units are in microseconds */
+       struct sockaddr_in rem_addr;
 
        /* Statistics */
        pthread_mutex_t mutex;
@@ -380,27 +381,120 @@ out:
        return NULL;
 }
 
+/** 
+ * Send a packet.
+ * 
+ * @return -1 in case of error, 1 in case of sucessfull send and 0
+ *        when all buffers are full.
+ */
+static inline int 
+send_packet(struct stream* stream, union msg_buff* buff)
+{
+       int ret = 1;
+       while (sendto(ac_sockfd[stream->ac], &buff, stream->packet_size, 0,
+                     (struct sockaddr*)&stream->rem_addr, sizeof(stream->rem_addr)) < 0) {
+               if (errno == EINTR) continue;
+               if (errno == EAGAIN) {
+                       if (opt_wait_for_queue_is_full && 
+                           !some_queue_is_full && 
+                           /* We use mutex as atomic test and set */
+                           (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
+                               some_queue_is_full = true;
+                               reset_statistics();
+                       }
+                       ret = 0;
+                       break;
+               } else {
+                       perror("Error while sending");
+                       ret = -1;
+                       break;
+               }
+       }
+       return ret;
+}
+
+static inline void
+wait_for_next_send(struct stream* stream, struct timespec *last_send_time)
+{
+       struct timespec time_to_wait, current_time, period, interval;
+       unsigned period_usec = stream->period_usec;
+
+       /*           |~~~+~~~| jitter interval (width = 2*stream->jitter percentage from period)*/
+       /* |-------------|     nominal period*/
+       if (stream->jitter) {
+               period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-stream->jitter)/100
+                                              + rand() % (2*period_usec*stream->jitter/100));
+       } else {
+               period.tv_nsec = USEC_TO_NSEC*(period_usec);
+       }
+       period.tv_sec = 0;
+       
+       timespec_add(&time_to_wait, last_send_time, &period);
+       clock_gettime(CLOCK_MONOTONIC,&current_time);
+       timespec_sub(&interval,&time_to_wait,&current_time);
+       nanosleep(&interval,NULL);
+}
+
+
 void* sender(void* arg)
 {
-       struct sockaddr_in rem_addr;
        union msg_buff buff;
        unsigned long int seqn;
-       struct timespec time_to_wait, current_time, period, interval;
-       struct hostent* ph;
        struct stream* stream = (struct stream*) arg;
-       int bandwidth;
-       int packet_size;
-       unsigned period_usec;
        char stream_desc[100];
+       int ret;
+
+       stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
+       printf("%s\n", stream_desc);
 
+       if (stream->bandwidth_bps == 0)
+               goto out;
+
+       seqn = 0;
+       
+       block_signals();
        set_rt_prio(90-stream->ac);
 
-       bandwidth = stream->bandwidth_bps;
-       if (bandwidth == 0) {
-               /* Either we are going to measure saturation bandwidth
-                  or we will return before the main loop. */
-               bandwidth = 300*Mbit;
+       while (!exit_flag) {
+
+/*             buff.msg.seqn = seqn++; */
+/*             buff.msg.tos = ac_to_tos[stream->ac]; */
+               buff.msg.stream = stream-streams;
+               
+               clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
+
+               ret = send_packet(stream, &buff);
+               if (ret < 0) {
+                       goto out;
+               }
+
+               pthread_mutex_lock(&stream->mutex);
+               stream->sent++;
+               if (ret > 0)
+                       stream->really_sent++;
+               pthread_mutex_unlock(&stream->mutex);
+
+#ifdef DEBUG
+               printf("%d", stream->ac);
+               fflush(stdout);
+#endif
+
+               wait_for_next_send(stream, &buff.msg.send_timestamp);
        }
+out:
+       sem_post(&sem_thread_finished);
+       return NULL;
+}
+
+static inline void
+calc_stream_params(struct stream *stream)
+{
+       int packet_size;
+       unsigned period_usec;
+       int bandwidth;
+       struct hostent* ph;
+
+       bandwidth = stream->bandwidth_bps;
 
        if (stream->packet_size) {
                packet_size = stream->packet_size;
@@ -415,94 +509,38 @@ void* sender(void* arg)
                fprintf(stderr, "Neither packet size nor period was specified for a stream %s\n", buf);
                exit(1);
        }
-       stream->packet_size = packet_size;
-       stream->period_usec = period_usec;
-       stream->jitter = opt_jitter;
-
-       bandwidth = stream->bandwidth_bps;
-
-       stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
-       printf("%s\n", stream_desc);
 
        if (packet_size < sizeof(struct msg_t)) {
                fprintf(stderr, "Packet size too small (min %d)\n", sizeof(struct msg_t));
                exit(1);
        }
 
-       memset(&rem_addr,0, sizeof(rem_addr));
+       stream->packet_size = packet_size;
+       stream->period_usec = period_usec;
+       stream->jitter = opt_jitter;
+
+       memset(&stream->rem_addr,0, sizeof(stream->rem_addr));
                
-       rem_addr.sin_family = AF_INET;
+       stream->rem_addr.sin_family = AF_INET;
        ph = gethostbyname(server_addr);
        if (ph) 
-               rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
+               stream->rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
        else {
                perror("Unknown server");
                exit(1);
        }
-       rem_addr.sin_port = htons(BASE_PORT + stream->ac);
-       seqn = 0;
-       
-       block_signals();
-
-       if (bandwidth == 0)
-               goto out;
-
-       while (!exit_flag) {
-
-/*             buff.msg.seqn = seqn; */
-/*             buff.msg.tos = ac_to_tos[stream->ac]; */
-               buff.msg.stream = stream-streams;
-               
-               clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
-
-               errno = 0;
-               while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0, \
-                               (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
-                               if (errno == EINTR) continue;
-                               if (errno == EAGAIN) {
-                                       if (opt_wait_for_queue_is_full && !some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
-                                               some_queue_is_full = true;
-                                               reset_statistics();
-                                       }
-                                       break;
-                               } else {
-                                       perror("Error while sending");
-                                       goto out;
-                               }
-               }
-
-#ifdef DEBUG
-               printf("%d", stream->ac);
-               fflush(stdout);
-#endif
-               seqn++;
-               pthread_mutex_lock(&stream->mutex);
-               stream->sent++;
-               if (errno != EAGAIN) {
-                       stream->really_sent++;
-               }
-               pthread_mutex_unlock(&stream->mutex);
+       stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac);
 
-               /*           |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
-               /* |-------------|     nominal period*/
-               if (opt_jitter) {
-                       period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-opt_jitter)/100
-                                                + rand() % (2*period_usec*opt_jitter/100));
-               } else {
-                       period.tv_nsec = USEC_TO_NSEC*(period_usec);
-               }
-               period.tv_sec = 0;
-
-               timespec_add(&time_to_wait,&buff.msg.send_timestamp,&period);
-               clock_gettime(CLOCK_MONOTONIC,&current_time);
-               timespec_sub(&interval,&time_to_wait,&current_time);
-               nanosleep(&interval,NULL);
-       }
-out:
-       sem_post(&sem_thread_finished);
-       return NULL;
 }
 
+/** 
+ * Parse -b parameter.
+ * 
+ * @param params String to parse
+ * 
+ * @return NULL in case of success, pointer to a problematic character
+ *        on error.
+ */
 char* parse_bandwidths(char *params)
 {
        struct stream *sp = &streams[nr_streams];
@@ -650,8 +688,6 @@ int main(int argc, char *argv[])
                exit(1);
        }
 
-       reset_statistics();
-
        if (nr_streams == 0)
                parse_bandwidths("BE");
                
@@ -684,6 +720,8 @@ int main(int argc, char *argv[])
 
        sem_init(&sem_thread_finished, 0, 0);
 
+       reset_statistics();
+
        /* create four receivers each per AC */
        for (ac = AC_NUM - 1; ac >= 0; ac--) {
                ac_sockfd[ac] = create_ac_socket(ac);
@@ -696,8 +734,10 @@ int main(int argc, char *argv[])
                        
        /* create sendpoints */
        for (i = 0; i < nr_streams; i++) {
-               pthread_mutex_init(&streams[i].mutex, NULL);
-               rc = pthread_create(&thread, &attr, sender, (void*) &streams[i]);
+               struct stream *s = &streams[i];
+               pthread_mutex_init(&s->mutex, NULL);
+               calc_stream_params(s);
+               rc = pthread_create(&thread, &attr, sender, (void*) s);
                if (rc) {
                        fprintf(stderr, "Error while creating sender %d\n",rc);
                        return 1;