]> rtime.felk.cvut.cz Git - frescor/fwp.git/commitdiff
Make wme_test compilable with FRSH and FWP
authorMichal Sojka <sojkam1@fel.cvut.cz>
Thu, 6 Aug 2009 16:19:41 +0000 (18:19 +0200)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Mon, 10 Aug 2009 16:18:58 +0000 (18:18 +0200)
fwp/lib/frsh_fwp/fwp_res.h
wme_test/Makefile.omk
wme_test/wclient.c

index fd0879785df9e2d4162c72ad56e5a76699f45730..75c2f43a3337ce39f97fcfa7dfb6169198332f42 100644 (file)
 int fra_fwp_init();
 int fres_block_register_fwp();
 
+#include <fres_container.h>
+FRES_CONTAINER_ACCESSOR(FWP_SCHED, fwp_sched)
+
+#include <fres_contract.h>
+FRES_CONTRACT_ACCESSOR(fwp_sched)
+
 #endif /* FWP_RES_H */
index c2815337b07714d1ca0b575ef5fad1b3746495eb..a25b7f92a7b0161c43b3918708846ad7ccc604f9 100644 (file)
@@ -1,7 +1,7 @@
 bin_PROGRAMS = fwptester fwptesterserver
 
 CFLAGS += -Wall -D_REENTRANT -g -O2 -DWITH_FWP
-lib_LOADLIBES = pthread  rt fwp ulut ncurses
+lib_LOADLIBES = pthread  rt frsh ulut ncurses
 
 fwptester_SOURCES = wclient.c common.c
 fwptesterserver_SOURCES = wserver.c common.c
index 4508169275ad8c47fbf90311232595f4d777015c..690671c5023bbeb6ffa20267d0660f21783432c5 100644 (file)
 #include <inttypes.h>
 
 #ifdef WITH_FWP
-#include <fwp_confdefs.h>
-#include <fwp.h>
-#include <fwp_vres.h>
+#include <frsh.h>
 #include <ncurses.h>
+#include <fwp_res.h>
 #endif
 
 #define MAX_STREAMS  10 
@@ -100,11 +99,9 @@ struct stream {
 #ifndef WITH_FWP
        struct sockaddr_in rem_addr;
 #else
-       fwp_contract_d_t contract_send;
-       fwp_contract_d_t contract_resp;
-       fwp_endpoint_d_t endpoint;
-       fwp_endpoint_d_t resp_endpoint;
-       fwp_vres_d_t vres;
+       frsh_send_endpoint_t endpoint;
+       frsh_receive_endpoint_t resp_endpoint;
+       frsh_vres_id_t vres, vres_rcv;
        uint16_t resp_port;
        struct receiver receiver;
        long wc_delay;          /* worst-case delay  */
@@ -163,8 +160,6 @@ void stopper()
 #ifdef WITH_FWP
        for (i=0; i < nr_streams; i++) {
                if (streams[i].receiver.valid) pthread_kill(streams[i].receiver.thread, SIGUSR1);
-               if (streams[i].contract_send) fwp_contract_cancel(streams[i].contract_send);
-               if (streams[i].contract_resp)fwp_contract_cancel(streams[i].contract_resp);
        }
 #else
        for (i=0; i < AC_NUM; i++) {
@@ -392,8 +387,9 @@ int recv_packet_native(intptr_t ac, struct msg_t *msg)
 #else
 int recv_packet_fwp(struct stream *stream, struct msg_t *msg)
 {
-       int     mlen;
-       mlen = fwp_recv(stream->resp_endpoint, msg, sizeof(*msg), 0);
+       size_t mlen;
+
+       mlen = frsh_receive_sync(stream->resp_endpoint, msg, sizeof(*msg), &mlen, NULL);
        return mlen;
 }
 #endif
@@ -530,9 +526,9 @@ send_packet_fwp(struct stream* stream, union msg_buff* buff)
        int ret;
 
        buff->msg.resp_port = htons(stream->resp_port);
-       ret = fwp_send(stream->endpoint, buff, stream->packet_size, 0);
-       
-       return ret;
+       ret = frsh_send_sync(stream->endpoint, buff, stream->packet_size);
+
+       return (ret == 0) ? 0 : -1;
 }
 #endif
 
@@ -615,35 +611,42 @@ out:
 #ifdef WITH_FWP
 static int negotiate_contract_for_stream_fwp(struct stream *stream)
 {
-       fwp_contract_t contract;
-       fwp_vres_d_t vres2;
+       frsh_contract_t contract;
        int ret;
+       frsh_rel_time_t budget, period, deadline;
+       frsh_signal_info_t si;
 
        /* Contract for client->server stream */
-       memset(&contract, 0, sizeof(contract));
-       contract.budget = stream->packet_size;
-       contract.period_usec = stream->period_usec;
-       contract.deadline_usec = 3*stream->period_usec;
+       frsh_contract_init(&contract);
+       frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+       period = frsh_usec_to_rel_time(stream->period_usec);
+       frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_REGULAR);
+       deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+       frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
        
-       stream->contract_send = fwp_contract_create(&contract);
-       ret = fwp_contract_negotiate(stream->contract_send, &stream->vres);
+       ret = frsh_contract_negotiate(&contract, &stream->vres);
+       frsh_contract_destroy(&contract);
        if (ret != 0) {
-               stream->contract_send = NULL;
-/*             fprintf(stderr, "Send contract was not accepted\n\n\n"); */
+               stream->vres = NULL;
+               fprintf(stderr, "Send contract was not accepted\n");
                return ret;
        }
 
        /* Contract for server->client stream */
-       memset(&contract, 0, sizeof(contract));
-       contract.budget = stream->packet_size;
-       contract.period_usec = stream->period_usec;
-       contract.deadline_usec = 3*stream->period_usec;
-
-       stream->contract_resp = fwp_contract_create(&contract);
-       ret = fwp_contract_negotiate(stream->contract_resp, &vres2);
+       /* TODO: Use group negotiation for these two contracts */
+       frsh_contract_init(&contract);
+       frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+       frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+       period = frsh_usec_to_rel_time(stream->period_usec);
+       frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_DUMMY);
+       deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+       frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
+       
+       ret = frsh_contract_negotiate(&contract, &stream->vres_rcv);
+       frsh_contract_destroy(&contract);
        if (ret != 0) {
-               stream->contract_resp = NULL;
-/*             fprintf(stderr, "Receive contract was not accepted\n\n\n"); */
+               fprintf(stderr, "Receive contract was not accepted\n");
                return ret;
        }
 
