]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - wme_test/wserver.c
wme_test: Make the message format independent of 32/64 bit architecture
[frescor/fwp.git] / wme_test / wserver.c
index c020c590477e6dbee7a46a270b6c1bdbc19f32b8..1649ed5a97eee9fbcdec456460162e37a389efac 100644 (file)
@@ -1,8 +1,10 @@
+#include <stdlib.h>
 #include <errno.h>
 
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <arpa/inet.h>
 
 #include <signal.h>
 #include <sys/wait.h>
 #include <time.h>
 #include <string.h>
 #include <pthread.h>
+#include "common.h"
+#include <stdbool.h>
 
-#define BASE_PORT      5100
-#define        BUFFSIZE        65536
-#define AC_QUEUES      4
+bool opt_same_interface = false;
 
-const int prio_to_ac[8] = {2,3,3,2,1,1,0,0};
-int ac_sockfd[8]; // sockfd for every (8) TOS values which are mapped to 4 ACs
+static int ac_sockfd[AC_NUM];
 
+struct receiver {
+       unsigned received, last_received;
+} receivers[AC_NUM];
 
-void reaper()
+
+void stopper()
 {
-       union wait status;
-       
-       while (wait3(&status, WNOHANG, (struct rusage *)0)>0);
-       
-       signal(SIGCHLD, reaper);        // re-register (POSIX like) not BSD like
+       int i;
+
+       for (i = 0; i < AC_NUM; i++) 
+               close(ac_sockfd[i]);
+
+       exit(0);
 }
 
 int create_ac_socket(unsigned int ac) 
@@ -48,6 +54,15 @@ int create_ac_socket(unsigned int ac)
                return -1;
        }
 
+       if (opt_same_interface) {
+               int receive = 1;
+               if (setsockopt(sockfd, SOL_IP, IP_PKTINFO, &receive, sizeof(receive)) == -1) {
+                       perror("setsockopt: IP_PKTINFO");
+                       exit(1);
+               }
+       }
+
+
    //  bzero(&my_addr, sizeof(my_addr));
        memset(&my_addr,0, sizeof(my_addr));
        my_addr.sin_family = AF_INET;
@@ -60,7 +75,8 @@ int create_ac_socket(unsigned int ac)
                return -1;
        }
        
