]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Allow fwp-timing to generate several streams simultaneously
authorMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 10 Nov 2009 10:37:58 +0000 (11:37 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Tue, 10 Nov 2009 10:37:58 +0000 (11:37 +0100)
Signed-off-by: Michal Sojka <sojkam1@fel.cvut.cz>
.topmsg
fwp/tests/timing/fwp-timing.c

diff --git a/.topmsg b/.topmsg
index 5973efdd1483f5de22102a501b6bdb9d92ac40cf..6611ddaebb7e3fa502f99a5291115e8a0ffa3a06 100644 (file)
--- a/.topmsg
+++ b/.topmsg
@@ -1,17 +1,4 @@
 From: Michal Sojka <sojkam1@fel.cvut.cz>
-Subject: Implemented synchronous and asynchronous sending
-
-The main goal of this big change is to avoid delays caused by CPU
-scheduler when rescheduling from application thread to VRES TX thread.
-According to our fwp-timing.c experiment, these delays can by even
-several milliseconds long.
-
-Now, whenever VRES capacity allows, send operation is invoked directly
-from application thread. Only if the budget is insufficient, the
-message can be queued for sending later by VRES thread, provided that
-asynchronous send was requested. In case of synchronous send, the
-application thread is blocked until the budget is replenished.
-
-Besides the above change, the code was cleaned up a lot.
+Subject: [PATCH] Allow fwp-timing to generate several streams simultaneously
 
 Signed-off-by: Michal Sojka <sojkam1@fel.cvut.cz>
index a6f8c6d336773509a64c7067a10ca043c7cd92f7..1a6c8ad8972a0e70d7c438cfba533b5226faa23b 100644 (file)
 #include <fwp_res.h>
 #include <error.h>
 #include <sys/mman.h>
-
-int opt_budget = 1024;
-int opt_period = 20;
-bool opt_async = false;
-
-struct in_addr src, dst;
+#include <semaphore.h>
 
 void set_rt_prio(int priority)
 {
@@ -57,8 +52,17 @@ void set_rt_prio(int priority)
        mlockall(MCL_CURRENT | MCL_FUTURE);
 }
 
+struct stream_params {
+       int budget;
+       int period_ms;
+       bool async;
+       struct in_addr src, dst;
+       int number;
+       int id;
+};
+
 
-int negotiate_contract(frsh_vres_id_t *vres)
+int negotiate_contract(struct stream_params *p, frsh_vres_id_t *vres)
 {
        frsh_contract_t contract;
        int ret;
@@ -76,8 +80,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
                NULL);
        if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
 
-       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, opt_budget, &budget);
-       period = fosa_msec_to_rel_time(opt_period);
+       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, p->budget, &budget);
+       period = fosa_msec_to_rel_time(p->period_ms);
        ret = frsh_contract_set_basic_params(&contract,
                                             &budget,
                                             &period,
@@ -96,7 +100,7 @@ int negotiate_contract(frsh_vres_id_t *vres)
 
        fwp = malloc(sizeof(*fwp));
        if (!fwp) PERROR_AND_EXIT(errno, "malloc");
-       fwp->src = src.s_addr;
+       fwp->src = p->src.s_addr;
        ret = fres_contract_add_fwp(contract, fwp);
        if (ret) PERROR_AND_EXIT(ret, "fres_contract_add_fwp");
 
@@ -108,7 +112,8 @@ int negotiate_contract(frsh_vres_id_t *vres)
        return 0;
 }
 
