From: Michal Sojka Date: Thu, 6 Aug 2009 16:19:41 +0000 (+0200) Subject: Make wme_test compilable with FRSH and FWP X-Git-Url: http://rtime.felk.cvut.cz/gitweb/frescor/fwp.git/commitdiff_plain/9384c4e8bb09a06dbb6b9e86871b330bb4fbd3bc Make wme_test compilable with FRSH and FWP --- diff --git a/fwp/lib/frsh_fwp/fwp_res.h b/fwp/lib/frsh_fwp/fwp_res.h index fd08797..75c2f43 100644 --- a/fwp/lib/frsh_fwp/fwp_res.h +++ b/fwp/lib/frsh_fwp/fwp_res.h @@ -54,4 +54,10 @@ int fra_fwp_init(); int fres_block_register_fwp(); +#include +FRES_CONTAINER_ACCESSOR(FWP_SCHED, fwp_sched) + +#include +FRES_CONTRACT_ACCESSOR(fwp_sched) + #endif /* FWP_RES_H */ diff --git a/wme_test/Makefile.omk b/wme_test/Makefile.omk index c281533..a25b7f9 100644 --- a/wme_test/Makefile.omk +++ b/wme_test/Makefile.omk @@ -1,7 +1,7 @@ bin_PROGRAMS = fwptester fwptesterserver CFLAGS += -Wall -D_REENTRANT -g -O2 -DWITH_FWP -lib_LOADLIBES = pthread rt fwp ulut ncurses +lib_LOADLIBES = pthread rt frsh ulut ncurses fwptester_SOURCES = wclient.c common.c fwptesterserver_SOURCES = wserver.c common.c diff --git a/wme_test/wclient.c b/wme_test/wclient.c index 4508169..690671c 100644 --- a/wme_test/wclient.c +++ b/wme_test/wclient.c @@ -23,10 +23,9 @@ #include #ifdef WITH_FWP -#include -#include -#include +#include #include +#include #endif #define MAX_STREAMS 10 @@ -100,11 +99,9 @@ struct stream { #ifndef WITH_FWP struct sockaddr_in rem_addr; #else - fwp_contract_d_t contract_send; - fwp_contract_d_t contract_resp; - fwp_endpoint_d_t endpoint; - fwp_endpoint_d_t resp_endpoint; - fwp_vres_d_t vres; + frsh_send_endpoint_t endpoint; + frsh_receive_endpoint_t resp_endpoint; + frsh_vres_id_t vres, vres_rcv; uint16_t resp_port; struct receiver receiver; long wc_delay; /* worst-case delay */ @@ -163,8 +160,6 @@ void stopper() #ifdef WITH_FWP for (i=0; i < nr_streams; i++) { if (streams[i].receiver.valid) pthread_kill(streams[i].receiver.thread, SIGUSR1); - if (streams[i].contract_send) fwp_contract_cancel(streams[i].contract_send); - if (streams[i].contract_resp)fwp_contract_cancel(streams[i].contract_resp); } #else for (i=0; i < AC_NUM; i++) { @@ -392,8 +387,9 @@ int recv_packet_native(intptr_t ac, struct msg_t *msg) #else int recv_packet_fwp(struct stream *stream, struct msg_t *msg) { - int mlen; - mlen = fwp_recv(stream->resp_endpoint, msg, sizeof(*msg), 0); + size_t mlen; + + mlen = frsh_receive_sync(stream->resp_endpoint, msg, sizeof(*msg), &mlen, NULL); return mlen; } #endif @@ -530,9 +526,9 @@ send_packet_fwp(struct stream* stream, union msg_buff* buff) int ret; buff->msg.resp_port = htons(stream->resp_port); - ret = fwp_send(stream->endpoint, buff, stream->packet_size, 0); - - return ret; + ret = frsh_send_sync(stream->endpoint, buff, stream->packet_size); + + return (ret == 0) ? 0 : -1; } #endif @@ -615,35 +611,42 @@ out: #ifdef WITH_FWP static int negotiate_contract_for_stream_fwp(struct stream *stream) { - fwp_contract_t contract; - fwp_vres_d_t vres2; + frsh_contract_t contract; int ret; + frsh_rel_time_t budget, period, deadline; + frsh_signal_info_t si; /* Contract for client->server stream */ - memset(&contract, 0, sizeof(contract)); - contract.budget = stream->packet_size; - contract.period_usec = stream->period_usec; - contract.deadline_usec = 3*stream->period_usec; + frsh_contract_init(&contract); + frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL); + frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget); + period = frsh_usec_to_rel_time(stream->period_usec); + frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_REGULAR); + deadline = frsh_usec_to_rel_time(3*stream->period_usec); + frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si); - stream->contract_send = fwp_contract_create(&contract); - ret = fwp_contract_negotiate(stream->contract_send, &stream->vres); + ret = frsh_contract_negotiate(&contract, &stream->vres); + frsh_contract_destroy(&contract); if (ret != 0) { - stream->contract_send = NULL; -/* fprintf(stderr, "Send contract was not accepted\n\n\n"); */ + stream->vres = NULL; + fprintf(stderr, "Send contract was not accepted\n"); return ret; } /* Contract for server->client stream */ - memset(&contract, 0, sizeof(contract)); - contract.budget = stream->packet_size; - contract.period_usec = stream->period_usec; - contract.deadline_usec = 3*stream->period_usec; - - stream->contract_resp = fwp_contract_create(&contract); - ret = fwp_contract_negotiate(stream->contract_resp, &vres2); + /* TODO: Use group negotiation for these two contracts */ + frsh_contract_init(&contract); + frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL); + frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget); + period = frsh_usec_to_rel_time(stream->period_usec); + frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_DUMMY); + deadline = frsh_usec_to_rel_time(3*stream->period_usec); + frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si); + + ret = frsh_contract_negotiate(&contract, &stream->vres_rcv); + frsh_contract_destroy(&contract); if (ret != 0) { - stream->contract_resp = NULL; -/* fprintf(stderr, "Receive contract was not accepted\n\n\n"); */ + fprintf(stderr, "Receive contract was not accepted\n"); return ret; } @@ -661,8 +664,13 @@ static void create_stream_endpoint_fwp(struct stream *stream) /* fwp_endpoint_attr_t attr; */ int ret; struct hostent* ph; + frsh_contract_t c; + fres_block_fwp_sched *fwp_sched; + + frsh_vres_get_contract(stream->vres, &c); + fwp_sched = fres_contract_get_fwp_sched(c); - stream->ac = fwp_vres_get_ac(stream->vres); + stream->ac = fwp_sched->ac_id; /* fwp_endpoint_attr_init(&attr); */ /* fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_BESTEFFORT); */ @@ -670,19 +678,22 @@ static void create_stream_endpoint_fwp(struct stream *stream) ph = gethostbyname(server_addr); if (ph && ph->h_addr_list[0]) { struct in_addr *a = (struct in_addr *)(ph->h_addr_list[0]); - ret = fwp_send_endpoint_create(a->s_addr, BASE_PORT + stream->ac, - NULL, &stream->endpoint); - /* stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac); */ - if (ret < 0) error(1, errno, "fwp_send_endpoint_create"); - - ret = fwp_send_endpoint_bind(stream->endpoint, stream->vres); - if (ret != 0) error(1, errno, "fwp_send_endpoint_bind"); + frsh_send_endpoint_protocol_info_t spi = { NULL, 0 }; + frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 }; + frsh_endpoint_queueing_info_t qi = { .queue_size=0, .queue_policy=FRSH_QRP_OLDEST }; + ret = frsh_send_endpoint_create(FRSH_NETPF_FWP, a->s_addr, BASE_PORT + stream->ac, + spi, &stream->endpoint); + if (ret < 0) error(1, errno, "frsh_send_endpoint_create()"); - ret = fwp_receive_endpoint_create(0, NULL, &stream->resp_endpoint); + ret = frsh_send_endpoint_bind(stream->vres, stream->endpoint); + if (ret != 0) error(1, errno, "frsh_send_endpoint_bind"); + + ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi, + &stream->resp_endpoint); if (ret != 0) error(1, errno, "fwp_receive_endpoint_create"); unsigned int port; - fwp_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL); + frsh_receive_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL, NULL); stream->resp_port = port; ret = pthread_create(&stream->receiver.thread, NULL, receiver, (void*)stream); @@ -1037,7 +1048,7 @@ int main(int argc, char *argv[]) reset_statistics(); #ifdef WITH_FWP - rc = fwp_init(); + rc = frsh_init(); if (rc != 0) { error(1, errno, "FWP initialization failed"); } @@ -1104,7 +1115,12 @@ int main(int argc, char *argv[]) #ifdef WITH_FWP //endwin(); - fwp_done(); + for (i=0; i < nr_streams; i++) { + if (streams[i].vres) + frsh_contract_cancel(streams[i].vres); + if (streams[i].vres_rcv) + frsh_contract_cancel(streams[i].vres_rcv); + } #else fprintf(stderr, "\nWaiting for threads to finish\n"); wait_for_all_threads_to_finish();