]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resmng/frm_generic.c
Contract broker updated to changed manager and scheduler
[frescor/frsh.git] / fres / resmng / frm_generic.c
1 #include <frm_generic.h>
2 #include <forb.h>
3 #include <ul_log.h>
4 #include <fres_sa_scenario.h>
5 #include <fcb.h>
6
7 UL_LOG_CUST(ulogd_frm);
8 ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
9
10 struct frm_data {
11         struct fres_sa_scenario *scenario;
12         const struct fres_res_manager *desc;
13 };
14
15 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
16 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
17
18 void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
19 {
20         struct fres_sa_contract *c;
21         fres_sa_scenario_for_each_contract(scenario, c) {
22                 if (c->new) {
23                         if (c->reserved) {
24                                 fres_contract_destroy(c->reserved);
25                         }
26                         c->reserved = c->new;
27                         c->new = NULL;
28                 }
29         }
30 }
31
32 void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
33 {
34         struct fres_sa_contract *c, *c_next;
35
36         /* Deleteion safe scenario traverse */
37         for(c=fres_sa_scenario_contract_first(scenario),
38                     c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
39             c;
40             c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
41                 if (c->new) {
42                         if (c->commited || c->reserved) {
43                                         fres_contract_destroy(c->new);
44                                         c->new = NULL;
45                                         c->contract = c->reserved ? c->reserved : c->commited;
46                         } else {
47                                 fres_sa_scenario_del_contract(scenario, c);
48                                 fres_sa_contract_destroy(c);
49                         }
50                 }
51         }
52 }
53
54
55 static CORBA_long reserve_contracts(fres_resource_manager obj,
56                                     const fres_contract_ptr_seq* contracts,
57                                     CORBA_Environment *ev)
58 {
59         struct frm_data *frm = object_to_frm(obj);
60         struct fres_sa_scenario *scenario = frm->scenario;
61         bool schedulable =false;
62         int i, ret;
63         struct fres_sa_contract *c;
64
65         ul_logmsg("reserve_contracts\n");
66         
67         for (i=0; i<contracts->_length; i++) {
68                 struct fres_contract *cin = contracts->_buffer[i];
69                 c = fres_sa_scenario_find_contract(scenario, &cin->id);
70                 if (!c) {
71                         c = fres_sa_contract_new();
72                         if (c) {
73                                 c->id = cin->id;
74                                 fres_sa_scenario_add_contract(scenario, c);
75                         }
76                 }
77                 if (!c) goto err;
78                 assert(c->new == NULL);
79                 c->new = fres_contract_duplicate(cin);
80                 c->contract = c->new;
81                 if (!c->new) goto err;
82         }
83
84         ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
85         if (ret) {
86                 ul_logerr("admission_test failed: %d\n", ret);
87                 goto err;
88         }
89
90         if (schedulable) {
91                 fres_sa_scenario_reserve_new(scenario);
92         } else {
93                 fres_sa_scenario_rollback(scenario);
94         }
95         return schedulable ? 0 : 1;
96 err:
97         fres_sa_scenario_rollback(scenario);
98         return -1;
99 }
100
101 static void commit_contracts(fres_resource_manager obj,
102                       const fres_contract_id_seq* ids,
103                       fres_contract_ptr_seq** contracts_with_scheduling_data,
104                       CORBA_Environment *ev)
105 {
106         struct frm_data *frm = object_to_frm(obj);
107         int i, num;
108         struct fres_sa_contract *c;
109         fres_contract_ptr_seq *contracts;
110
111         ul_logmsg("commit_contracts\n");
112
113         contracts = forb_malloc(sizeof(*contracts));
114         if (!contracts) {
115                 ev->major = FORB_EX_NO_MEMORY;
116                 goto err;
117         }
118         num = ids->_length;
119         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
120         CORBA_sequence_set_release(contracts, CORBA_TRUE);
121         contracts->_maximum = contracts->_length = num;
122
123         /* TODO: Add also the changed contracts (e.g. because of
124          * priorities). Question: How to recognize which contracts are
125          * changed because of this commit? */
126         for (i=0; i < num; i++) {
127                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
128                 if (c && c->reserved) {
129                         if (c->commited) fres_contract_destroy(c->commited);
130                         c->commited = c->reserved;
131                         c->reserved = NULL;
132                         contracts->_buffer[i] = fres_contract_duplicate(c->commited);
133                 } else {
134                         contracts->_buffer[i] = NULL;
135                         if (!c) ul_logerr("Commit to unknown contract ID\n");
136                         else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
137                 }
138         }
139         
140         *contracts_with_scheduling_data = contracts;
141 err:;
142 }
143
144 static void cancel_contracts(fres_resource_manager obj,
145                       const fres_contract_id_seq* ids,
146                       CORBA_Environment *ev)
147 {
148         int i;
149         struct frm_data *frm = object_to_frm(obj);
150
151         ul_logmsg("cancel_contracts\n");
152
153         for (i=0; i<ids->_length; i++) {
154                 struct fres_sa_contract *c;
155                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
156
157                 if (c) {
158                         fres_sa_scenario_del_contract(frm->scenario, c);
159                         fres_sa_contract_destroy(c);
160                 }
161         }
162 }
163
164
165 static const struct forb_fres_resource_manager_impl frm_impl = {
166         .reserve_contracts = reserve_contracts,
167         .commit_contracts  = commit_contracts,
168         .cancel_contracts  = cancel_contracts,
169 };
170
171 fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
172                                    forb_executor_t *executor,
173                                    const struct fres_res_manager *desc)
174 {
175         fres_contract_broker fcb;
176         fres_resource_desc res_desc;
177         fres_resource_manager frm;
178         struct forb_env env;
179         int ret;
180
181         memset(frm_data, 0, sizeof(*frm_data));
182         frm_data->desc = desc;
183         frm_data->scenario = fres_sa_scenario_new();
184         if (!frm_data->scenario) {
185                 save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
186                 goto err;
187         }
188
189         fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
190         if (!fcb) {
191                 save_errno(ul_logerr("Could not find contract broker\n"));
192                 goto err;
193         }
194
195         frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
196         if (!frm) {
197                 save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
198                 goto err_release_fcb;
199         }
200
201         ret = forb_executor_register_object(executor, frm);
202         if (ret) {
203                 save_errno(ul_logerr("forb_executor_register_object failed\n"));
204                 goto err_executor;
205         }
206
207         /* Register resource manager */
208         res_desc.manager = frm;
209         ret = fres_contract_broker_register_resource(fcb,
210                                                      desc->res_type,
211                                                      desc->res_id,
212                                                      &res_desc, &env);
213         if (forb_exception_occured(&env) || ret != 0) {
214                 save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
215                 goto err_register;
216         }
217
218         forb_object_release(fcb);
219         return frm;
220 err_register:   
221         forb_executor_unregister_object(executor, frm);
222 err_executor:
223         forb_object_release(frm);
224 err_release_fcb:        
225         forb_object_release(fcb);
226 err:
227         return NULL;
228 }
229
230 /** 
231  * Initializes and runs a generic resource manager. The only thing a
232  * caller has to supply is admission test function, which is passed in
233  * @a frm_data->admission_test.
234  * 
235  * @param orb FORB object used to communicate with other components.
236  * @param desc Description on the resource manager
237  * 
238  * @return 
239  */
240 int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
241 {
242         fres_resource_manager frm;
243         struct frm_data frm_data;
244         forb_executor_t executor;
245         int ret = -1;
246
247         /* Prepare executor before we register the resource manager
248          * with contract broker */
249         ret = forb_executor_init(&executor);
250         if (ret) {
251                 save_errno(ul_logerr("forb_executor_init failed\n"));
252                 goto err;
253         }
254
255         frm = frm_register(orb, &frm_data, &executor, desc);
256         if (!frm) {
257                 ret = -1;
258                 save_errno(ul_logerr("frm_register failed\n"));
259                 goto err_destroy_executor;
260         }
261                 
262         /* Start request processing */
263         ul_logmsg("Waiting for requests\n");
264         ret = forb_executor_run(&executor);
265         if (ret) goto err_release_frm;
266
267 err_release_frm:        
268         forb_object_release(frm);
269 err_destroy_executor:
270         forb_executor_destroy(&executor);
271 err:
272         return ret;
273 }