]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resmng/frm_generic.c
Merge branch 'master' of rtime.felk.cvut.cz:/var/git/frescor/frsh_forb.git
[frescor/frsh.git] / fres / resmng / frm_generic.c
1 #include <frm_generic.h>
2 #include <forb.h>
3 #include <ul_log.h>
4 #include <fres_sa_scenario.h>
5 #include <fcb.h>
6
7 UL_LOG_CUST(ulogd_frm);
8 ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
9
10 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
11 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
12
13 void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
14 {
15         struct fres_sa_contract *c;
16         fres_sa_scenario_for_each_contract(scenario, c) {
17                 if (c->new) {
18                         if (c->reserved) {
19                                 fres_contract_destroy(c->reserved);
20                         }
21                         c->reserved = c->new;
22                         c->new = NULL;
23                 }
24         }
25 }
26
27 void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
28 {
29         struct fres_sa_contract *c, *c_next;
30
31         /* Deleteion safe scenario traverse */
32         for(c=fres_sa_scenario_contract_first(scenario),
33                     c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
34             c;
35             c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
36                 if (c->new) {
37                         if (c->commited || c->reserved) {
38                                         fres_contract_destroy(c->new);
39                                         c->new = NULL;
40                                         c->contract = c->reserved ? c->reserved : c->commited;
41                         } else {
42                                 fres_sa_scenario_del_contract(scenario, c);
43                                 fres_sa_contract_destroy(c);
44                         }
45                 }
46         }
47 }
48
49
50 static CORBA_long reserve_contracts(fres_resource_manager obj,
51                                     const fres_contract_ptr_seq* contracts,
52                                     CORBA_Environment *ev)
53 {
54         struct frm_data *frm = object_to_frm(obj);
55         struct fres_sa_scenario *scenario = frm->scenario;
56         bool schedulable =false;
57         int i, ret;
58         struct fres_sa_contract *c;
59
60         ul_logmsg("reserve_contracts\n");
61         
62         for (i=0; i<contracts->_length; i++) {
63                 struct fres_contract *cin = contracts->_buffer[i];
64                 c = fres_sa_scenario_find_contract(scenario, &cin->id);
65                 if (!c) {
66                         c = fres_sa_contract_new();
67                         if (c) {
68                                 c->id = cin->id;
69                                 fres_sa_scenario_add_contract(scenario, c);
70                         }
71                 }
72                 if (!c) goto err;
73                 assert(c->new == NULL);
74                 c->new = fres_contract_duplicate(cin);
75                 c->contract = c->new;
76                 if (!c->new) goto err;
77         }
78
79         ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
80         if (ret) {
81                 ul_logerr("admission_test failed: %d\n", ret);
82                 goto err;
83         }
84
85         if (schedulable) {
86                 fres_sa_scenario_reserve_new(scenario);
87         } else {
88                 fres_sa_scenario_rollback(scenario);
89         }
90         return schedulable ? 0 : 1;
91 err:
92         fres_sa_scenario_rollback(scenario);
93         return -1;
94 }
95
96 static void commit_contracts(fres_resource_manager obj,
97                       const fres_contract_id_seq* ids,
98                       fres_contract_ptr_seq** contracts_with_scheduling_data,
99                       CORBA_Environment *ev)
100 {
101         struct frm_data *frm = object_to_frm(obj);
102         int i, num;
103         struct fres_sa_contract *c;
104         fres_contract_ptr_seq *contracts;
105
106         ul_logmsg("commit_contracts\n");
107
108         contracts = forb_malloc(sizeof(*contracts));
109         if (!contracts) {
110                 ev->major = FORB_EX_NO_MEMORY;
111                 goto err;
112         }
113         num = ids->_length;
114         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
115         CORBA_sequence_set_release(contracts, CORBA_TRUE);
116         contracts->_maximum = contracts->_length = num;
117
118         /* TODO: Add also the changed contracts (e.g. because of
119          * priorities). Question: How to recognize which contracts are
120          * changed because of this commit? */
121         for (i=0; i < num; i++) {
122                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
123                 if (c && c->reserved) {
124                         if (c->commited) fres_contract_destroy(c->commited);
125                         c->commited = c->reserved;
126                         c->reserved = NULL;
127                         contracts->_buffer[i] = fres_contract_duplicate(c->commited);
128                 } else {
129                         contracts->_buffer[i] = NULL;
130                         if (!c) ul_logerr("Commit to unknown contract ID\n");
131                         else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
132                 }
133         }
134         
135         *contracts_with_scheduling_data = contracts;
136 err:;
137 }
138
139 static void cancel_contracts(fres_resource_manager obj,
140                       const fres_contract_id_seq* ids,
141                       CORBA_Environment *ev)
142 {
143         int i;
144         struct frm_data *frm = object_to_frm(obj);
145
146         ul_logmsg("cancel_contracts\n");
147
148         for (i=0; i<ids->_length; i++) {
149                 struct fres_sa_contract *c;
150                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
151
152                 if (c) {
153                         fres_sa_scenario_del_contract(frm->scenario, c);
154                         fres_sa_contract_destroy(c);
155                 }
156         }
157 }
158
159
160 static const struct forb_fres_resource_manager_impl frm_impl = {
161         .reserve_contracts = reserve_contracts,
162         .commit_contracts  = commit_contracts,
163         .cancel_contracts  = cancel_contracts,
164 };
165
166 fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
167                                    forb_executor_t *executor,
168                                    const struct fres_res_manager *desc)
169 {
170         fres_contract_broker fcb;
171         fres_resource_desc res_desc;
172         fres_resource_manager frm;
173         struct forb_env env;
174         int ret;
175
176         memset(frm_data, 0, sizeof(*frm_data));
177         frm_data->desc = desc;
178         frm_data->scenario = fres_sa_scenario_new();
179         if (!frm_data->scenario) {
180                 save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
181                 goto err;
182         }
183
184         fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
185         if (!fcb) {
186                 save_errno(ul_logerr("Could not find contract broker\n"));
187                 goto err;
188         }
189
190         frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
191         if (!frm) {
192                 save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
193                 goto err_release_fcb;
194         }
195
196         ret = forb_executor_register_object(executor, frm);
197         if (ret) {
198                 save_errno(ul_logerr("forb_executor_register_object failed\n"));
199                 goto err_executor;
200         }
201
202         /* Register resource manager */
203         res_desc.manager = frm;
204         ret = fres_contract_broker_register_resource(fcb,
205                                                      desc->res_type,
206                                                      desc->res_id,
207                                                      &res_desc, &env);
208         if (forb_exception_occured(&env) || ret != 0) {
209                 save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
210                 goto err_register;
211         }
212
213         forb_object_release(fcb);
214         return frm;
215 err_register:   
216         forb_executor_unregister_object(executor, frm);
217 err_executor:
218         forb_object_release(frm);
219 err_release_fcb:        
220         forb_object_release(fcb);
221 err:
222         return NULL;
223 }
224
225 /** 
226  * Initializes and runs a generic resource manager. The only thing a
227  * caller has to supply is admission test function, which is passed in
228  * @a frm_data->admission_test.
229  * 
230  * @param orb FORB object used to communicate with other components.
231  * @param desc Description on the resource manager
232  * 
233  * @return 
234  */
235 int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
236 {
237         fres_resource_manager frm;
238         struct frm_data frm_data;
239         forb_executor_t executor;
240         int ret = -1;
241
242         /* Prepare executor before we register the resource manager
243          * with contract broker */
244         ret = forb_executor_init(&executor);
245         if (ret) {
246                 save_errno(ul_logerr("forb_executor_init failed\n"));
247                 goto err;
248         }
249
250         frm = frm_register(orb, &frm_data, &executor, desc);
251         if (!frm) {
252                 ret = -1;
253                 save_errno(ul_logerr("frm_register failed\n"));
254                 goto err_destroy_executor;
255         }
256                 
257         /* Start request processing */
258         ul_logmsg("Waiting for requests\n");
259         ret = forb_executor_run(&executor);
260         if (ret) goto err_release_frm;
261
262 err_release_frm:        
263         forb_object_release(frm);
264 err_destroy_executor:
265         forb_executor_destroy(&executor);
266 err:
267         return ret;
268 }