-void create_endpoints(frsh_vres_id_t vres,
+void create_endpoints(struct stream_params *p,
+                     frsh_vres_id_t vres,
                      frsh_send_endpoint_t *epsrc,
                      frsh_receive_endpoint_t *epdst)
 {
@@ -126,7 +131,7 @@ void create_endpoints(frsh_vres_id_t vres,
        frsh_receive_endpoint_get_params(*epdst, NULL, &port, NULL, NULL);
 
        ret = frsh_send_endpoint_create(FRSH_NETPF_FWP,
-                                       dst.s_addr, port, spi,
+                                       p->dst.s_addr, port, spi,
                                        epsrc);
        if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
                
@@ -141,6 +146,7 @@ static struct option long_opts[] = {
     { "source", 1, 0, 's' },
     { "dest",  1, 0, 'd' },
     { "async",         0, 0, 'a' },
+    { "number",        0, 0, 'n' },
     { 0, 0, 0, 0}
 };
 
@@ -153,26 +159,30 @@ usage(void)
        printf("  -s, --source <ip>  source IP address\n");
        printf("  -d, --dest <ip:port> destination IP address and port\n");
        printf("  -a, --async  Send packets asynchronously\n");
+       printf("  -n, --number Number of streams with the same parameters\n");
+       printf("  -/, --stream  New stream separator\n");
 }
 
-void parse_opts(int argc, char *argv[])
+int parse_opts(int *argc, char **argv[], struct stream_params *p)
 {
-       char opt;
+       int opt;
        int ret;
+       bool options_found = false;
 
-       while ((opt = getopt_long(argc, argv, "ab:p:s:d:", long_opts, NULL)) != -1) {
+       while ((opt = getopt_long(*argc, *argv, "/ab:p:s:d:n:", long_opts, NULL)) != -1) {
+               options_found = true;
                switch (opt) {
                case 'a':
-                       opt_async = true;
+                       p->async = true;
                        break;
                case 'b':
-                       opt_budget = atoi(optarg);
+                       p->budget = atoi(optarg);
                        break;
                case 'p':
-                       opt_period = atoi(optarg);
+                       p->period_ms = atoi(optarg);
                        break;
                case 's':
-                       ret = inet_aton(optarg, &src);
+                       ret = inet_aton(optarg, &p->src);
                        if (!ret) {
                                fprintf(stderr, "Source IP address not recognized: %s\n",
                                        optarg);
@@ -181,7 +191,7 @@ void parse_opts(int argc, char *argv[])
                        }
                        break;
                case 'd':
-                       ret = inet_aton(optarg, &dst);
+                       ret = inet_aton(optarg, &p->dst);
                        if (!ret) {
                                fprintf(stderr, "Destination IP address not recognized: %s\n",
                                        optarg);
@@ -189,11 +199,19 @@ void parse_opts(int argc, char *argv[])
                                exit(1);
                        }
                        break;
+               case 'n':
+                       p->number = atoi(optarg);
+                       break;
+               case '/':
+                       break;
                default:
                        usage();
                        exit(1);
                }
+               if (opt == '/')
+                       break; 
        }
+       return (options_found) ? 0 : -1;
 }
 
 volatile bool exit_flag = false;
@@ -246,52 +264,65 @@ struct msg {
        struct timespec ts;
 };
 
+struct receiver_params {
+       int budget;
+       frsh_receive_endpoint_t epdst;
+       int id;
+};
+
 void *receiver(void *arg)
 {
-       frsh_receive_endpoint_t epdst = (frsh_receive_endpoint_t)arg;
+       struct receiver_params *rp = arg;
+       frsh_receive_endpoint_t epdst = rp->epdst;
        size_t mlen;
        int ret;
        int last_cnt = -1;
        struct timespec tss, tsr;
        struct msg *msg;
-       msg = malloc(opt_budget);
+       msg = malloc(rp->budget);
        if (!msg) error(1, errno, "malloc msg");
 
        while (!exit_flag) {
-               ret = frsh_receive_sync(epdst, msg, opt_budget, &mlen, NULL);
+               ret = frsh_receive_sync(epdst, msg, rp->budget, &mlen, NULL);
                clock_gettime(CLOCK_MONOTONIC, &tsr);
                tss = msg->ts;
                if (msg->cnt != last_cnt+1)
                        printf("packet(s) lost!\n");
-               printf("%10d: %10.3lf ms\n",
-                      msg->cnt, tsdiff2d(&tsr, &tss)*1000);
+               printf("%3d: %10d: %10.3lf ms\n",
+                      rp->id, msg->cnt, tsdiff2d(&tsr, &tss)*1000);
                last_cnt = msg->cnt;
        }
+       free(rp);
        return NULL;
 }
 
-void run()
+sem_t finished;
+
+void *sender(void *arg)
 {
+       struct stream_params *p = arg;
        frsh_vres_id_t vres;
        frsh_send_endpoint_t epsrc;
-       frsh_receive_endpoint_t epdst;
+       struct receiver_params *rp;
        int ret;
        struct msg *msg;
        long int cnt=0;
        pthread_t receiver_id;
 
-       msg = malloc(opt_budget);
+       msg = malloc(p->budget);
        if (!msg) error(1, errno, "malloc msg");
 
-       ret = frsh_init();
-       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
-       
-       negotiate_contract(&vres);
-       create_endpoints(vres, &epsrc, &epdst);
+       negotiate_contract(p, &vres);
+
+       rp = malloc(sizeof(*rp));
+       rp->budget = p->budget;
+       rp->id = p->id;
+
+       create_endpoints(p, vres, &epsrc, &rp->epdst);
 
        set_rt_prio(50);
 
-       ret = pthread_create(&receiver_id, NULL, receiver, epdst);
+       ret = pthread_create(&receiver_id, NULL, receiver, rp);
        
        if (signal(SIGTERM, stopper) == SIG_ERR)
                error(1, errno, "Error in signal registration");
@@ -305,14 +336,14 @@ void run()
                clock_gettime(CLOCK_MONOTONIC, &tss);
                msg->cnt = cnt++;
                msg->ts = tss;
-               if (opt_async)
-                       ret = frsh_send_async(epsrc, msg, opt_budget);
+               if (p->async)
+                       ret = frsh_send_async(epsrc, msg, p->budget);
                else {
-                       ret = frsh_send_sync(epsrc, msg, opt_budget);
+                       ret = frsh_send_sync(epsrc, msg, p->budget);
                        clock_gettime(CLOCK_MONOTONIC, &next_period);
                }
-               next_period.tv_sec  += (opt_period/1000);
-               next_period.tv_nsec += (opt_period%1000) * 1000000;
+               next_period.tv_sec  += (p->period_ms/1000);
+               next_period.tv_nsec += (p->period_ms%1000) * 1000000;
                if (next_period.tv_nsec >= 1000000000) {
                        next_period.tv_nsec -= 1000000000;
                        next_period.tv_sec++;
@@ -320,16 +351,50 @@ void run()
                clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
                                &next_period, NULL);
        }
-       
+
        frsh_contract_cancel(vres);
+       free(p);
+       sem_post(&finished);
+       return NULL;
 }
 
 int main(int argc, char *argv[])
 {
-       src.s_addr = htonl(INADDR_LOOPBACK);
-       dst.s_addr = htonl(INADDR_LOOPBACK);
+       int ret;
+       int num = 0;
+       struct stream_params sp = {
+               .budget = 1024,
+               .period_ms = 20,
+               .async = false,
+               .src.s_addr = htonl(INADDR_LOOPBACK),
+               .dst.s_addr = htonl(INADDR_LOOPBACK),
+               .number = 1,
+       };
+       struct stream_params *p = NULL;
+
+       sem_init(&finished, 0, 0);
+       
+       ret = frsh_init();
+       if (ret) PERROR_AND_EXIT(ret, "frsh_init");
+       
+       do {
+               ret = parse_opts(&argc, &argv, &sp);
+               if (num == 0 || ret == 0) {
+                       int i;
+                       for (i=0; i<sp.number; i++) {
+                               pthread_t thread;
+                               p = malloc(sizeof(*p));
+                               if (!p) error(1, errno, "malloc");
+                               *p = sp;
+                               p->id = num;
+                               pthread_create(&thread, NULL, sender, p);
+                               num++;
+                       }
+               }
+       } while(ret == 0);
+
+       while (num--)
+               sem_wait(&finished);
        
-       parse_opts(argc, argv);
-       run();
        return 0;
 }