@@ -661,8 +664,13 @@ static void create_stream_endpoint_fwp(struct stream *stream)
 /*     fwp_endpoint_attr_t  attr; */
        int ret;
        struct hostent* ph;
+       frsh_contract_t c;
+       fres_block_fwp_sched *fwp_sched;
+
+       frsh_vres_get_contract(stream->vres, &c);
+       fwp_sched = fres_contract_get_fwp_sched(c);
 
-       stream->ac = fwp_vres_get_ac(stream->vres);
+       stream->ac = fwp_sched->ac_id;
 
 /*     fwp_endpoint_attr_init(&attr); */
 /*     fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_BESTEFFORT); */
@@ -670,19 +678,22 @@ static void create_stream_endpoint_fwp(struct stream *stream)
        ph = gethostbyname(server_addr);
        if (ph && ph->h_addr_list[0]) {
                struct in_addr *a = (struct in_addr *)(ph->h_addr_list[0]);
-               ret = fwp_send_endpoint_create(a->s_addr, BASE_PORT + stream->ac,
-                                              NULL, &stream->endpoint);
-               /* stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac); */
-               if (ret < 0) error(1, errno, "fwp_send_endpoint_create");
-               
-               ret = fwp_send_endpoint_bind(stream->endpoint, stream->vres);
-               if (ret != 0) error(1, errno, "fwp_send_endpoint_bind");
+               frsh_send_endpoint_protocol_info_t    spi = { NULL, 0 };
+               frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
+               frsh_endpoint_queueing_info_t qi = { .queue_size=0, .queue_policy=FRSH_QRP_OLDEST };
+               ret = frsh_send_endpoint_create(FRSH_NETPF_FWP, a->s_addr, BASE_PORT + stream->ac, 
+                                               spi, &stream->endpoint);
+               if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
                
-               ret = fwp_receive_endpoint_create(0, NULL, &stream->resp_endpoint);
+               ret = frsh_send_endpoint_bind(stream->vres, stream->endpoint);
+               if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
+
+               ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
+                                                  &stream->resp_endpoint);
                if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
                
                unsigned int port;
-               fwp_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL);
+               frsh_receive_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL, NULL);
                stream->resp_port = port;
 
                ret = pthread_create(&stream->receiver.thread, NULL, receiver, (void*)stream);
@@ -1037,7 +1048,7 @@ int main(int argc, char *argv[])
        reset_statistics();
 
 #ifdef WITH_FWP
-       rc = fwp_init();
+       rc = frsh_init();
        if (rc != 0) {
                error(1, errno, "FWP initialization failed");
        }
@@ -1104,7 +1115,12 @@ int main(int argc, char *argv[])
 
 #ifdef WITH_FWP
        //endwin();
-       fwp_done();
+       for (i=0; i < nr_streams; i++) {
+               if (streams[i].vres)
+                       frsh_contract_cancel(streams[i].vres);
+               if (streams[i].vres_rcv)
+                       frsh_contract_cancel(streams[i].vres_rcv);
+       }
 #else
        fprintf(stderr, "\nWaiting for threads to finish\n");
        wait_for_all_threads_to_finish();