int csockd, i;
if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
- FWP_DEBUG("Recv\n");
_fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
peer->addr, &peer->addrlen);
return len;
size_t buffer_size, int flags);
int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags);
-static inline int fwp_endpoint_attr_setreliability(fwp_endpoint_attr_t *attr)
+int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr);
+static inline int
+fwp_endpoint_attr_setreliability(fwp_endpoint_attr_t *attr, int reliability)
{
- attr->reliability = FWP_EPOINT_RELIABLE;
+ attr->reliability = reliability;
return 0;
}
#include "fwp_contract.h"
#include "fwp_contract_table.h"
-/**
- * FWP initialisation
- * 1. endpoint table init
- * 2. vres table init
- * 3. mngt_init
- * 4. mngt connect
- *
- * fwp mngr does not call 3., 4.
- */
-
static inline int fwp_init()
{
int rv;
if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
- (rv = fwp_vres_table_init(fwp_configuration.max_vres)) ||
- (rv = fwp_mngt_init()) )
+ (rv = fwp_vres_table_init(fwp_configuration.max_vres)))
+ return rv;
+
+ if (fwp_configuration.mngt) {
+ rv = fwp_mngt_init();
return rv;
+ }
return 0;
}
struct {
unsigned int max_vres;
unsigned int max_endpoints;
+ unsigned int mngt;
char my_addr[16];
unsigned int my_stream_id;
char mngr_addr[16];
#define CONFIGURE_FWP_ENDPOINTS_MAXIMUM 50
#endif
+/** Turn on/off FWP mnanagement */
+#ifndef CONFIGURE_FWP_MNGT
+#define CONFIGURE_FWP_MNGT 1
+#endif
+
/** IP Address of interface FWP operates on */
#ifndef CONFIGURE_FWP_MY_ADDR
#define CONFIGURE_FWP_MY_ADDR "127.0.0.1"
fwp_configuration_table_t fwp_configuration = {
CONFIGURE_FWP_VRES_MAXIMUM,
CONFIGURE_FWP_ENDPOINTS_MAXIMUM,
+ CONFIGURE_FWP_MNGT,
CONFIGURE_FWP_MY_ADDR,
CONFIGURE_FWP_MY_STREAM_ID,
CONFIGURE_FWP_MNGR_ADDR,
#include "ul_list.h"
#include "ul_gavlcust.h"
-/**
- * FWP vres parameters
- * It is internal representation of the contract used inside
- * protocol.
- *
- */
-
/**
* List of contract_data structures
*
*/
-typedef
+/*typedef
struct fwp_contract_list {
ul_list_head_t head;
} fwp_contract_list_t;
UL_LIST_CUST_DEC(fwp_contract_list, fwp_contract_list_t, fwp_contract_data_t,
head, list_node);
+*/
+
/**
* Table of contract_data structures
/* create service vres */
fwp_service_vparams.ac_id = FWP_AC_BK;
fwp_service_vparams.budget = 100;
- fwp_service_vparams.period_usec = 1000;
+ fwp_service_vparams.period_usec = 30;
if ((fwp_vres_create(&fwp_service_vparams, fwp_service_vresd) < 0)) {
fprintf(stderr,"Unable to open service vres\n");
fwp_participant_id_t participant_id;
fwp_msgb_t *msgb;
fwp_msg_type_t msg_type;
-
+ fwp_endpoint_attr_t attr;
+
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
/* Create discovery endpoint */
FWP_DEBUG("Service vres created\n");
FWP_DEBUG("Discovery send endpoint created\n");
fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
fwp_participant_mngr->stream_id,
- 0, &fwp_participant_mngr->epointd);
+ &attr, &fwp_participant_mngr->epointd);
fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
fwp_participant_mngr->vresd);
/* Create mngt send endpoint to manager */
fwp_send_endpoint_create(fwp_participant_mngr->id.node_id,
- fwp_participant_mngr->stream_id, 0,
+ fwp_participant_mngr->stream_id, &attr,
&fwp_participant_mngr->epointd);
FWP_DEBUG("Management send endpoint created\n");
fwp_send_endpoint_bind(fwp_participant_mngr->epointd,
fwp_endpoint_attr_t attr;
char *value;
- /* Create fwp_participant_this */
-
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
+
+ /* Create fwp_participant_this */
my_info.id.node_id = inet_addr(fwp_configuration.my_addr);
if (my_info.id.node_id == inet_addr("127.0.0.1")) {
/* if default then check env variable */
my_info.stream_id = fwp_configuration.my_stream_id;
fwp_participant_this = fwp_participant_create(&my_info);
- fwp_receive_endpoint_create(my_info.stream_id, NULL,
+ fwp_receive_endpoint_create(my_info.stream_id, &attr,
&fwp_participant_this->epointd);
/* FIXME
fwp_endpoint_get_params(&(fwp_participant_this->id.node_id),
#include "fwp_participant.h"
#include "fwp_contract_table.h"
-#define FWP_MNGR_STREAM_ID 3000
+#define FWP_MNGR_STREAM_ID 3000
#define FWP_MNGT_DISCOVERY_STREAM_ID 3000
+#define FWP_MNGT_RELIABILITY FWP_EPOINT_BESTEFFORT
extern fwp_participant_t *fwp_participant_this;
extern fwp_participant_t *fwp_participant_mngr;
{
contdata->vres_params.ac_id = FWP_AC_VI;
contdata->vres_params.budget = 100;
- contdata->vres_params.period_usec = 2000;
+ contdata->vres_params.period_usec = 20;
if (nr_negotiated <= 2) {
nr_negotiated++;
{
fwp_participant_info_t participant_info, my_info;
fwp_participant_t *participant;
+ fwp_endpoint_attr_t attr;
FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
participant_id.node_id, participant_id.app_id);
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
/* Create a new participant */
fwp_msg_hello_out(msgb->data, &participant_info);
participant = fwp_participant_create(&participant_info);
fwp_mngt_service_vres_create(&participant->vresd);
fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
- 0, &participant->epointd);
+ &attr, &participant->epointd);
fwp_send_endpoint_bind(participant->epointd, participant->vresd);
fwp_contract_table_init(&participant->contract_table);
fwp_msg_contracthdr_out(msgb->data, &id, &status);
fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
- FWP_DEBUG("Contract id=%d to commit\n", contdata->id);
+ FWP_DEBUG("Contract id=%d to commit\n", id);
contdata = fwp_contract_table_find(&participant->contract_table, id);
contdata->status = FWP_CONT_NEGOTIATED;
-SUBDIRS= fwp_mngrtest
+SUBDIRS= fwp_msgtest fwp_vrestest fwp_prototest fwp_mngrtest
#SUBDIRS= fwp_msgtest fwp_vrestest fwp_prototest fwp_mngrtest
#fwp_mngrtest unixsocktest
#define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
-
#include "fwp_confdefs.h"
#include "fwp.h"
void* receiver(void *arg)
{
- fwp_endpoint_d_t repoint_d1;
+ fwp_endpoint_d_t repoint_d1;
int i,len;
char buffer[30];
+ FWP_DEBUG("Creating receive endpoint\n");
if (fwp_receive_endpoint_create(7777, &attr,&repoint_d1) < 0){
- return ;
+ perror("Error while creating receive endpoint\n");
+ return NULL;
}
FWP_DEBUG("Receive endpoint created \n");
- for (i = 0; i < 4; i++) {
+ for (i = 0; i < 3; i++) {
if ((len = fwp_recv(repoint_d1, buffer, sizeof(buffer), 0)) < 0) {
perror("Error while receiving data::");
- return ;
- }
- else printf("Received - %s\n", buffer);
+ return NULL;
+ } else {
+ printf("Received %s\n",buffer);
+ //for (j = 0 ; j < 10; i++)
+ // printf("%c", buffer[i]);
+
+ }
+ printf("END\n");
}
-}
+ return NULL;
+}
int main()
{
fwp_endpoint_d_t sepoint_d1, sepoint_d2, repoint_d1, repoint_d2;
char msg1[] = "Hello1";
char msg2[] = "Hello2";
-
fwp_contract_t cnt1;
fwp_contract_t cnt2;
fwp_contract_t cnt3;
-
pthread_t id;
-
+ struct fwp_vres_params vparam1;
+
cnt1.budget = 100;
cnt1.period_usec = 10000;
}
fwp_endpoint_attr_init(&attr);
- attr.reliability = FWP_EPOINT_RELIABLE;
-
- pthread_create(&id, NULL,
- receiver, (void*) NULL);
+ fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_RELIABLE);
+ pthread_create(&id, NULL, &receiver, (void*) NULL);
cnt1d = fwp_contract_create(&cnt1);
fwp_contract_negotiate(cnt1d, &vres_d1);
printf("Contract3 not negotiated.\n");
*/
if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777,
- &attr, &sepoint_d1)
- < 0) {
+ &attr, &sepoint_d1) < 0) {
return -1;
}
printf("Send endpoint 1 created\n");
fwp_send(sepoint_d1, msg1, sizeof(msg1), 0);
printf("Sent msg 1\n");
-// sleep(2);
fwp_send(sepoint_d1, msg2, sizeof(msg2), 0);
printf("Sent msg 2\n");
printf("Test PASSED!\n");
scanf("Press key");
+
return 0;
}
cont_a.budget = 50;
cont_a.period_usec = 80;
- fwp_msg_contract_deflate(msgb->data, &cont_a);
+ fwp_msg_contract_in(msgb->data, &cont_a);
printf("Enqueue message A. ");
if (!fwp_msgq_enqueue(&msgq, msgb))
cont_b.budget = 30;
cont_b.period_usec = 100;
- fwp_msg_contract_deflate(msgb->data, &cont_b);
+ fwp_msg_contract_in(msgb->data, &cont_b);
printf("Enqueue message B. ");
if (!fwp_msgq_enqueue(&msgq, msgb))
printf("in=%d out=%d pending=%d \n",msgq.in,msgq.out,msgq.nr_pending);
- fwp_msg_contract_inflate(msgb->data, &cont_c);
+ fwp_msg_contract_out(msgb->data, &cont_c);
fwp_msgb_free(msgb);
if ((cont_a.budget != cont_c.budget)||
printf("in=%d out=%d pending=%d \n",msgq.in,msgq.out,msgq.nr_pending);
- fwp_msg_contract_inflate(msgb->data, &cont_d);
+ fwp_msg_contract_out(msgb->data, &cont_d);
fwp_msgb_free(msgb);
if ((cont_b.budget != cont_d.budget)||
-test_PROGRAMS = fwp_sendrecv_test
-fwp_sendrecv_test_SOURCES+= fwp_sendrecv_test.c
-lib_LOADLIBES += fwp pthread rt
+test_PROGRAMS = fwp_sendrecv_test1 fwp_sendrecv_test2
+fwp_sendrecv_test1_SOURCES+= fwp_sendrecv_test1.c
+fwp_sendrecv_test2_SOURCES+= fwp_sendrecv_test2.c
+lib_LOADLIBES += fwp pthread rt ulut
+#define CONFIGURE_FWP_MNGT 0
+#include "fwp_confdefs.h"
#include "fwp.h"
#include <errno.h>
char msg2[] = "Hello2";
char buffer[30];
fwp_endpoint_d_t sepoint_d1, sepoint_d2, repoint_d1, repoint_d2;
+ fwp_endpoint_attr_t attr;
+
+ fwp_endpoint_attr_init(&attr);
vparam1.ac_id = FWP_AC_VO;
vparam1.budget = 100;
printf("Vres2 created\n");
/* local_addr should be handled when creating socket */
- if (fwp_receive_endpoint_create(7777, FWP_EPOINT_MNGT,&repoint_d1) < 0){
+ if (fwp_receive_endpoint_create(7777, &attr,&repoint_d1) < 0){
return -1;
}
printf("Receive endpoint created\n");
- if (fwp_receive_endpoint_create(7778, FWP_EPOINT_MNGT,&repoint_d2) < 0){
+ if (fwp_receive_endpoint_create(7778, &attr,&repoint_d2) < 0){
return -1;
}
printf("Receive endpoint created\n");
- if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777,
- FWP_EPOINT_MNGT, &sepoint_d1)
- < 0) {
+ if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777, 0,
+ &sepoint_d1) < 0) {
return -1;
}
printf("Send endpoint 1 created\n");
fwp_send_endpoint_bind(sepoint_d1, vres_d1);
- if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7778,
- FWP_EPOINT_MNGT, &sepoint_d2) < 0){
-
+ if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7778, 0,
+ &sepoint_d2) < 0){
return -1;
}
printf("Send endpoint 2 created\n");
fwp_send_endpoint_bind(sepoint_d2, vres_d2);
- fwp_send(sepoint_d1, msg1, sizeof(msg1));
- fwp_send(sepoint_d1, msg2, sizeof(msg2));
+ fwp_send(sepoint_d1, msg1, sizeof(msg1), 0);
+ fwp_send(sepoint_d1, msg2, sizeof(msg2), 0);
for (i = 0; i < 2; i++) {
- if ((len = fwp_recv(repoint_d1, buffer, sizeof(buffer))) < 0) {
+ if ((len = fwp_recv(repoint_d1, buffer, sizeof(buffer), 0)) < 0) {
perror("Error while receiving data");
return -1;
}
--- /dev/null
+#define CONFIGURE_FWP_MNGT 0
+#include "fwp_confdefs.h"
+#include "fwp.h"
+
+#include <errno.h>
+#include <stdio.h>
+
+fwp_endpoint_attr_t attr;
+
+void* receiver(void *arg)
+{
+ fwp_endpoint_d_t repoint_d1;
+ int i,len;
+ char buffer[30];
+
+ FWP_DEBUG("Creating receive endpoint\n");
+ if (fwp_receive_endpoint_create(7777, &attr,&repoint_d1) < 0){
+ perror("Error while creating receive endpoint\n");
+ return NULL;
+ }
+
+ FWP_DEBUG("Receive endpoint created \n");
+ for (i = 0; i < 3; i++) {
+ if ((len = fwp_recv(repoint_d1, buffer, sizeof(buffer), 0)) < 0) {
+ perror("Error while receiving data::");
+ return NULL;
+ } else {
+ printf("Received %s\n",buffer);
+ //for (j = 0 ; j < 10; i++)
+ // printf("%c", buffer[i]);
+
+ }
+ printf("END\n");
+ }
+
+ return NULL;
+}
+
+int main()
+{
+ fwp_vres_d_t vres_d1;
+ struct fwp_vres_params vparam1, vparam2;
+ char msg1[] = "Hello1";
+ char msg2[] = "Hello2";
+ fwp_endpoint_d_t sepoint_d1;
+ pthread_t id;
+
+ vparam1.ac_id = FWP_AC_VO;
+ vparam1.budget = 100;
+ vparam1.period_usec = 10;
+
+ vparam2.ac_id = FWP_AC_BK;
+ vparam2.budget = 100;
+ vparam2.period_usec = 100;
+
+ printf("Start\n");
+ if (fwp_init() != 0) {
+ printf("FWP initialization failed!\n");
+ return -1;
+ }
+
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_EPOINT_RELIABLE);
+
+ pthread_create(&id, NULL, &receiver, (void*) NULL);
+ printf("Create vres1, vres2\n");
+ if (fwp_vres_create(&vparam1, &vres_d1) < 0) {
+ printf("Unable to create vres1\n");
+ return -1;
+ }
+ printf("Vres1 created \n");
+
+ if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777, &attr,
+ &sepoint_d1) < 0) {
+ return -1;
+ }
+ printf("Send endpoint 1 created\n");
+ fwp_send_endpoint_bind(sepoint_d1, vres_d1);
+
+ fwp_send(sepoint_d1, msg1, sizeof(msg1), 0);
+ FWP_DEBUG("Sent msg1\n");
+ fwp_send(sepoint_d1, msg2, sizeof(msg2), 0);
+ FWP_DEBUG("Sent msg2\n");
+
+ /*if (fwp_vres_destroy(vres_d1) < 0) {
+ perror("Unable to destroy vres1\n");
+ return -1;
+ }
+ printf("Vres1 destroyed\n");
+
+ if (fwp_vres_destroy(vres_d2) < 0){
+ perror("Unable to destroy vres2\n");
+ return -1;
+ }
+ printf("Vres2 destroyed\n");
+ */
+
+ printf("Test PASSED!\n");
+ scanf("Press key");
+
+ return 0;
+}
test_PROGRAMS = fwp_vrestest1 fwp_vrestest2
fwp_vrestest1_SOURCES+= fwp_vrestest1.c
fwp_vrestest2_SOURCES+= fwp_vrestest2.c
-lib_LOADLIBES += fwp pthread rt
+lib_LOADLIBES += fwp pthread rt ulut
+#define CONFIGURE_FWP_MNGT 0
+#include "fwp_confdefs.h"
#include "fwp.h"
#include <errno.h>
fwp_endpoint_d_t sepoint_d1, repoint_d;
int count;
struct timespec sendtime;
-
+ fwp_endpoint_attr_t attr ;
+
+ fwp_endpoint_attr_init(&attr);
vparam1.ac_id = FWP_AC_VO;
vparam1.budget = 100;
vparam1.period_usec = 2111111;
}
printf("Vres2 created\n");
/* local_addr should be handled when creating socket */
- if (fwp_receive_endpoint_create(7777, &repoint_d) < 0) {
+ if (fwp_receive_endpoint_create(7777, &attr, &repoint_d) < 0) {
return -1;
}
printf("Receive endpoint created\n");
- if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777, &sepoint_d1)
- < 0){
+ if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777, &attr,
+ &sepoint_d1) < 0){
return -1;
}
printf("Send endpoint 1 created\n");
for (count = 0; count < NUM; count++) {
sprintf(msg1,"msg%d",count);
- fwp_send(sepoint_d1, msg1, sizeof(msg1));
+ fwp_send(sepoint_d1, msg1, sizeof(msg1), 0);
clock_gettime(CLOCK_MONOTONIC, &sendtime);
FWP_DEBUG("Sent: sec = %ld nsec = %ld \n", sendtime.tv_sec,
sendtime.tv_nsec);
- if ((len = fwp_recv(repoint_d, buffer, sizeof(buffer))) < 0) {
+ if ((len = fwp_recv(repoint_d, buffer, sizeof(buffer), 0)) < 0) {
perror("Error while receiving data");
return -1;
}
+#define CONFIGURE_FWP_MNGT 0
+#include "fwp_confdefs.h"
#include "fwp.h"
#include <errno.h>
#define NUM 20
int exit_flag = 0;
-
+fwp_endpoint_attr_t attr;
+
void* sender()
{
fwp_endpoint_d_t sepoint_d1;
}
printf("Vres1 created");
- if (fwp_send_endpoint_create(inet_addr("127.0.0.1"), 7777, &sepoint_d1)
- < 0){
+ if (fwp_send_endpoint_create(inet_addr("127.0.0.0"), 7777, &attr,
+ &sepoint_d1) < 0){
return NULL;
}
printf("Send endpoint 1 created\n");
while (count < NUM){
count++;
sprintf(msg1,"msg%d",count);
- fwp_send(sepoint_d1, msg1, sizeof(msg1));
+ fwp_send(sepoint_d1, msg1, sizeof(msg1), 0);
printf("sent\n");
/*clock_gettime(CLOCK_MONOTONIC, &sendtime);
struct timespec recvtime;
/* local_addr should be handled when creating socket */
- if (fwp_receive_endpoint_create(7777, &repoint_d) < 0){
+ if (fwp_receive_endpoint_create(7777, &attr, &repoint_d) < 0){
perror("Not initialized\n");
return NULL;
}
for (count = 1; count <= NUM; count++) {
- if ((len = fwp_recv(repoint_d, buffer, sizeof(buffer))) < 0){
+ if ((len = fwp_recv(repoint_d, buffer, sizeof(buffer), 0)) < 0){
perror("Error while receiving data");
return NULL;
}
int main()
{
// struct sockaddr_in local_addr, rem_addr, from;
- pthread_attr_t attr;
+ pthread_attr_t thattr;
pthread_t thread;
printf("Start\n");
fwp_init();
- pthread_attr_init(&attr);
- pthread_create(&thread, &attr, receiver, NULL);
- pthread_create(&thread, &attr, sender, NULL);
+ fwp_endpoint_attr_init(&attr);
+ pthread_attr_init(&thattr);
+ pthread_create(&thread, &thattr, receiver, NULL);
+ pthread_create(&thread, &thattr, sender, NULL);
pthread_join(thread, (void**) NULL);
printf("Test PASSED!\n");