#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <string.h>
#include <pthread.h>
#include "common.h"
+#include <stdbool.h>
-#define BUFFSIZE 65536
+bool opt_same_interface = false;
-int ac_sockfd[AC_QUEUES];
+static int ac_sockfd[AC_NUM];
+
+struct receiver {
+ unsigned received, last_received;
+} receivers[AC_NUM];
void stopper()
{
int i;
- for (i = 0 ; i < AC_QUEUES; i++)
- close(ac_sockfd[AC_QUEUES]);
+ for (i = 0; i < AC_NUM; i++)
+ close(ac_sockfd[i]);
exit(0);
}
return -1;
}
+ if (opt_same_interface) {
+ int receive = 1;
+ if (setsockopt(sockfd, SOL_IP, IP_PKTINFO, &receive, sizeof(receive)) == -1) {
+ perror("setsockopt: IP_PKTINFO");
+ exit(1);
+ }
+ }
+
+
// bzero(&my_addr, sizeof(my_addr));
memset(&my_addr,0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
return -1;
}
- //tos = ((AC_QUEUES - ac) *2 - 1)*32;
+ //tos = ((AC_NUM - ac) *2 - 1)*32;
tos = ac_to_tos[ac];
if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
perror("Unable to set TOS");
void* qhandler(void* queue)
{
- char buff[BUFFSIZE];
+ union msg_buff buff;
struct sockaddr_in rem_addr;
int mlen;
- unsigned int ac, rem_addr_length;
+ unsigned int rem_addr_length;
+ intptr_t ac;
+ char cbufrec[512], cbufsend[512];
+ struct iovec iov;
+ struct msghdr msg;
+ struct in_pktinfo *ipi = NULL;
+ struct timespec ts;
- ac = (int) queue;
- printf("AC= %d\n",ac);
+ ac = (intptr_t) queue;
rem_addr_length=sizeof(rem_addr);
block_signals();
set_rt_prio(90-ac);
while (1) {
- while ((mlen = recvfrom(ac_sockfd[ac], buff, sizeof(buff) , 0, \
- (struct sockaddr*)&rem_addr, &rem_addr_length)) < 0) {
- if (errno == EINTR) continue;
- perror("Chyba pri prijimani pozadavku");
- return NULL;
+ struct cmsghdr *cmsg;
+
+ iov.iov_base = &buff;
+ iov.iov_len = sizeof(buff);
+ msg.msg_name = (void*)&rem_addr;
+ msg.msg_namelen = sizeof(rem_addr);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_flags = 0;
+ msg.msg_control = cbufrec;
+ msg.msg_controllen = sizeof(cbufrec);
+
+ while ((mlen = recvmsg(ac_sockfd[ac], &msg, 0)) < 0) {
+ if (errno == EINTR) continue;
+ perror("recvmsg");
+ return NULL;
+ }
+ clock_gettime(CLOCK_REALTIME, &ts);
+ buff.msg.sendback_timestamp = ts.tv_sec*1000000000 + ts.tv_nsec;
+
+
+ if (opt_same_interface) {
+ /* determine receiving interface */
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == SOL_IP) {
+ if (cmsg->cmsg_type == IP_PKTINFO) {
+/* char spec_dst[20], addr[20]; */
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ if (cmsg->cmsg_len <= sizeof(cbufsend)) {
+ struct in_pktinfo *ipi2;
+ msg.msg_control = cbufsend;
+ msg.msg_controllen = CMSG_LEN(sizeof(struct in_pktinfo));
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+ /* Initialize the payload: */
+ ipi2 = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ memset(ipi2, 0, sizeof(*ipi2));
+ ipi2->ipi_ifindex = ipi->ipi_ifindex;
+
+ } else {
+ fprintf(stderr, "cbufsend too small\n");
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ }
+/* strncpy(spec_dst, inet_ntoa(ipi->ipi_spec_dst), sizeof(spec_dst)-1); */
+/* strncpy(addr, inet_ntoa(ipi->ipi_addr), sizeof(addr)-1); */
+/* printf("pktinfo if=%d %s %s\n", ipi->ipi_ifindex, spec_dst, addr); */
+ }
+ }
+ }
+ } else {
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
}
-
#ifdef DEBUG
printf("%d",ac);
fflush(stdout);
-#endif
- while (sendto(ac_sockfd[ac], buff, mlen,0 ,(struct sockaddr*)&rem_addr, \
- sizeof(rem_addr)) < 0){
+#endif
+ receivers[ac].received++;
+ msg.msg_iov->iov_len = mlen;
+#ifdef WITH_FWP
+ /* resp_port is already stored in network order */
+ rem_addr.sin_port = buff.msg.resp_port;
+#endif
+ while (sendmsg(ac_sockfd[ac], &msg, 0) < 0) {
if (errno == EINTR) continue;
- perror("Chyba pri zapisu");
+ perror("sendmsg");
return NULL;
}
}
int main(int argc, char *argv[])
{
- int i,rc;
+ int rc;
+ intptr_t ac;
pthread_attr_t attr;
pthread_t thread;
+ char opt;
+
+
+ while ((opt = getopt(argc, argv, "I")) != -1) {
+ switch (opt) {
+ case 'I':
+ opt_same_interface = true;
+ break;
+ default:
+ fprintf(stderr, "Usage: %s [ options ]\n\n", argv[0]);
+ fprintf(stderr, "Options:\n");
+ fprintf(stderr, " -I send back through the same interface (bypass routing tables)\n");
+ exit(1);
+ }
+ }
pthread_attr_init(&attr);
if (signal(SIGTERM, stopper) == SIG_ERR) {
exit(1);
}
- for (i = 0; i < AC_QUEUES; i++) {
- ac_sockfd[i] = create_ac_socket(i);
- rc = pthread_create(&thread, &attr, qhandler, (void*) i);
+ for (ac = 0; ac < AC_NUM; ac++) {
+ ac_sockfd[ac] = create_ac_socket(ac);
+ rc = pthread_create(&thread, &attr, qhandler, (void*) ac);
}
while (1) {
- sleep(100000);
+ printf("\r");
+ for (ac = 0; ac < AC_NUM; ac++) {
+ int delta = receivers[ac].received - receivers[ac].last_received;
+ receivers[ac].last_received = receivers[ac].received;
+ fprintf(stderr, "%s %5d %4d/s ", ac_to_text[ac], receivers[ac].received, delta);
+ }
+ fflush(stdout);
+ sleep(1);
}
+ printf("\n");
return 0;