]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resmng/frm_generic.c
Merge branch 'master' of rtime.felk.cvut.cz:/var/git/frescor/frsh_forb.git
[frescor/frsh.git] / fres / resmng / frm_generic.c
1 /**
2  * @file   frm_generic.c
3  * @author Michal Sojka <sojkam1@fel.cvut.cz>
4  * @date   Tue Nov 11 08:34:34 2008
5  * 
6  * @brief  Generic resource manager implementation.
7  * 
8  * 
9  */
10 #include <frm_generic.h>
11 #include <forb.h>
12 #include <ul_log.h>
13 #include <fres_sa_scenario.h>
14 #include <fcb.h>
15
16 UL_LOG_CUST(ulogd_frm);
17 ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
18
19 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
20 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
21
22 void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
23 {
24         struct fres_sa_contract *c;
25         fres_sa_scenario_for_each_contract(scenario, c) {
26                 if (c->new) {
27                         if (c->reserved) {
28                                 fres_contract_destroy(c->reserved);
29                         }
30                         c->reserved = c->new;
31                         c->new = NULL;
32                 }
33         }
34 }
35
36 void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
37 {
38         struct fres_sa_contract *c, *c_next;
39
40         /* Deleteion safe scenario traverse */
41         for(c=fres_sa_scenario_contract_first(scenario),
42                     c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
43             c;
44             c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
45                 if (c->new) {
46                         if (c->committed || c->reserved) {
47                                         fres_contract_destroy(c->new);
48                                         c->new = NULL;
49                                         c->contract = c->reserved ? c->reserved : c->committed;
50                         } else {
51                                 fres_sa_scenario_del_contract(scenario, c);
52                                 fres_sa_contract_destroy(c);
53                         }
54                 }
55         }
56 }
57
58
59 static CORBA_long reserve_contracts(fres_resource_manager obj,
60                                     const fres_contract_ptr_seq* contracts,
61                                     CORBA_Environment *ev)
62 {
63         struct frm_data *frm = object_to_frm(obj);
64         struct fres_sa_scenario *scenario = frm->scenario;
65         bool schedulable =false;
66         int i, ret;
67         struct fres_sa_contract *c;
68
69         ul_logmsg("reserve_contracts\n");
70         
71         for (i=0; i<contracts->_length; i++) {
72                 struct fres_contract *cin = contracts->_buffer[i];
73                 c = fres_sa_scenario_find_contract(scenario, &cin->id);
74                 if (!c) {
75                         c = fres_sa_contract_new();
76                         if (c) {
77                                 c->id = cin->id;
78                                 fres_sa_scenario_add_contract(scenario, c);
79                         }
80                 }
81                 if (!c) goto err;
82                 assert(c->new == NULL);
83                 c->new = fres_contract_duplicate(cin);
84                 c->contract = c->new;
85                 if (!c->new) goto err;
86         }
87
88         ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
89         if (ret) {
90                 ul_logerr("admission_test failed: %d\n", ret);
91                 goto err;
92         }
93
94         if (schedulable) {
95                 fres_sa_scenario_reserve_new(scenario);
96         } else {
97                 fres_sa_scenario_rollback(scenario);
98         }
99         return schedulable ? 0 : 1;
100 err:
101         fres_sa_scenario_rollback(scenario);
102         return -1;
103 }
104
105 static void commit_contracts(fres_resource_manager obj,
106                       const fres_contract_id_seq* ids,
107                       fres_contract_ptr_seq** contracts_with_scheduling_data,
108                       CORBA_Environment *ev)
109 {
110         struct frm_data *frm = object_to_frm(obj);
111         int i, num;
112         struct fres_sa_contract *c;
113         fres_contract_ptr_seq *contracts;
114
115         ul_logmsg("commit_contracts\n");
116
117         contracts = forb_malloc(sizeof(*contracts));
118         if (!contracts) {
119                 ev->major = FORB_EX_NO_MEMORY;
120                 goto err;
121         }
122         num = ids->_length;
123         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
124         CORBA_sequence_set_release(contracts, CORBA_TRUE);
125         contracts->_maximum = contracts->_length = num;
126
127         /* TODO: Add also the changed contracts (e.g. because of
128          * priorities). Question: How to recognize which contracts are
129          * changed because of this commit? */
130         for (i=0; i < num; i++) {
131                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
132                 if (c && c->reserved) {
133                         if (c->committed) fres_contract_destroy(c->committed);
134                         c->committed = c->reserved;
135                         c->reserved = NULL;
136                         contracts->_buffer[i] = fres_contract_duplicate(c->committed);
137                 } else {
138                         contracts->_buffer[i] = NULL;
139                         if (!c) ul_logerr("Commit to unknown contract ID\n");
140                         else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
141                 }
142         }
143         
144         *contracts_with_scheduling_data = contracts;
145 err:;
146 }
147
148 static void cancel_contracts(fres_resource_manager obj,
149                       const fres_contract_id_seq* ids,
150                       CORBA_Environment *ev)
151 {
152         int i;
153         struct frm_data *frm = object_to_frm(obj);
154
155         ul_logmsg("cancel_contracts\n");
156
157         for (i=0; i<ids->_length; i++) {
158                 struct fres_sa_contract *c;
159                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
160
161                 if (c) {
162                         fres_sa_scenario_del_contract(frm->scenario, c);
163                         fres_sa_contract_destroy(c);
164                 }
165         }
166 }
167
168
169 static const struct forb_fres_resource_manager_impl frm_impl = {
170         .reserve_contracts = reserve_contracts,
171         .commit_contracts  = commit_contracts,
172         .cancel_contracts  = cancel_contracts,
173 };
174
175 /** 
176  * Initializes generic resource manager. The only thing a
177  * caller has to supply is admission test function, which is passed in
178  * @a frm_data->admission_test.
179  * 
180  * @param orb FORB object used to communicate with other components.
181  * @param[out] frm_data Pointer to frm_data structure
182  * @param executor Executor used for this resource manager.
183  * @param desc Description on the resource manager
184  * 
185  * @return Resource manager object reference of NULL in case of error.
186  */
187 fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
188                                    forb_executor_t *executor,
189                                    const struct fres_res_manager *desc)
190 {
191         fres_contract_broker fcb;
192         fres_resource_desc res_desc;
193         fres_resource_manager frm;
194         struct forb_env env;
195         int ret;
196
197         memset(frm_data, 0, sizeof(*frm_data));
198         frm_data->desc = desc;
199         frm_data->scenario = fres_sa_scenario_new();
200         if (!frm_data->scenario) {
201                 save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
202                 goto err;
203         }
204
205         fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
206         if (!fcb) {
207                 save_errno(ul_logerr("Could not find contract broker\n"));
208                 goto err;
209         }
210
211         frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
212         if (!frm) {
213                 save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
214                 goto err_release_fcb;
215         }
216
217         ret = forb_executor_register_object(executor, frm);
218         if (ret) {
219                 save_errno(ul_logerr("forb_executor_register_object failed\n"));
220                 goto err_executor;
221         }
222
223         /* Register resource manager */
224         res_desc.manager = frm;
225         ret = fres_contract_broker_register_resource(fcb,
226                                                      desc->res_type,
227                                                      desc->res_id,
228                                                      &res_desc, &env);
229         if (forb_exception_occurred(&env) || ret != 0) {
230                 save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
231                 goto err_register;
232         }
233
234         forb_object_release(fcb);
235         return frm;
236 err_register:   
237         forb_executor_unregister_object(executor, frm);
238 err_executor:
239         forb_object_release(frm);
240 err_release_fcb:        
241         forb_object_release(fcb);
242 err:
243         return NULL;
244 }
245
246 /** 
247  * Initializes and runs a generic resource manager. The only thing a
248  * caller has to supply is admission test function, which is passed in
249  * @a frm_data->admission_test.
250  * 
251  * @param orb FORB object used to communicate with other components.
252  * @param desc Description on the resource manager
253  * 
254  * @return 
255  */
256 int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
257 {
258         fres_resource_manager frm;
259         struct frm_data frm_data;
260         forb_executor_t executor;
261         int ret = -1;
262
263         /* Prepare executor before we register the resource manager
264          * with contract broker */
265         ret = forb_executor_init(&executor);
266         if (ret) {
267                 save_errno(ul_logerr("forb_executor_init failed\n"));
268                 goto err;
269         }
270
271         frm = frm_register(orb, &frm_data, &executor, desc);
272         if (!frm) {
273                 ret = -1;
274                 save_errno(ul_logerr("frm_register failed\n"));
275                 goto err_destroy_executor;
276         }
277                 
278         /* Start request processing */
279         ul_logmsg("Waiting for requests\n");
280         ret = forb_executor_run(&executor);
281         if (ret) goto err_release_frm;
282
283 err_release_frm:        
284         forb_object_release(frm);
285 err_destroy_executor:
286         forb_executor_destroy(&executor);
287 err:
288         return ret;
289 }