-#include <fwp.h>
+#define CONFIGURE_FWP_MY_STREAM_ID 3000
+#define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
+
+#include "fwp_confdefs.h"
+#include "fwp.h"
+#include "fwp_participant_table.h"
+#include "fwp_admctrl.h"
+
+#define FWP_MTU 2346
+#define BUFFSIZE FWP_MTU
+
+/* buffer and socket for incomming message */
+static unsigned char buffer[FWP_MTU];
+
+/* Admission control test */
+fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
+
+/**
+ * fwp_mngt_input
+ *
+ * Function waits for remote or local message
+ *
+ * @msgb received message
+ * \return
+ * On success, it returns 0 and the pointer to received message in msgb parameter.
+ * On error, it returns negative error code
+ *
+ */
+int fwp_mngr_input(struct fwp_msgb **pmsgb)
+{
+ struct fwp_msgb *msgb;
+ ssize_t size;
+
+ FWP_DEBUG("Waiting for messages\n");
+ /* TODO: consider to replace with fwp_mngt_recv call */
+ size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
+
+ /* For future: fwp_socket could be allocated behind data in msgb*/
+ if (!(msgb = fwp_msgb_alloc(size))) {
+ perror("No memory available.\n");
+ return -ENOMEM;
+ }
+ /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
+ msgb->data = buffer;
+ fwp_msgb_put(msgb, size);
+
+ *pmsgb = msgb;
+ return (0);
+}
+
+void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+ fwp_participant_info_t participant_info, my_info;
+ fwp_participant_t *participant;
+ fwp_endpoint_attr_t attr;
+
+ FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n",
+ participant_id.node_id, participant_id.app_id);
+
+ fwp_endpoint_attr_init(&attr);
+ fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
+ /* Create a new participant */
+ fwp_msg_hello_out(msgb->data, &participant_info);
+ participant = fwp_participant_create(&participant_info);
+ fwp_mngt_service_vres_create(&participant->vresd);
+ fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
+ &attr, &participant->epointd);
+ fwp_send_endpoint_bind(participant->epointd, participant->vresd);
+ fwp_contract_table_init(&participant->contract_table);
+
+ /* Insert participant into table */
+ fwp_participant_table_insert(participant);
+
+ /* Send back hello msg with mngr`s info */
+ /* prepare hello message */
+ fwp_msgb_reset_data(msgb);
+ fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
+
+ my_info.id = fwp_participant_this->id;
+ my_info.stream_id = fwp_participant_this->stream_id;
+
+ fwp_msg_hello_in(msgb->tail, &my_info);
+ fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
+
+ /* Send hello to manager */
+ fwp_mngt_send(FWP_MSG_HELLO, msgb,
+ fwp_participant_this, participant);
+
+ FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n",
+ participant_id.node_id, participant_id.app_id);
+}
+
+int
+fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+ fwp_participant_t *participant;
+ fwp_contract_data_t *contdata;
+
+ /* Find participant */
+ if (!(participant = fwp_participant_table_find(&participant_id))){
+ return -EPERM;
+ }
+
+ contdata = fwp_contract_data_new();
+
+ /* Extract contract header */
+ fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
+ fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
+ /* Extract contract params */
+ fwp_msg_contract_out(msgb->data, &contdata->contract);
+ fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
+
+ /*launch admission test */
+ fwp_admctrl_test(contdata);
+
+ free(msgb);
+ msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
+ sizeof(struct fwp_msg_contract) +
+ sizeof(struct fwp_msg_vres_params));
+ fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
+
+ /*Add contract header*/
+ fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
+ fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
+ /* Add contract params */
+ /* No needed to send back if spare capacity is not considered
+ * fwp_msg_contract_in(msgb->tail, &contdata->contract);
+ * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
+ * */
+
+ /*Send back contract reservation */
+ if (contdata->status == FWP_CONT_RESERVED) {
+ fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
+ FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n",
+ contdata->vres_params.budget,
+ contdata->vres_params.period_usec,
+ contdata->vres_params.ac_id);
+ fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
+ /* Add contract to contract table */
+ fwp_contract_table_insert(&participant->contract_table,contdata);
+ FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
+
+ } else {
+ free(contdata);
+ }
+
+ fwp_mngt_send(FWP_MSG_RESERVE, msgb,
+ fwp_participant_this, participant);
+ return 0;
+}
+
+int
+fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+ fwp_participant_t *participant;
+ fwp_contract_data_t *contdata;
+ fwp_contract_id_t id;
+ fwp_contract_status_t status;
+
+ /* Find participant */
+ if (!(participant = fwp_participant_table_find(&participant_id))){
+ return -EPERM;
+ }
+
+ fwp_msg_contracthdr_out(msgb->data, &id, &status);
+ fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
+ FWP_DEBUG("Contract id=%d to commit\n", id);
+
+ contdata = fwp_contract_table_find(&participant->contract_table, id);
+ contdata->status = FWP_CONT_NEGOTIATED;
+
+ return 0;
+}
+
+void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
+{
+ fwp_msg_type_t msg_type;
+ fwp_participant_id_t participant_id;
+
+ fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
+ fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
+
+ switch (msg_type) {
+ case FWP_MSG_HELLO:
+ FWP_DEBUG("Message HELLO received from nodeid = %d "
+ "appid = %d\n", participant_id.node_id,
+ participant_id.app_id);
+ fwp_mngr_hello(msgb, participant_id);
+ break;
+
+ case FWP_MSG_RESERVE:
+ FWP_DEBUG("Message RESERVE received from nodeid = %d "
+ "appid = %d\n", participant_id.node_id,
+ participant_id.app_id);
+ fwp_mngr_contract_reserve(msgb, participant_id);
+ break;
+
+ case FWP_MSG_COMMIT:
+ FWP_DEBUG("Message COMMIT received from nodeid = %d "
+ "appid = %d\n", participant_id.node_id,
+ participant_id.app_id);
+ fwp_mngr_contract_commit(msgb, participant_id);
+ break;
+ default:
+ printf("Invalid message\n.");
+ fwp_msgb_free(msgb);
+ }
+}
+
+void fwp_mngr_main_loop()
+{
+ struct fwp_msgb *msgb;
+
+ /* start admission control thread */
+ while (1 /*exit_flag*/){
+ fwp_mngr_input(&msgb);
+ if (msgb)
+ fwp_mngr_msg_handler(msgb);
+ FWP_DEBUG("Mngr waiting for next msg.\n");
+ }
+}
+
+#if 0
+int fwp_mngr_init()
+{
+ fwp_participant_info_t my_info;
+ int rv;
+
+ if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
+ (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
+
+ return rv;
+ }
+
+ /* Create fwp_participant_this */
+ my_info.id.node_id = inet_addr("127.0.0.1");
+ my_info.id.app_id = getpid();
+ my_info.stream_id = FWP_MNGR_STREAM_ID;
+
+ fwp_participant_this = fwp_participant_create(&my_info);
+ fwp_participant_mngr = fwp_participant_this;
+ fwp_receive_endpoint_create(my_info.stream_id, 0,
+ &fwp_participant_this->epointd);
+ FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
+ fwp_participant_this->id.node_id,
+ fwp_participant_this->stream_id);
+ return 0;
+
+}
+#endif
int main()
{
-#if 1
- if (fwp_init() < 0) {
- fprintf(stderr,"FWP initialization failed.\n");
+ if (fwp_init()) {
+ fprintf(stderr,"FWP manager initialization failed.\n");
exit(1);
-
}
-#endif
+
+ fwp_mngr_main_loop();
+
return 0;
}