]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/mngr/fwp_mngr.c
Fix in mngrtest/Makefile.omk -libary order, changed contract ops in libfwp and in...
[frescor/fwp.git] / fwp / mngr / fwp_mngr.c
1 #include <fwp.h>
2 #include "fwp_mngt.h"
3 #include "fwp_participant_table.h"
4
5 #define FWP_MTU         2346  
6 #define BUFFSIZE        FWP_MTU 
7
8 /* buffer and socket for incomming message */
9 static unsigned char    buffer[FWP_MTU];
10
11 /* Admission control test */
12 //fwp_admctrl_test_t fwp_admctrl_test = &fwp_admctrl_stupid;
13
14 /**
15  * fwp_mngt_input 
16  *
17  * Function waits for remote or local message 
18  * 
19  * @msgb  received message 
20  * \return 
21  * On success, it returns 0 and the pointer to received message in msgb parameter.
22  * On error, it returns negative error code
23  *
24  */
25 int fwp_mngr_input(struct fwp_msgb **pmsgb)
26 {
27         struct fwp_msgb *msgb;
28         ssize_t size;
29
30         FWP_DEBUG("Waiting for messages\n");
31         /* TODO: consider to replace with fwp_mngt_recv call */
32         size = fwp_recv(fwp_participant_this->epointd, buffer, BUFFSIZE);
33          
34         FWP_DEBUG("Creating fwp msgb len=%d\n", size);  
35         /* For future: fwp_socket could be allocated behind data in msgb*/
36         if (!(msgb = fwp_msgb_alloc(size))) {
37                 perror("No memory available.\n");
38                 return -ENOMEM;
39         }
40         /*memcpy(fwp_msgb_put(msgb, len), buffer, len); */
41         msgb->data = buffer;
42         fwp_msgb_put(msgb, size);
43         
44         *pmsgb = msgb;
45         return (0);
46 }
47
48 void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
49 {
50         fwp_participant_info_t participant_info, my_info;
51         fwp_participant_t *participant;
52
53         FWP_DEBUG("Received HELLO msg from nodeid= %d appid= %d\n", 
54                         participant_id.node_id, participant_id.app_id);
55
56         /* Create a new participant */
57         fwp_msg_hello_inflate(msgb->data, &participant_info);
58         participant = fwp_participant_create(&participant_info);
59         fwp_mngt_service_vres_create(&participant->vresd);
60         fwp_send_endpoint_create(participant->id.node_id, participant->stream_id,
61                                         0, &participant->epointd);
62         fwp_send_endpoint_bind(participant->epointd, participant->vresd);
63         fwp_contract_table_init(&participant->contract_table);
64
65         /* Insert participant into table */
66         fwp_participant_table_insert(participant);
67
68         /* Send back hello msg with mngr`s info */
69         /* prepare hello message */
70         fwp_msgb_reset_data(msgb);
71         fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header));
72         
73         my_info.id = fwp_participant_this->id;
74         my_info.stream_id = fwp_participant_this->stream_id;
75
76         fwp_msg_hello_deflate(msgb->tail, &my_info);
77         fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello));
78
79         /* Send hello to manager */
80         FWP_DEBUG("nodeid = %d\n", fwp_participant_this->id.node_id);
81         fwp_mngt_send(FWP_MSG_HELLO, msgb, 
82                         fwp_participant_this, participant);
83
84         FWP_DEBUG("Sent HELLO msg \n");
85 }
86
87 int fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
88 {
89         fwp_participant_t *participant;
90         fwp_contract_data_t *contdata;
91
92         /* Find participant */
93         if (!(participant = fwp_participant_table_find(&participant_id))){
94                 return -EPERM;
95         }
96
97         contdata = fwp_contract_data_new();
98         fwp_msg_contract_inflate(msgb->data, &contdata->id, &contdata->contract);
99         contdata->status = FWP_CONT_REQUESTED; 
100         
101         /* Add contract to contract table */
102         fwp_contract_table_insert(&participant->contract_table, contdata);
103         /*launch admission test */
104
105         return 0;
106 }
107
108 int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_id)
109 {
110         fwp_participant_t *participant;
111         fwp_contract_data_t *contdata;
112         fwp_contract_t  contract;
113         fwp_contract_id_t  id;
114
115         /* Find participant */
116         if (!(participant = fwp_participant_table_find(&participant_id))){
117                 return -EPERM;
118         }
119
120         fwp_msg_contract_inflate(msgb->data, &id, &contract);
121         contdata = fwp_contract_table_find(&participant->contract_table, id);
122         contdata->status = FWP_CONT_ACCEPTED; 
123         
124         /* Add to contract accept list */
125         return 0;       
126 }
127
128 void fwp_mngr_msg_handler(fwp_msgb_t *msgb)
129 {
130         fwp_msg_type_t msg_type;
131         fwp_participant_id_t    participant_id;
132
133         fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id);
134         fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header));
135         FWP_DEBUG("type = %d , nodeid = %d appid = %d\n", msg_type, 
136                         participant_id.node_id, participant_id.app_id);
137         
138         switch (msg_type) {
139                 case  FWP_MSG_HELLO:
140                         fwp_mngr_hello(msgb, participant_id);
141                         break;
142
143                 case  FWP_MSG_NEGT_REQ: 
144                         FWP_DEBUG("Negotiation Request received\n");
145                         fwp_mngr_contract_reserve(msgb, participant_id);
146                         break;  
147                 default:
148                         printf("Invalid message\n.");
149                         fwp_msgb_free(msgb);
150         }
151
152
153 }
154
155 void fwp_mngr_main_loop()
156 {
157         struct fwp_msgb *msgb;
158
159         /* start admission control thread */
160         while (1 /*exit_flag*/){
161                 fwp_mngr_input(&msgb);
162                 if (msgb)
163                         fwp_mngr_msg_handler(msgb);
164                 FWP_DEBUG("Mngr waiting for next msg.\n");
165         }
166 }
167
168 int fwp_mngr_init()
169 {
170         fwp_participant_info_t  my_info;
171         int rv;
172
173         if ((rv = fwp_endpoint_table_init(FWP_EPOINT_MAX)) ||
174             (rv = fwp_vres_table_init(FWP_VRES_MAX))) {
175
176                 return rv;
177         }
178         
179         /* Create fwp_participant_this */
180         my_info.id.node_id = inet_addr("127.0.0.1");
181         my_info.id.app_id = getpid();
182         my_info.stream_id = FWP_MNGR_STREAM_ID;
183
184         fwp_participant_this = fwp_participant_create(&my_info);
185         fwp_participant_mngr = fwp_participant_this;
186         fwp_receive_endpoint_create(my_info.stream_id, 0,
187                                         &fwp_participant_this->epointd);
188         FWP_DEBUG("Participant_this created node_id id= %d stream id= %d\n",
189                                fwp_participant_this->id.node_id,
190                                fwp_participant_this->stream_id);
191         return 0;
192
193 }
194
195 int main()
196 {
197         if (fwp_mngr_init()) {
198                 fprintf(stderr,"FWP manager initialization failed.\n");
199                 exit(1);
200         }
201
202         fwp_mngr_main_loop();
203         
204         return 0;       
205