-       tos = ((AC_QUEUES - ac) *2 - 1)*32;
+       //tos = ((AC_NUM - ac) *2 - 1)*32;
+       tos = ac_to_tos[ac];
        if (setsockopt(sockfd, SOL_IP, IP_TOS, &tos, sizeof(tos))) {
                perror("Unable to set TOS");
                close(sockfd);
@@ -72,26 +88,92 @@ int create_ac_socket(unsigned int ac)
 
 void* qhandler(void* queue)
 {
-       char    buff[BUFFSIZE];
+       union msg_buff buff;
        struct  sockaddr_in rem_addr;
        int     mlen;
        unsigned int ac, rem_addr_length; 
+       char cbufrec[512], cbufsend[512];
+       struct iovec  iov;
+       struct msghdr msg;
+       struct in_pktinfo *ipi = NULL;
+       struct timespec ts;
        
        ac = (int) queue;
-       printf("AC= %d\n",ac);
         rem_addr_length=sizeof(rem_addr);
+
+       block_signals();
+       set_rt_prio(90-ac);
+
        while (1) {
-               while ((mlen = recvfrom(ac_sockfd[ac], buff, BUFFSIZE , 0,\
-                       (struct sockaddr*)&rem_addr, &rem_addr_length)) < 0) {
-                           if (errno == EINTR) continue;
-                           perror("Chyba pri prijimani pozadavku");
-                           return NULL;
-               }       
-               
-               while (sendto(ac_sockfd[ac], buff, mlen,0 ,(struct sockaddr*)&rem_addr, \
-                       sizeof(rem_addr)) < 0){
+               struct cmsghdr *cmsg;
+
+               iov.iov_base = &buff;
+               iov.iov_len = sizeof(buff);
+               msg.msg_name = (void*)&rem_addr;
+               msg.msg_namelen = sizeof(rem_addr);
+               msg.msg_iov = &iov;
+               msg.msg_iovlen = 1;
+               msg.msg_flags = 0;
+               msg.msg_control = cbufrec;
+               msg.msg_controllen = sizeof(cbufrec);
+
+               while ((mlen = recvmsg(ac_sockfd[ac], &msg, 0)) < 0) {
+                       if (errno == EINTR) continue;
+                       perror("recvmsg");
+                       return NULL;
+               }
+               clock_gettime(CLOCK_REALTIME, &ts);
+               buff.msg.sendback_timestamp = ts.tv_sec*1000000000 + ts.tv_nsec;
+
+
+               if (opt_same_interface) {
+                       /* determine receiving interface */
+                       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+                               if (cmsg->cmsg_level == SOL_IP) {
+                                       if (cmsg->cmsg_type == IP_PKTINFO) {
+/*                                             char spec_dst[20], addr[20]; */
+                                               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+                                               if (cmsg->cmsg_len <= sizeof(cbufsend)) {
+                                                       struct in_pktinfo *ipi2;
+                                                       msg.msg_control = cbufsend;
+                                                       msg.msg_controllen = CMSG_LEN(sizeof(struct in_pktinfo));
+                                                       cmsg = CMSG_FIRSTHDR(&msg);
+                                                       cmsg->cmsg_level = SOL_IP;
+                                                       cmsg->cmsg_type = IP_PKTINFO;
+                                                       cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+                                                       /* Initialize the payload: */
+                                                       ipi2 = (struct in_pktinfo*)CMSG_DATA(cmsg);
+                                                       memset(ipi2, 0, sizeof(*ipi2));
+                                                       ipi2->ipi_ifindex = ipi->ipi_ifindex;
+
+                                               } else {
+                                                       fprintf(stderr, "cbufsend too small\n");
+                                                       msg.msg_control = NULL;
+                                                       msg.msg_controllen = 0;
+                                               }
+/*                                             strncpy(spec_dst, inet_ntoa(ipi->ipi_spec_dst), sizeof(spec_dst)-1); */
+/*                                             strncpy(addr,     inet_ntoa(ipi->ipi_addr),     sizeof(addr)-1); */
+/*                                             printf("pktinfo if=%d %s %s\n", ipi->ipi_ifindex, spec_dst, addr); */
+                                       }
+                               }
+                       }
+               } else {
+                       msg.msg_control = NULL;
+                       msg.msg_controllen = 0;
+               }
+#ifdef DEBUG
+               printf("%d",ac);
+               fflush(stdout);
+#endif
+               receivers[ac].received++;
+               msg.msg_iov->iov_len = mlen;
+#ifdef WITH_FWP
+               /* resp_port is already stored in network order */
+               rem_addr.sin_port = buff.msg.resp_port;
+#endif
+               while (sendmsg(ac_sockfd[ac], &msg, 0) < 0) {
                            if (errno == EINTR) continue;
-                           perror("Chyba pri zapisu");
+                           perror("sendmsg");
                            return NULL;
                }
        }
@@ -100,27 +182,54 @@ void* qhandler(void* queue)
 
 int main(int argc, char *argv[])
 {
-       int i,rc;
+       int ac,rc;
        pthread_attr_t attr;
        pthread_t thread;
 
+       char opt;
+
+
+       while ((opt = getopt(argc, argv, "I")) != -1) {
+               switch (opt) {
+                       case 'I':
+                               opt_same_interface = true;
+                               break;
+                       default:
+                               fprintf(stderr, "Usage: %s [ options ]\n\n", argv[0]);
+                               fprintf(stderr, "Options:\n");
+                               fprintf(stderr, "    -I  send back through the same interface (bypass routing tables)\n");
+                               exit(1);
+               }
+       }
        pthread_attr_init(&attr);
 
-       if (signal(SIGCHLD, reaper)==SIG_ERR) {
-               perror("Chyba v registraci obsluhy signalu");
-               return -1;
+       if (signal(SIGTERM, stopper) == SIG_ERR) {
+               perror("Signal handler registration error");
+               exit(1);
+       }
+               
+       if (signal(SIGINT, stopper) == SIG_ERR) {
+               perror("Signal handler registration error");
+               exit(1);
        }
-       
 
-       for (i = 0; i < 4; i++) {
-               ac_sockfd[i] = create_ac_socket(i);
-               rc = pthread_create(&thread, &attr, qhandler, (void*) i); 
+       for (ac = 0; ac < AC_NUM; ac++) {
+               ac_sockfd[ac] = create_ac_socket(ac);
+               rc = pthread_create(&thread, &attr, qhandler, (void*) ac); 
 
        }
        
        while (1) {
-               sleep(2000);
+               printf("\r");
+               for (ac = 0; ac < AC_NUM; ac++) {
+                       int delta = receivers[ac].received - receivers[ac].last_received;
+                       receivers[ac].last_received = receivers[ac].received;
+                       fprintf(stderr, "%s %5d %4d/s ", ac_to_text[ac], receivers[ac].received, delta);
+               }
+               fflush(stdout);
+               sleep(1);
        }
+       printf("\n");
 
        return 0;