]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Allow wclient and wserver to send packets with a specified interface.
authorMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 8 Feb 2008 08:12:41 +0000 (09:12 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Fri, 8 Feb 2008 08:12:41 +0000 (09:12 +0100)
In combination with send-to-self patch (http://www.ssi.bg/~ja/#loop),
this allows to have both wserver and wclient running in one machine
and use multiple interfaces for communication.

wme_test/wclient.c
wme_test/wserver.c

index 7c11c62593632a837c071f8927bf6388aa531c90..375b1c9bc27d7296d90d357a106803bb4112fc53 100644 (file)
@@ -17,6 +17,8 @@
 #include <stdbool.h>
 #include "common.h"
 #include <semaphore.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
 
 #ifdef WITH_FWP
 #include <fwp_proto.h>
@@ -28,6 +30,7 @@
 unsigned opt_packet_size = 800;
 int opt_send_buf_size = -1;
 unsigned opt_period_usec = 10*MSEC_TO_USEC;
+char   *opt_interface;
 unsigned opt_jitter = 0;
 char    *opt_output = "delay_stats";
 unsigned opt_count_sec = 0;
@@ -94,6 +97,13 @@ struct stream {
        unsigned long long sent, really_sent, received;
 };
 
+static struct cmsg_ipi {
+       struct cmsghdr cm;
+       struct in_pktinfo ipi;
+} cmsg = { {sizeof(struct cmsg_ipi), SOL_IP, IP_PKTINFO},
+          {0, }};
+int cmsg_len = 0;
+
 /*
 struct send_endpoint sepoint[] = {
        { .ac = AC_VO, .period_usec=200*MSEC_TO_USEC, .bandwidth_bps = 34*Kbit },
@@ -352,7 +362,7 @@ void* receiver(void* queue)
                server_timestamp = msg.sendback_timestamp;
 
                /* Check whether this message was sent after reset_statistics() */
-               
+
                if ((ret = timespec_sub_usec(&send_timestamp, &reset_timestamp)) < 0) {
                        continue; /* If so, don't count it */
                }
@@ -362,7 +372,7 @@ void* receiver(void* queue)
                server_to_client_usec = timespec_sub_usec(&recv_timestamp, &server_timestamp);
 
                pthread_mutex_lock(&delay_stats_mutex);
-               if (trans_time_usec < MAX_DELAY_US) {
+               if (trans_time_usec < MAX_DELAY_US && trans_time_usec >= 0) {
                        delay_stats[ac][trans_time_usec/opt_granularity_usec].csc++;
                }
                if (client_to_server_usec < MAX_DELAY_US && client_to_server_usec >= 0) {
@@ -402,9 +412,22 @@ out:
 static inline int 
 send_packet_native(struct stream* stream, union msg_buff* buff)
 {
+       struct iovec  iov;
+       struct msghdr msg;
+
+       iov.iov_base = buff;
+       iov.iov_len = stream->packet_size;
+       msg.msg_name = (void*)&stream->rem_addr;
+       msg.msg_namelen = sizeof(stream->rem_addr);
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+       msg.msg_flags = 0;
+       msg.msg_control = &cmsg;
+       msg.msg_controllen = cmsg_len;
+
        int ret = 1;
-       while (sendto(ac_sockfd[stream->ac], buff, stream->packet_size, 0,
-                     (struct sockaddr*)&stream->rem_addr, sizeof(stream->rem_addr)) < 0) {
+       
+       while (sendmsg(ac_sockfd[stream->ac], &msg, 0) < 0) {
                if (errno == EINTR) continue;
                if (errno == EAGAIN) {
                        if (opt_wait_for_queue_is_full && 
@@ -709,7 +732,7 @@ int main(int argc, char *argv[])
        char opt;
 
 
-       while ((opt = getopt(argc, argv, "B:b:c:g:j:o:qQ:s:T:")) != -1) {
+       while ((opt = getopt(argc, argv, "B:b:c:g:I:j:o:qQ:s:T:")) != -1) {
                switch (opt) {
                        case 'B':
                                opt_def_bandwidth = atoi(optarg);
@@ -736,6 +759,9 @@ int main(int argc, char *argv[])
                                        exit(1);
                                }
                                break;
+                       case 'I':
+                               opt_interface = optarg;
+                               break;
                        case 'j':
                                opt_jitter = atoi(optarg);
                                break;
@@ -761,6 +787,7 @@ int main(int argc, char *argv[])
                                fprintf(stderr, "    -b  bandwidth of streams (VO|VI|BE|BK)[:<kbit>][@<msec> or /<bytes>][,...]\n");
                                fprintf(stderr, "    -c  count (number of seconds to run)\n");
                                fprintf(stderr, "    -g  histogram granularity [usec]\n");
+                               fprintf(stderr, "    -I  <interface> send packets from this interface");
                                fprintf(stderr, "    -j  send jitter (0-100) [%%]\n");
                                fprintf(stderr, "    -o  output filename (.dat will be appended)\n");
                                fprintf(stderr, "    -q  gather statistics only after some queue becomes full\n");
@@ -829,6 +856,18 @@ int main(int argc, char *argv[])
                }
        }               
                        
+       if (opt_interface) {
+               struct ifreq ifr;
+
+               memset(&ifr, 0, sizeof(ifr));
+               strncpy(ifr.ifr_name, opt_interface, IFNAMSIZ-1);
+               if (ioctl(ac_sockfd[AC_VO], SIOCGIFINDEX, &ifr) < 0) {
+                       fprintf(stderr, "unknown iface %s\n", opt_interface);
+                       exit(1);
+               }
+               cmsg.ipi.ipi_ifindex = ifr.ifr_ifindex;
+               cmsg_len = sizeof(cmsg);
+       }
        /* create sendpoints */
        for (i = 0; i < nr_streams; i++) {
                struct stream *s = &streams[i];
index c6d0bfa1354bead028c07fc333fa20a4625c3545..03dbd855028f86196e999f7a8d2312032a2f41b5 100644 (file)
@@ -4,6 +4,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <arpa/inet.h>
 
 #include <signal.h>
 #include <sys/wait.h>
@@ -14,6 +15,9 @@
 #include <string.h>
 #include <pthread.h>
 #include "common.h"
+#include <stdbool.h>
+
+bool opt_same_interface = false;
 
 int ac_sockfd[AC_NUM];
 
@@ -50,6 +54,15 @@ int create_ac_socket(unsigned int ac)
                return -1;
        }
 
+       if (opt_same_interface) {
+               int receive = 1;
+               if (setsockopt(sockfd, SOL_IP, IP_PKTINFO, &receive, sizeof(receive)) == -1) {
+                       perror("setsockopt: IP_PKTINFO");
+                       exit(1);
+               }
+       }
+
+
    //  bzero(&my_addr, sizeof(my_addr));
        memset(&my_addr,0, sizeof(my_addr));
        my_addr.sin_family = AF_INET;
@@ -79,6 +92,10 @@ void* qhandler(void* queue)
        struct  sockaddr_in rem_addr;
        int     mlen;
        unsigned int ac, rem_addr_length; 
+       char cbufrec[512], cbufsend[512];
+       struct iovec  iov;
+       struct msghdr msg;
+       struct in_pktinfo *ipi = NULL;
        
        ac = (int) queue;
        printf("AC= %d\n",ac);
@@ -88,23 +105,70 @@ void* qhandler(void* queue)
        set_rt_prio(90-ac);
 
        while (1) {
-               while ((mlen = recvfrom(ac_sockfd[ac], &buff, sizeof(buff) , 0, \
-                       (struct sockaddr*)&rem_addr, &rem_addr_length)) < 0) {
-                           if (errno == EINTR) continue;
-                           perror("Chyba pri prijimani pozadavku");
-                           return NULL;
+               struct cmsghdr *cmsg;
+
+               iov.iov_base = &buff;
+               iov.iov_len = sizeof(buff);
+               msg.msg_name = (void*)&rem_addr;
+               msg.msg_namelen = sizeof(rem_addr);
+               msg.msg_iov = &iov;
+               msg.msg_iovlen = 1;
+               msg.msg_flags = 0;
+               msg.msg_control = cbufrec;
+               msg.msg_controllen = sizeof(cbufrec);
+
+               while ((mlen = recvmsg(ac_sockfd[ac], &msg, 0)) < 0) {
+                       if (errno == EINTR) continue;
+                       perror("recvmsg");
+                       return NULL;
                }
                clock_gettime(CLOCK_REALTIME, &buff.msg.sendback_timestamp);
 
+
+               if (opt_same_interface) {
+                       /* determine receiving interface */
+                       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+                               if (cmsg->cmsg_level == SOL_IP) {
+                                       if (cmsg->cmsg_type == IP_PKTINFO) {
+/*                                             char spec_dst[20], addr[20]; */
+                                               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+                                               if (cmsg->cmsg_len <= sizeof(cbufsend)) {
+                                                       struct in_pktinfo *ipi2;
+                                                       msg.msg_control = cbufsend;
+                                                       msg.msg_controllen = CMSG_LEN(sizeof(struct in_pktinfo));
+                                                       cmsg = CMSG_FIRSTHDR(&msg);
+                                                       cmsg->cmsg_level = SOL_IP;
+                                                       cmsg->cmsg_type = IP_PKTINFO;
+                                                       cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+                                                       /* Initialize the payload: */
+                                                       ipi2 = (struct in_pktinfo*)CMSG_DATA(cmsg);
+                                                       memset(ipi2, 0, sizeof(*ipi2));
+                                                       ipi2->ipi_ifindex = ipi->ipi_ifindex;
+
+                                               } else {
+                                                       fprintf(stderr, "cbufsend too small\n");
+                                                       msg.msg_control = NULL;
+                                                       msg.msg_controllen = 0;
+                                               }
+/*                                             strncpy(spec_dst, inet_ntoa(ipi->ipi_spec_dst), sizeof(spec_dst)-1); */
+/*                                             strncpy(addr,     inet_ntoa(ipi->ipi_addr),     sizeof(addr)-1); */
+/*                                             printf("pktinfo if=%d %s %s\n", ipi->ipi_ifindex, spec_dst, addr); */
+                                       }
+                               }
+                       }
+               } else {
+                       msg.msg_control = NULL;
+                       msg.msg_controllen = 0;
+               }
 #ifdef DEBUG
                printf("%d",ac);
                fflush(stdout);
 #endif
                receivers[ac].received++;
-               while (sendto(ac_sockfd[ac], &buff, mlen,0 ,(struct sockaddr*)&rem_addr, \
-                       sizeof(rem_addr)) < 0){
+               msg.msg_iov->iov_len = mlen;
+               while (sendmsg(ac_sockfd[ac], &msg, 0) < 0) {
                            if (errno == EINTR) continue;
-                           perror("Chyba pri zapisu");
+                           perror("sendmsg");
                            return NULL;
                }
        }
@@ -117,6 +181,21 @@ int main(int argc, char *argv[])
        pthread_attr_t attr;
        pthread_t thread;
 
+       char opt;
+
+
+       while ((opt = getopt(argc, argv, "I")) != -1) {
+               switch (opt) {
+                       case 'I':
+                               opt_same_interface = true;
+                               break;
+                       default:
+                               fprintf(stderr, "Usage: %s [ options ]\n\n", argv[0]);
+                               fprintf(stderr, "Options:\n");
+                               fprintf(stderr, "    -I  send back through the same interface (bypass routing tables)\n");
+                               exit(1);
+               }
+       }
        pthread_attr_init(&attr);
 
        if (signal(SIGTERM, stopper) == SIG_ERR) {