/* Mulualy exclusive input parameters */
int packet_size;
long period_usec; /* all time units are in microseconds */
+ struct sockaddr_in rem_addr;
/* Statistics */
pthread_mutex_t mutex;
return NULL;
}
+/**
+ * Send a packet.
+ *
+ * @return -1 in case of error, 1 in case of sucessfull send and 0
+ * when all buffers are full.
+ */
+static inline int
+send_packet(struct stream* stream, union msg_buff* buff)
+{
+ int ret = 1;
+ while (sendto(ac_sockfd[stream->ac], &buff, stream->packet_size, 0,
+ (struct sockaddr*)&stream->rem_addr, sizeof(stream->rem_addr)) < 0) {
+ if (errno == EINTR) continue;
+ if (errno == EAGAIN) {
+ if (opt_wait_for_queue_is_full &&
+ !some_queue_is_full &&
+ /* We use mutex as atomic test and set */
+ (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
+ some_queue_is_full = true;
+ reset_statistics();
+ }
+ ret = 0;
+ break;
+ } else {
+ perror("Error while sending");
+ ret = -1;
+ break;
+ }
+ }
+ return ret;
+}
+
+static inline void
+wait_for_next_send(struct stream* stream, struct timespec *last_send_time)
+{
+ struct timespec time_to_wait, current_time, period, interval;
+ unsigned period_usec = stream->period_usec;
+
+ /* |~~~+~~~| jitter interval (width = 2*stream->jitter percentage from period)*/
+ /* |-------------| nominal period*/
+ if (stream->jitter) {
+ period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-stream->jitter)/100
+ + rand() % (2*period_usec*stream->jitter/100));
+ } else {
+ period.tv_nsec = USEC_TO_NSEC*(period_usec);
+ }
+ period.tv_sec = 0;
+
+ timespec_add(&time_to_wait, last_send_time, &period);
+ clock_gettime(CLOCK_MONOTONIC,¤t_time);
+ timespec_sub(&interval,&time_to_wait,¤t_time);
+ nanosleep(&interval,NULL);
+}
+
+
void* sender(void* arg)
{
- struct sockaddr_in rem_addr;
union msg_buff buff;
unsigned long int seqn;
- struct timespec time_to_wait, current_time, period, interval;
- struct hostent* ph;
struct stream* stream = (struct stream*) arg;
- int bandwidth;
- int packet_size;
- unsigned period_usec;
char stream_desc[100];
+ int ret;
+
+ stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
+ printf("%s\n", stream_desc);
+ if (stream->bandwidth_bps == 0)
+ goto out;
+
+ seqn = 0;
+
+ block_signals();
set_rt_prio(90-stream->ac);
- bandwidth = stream->bandwidth_bps;
- if (bandwidth == 0) {
- /* Either we are going to measure saturation bandwidth
- or we will return before the main loop. */
- bandwidth = 300*Mbit;
+ while (!exit_flag) {
+
+/* buff.msg.seqn = seqn++; */
+/* buff.msg.tos = ac_to_tos[stream->ac]; */
+ buff.msg.stream = stream-streams;
+
+ clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
+
+ ret = send_packet(stream, &buff);
+ if (ret < 0) {
+ goto out;
+ }
+
+ pthread_mutex_lock(&stream->mutex);
+ stream->sent++;
+ if (ret > 0)
+ stream->really_sent++;
+ pthread_mutex_unlock(&stream->mutex);
+
+#ifdef DEBUG
+ printf("%d", stream->ac);
+ fflush(stdout);
+#endif
+
+ wait_for_next_send(stream, &buff.msg.send_timestamp);
}
+out:
+ sem_post(&sem_thread_finished);
+ return NULL;
+}
+
+static inline void
+calc_stream_params(struct stream *stream)
+{
+ int packet_size;
+ unsigned period_usec;
+ int bandwidth;
+ struct hostent* ph;
+
+ bandwidth = stream->bandwidth_bps;
if (stream->packet_size) {
packet_size = stream->packet_size;
fprintf(stderr, "Neither packet size nor period was specified for a stream %s\n", buf);
exit(1);
}
- stream->packet_size = packet_size;
- stream->period_usec = period_usec;
- stream->jitter = opt_jitter;
-
- bandwidth = stream->bandwidth_bps;
-
- stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
- printf("%s\n", stream_desc);
if (packet_size < sizeof(struct msg_t)) {
fprintf(stderr, "Packet size too small (min %d)\n", sizeof(struct msg_t));
exit(1);
}
- memset(&rem_addr,0, sizeof(rem_addr));
+ stream->packet_size = packet_size;
+ stream->period_usec = period_usec;
+ stream->jitter = opt_jitter;
+
+ memset(&stream->rem_addr,0, sizeof(stream->rem_addr));
- rem_addr.sin_family = AF_INET;
+ stream->rem_addr.sin_family = AF_INET;
ph = gethostbyname(server_addr);
if (ph)
- rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
+ stream->rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
else {
perror("Unknown server");
exit(1);
}
- rem_addr.sin_port = htons(BASE_PORT + stream->ac);
- seqn = 0;
-
- block_signals();
-
- if (bandwidth == 0)
- goto out;
-
- while (!exit_flag) {
-
-/* buff.msg.seqn = seqn; */
-/* buff.msg.tos = ac_to_tos[stream->ac]; */
- buff.msg.stream = stream-streams;
-
- clock_gettime(CLOCK_MONOTONIC,&buff.msg.send_timestamp);
-
- errno = 0;
- while (sendto(ac_sockfd[stream->ac], &buff, packet_size, 0, \
- (struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
- if (errno == EINTR) continue;
- if (errno == EAGAIN) {
- if (opt_wait_for_queue_is_full && !some_queue_is_full && (pthread_mutex_trylock(&queue_full_mutex) != EBUSY)) {
- some_queue_is_full = true;
- reset_statistics();
- }
- break;
- } else {
- perror("Error while sending");
- goto out;
- }
- }
-
-#ifdef DEBUG
- printf("%d", stream->ac);
- fflush(stdout);
-#endif
- seqn++;
- pthread_mutex_lock(&stream->mutex);
- stream->sent++;
- if (errno != EAGAIN) {
- stream->really_sent++;
- }
- pthread_mutex_unlock(&stream->mutex);
+ stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac);
- /* |~~~+~~~| jitter interval (width = 2*opt_jitter percentage from period)*/
- /* |-------------| nominal period*/
- if (opt_jitter) {
- period.tv_nsec = USEC_TO_NSEC*(period_usec*(100-opt_jitter)/100
- + rand() % (2*period_usec*opt_jitter/100));
- } else {
- period.tv_nsec = USEC_TO_NSEC*(period_usec);
- }
- period.tv_sec = 0;
-
- timespec_add(&time_to_wait,&buff.msg.send_timestamp,&period);
- clock_gettime(CLOCK_MONOTONIC,¤t_time);
- timespec_sub(&interval,&time_to_wait,¤t_time);
- nanosleep(&interval,NULL);
- }
-out:
- sem_post(&sem_thread_finished);
- return NULL;
}
+/**
+ * Parse -b parameter.
+ *
+ * @param params String to parse
+ *
+ * @return NULL in case of success, pointer to a problematic character
+ * on error.
+ */
char* parse_bandwidths(char *params)
{
struct stream *sp = &streams[nr_streams];
exit(1);
}
- reset_statistics();
-
if (nr_streams == 0)
parse_bandwidths("BE");
sem_init(&sem_thread_finished, 0, 0);
+ reset_statistics();
+
/* create four receivers each per AC */
for (ac = AC_NUM - 1; ac >= 0; ac--) {
ac_sockfd[ac] = create_ac_socket(ac);
/* create sendpoints */
for (i = 0; i < nr_streams; i++) {
- pthread_mutex_init(&streams[i].mutex, NULL);
- rc = pthread_create(&thread, &attr, sender, (void*) &streams[i]);
+ struct stream *s = &streams[i];
+ pthread_mutex_init(&s->mutex, NULL);
+ calc_stream_params(s);
+ rc = pthread_create(&thread, &attr, sender, (void*) s);
if (rc) {
fprintf(stderr, "Error while creating sender %d\n",rc);
return 1;