]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/mngr/fwp_mngr.c
FWP tests got to the functional state
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
index c90c37baead6eb218b3a676716d477de505440ed..c89d06e69fb85e2dc84ae112262d1b6bb49fab15 100644 (file)
-#include <fwp.h>
+#define CONFIGURE_FWP_MY_STREAM_ID 3000
+#define CONFIGURE_FWP_MNGR_ADDR "127.0.0.1"
+
+#include "fwp_confdefs.h"
+#include "fwp.h"
+#include "fwp_participant_table.h"
+#include "fwp_admctrl.h"
+
+#define FWP_MTU        2346  
+#define BUFFSIZE       FWP_MTU 
+
+/* buffer and socket for incomming message */
+static unsigned char   buffer[FWP_MTU];
+
+/* Admission control test */
+fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
+
+/**
+ * fwp_mngt_input 
+ *
+ * Function waits for remote or local message 
+ * 
+ * @msgb  received message 
+ * \return 
+ * On success, it returns 0 and the pointer to received message in msgb parameter.
+ * On error, it returns negative error code
+ *
+ */
+int fwp_mngr_input(struct fwp_msgb **pmsgb)
+{
+       struct fwp_msgb *msgb;
+       ssize_t size;
+
+       FWP_DEBUG("Waiting for messages\n");
+       /* TODO: consider to replace with fwp_mngt_recv call */
+       size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE, 0);
+        
+       /* For future: fwp_socket could be allocated behind data in msgb*/
+       if (!(msgb = fwp_msgb_alloc(size))) {
+               perror("No memory available.\n");
+               return -ENOMEM;
+       }
+       /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
+       msgb->data = buffer;
+       fwp_msgb_put(msgb, size);
+       
+       *pmsgb = msgb;
+       return (0);
+}
+
+void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+       fwp_participant_info_t participant_info, my_info;
+       fwp_participant_t *participant;
+       fwp_endpoint_attr_t attr;
+
+       FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
+                       participant_id.node_id, participant_id.app_id);
+
+       fwp_endpoint_attr_init(&attr);
+       fwp_endpoint_attr_setreliability(&attr, FWP_MNGT_RELIABILITY);
+       /* Create a new participant */
+       fwp_msg_hello_out(msgb->data, &participant_info);
+       participant = fwp_participant_create(&participant_info);
+       fwp_mngt_service_vres_create(&participant->vresd);
+       fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
+                                       &attr, &participant->epointd);
+       fwp_send_endpoint_bind(participant->epointd, participant->vresd);
+       fwp_contract_table_init(&participant->contract_table);
+
+       /* Insert participant into table */
+       fwp_participant_table_insert(participant);
+
+       /* Send back hello msg with mngr`s info */
+       /* prepare hello message */
+       fwp_msgb_reset_data(msgb);
+       fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
+       
+       my_info.id = fwp_participant_this->id;
+       my_info.stream_id = fwp_participant_this->stream_id;
+
+       fwp_msg_hello_in(msgb->tail, &my_info);
+       fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
+
+       /* Send hello to manager */
+       fwp_mngt_send(FWP_MSG_HELLO, msgb, 
+                       fwp_participant_this, participant);
+
+       FWP_DEBUG("Sent HELLO msg from nodeid= %d appid= %d\n", 
+                       participant_id.node_id, participant_id.app_id);
+}
+
+int 
+fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+       fwp_participant_t *participant;
+       fwp_contract_data_t *contdata;
+
+       /* Find participant */
+       if (!(participant = fwp_participant_table_find(&participant_id))){
+               return -EPERM;
+       }
+
+       contdata = fwp_contract_data_new();
+       
+       /* Extract contract header */
+       fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status);
+       fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
+       /* Extract contract params */
+       fwp_msg_contract_out(msgb->data, &contdata->contract);
+       fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract));
+
+       /*launch admission test */
+       fwp_admctrl_test(contdata);             
+       
+       free(msgb);
+       msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) +
+                                       sizeof(struct fwp_msg_contract) +
+                                       sizeof(struct fwp_msg_vres_params));
+       fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header));
+       
+       /*Add contract header*/
+       fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status);
+       fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr));
+       /* Add contract params */
+       /* No needed to send back if spare capacity is not considered
+        * fwp_msg_contract_in(msgb->tail, &contdata->contract);
+        * fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract));
+        * */
+       
+       /*Send back contract reservation */
+       if (contdata->status == FWP_CONT_RESERVED) {
+               fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params);
+               FWP_DEBUG("Sent vres params budget=%d period=%d ac=%d\n", 
+                               contdata->vres_params.budget,
+                               contdata->vres_params.period_usec,
+                               contdata->vres_params.ac_id);
+               fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params));
+               /* Add contract to contract table */
+               fwp_contract_table_insert(&participant->contract_table,contdata);
+               FWP_DEBUG("Contract id=%d stored in table\n", contdata->id);
+
+       } else {
+               free(contdata);
+       }       
+       
+       fwp_mngt_send(FWP_MSG_RESERVE, msgb, 
+                       fwp_participant_this, participant);
+       return 0;
+}
+
+int 
+fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
+{
+       fwp_participant_t *participant;
+       fwp_contract_data_t *contdata;
+       fwp_contract_id_t  id;
+       fwp_contract_status_t  status;
+
+       /* Find participant */
+       if (!(participant = fwp_participant_table_find(&participant_id))){
+               return -EPERM;
+       }
+
+       fwp_msg_contracthdr_out(msgb->data, &id, &status);
+       fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr));
+       FWP_DEBUG("Contract id=%d to commit\n", id);
+       
+       contdata = fwp_contract_table_find(&participant->contract_table, id);
+       contdata->status = FWP_CONT_NEGOTIATED; 
+       
+       return 0;       
+}
+
+void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
+{
+       fwp_msg_type_t msg_type;
+       fwp_participant_id_t    participant_id;
+
+       fwp_msg_header_out(msgb->data, &msg_type, &participant_id);
+       fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
+       
+       switch (msg_type) {
+               case  FWP_MSG_HELLO:
+                       FWP_DEBUG("Message HELLO received from nodeid = %d "
+                                 "appid = %d\n", participant_id.node_id, 
+                                       participant_id.app_id);
+                       fwp_mngr_hello(msgb, participant_id);
+                       break;
+
+               case  FWP_MSG_RESERVE: 
+                       FWP_DEBUG("Message RESERVE received from nodeid = %d " 
+                                 "appid = %d\n", participant_id.node_id, 
+                                       participant_id.app_id);
+                       fwp_mngr_contract_reserve(msgb, participant_id);
+                       break;
+
+               case  FWP_MSG_COMMIT: 
+                       FWP_DEBUG("Message COMMIT received from nodeid = %d "
+                                 "appid = %d\n", participant_id.node_id, 
+                                       participant_id.app_id);
+                       fwp_mngr_contract_commit(msgb, participant_id);
+                       break;  
+               default:
+                       printf("Invalid message\n.");
+                       fwp_msgb_free(msgb);
+       }
+}
+
+void fwp_mngr_main_loop()
+{
+       struct fwp_msgb *msgb;
+
+       /* start admission control thread */
+       while (1 /*exit_flag*/){
+               fwp_mngr_input(&msgb);
+               if (msgb)
+                       fwp_mngr_msg_handler(msgb);
+               FWP_DEBUG("Mngr waiting for next msg.\n");
+       }
+}
+
+#if 0
+int fwp_mngr_init()
+{
+       fwp_participant_info_t  my_info;
+       int rv;
+
+       if ((rv = fwp_endpoint_table_init(fwp_configuration.max_endpoints)) ||
+           (rv = fwp_vres_table_init(fwp_configuration.max_vres))) {
+
+               return rv;
+       }
+       
+       /* Create fwp_participant_this */
+       my_info.id.node_id = inet_addr("127.0.0.1");
+       my_info.id.app_id = getpid();
+       my_info.stream_id = FWP_MNGR_STREAM_ID;
+
+       fwp_participant_this = fwp_participant_create(&my_info);
+       fwp_participant_mngr = fwp_participant_this;
+       fwp_receive_endpoint_create(my_info.stream_id, 0,
+                                       &fwp_participant_this->epointd);
+       FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
+                               fwp_participant_this->id.node_id,
+                              fwp_participant_this->stream_id);
+       return 0;
+
+}
+#endif
 
 int main()
 {
-#if 1
-       if (fwp_init() < 0) {
-               fprintf(stderr,"FWP initialization failed.\n");
+       if (fwp_init()) {
+               fprintf(stderr,"FWP manager initialization failed.\n");
                exit(1);
-
        }
-#endif 
+
+       fwp_mngr_main_loop();
+       
        return 0;       
 }