--- /dev/null
+#include "common.h"
+#include <sched.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
+const unsigned int ac_to_tos[4] = {224,160,96,64};
+
+void set_rt_prio(int priority)
+{
+ int maxpri, minpri;
+ static struct sched_param param;
+
+ if ((maxpri = sched_get_priority_max(SCHED_FIFO)) == -1) {
+ perror("sched_get_priority_max fails");
+ exit(8);
+ }
+
+ if ((minpri = sched_get_priority_min(SCHED_FIFO)) == -1) {
+ perror("sched_get_priority_min fails");
+ exit(10);
+ }
+
+ if (priority > maxpri) {
+ fprintf(stderr,"maximum priority allowed is %d.\n", maxpri);
+ exit(9);
+ }
+ if (priority < minpri) {
+ fprintf(stderr,"minimum priority allowed is %d.\n", minpri);
+ exit(11);
+ }
+
+ param.sched_priority = priority;
+
+ if (sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
+ perror("sched_setscheduler fails");
+ exit(13);
+ }
+}
+
+void block_signals(void)
+{
+ sigset_t sigset;
+ int ret;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGINT);
+ sigaddset(&sigset, SIGTERM);
+ ret = pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+ if (ret != 0) {
+ perror("pthread_sigmask failed");
+ exit(1);
+ }
+}
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
+#include "common.h"
+#include <semaphore.h>
/*#define AC_VO 0
#define AC_VI 1
*/
#define PARAM_SERVERADDR 1
-#define BASE_PORT 5100
-#define AC_QUEUES 4
#define MAX_SENDENDPOINTS 10
-#define MTU 800
-// Turn on/off debugging
-#define DEBUG 0
-
-enum { AC_VO = 0,
- AC_VI = 1,
- AC_BE = 2,
- AC_BK = 3
-};
+int ac_sockfd[AC_QUEUES];
-const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
-const unsigned int ac_to_tos[4] = {224,160,96,64};
+struct receiver {
+ pthread_t thread;
+} receivers[AC_QUEUES];
-int ac_sockfd[AC_QUEUES];
FILE* logfd;
char* server_addr;
unsigned int nr_sepoints = sizeof(sepoint)/sizeof(*sepoint);
+sem_t sem_thread_finished;
+
+bool exit_flag = false;
+
void stopper()
+{
+ int i;
+ exit_flag = true;
+
+ /* Interrupt all receivers */
+ for (i=0; i<AC_QUEUES; i++) {
+ pthread_kill(receivers[i].thread, SIGUSR1);
+ }
+}
+
+void save_results()
{
int ac, i, maxi;
bool allzeros;
unsigned sum[AC_QUEUES];
- printf("\n Writing data to log file...\n");
+ printf("\nWriting data to log file...\n");
allzeros = true;
for (maxi = MAX_DELAY_US/GRANULARITY - 1; maxi >= 0; maxi--) {
}
if (!allzeros) break;
}
- if (maxi < 10000/GRANULARITY) maxi = 10000/GRANULARITY;
+ if (maxi < 3000/GRANULARITY) maxi = 3000/GRANULARITY;
for (ac = 0; ac < AC_QUEUES; ac++) {
sum[ac] = 0;
return sockfd;
}
+void empty_handler()
+{
+}
void* receiver(void* queue)
{
min_trans_time = ~0;
+ block_signals();
+ set_rt_prio(99);
+
ac = (int)queue;
rem_addr_length = sizeof(rem_addr);
- while (1) {
- while ((mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0,\
- (struct sockaddr*)&rem_addr, &rem_addr_length)) < 0) {
- if (errno == EINTR) continue;
- perror("Chyba pri prijimani pozadavku");
- return NULL;
+ while (!exit_flag) {
+ mlen = recvfrom(ac_sockfd[ac], &msg, sizeof(msg), 0, \
+ (struct sockaddr*)&rem_addr, &rem_addr_length);
+ if (mlen < 0) {
+ if (errno == EINTR) continue;
+ perror("Chyba pri prijimani pozadavku");
+ goto out;
}
clock_gettime(CLOCK_MONOTONIC,&recv_timestamp);
send_timestamp = msg.send_timestamp;
send_timestamp.tv_nsec,recv_timestamp.tv_sec,\
recv_timestamp.tv_nsec, trans_time_msec); */
}
+out:
+ sem_post(&sem_thread_finished);
+ return NULL;
}
void* sender(void* endpoint)
period.tv_nsec = spoint->period_nsec;
period.tv_sec = 0;
- while (1) {
+ block_signals();
+ set_rt_prio(90-ac);
+
+ while (!exit_flag) {
msg.seqn = seqn;
msg.tos = ac_to_tos[ac];
while (sendto(ac_sockfd[ac], &msg, sizeof(msg), 0,\
(struct sockaddr*)&rem_addr, sizeof(rem_addr)) < 0) {
if (errno == EINTR) continue;
- perror("Error while sending.");
- return NULL;
+ perror("Error while sending");
+ goto out;
}
-#if DEBUG
+#ifdef DEBUG
printf("%d", ac);
fflush(stdout);
#endif
clock_gettime(CLOCK_MONOTONIC,¤t_time);
timespec_sub(&interval,&time_to_wait,¤t_time);
nanosleep(&interval,NULL);
- }
+ }
+out:
+ sem_post(&sem_thread_finished);
+ return NULL;
}
int main(int argc, char *argv[])
perror("Signal handler registration error");
exit(1);
}
+
+ struct sigaction sa;
+ sa.sa_handler = empty_handler;
+ sa.sa_flags = 0; /* don't restart syscalls */
+
+ if (sigaction(SIGUSR1, &sa, NULL) < 0) {
+ perror("sigaction error");
+ exit(1);
+ }
+
+ sem_init(&sem_thread_finished, 0, 0);
/* create four receivers each per AC */
for (ac = AC_QUEUES - 1; ac >= 0; ac--) {
ac_sockfd[ac] = create_ac_socket(ac);
- rc = pthread_create(&thread, &attr, receiver, (void*) ac);
+ rc = pthread_create(&receivers[ac].thread, &attr, receiver, (void*) ac);
if (rc) {
printf("Error while creating receiver %d\n",rc);
return 1;
}
}
- while (1) {
+ while (!exit_flag) {
sleep(100000);
}
-
+
+ printf("Waiting for threads to finish\n");
+ /* Wait for all threads to finish */
+ for (i=0; i < nr_sepoints + AC_QUEUES; i++) {
+ sem_wait(&sem_thread_finished);
+ }
+
+ save_results();
+
return 0;
}