/* Input parameters */
enum ac ac; /* */
int bandwidth_bps; /* bits per second */
+ bool measure_saturation; /* Measure the saturation bandwidth first */
int jitter; /* percent */
/* Mulualy exclusive input parameters */
int packet_size;
snprintf(real, sizeof(real), "; real: sent %lld (%lld/s), received %lld (%lld/s)",
stream->sent, stream->sent/seconds,
stream->received, stream->received/seconds);
+ } else {
+ real[0]=0;
}
snprintf(stream_desc, n, "%d: %s %s (%d bytes per %s +-%s, %d packets/s)%s",
struct sockaddr_in rem_addr;
union msg_buff buff;
unsigned long int seqn;
- struct timespec time_to_wait, current_time, period, interval;
+ unsigned long int really_sent = 0;
+ struct timespec time_to_wait, current_time, period, interval, measure_start, measure_end;
struct hostent* ph;
struct stream* stream = (struct stream*) arg;
+ int bandwidth;
int packet_size;
unsigned period_usec;
char stream_desc[100];
-
- if (stream->bandwidth_bps == 0)
- goto out;
+ bool this_queue_is_full = false, this_queue_is_full_old = false;
set_rt_prio(90-stream->ac);
+ bandwidth = stream->bandwidth_bps;
+ if (bandwidth == 0) {
+ /* Either we are going to measure saturation bandwidth
+ or we will return before the main loop. */
+ bandwidth = 300*Mbit;
+ }
+
if (stream->packet_size) {
packet_size = stream->packet_size;
- period_usec = SEC_TO_USEC*packet_size*8/stream->bandwidth_bps;
+ period_usec = SEC_TO_USEC*packet_size*8/bandwidth;
+ if (period_usec == 0) period_usec = 1;
} else if (stream->period_usec) {
period_usec = stream->period_usec;
- packet_size = (long long)stream->bandwidth_bps/8 * period_usec/SEC_TO_USEC;
+ packet_size = (long long)bandwidth/8 * period_usec/SEC_TO_USEC;
} else {
char buf[200];
stream_to_text(buf, sizeof(buf), stream, 0);
stream->period_usec = period_usec;
stream->jitter = opt_jitter;
+ bandwidth = stream->bandwidth_bps;
stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
-
printf("%s\n", stream_desc);
if (packet_size < sizeof(struct msg_t)) {
exit(1);
}
-
memset(&rem_addr,0, sizeof(rem_addr));
rem_addr.sin_family = AF_INET;
block_signals();
+ if (bandwidth == 0 && !stream->measure_saturation)
+ goto out;
+
while (!exit_flag) {
buff.msg.seqn = seqn;
clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
- while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0,\
+ errno = 0;
+ while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0, \
(struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
if (errno == EINTR) continue;
if (errno == EAGAIN) {
- if (!some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
+ if (opt_wait_for_queue_is_full && !some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
some_queue_is_full = true;
reset_statistics();
}
-
+ this_queue_is_full = true;
break;
} else {
perror("Error while sending");
}
#ifdef DEBUG
- printf("%d", ac);
+ printf("%d", stream->ac);
fflush(stdout);
#endif
seqn++;
stream->sent++;
+ /* Measure saturation bandwidth */
+ if (stream->measure_saturation && this_queue_is_full && !this_queue_is_full_old) {
+ measure_start = buff.msg.send_timestamp;
+ }
+ this_queue_is_full_old = this_queue_is_full;
+
+ if (stream->measure_saturation && this_queue_is_full && errno != EAGAIN) {
+ really_sent++;
+ measure_end = buff.msg.send_timestamp;
+ timespec_sub(&interval,&measure_end,&measure_start);
+ if (really_sent >= 100 && interval.tv_sec >= 20) {
+ period_usec = (interval.tv_sec*SEC_TO_USEC + interval.tv_nsec/USEC_TO_NSEC) / really_sent;
+
+ bandwidth = (long long)packet_size*8*SEC_TO_USEC/period_usec;
+ stream->bandwidth_bps = bandwidth;
+ stream->period_usec = period_usec;
+ stream->measure_saturation = false;
+ reset_statistics();
+
+ stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
+ printf("\n%s\n", stream_desc);
+ }
+ }
+
+
/* |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
/* |-------------| nominal period*/
if (opt_jitter) {
if (*params == ':') {
params++;
- bw = strtol(params, &next_char, 10);
- if (next_char == params)
- return params;
- params = next_char;
+ if (strncmp(params, "SAT", 3) == 0) {
+ bw = 0;
+ sp->measure_saturation = true;
+ params+=3;
+ } else {
+ bw = strtol(params, &next_char, 10);
+ if (next_char == params)
+ return params;
+ params = next_char;
+ }
} else
bw = opt_def_bandwidth;
int main(int argc, char *argv[])
{
- int ac, i, rc, seconds, seconds_start;
+ int ac, i, rc, seconds;
pthread_attr_t attr;
pthread_t thread;
char opt;
exit(1);
}
+ reset_statistics();
if (nr_streams == 0)
parse_bandwidths("BE");
}
seconds = 0;
- seconds_start = -1;
while (!exit_flag) {
sleep(1);
seconds++;
- if (opt_wait_for_queue_is_full) {
- if (some_queue_is_full && seconds_start == -1) {
- printf("\nSome queue is full\n");
- seconds_start = seconds;
- }
- }
fprintf(stderr, "\r%3ds", seconds);
for (ac = 0; ac < AC_NUM; ac++) {
int delta = receivers[ac].received - receivers[ac].last_received;
for (i=0; i < nr_streams + AC_NUM; i++) {
sem_wait(&sem_thread_finished);
}
+ struct timespec end_timestamp, measure_length;
+ clock_gettime(CLOCK_MONOTONIC,&end_timestamp);
+ timespec_sub(&measure_length, &end_timestamp, &reset_timestamp);
- if (seconds_start == -1) seconds_start=0;
- save_results(argc, argv, seconds-seconds_start);
+ save_results(argc, argv, measure_length.tv_sec);
return 0;
}