#include <sys/ioctl.h>
#include <net/if.h>
#include <inttypes.h>
+#include <ncurses.h>
#ifdef WITH_FWP
-#include <fwp_confdefs.h>
-#include <fwp.h>
-#include <fwp_vres.h>
-#include <ncurses.h>
+#include <ul_logreg.h>
+#endif
+
+#ifdef WITH_FWP
+#include <frsh.h>
+#include <fwp_res.h>
+
+/* static UL_LOG_CUST(ulogd); */
+/* static ul_log_domain_t ulogd = {UL_LOGL_MSG, "wclient"}; */
+/* UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(init_ulogd_wclient, ulogd); */
+
#endif
#define MAX_STREAMS 10
int opt_granularity_usec = MIN_GRANULARITY;
bool opt_wait_for_queue_is_full; /* Don't gather any statistics until any queue is full */
char *opt_comment = NULL;
+bool opt_gui = false;
bool some_queue_is_full = false;
uint64_t reset_timestamp; /* [nsec] */
#ifndef WITH_FWP
struct sockaddr_in rem_addr;
#else
- fwp_contract_d_t contract_send;
- fwp_contract_d_t contract_resp;
- fwp_endpoint_d_t endpoint;
- fwp_endpoint_d_t resp_endpoint;
- fwp_vres_d_t vres;
+ frsh_send_endpoint_t endpoint;
+ frsh_receive_endpoint_t resp_endpoint;
+ frsh_vres_id_t vres, vres_rcv;
uint16_t resp_port;
struct receiver receiver;
- long wc_delay; /* worst-case delay */
#endif
/* Statistics */
pthread_mutex_t mutex;
unsigned long long sent, really_sent, received;
+ long wc_delay; /* worst-case delay */
};
static struct cmsg_ipi {
#ifdef WITH_FWP
for (i=0; i < nr_streams; i++) {
if (streams[i].receiver.valid) pthread_kill(streams[i].receiver.thread, SIGUSR1);
- if (streams[i].contract_send) fwp_contract_cancel(streams[i].contract_send);
- if (streams[i].contract_resp)fwp_contract_cancel(streams[i].contract_resp);
}
#else
for (i=0; i < AC_NUM; i++) {
#else
int recv_packet_fwp(struct stream *stream, struct msg_t *msg)
{
- int mlen;
- mlen = fwp_recv(stream->resp_endpoint, msg, sizeof(*msg), 0);
+ size_t mlen;
+
+ mlen = frsh_receive_sync(stream->resp_endpoint, msg, sizeof(*msg), &mlen, NULL);
return mlen;
}
#endif
}
pthread_mutex_unlock(&delay_stats_mutex);
-#ifdef WITH_FWP
- if (trans_time_usec > stream->wc_delay) {
- stream->wc_delay = trans_time_usec;
+ if (trans_time_usec > streams[msg.stream].wc_delay) {
+ streams[msg.stream].wc_delay = trans_time_usec;
}
-#endif
receivers[ac].received++;
pthread_mutex_lock(&streams[msg.stream].mutex);
static inline int
send_packet_fwp(struct stream* stream, union msg_buff* buff)
{
- int ret;
+ int ret = 0;
buff->msg.resp_port = htons(stream->resp_port);
- ret = fwp_send(stream->endpoint, buff, stream->packet_size, 0);
-
- return ret;
+ ret = frsh_send_async(stream->endpoint, buff, stream->packet_size);
+ if (ret) {
+ char msg[1024];
+ frsh_strerror(ret, msg, sizeof(msg));
+ fprintf(stderr, "frsh_send error: %s\n", msg);
+ }
+ return (ret == 0) ? 0 : -1;
}
#endif
struct timespec ts;
int ret;
-#ifndef WITH_FWP
- char stream_desc[100];
- stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
- printf("%s\n", stream_desc);
-#endif
+ if (!opt_gui) {
+ char stream_desc[100];
+ stream_to_text(stream_desc, sizeof(stream_desc), stream, 0);
+ printf("%s\n", stream_desc);
+ }
if (stream->bandwidth_bps == 0)
goto out;
ret = send_packet(stream, &buff);
if (ret < 0) {
+ stopper();
goto out;
}
#ifdef WITH_FWP
static int negotiate_contract_for_stream_fwp(struct stream *stream)
{
- fwp_contract_t contract;
- fwp_vres_d_t vres2;
+ frsh_contract_t contract;
int ret;
+ frsh_rel_time_t budget, period, deadline;
+ frsh_signal_info_t si;
/* Contract for client->server stream */
- memset(&contract, 0, sizeof(contract));
- contract.budget = stream->packet_size;
- contract.period_usec = stream->period_usec;
- contract.deadline_usec = 3*stream->period_usec;
+ frsh_contract_init(&contract);
+ frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+ frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+ period = frsh_usec_to_rel_time(stream->period_usec);
+ frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_REGULAR);
+ deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+ frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
- stream->contract_send = fwp_contract_create(&contract);
- ret = fwp_contract_negotiate(stream->contract_send, &stream->vres);
+ ret = frsh_contract_negotiate(&contract, &stream->vres);
+ frsh_contract_destroy(&contract);
if (ret != 0) {
- stream->contract_send = NULL;
-/* fprintf(stderr, "Send contract was not accepted\n\n\n"); */
+ stream->vres = NULL;
+ fprintf(stderr, "Send contract was not accepted\n");
return ret;
}
/* Contract for server->client stream */
- memset(&contract, 0, sizeof(contract));
- contract.budget = stream->packet_size;
- contract.period_usec = stream->period_usec;
- contract.deadline_usec = 3*stream->period_usec;
-
- stream->contract_resp = fwp_contract_create(&contract);
- ret = fwp_contract_negotiate(stream->contract_resp, &vres2);
+ /* TODO: Use group negotiation for these two contracts */
+ frsh_contract_init(&contract);
+ frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+ frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+ period = frsh_usec_to_rel_time(stream->period_usec);
+ frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_DUMMY);
+ deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+ frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
+
+ ret = frsh_contract_negotiate(&contract, &stream->vres_rcv);
+ frsh_contract_destroy(&contract);
if (ret != 0) {
- stream->contract_resp = NULL;
-/* fprintf(stderr, "Receive contract was not accepted\n\n\n"); */
+ fprintf(stderr, "Receive contract was not accepted\n");
return ret;
}
/* fwp_endpoint_attr_t attr; */
int ret;
struct hostent* ph;
+ frsh_contract_t c;
+ fres_block_fwp_sched *fwp_sched;
- stream->ac = fwp_vres_get_ac(stream->vres);
+ frsh_vres_get_contract(stream->vres, &c);
+ fwp_sched = fres_contract_get_fwp_sched(c);
+
+ stream->ac = fwp_sched->ac_id;
/* fwp_endpoint_attr_init(&attr); */
/* fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_BESTEFFORT); */
ph = gethostbyname(server_addr);
if (ph && ph->h_addr_list[0]) {
struct in_addr *a = (struct in_addr *)(ph->h_addr_list[0]);
- ret = fwp_send_endpoint_create(a->s_addr, BASE_PORT + stream->ac,
- NULL, &stream->endpoint);
- /* stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac); */
- if (ret < 0) error(1, errno, "fwp_send_endpoint_create");
-
- ret = fwp_send_endpoint_bind(stream->endpoint, stream->vres);
- if (ret != 0) error(1, errno, "fwp_send_endpoint_bind");
+ frsh_send_endpoint_protocol_info_t spi = { NULL, 0 };
+ frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
+ frsh_endpoint_queueing_info_t qi = { .queue_size=0, .queue_policy=FRSH_QRP_OLDEST };
+ ret = frsh_send_endpoint_create(FRSH_NETPF_FWP, a->s_addr, BASE_PORT + stream->ac,
+ spi, &stream->endpoint);
+ if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
- ret = fwp_receive_endpoint_create(0, NULL, &stream->resp_endpoint);
+ ret = frsh_send_endpoint_bind(stream->vres, stream->endpoint);
+ if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
+
+ ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
+ &stream->resp_endpoint);
if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
unsigned int port;
- fwp_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL);
+ frsh_receive_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL, NULL);
stream->resp_port = port;
ret = pthread_create(&stream->receiver.thread, NULL, receiver, (void*)stream);
void wait_for_all_threads_to_finish_fwp(void)
{
int i;
- /* Wait for all threads to finish */
- /* FIXME: */
-/* for (i=0; i < 2*nr_streams; i++) { */
-/* sem_wait(&sem_thread_finished); */
-/* } */
+ /* wait for all threads to finish */
+ for (i=0; i < 2*nr_streams; i++) {
+ sem_wait(&sem_thread_finished);
+ }
}
#else
void wait_for_all_threads_to_finish_native(void)
}
#endif
-#ifdef WITH_FWP
+WINDOW *logwin;
+
+#if 0
+struct log_params {
+ ul_log_domain_t *domain;
+ int level;
+ const char *format;
+ va_list ap;
+};
+
+
+int locked_log(WINDOW *logwin, void *arg)
+{
+ struct log_params *p = arg;
+ if(!(p->level&UL_LOGL_CONT)) {
+ p->level&=UL_LOGL_MASK;
+ if(p->level)
+ wprintw(logwin,"<%d>", p->level);
+ if(p->domain && p->domain->name)
+ wprintw(logwin,"%s: ",p->domain->name);
+ }
+ vwprintw(logwin, p->format, p->ap);
+ wnoutrefresh(logwin);
+ return 0;
+}
+
+void
+wclient_log_fnc(ul_log_domain_t *domain, int level,
+ const char *format, va_list ap)
+{
+ struct log_params p = {
+ .domain = domain,
+ .level = level,
+ .format = format,
+ };
+ va_copy(p.ap, ap);
+ va_end(ap);
+
+ use_window(logwin, locked_log, (void*)&p);
+}
+#endif
+
+
void init_gui()
{
- initscr();
- cbreak();
- noecho();
+ if (opt_gui) {
+ initscr();
+ cbreak();
+ noecho();
+/* nonl(); */
+/* intrflush(stdscr, FALSE); */
+/* keypad(stdscr, TRUE); */
+
+ logwin = newwin(0, 0, LINES/2, 0);
+ if (logwin) {
+ scrollok(logwin, TRUE);
+/* ul_log_redir(wclient_log_fnc, 0); */
+ }
+ }
}
+void end_gui()
+{
+ if (opt_gui) {
+ endwin();
+ if (logwin) {
+/* ul_log_redir(NULL, 0); */
+ }
+ }
+}
+
+
#define addfield(title, format, ...) \
move(y, x); \
x+=strlen(title)+1; \
addstr(str); \
}
-void print_status(int seconds)
+void print_status_gui(int seconds)
{
int i;
char str[200], s1[20];
addfield("Worst-case delay", "%s", usec_to_text(s1, s->wc_delay));
addfield("Received responses", "%lld", s->received);
}
- refresh();
+ wnoutrefresh(stdscr);
+ doupdate();
}
-#else
-void init_gui() {}
-void print_status(int seconds)
+
+void print_status_nogui(int seconds)
{
int ac;
fprintf(stderr, "\r%3ds", seconds);
}
fflush(stderr);
}
-#endif
+#ifdef WITH_FWP
+int print_log_domain(ul_log_domain_t *domain, void *context)
+{
+ printf("%s = %d\n", domain->name, domain->level);
+ return 0;
+}
+#endif
int main(int argc, char *argv[])
{
int i, rc, frames, seconds;
char opt;
- while ((opt = getopt(argc, argv, "B:b:C:c:g:I:j:o:qQ:s:T:")) != -1) {
+ while ((opt = getopt(argc, argv, "B:b:C:c:Gg:I:j:l:o:qQ:s:T:")) != -1) {
switch (opt) {
case 'B':
opt_def_bandwidth = atoi(optarg);
case 'c':
opt_count_sec = atoi(optarg);
break;
+ case 'G':
+ opt_gui = true;
+ break;
case 'g':
opt_granularity_usec = atoi(optarg);
if (opt_granularity_usec < MIN_GRANULARITY) {
opt_jitter = atoi(optarg);
#endif
break;
+#ifdef WITH_FWP
+ case 'l':
+ if (*optarg == '?') {
+ ul_logreg_for_each_domain(print_log_domain, NULL);
+ exit(0);
+ }
+ {
+ int ret;
+ ret = ul_log_domain_arg2levels(optarg);
+ if (ret)
+ error(1, EINVAL, "Error parsing -l argument at char %d\n", ret);
+ }
+ break;
+#endif
case 'o':
opt_output = optarg;
break;
fprintf(stderr, " -c count (number of seconds to run)\n");
fprintf(stderr, " -g histogram granularity [usec]\n");
fprintf(stderr, " -I <interface> send packets from this interface\n");
+#ifdef WITH_FWP
+ fprintf(stderr, " -l <loglevel> uLUt logging levels\n");
+#endif
fprintf(stderr, " -j send jitter (0-100) [%%]\n");
fprintf(stderr, " -o output filename (.dat will be appended)\n");
fprintf(stderr, " -q gather statistics only after some queue becomes full\n");
reset_statistics();
#ifdef WITH_FWP
- rc = fwp_init();
+ //ul_log_domain_arg2levels("6");
+ rc = frsh_init();
if (rc != 0) {
error(1, errno, "FWP initialization failed");
}
if (some_contract_not_accepted) {
stopper();
} else {
- //init_gui();
+ init_gui();
seconds = 1;
frames=0;
while (!exit_flag) {
-#ifdef WITH_FWP
- usleep(40000);
-#else
- sleep(1);
-#endif
- frames++;
- if (frames>=25) {
+ if (opt_gui) {
+ usleep(40000);
+ frames++;
+ if (frames>=25) {
+ seconds++;
+ frames = 0;
+ }
+ print_status_gui(seconds);
+ } else {
+ sleep(1);
seconds++;
- frames = 0;
+ print_status_nogui(seconds);
}
- print_status(seconds);
+
if (seconds == opt_count_sec)
stopper();
}
}
-#ifdef WITH_FWP
- //endwin();
- fwp_done();
-#else
+ end_gui();
+
fprintf(stderr, "\nWaiting for threads to finish\n");
wait_for_all_threads_to_finish();
+#ifdef WITH_FWP
+ for (i=0; i < nr_streams; i++) {
+ if (streams[i].vres)
+ frsh_contract_cancel(streams[i].vres);
+ if (streams[i].vres_rcv)
+ frsh_contract_cancel(streams[i].vres_rcv);
+ }
+#endif
+
struct timespec ts;
uint64_t end_timestamp, measure_length;
clock_gettime(CLOCK_REALTIME,&ts);
measure_length = end_timestamp - reset_timestamp;
save_results(argc, argv, measure_length/1000);
-#endif
return 0;
}