+#ifdef WITH_FWP
+static int negotiate_contract_for_stream_fwp(struct stream *stream)
+{
+ frsh_contract_t contract;
+ int ret;
+ frsh_rel_time_t budget, period, deadline;
+ frsh_signal_info_t si;
+
+ /* Contract for client->server stream */
+ frsh_contract_init(&contract);
+ frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+ frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+ period = frsh_usec_to_rel_time(stream->period_usec);
+ frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_REGULAR);
+ deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+ frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
+
+ ret = frsh_contract_negotiate(&contract, &stream->vres);
+ frsh_contract_destroy(&contract);
+ if (ret != 0) {
+ stream->vres = NULL;
+ fprintf(stderr, "Send contract was not accepted\n");
+ return ret;
+ }
+
+ /* Contract for server->client stream */
+ /* TODO: Use group negotiation for these two contracts */
+ frsh_contract_init(&contract);
+ frsh_contract_set_resource_and_label(&contract, FRSH_RT_NETWORK, FRSH_NETPF_FWP, NULL);
+ frsh_network_bytes_to_budget(FRSH_NETPF_FWP, stream->packet_size, &budget);
+ period = frsh_usec_to_rel_time(stream->period_usec);
+ frsh_contract_set_basic_params(&contract, &budget, &period, FRSH_WT_BOUNDED, FRSH_CT_DUMMY);
+ deadline = frsh_usec_to_rel_time(3*stream->period_usec);
+ frsh_contract_set_timing_reqs(&contract, false, &deadline, 0, si, 0, si);
+
+ ret = frsh_contract_negotiate(&contract, &stream->vres_rcv);
+ frsh_contract_destroy(&contract);
+ if (ret != 0) {
+ fprintf(stderr, "Receive contract was not accepted\n");
+ return ret;
+ }
+
+ /* We don't use the vres at server, since the server doesn't
+ * know the parameters. Instread, server uses plain
+ * sockets. */
+
+ return ret;
+}
+#endif
+
+#ifdef WITH_FWP
+static void create_stream_endpoint_fwp(struct stream *stream)
+{
+/* fwp_endpoint_attr_t attr; */
+ int ret;
+ struct hostent* ph;
+ frsh_contract_t c;
+ fres_block_fwp_sched *fwp_sched;
+
+ frsh_vres_get_contract(stream->vres, &c);
+ fwp_sched = fres_contract_get_fwp_sched(c);
+
+ stream->ac = fwp_sched->ac_id;
+
+/* fwp_endpoint_attr_init(&attr); */
+/* fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_BESTEFFORT); */
+
+ ph = gethostbyname(server_addr);
+ if (ph && ph->h_addr_list[0]) {
+ struct in_addr *a = (struct in_addr *)(ph->h_addr_list[0]);
+ frsh_send_endpoint_protocol_info_t spi = { NULL, 0 };
+ frsh_receive_endpoint_protocol_info_t rpi = { NULL, 0 };
+ frsh_endpoint_queueing_info_t qi = { .queue_size=0, .queue_policy=FRSH_QRP_OLDEST };
+ ret = frsh_send_endpoint_create(FRSH_NETPF_FWP, a->s_addr, BASE_PORT + stream->ac,
+ spi, &stream->endpoint);
+ if (ret < 0) error(1, errno, "frsh_send_endpoint_create()");
+
+ ret = frsh_send_endpoint_bind(stream->vres, stream->endpoint);
+ if (ret != 0) error(1, errno, "frsh_send_endpoint_bind");
+
+ ret = frsh_receive_endpoint_create(FRSH_NETPF_FWP, 0, qi, rpi,
+ &stream->resp_endpoint);
+ if (ret != 0) error(1, errno, "fwp_receive_endpoint_create");
+
+ unsigned int port;
+ frsh_receive_endpoint_get_params(stream->resp_endpoint, NULL, &port, NULL, NULL);
+ stream->resp_port = port;
+
+ ret = pthread_create(&stream->receiver.thread, NULL, receiver, (void*)stream);
+ if (ret) error(1, ret, "Error while creating receiver");
+
+ stream->receiver.valid = true;
+ }
+ else {
+ error(1, errno, "gethostbyname(%s)", server_addr);
+ }
+
+}
+#else
+static void create_stream_endpoint_native(struct stream *stream)
+{
+ struct hostent* ph;
+
+ memset(&stream->rem_addr,0, sizeof(stream->rem_addr));
+
+ stream->rem_addr.sin_family = AF_INET;
+ ph = gethostbyname(server_addr);
+ if (ph)
+ stream->rem_addr.sin_addr = *((struct in_addr *)ph->h_addr);
+ else {
+ error(1, errno, "gethostbyname(%s)", server_addr);
+ }
+ stream->rem_addr.sin_port = htons(BASE_PORT + stream->ac);
+}
+#endif
+
+static inline void
+calc_stream_params(struct stream *stream)
+{
+ int packet_size;
+ unsigned period_usec;
+ int bandwidth;
+ int ret;
+
+ /* If some parameters are not set explicitely, use default values. */
+ if (stream->bandwidth_bps < 0) stream->bandwidth_bps = opt_def_bandwidth * Kbit;
+ if (stream->packet_size < 0) stream->packet_size = opt_packet_size;
+ if (stream->period_usec < 0) stream->period_usec = opt_def_period_msec * MSEC_TO_USEC;
+
+ bandwidth = stream->bandwidth_bps;
+
+ /* Avoid arithmetic exception. Server thread will exit if
+ stream->bandwidth_bps == 0. */
+ if (bandwidth == 0) bandwidth = 1;
+
+ if (stream->packet_size) {
+ packet_size = stream->packet_size;
+ period_usec = SEC_TO_USEC*packet_size*8/bandwidth;
+ if (period_usec == 0) period_usec = 1;
+ } else if (stream->period_usec) {
+ period_usec = stream->period_usec;
+ packet_size = (long long)bandwidth/8 * period_usec/SEC_TO_USEC;
+ } else {
+ char buf[200];
+ stream_to_text(buf, sizeof(buf), stream, 0);
+ error(1, 0, "Neither packet size nor period was specified for a stream %s", buf);
+ }
+
+ if (packet_size < sizeof(struct msg_t)) {
+ error(1, 0, "Packet size too small (min %zd)", sizeof(struct msg_t));
+ }
+
+ stream->packet_size = packet_size;
+ stream->period_usec = period_usec;
+ stream->jitter = opt_jitter;
+
+ ret = negotiate_contract_for_stream(stream);
+ if (ret == 0) {
+ create_stream_endpoint(stream);
+ } else {
+ char buf[200];
+ stream_to_text(buf, sizeof(buf), stream, 0);
+ fprintf(stderr, "Contract hasn't been accepted:\n%s\n", buf);
+ stream->bandwidth_bps = 0;
+ some_contract_not_accepted = true;
+ }
+}
+
+/**
+ * Parse -b parameter.
+ *
+ * @param params String to parse
+ *
+ * @return NULL in case of success, pointer to a problematic character
+ * on error.
+ */