]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
fAdded fwp_mngr_init - initialization routine for manager
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4
5 #define FWP_MTU         2346  
6 #define BUFFSIZE        FWP_MTU 
7
8 /* buffer and socket for incomming message */
9 static unsigned char    buffer[FWP_MTU];
10
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
13
14 /**
15  * fwp_mngt_input 
16  *
17  * Function waits for remote or local message 
18  * 
19  * @msgb  received message 
20  * \return 
21  * On success, it returns 0 and the pointer to received message in msgb parameter.
22  * On error, it returns negative error code
23  *
24  */
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
26 {
27         struct fwp_msgb *msgb;
28         ssize_t size;
29
30         FWP_DEBUG("Waiting for messages\n");
31         /* TODO: consider to replace with fwp_mngt_recv call */
32         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
33          
34         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
35         /* For future: fwp_socket could be allocated behind data in msgb*/
36         if (!(msgb = fwp_msgb_alloc(size))) {
37                 perror("No memory available.\n");
38                 return -ENOMEM;
39         }
40         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
41         msgb->data = buffer;
42         fwp_msgb_put(msgb, size);
43         
44         *pmsgb = msgb;
45         return (0);
46 }
47
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
49 {
50         fwp_participant_info_t participant_info, my_info;
51         fwp_participant_t *participant;
52
53         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
54                         participant_id.node_id, participant_id.app_id);
55
56         /* Create a new participant */
57         fwp_msg_hello_inflate(msgb->data, &participant_info);
58         participant = fwp_participant_create(&participant_info);
59         fwp_mngt_service_vres_create(&participant->vresd);
60         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61                                         0, &participant->epointd);
62         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63         fwp_contract_table_init(&participant->contract_table);
64
65         /* Insert participant into table */
66         fwp_participant_table_insert(participant);
67
68         /* Send back hello msg with mngr`s info */
69         /* prepare hello message */
70         fwp_msgb_reset_data(msgb);
71         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
72         
73         my_info.id = fwp_participant_this->id;
74         my_info.stream_id = fwp_participant_this->stream_id;
75
76         fwp_msg_hello_deflate(msgb->tail, &my_info);
77         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
78
79         /* Send hello to manager */
80         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
81         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
82                         fwp_participant_this, participant);
83
84         FWP_DEBUG("Sent HELLO msg \n");
85 }
86
87 /*void fwp_mngr_negt_request(msgb, participant_id)
88 {
89         
90
91 }*/
92
93 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
94 {
95         fwp_msg_type_t msg_type;
96         fwp_participant_id_t    participant_id;
97
98         fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
99         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
100         FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type, 
101                         participant_id.node_id, participant_id.app_id);
102         
103         switch (msg_type) {
104                 case  FWP_MSG_HELLO:
105                         fwp_mngr_hello(msgb, participant_id);
106                         break;
107 #if 0
108                 case  FWP_CONTNEGT_REQ: 
109                         FWP_DEBUG("Negotiation Request received\n");
110                         fwp_mngr_negt_request(msgb, participant_id);
111                         break;  
112 #endif
113                 default:
114                         printf("Invalid message\n.");
115                         fwp_msgb_free(msgb);
116         }
117
118
119 }
120
121 void fwp_mngr_main_loop()
122 {
123         struct fwp_msgb *msgb;
124
125         /* start admission control thread */
126         while (1 /*exit_flag*/){
127                 fwp_mngr_input(&msgb);
128                 if (msgb)
129                         fwp_mngr_msg_handler(msgb);
130                 FWP_DEBUG("Mngr waiting for next msg.\n");
131         }
132 }
133
134 int fwp_mngr_init()
135 {
136         fwp_participant_info_t  my_info;
137         int rv;
138
139         if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
140             (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
141
142                 return rv;
143         }
144         
145         /* Create fwp_participant_this */
146         my_info.id.node_id = inet_addr("127.0.0.1");
147         my_info.id.app_id = getpid();
148         my_info.stream_id = FWP_MNGR_STREAM_ID;
149
150         fwp_participant_this = fwp_participant_create(&my_info);
151         fwp_participant_mngr = fwp_participant_this;
152         fwp_receive_endpoint_create(my_info.stream_id, 0,
153                                         &fwp_participant_this->epointd);
154         FWP_DEBUG("Management receive endpoint created, stream id= %d\n",
155                         fwp_participant_this->stream_id);
156         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
157
158         return 0;
159
160 }
161
162 int main()
163 {
164         if (fwp_mngr_init()) {
165                 fprintf(stderr,"FWP manager initialization failed.\n");
166                 exit(1);
167         }
168
169         fwp_mngr_main_loop();
170         
171         return 0;       
172