]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resmng/frm_generic.c
fc57081138e103fa680b27a59c282eb54faf14bc
[frescor/frsh.git] / fres / resmng / frm_generic.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   frm_generic.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Tue Nov 11 08:34:34 2008
51  * 
52  * @brief  Generic resource manager implementation.
53  * 
54  * 
55  */
56 #include <frm_generic.h>
57 #include <forb.h>
58 #include <ul_log.h>
59 #include <ul_logreg.h>
60 #include <fres_sa_scenario.h>
61 #include <fcb.h>
62
63 UL_LOG_CUST(ulogd_frm_generic);
64 ul_log_domain_t ulogd_frm_generic = {UL_LOGL_DEB, "frm"};
65 UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(frm_generic_logreg_domains, ulogd_frm_generic);
66
67 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
68 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
69
70 void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
71 {
72         struct fres_sa_contract *c;
73         fres_sa_scenario_for_each_contract(scenario, c) {
74                 if (c->new) {
75                         if (c->reserved) {
76                                 fres_contract_destroy(c->reserved);
77                         }
78                         c->reserved = c->new;
79                         c->new = NULL;
80                 }
81         }
82 }
83
84 void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
85 {
86         struct fres_sa_contract *c, *c_next;
87
88         /* Deleteion safe scenario traverse */
89         for(c=fres_sa_scenario_contract_first(scenario),
90                     c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
91             c;
92             c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
93                 if (c->new) {
94                         if (c->committed || c->reserved) {
95                                         fres_contract_destroy(c->new);
96                                         c->new = NULL;
97                                         c->contract = c->reserved ? c->reserved : c->committed;
98                         } else {
99                                 fres_sa_scenario_del_contract(scenario, c);
100                                 fres_sa_contract_destroy(c);
101                         }
102                 }
103         }
104 }
105
106
107 static CORBA_long reserve_contracts(fres_resource_manager obj,
108                                     const fres_contract_ptr_seq* contracts,
109                                     CORBA_Environment *ev)
110 {
111         struct frm_data *frm = object_to_frm(obj);
112         struct fres_sa_scenario *scenario = frm->scenario;
113         bool schedulable =false;
114         int i, ret;
115         struct fres_sa_contract *c;
116
117         ul_logmsg("reserve_contracts\n");
118         
119         for (i=0; i<contracts->_length; i++) {
120                 struct fres_contract *cin = contracts->_buffer[i];
121
122                 c = fres_sa_scenario_find_contract(scenario, &cin->id);
123                 if (!c) {
124                         c = fres_sa_contract_new();
125                         if (c) {
126                                 c->id = cin->id;
127                                 fres_sa_scenario_add_contract(scenario, c);
128                         }
129                 }
130                 if (!c) goto err;
131
132                 {
133                         char id[40];
134                         char *operation;
135                         fres_contract_id_to_string(id, &c->id, sizeof(id));
136                         if (fres_contract_get_num_blocks(cin) == 0) operation = "cancelation";
137                         else if (c->committed) operation = "renegotiation";
138                         else if (c->reserved) operation = "negotiation (already reserved)";
139                         else operation = "negotiation";
140                         ul_logdeb("  reserve contract %s %s\n", id, operation);
141                 }
142                 assert(c->new == NULL);
143                 c->new = fres_contract_duplicate(cin);
144                 c->contract = c->new;
145                 if (!c->new) goto err;
146         }
147
148         ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
149         if (ret) {
150                 ul_logmsg("admission_test failed: %d\n", ret);
151                 goto err;
152         }
153
154         if (schedulable) {
155                 fres_sa_scenario_reserve_new(scenario);
156         } else {
157                 fres_sa_scenario_rollback(scenario);
158         }
159         return schedulable ? 0 : 1;
160 err:
161         fres_sa_scenario_rollback(scenario);
162         return -1;
163 }
164
165 static void commit_contracts(fres_resource_manager obj,
166                              const fres_contract_id_seq* ids,
167                              fres_contract_ptr_seq** contracts_with_scheduling_data,
168                              CORBA_Environment *ev)
169 {
170         struct frm_data *frm = object_to_frm(obj);
171         int i, num;
172         struct fres_sa_contract *c;
173         fres_contract_ptr_seq *contracts;
174
175         ul_logmsg("commit_contracts\n");
176
177         contracts = forb_malloc(sizeof(*contracts));
178         if (!contracts) {
179                 ev->major = FORB_EX_NO_MEMORY;
180                 goto err;
181         }
182         num = ids->_length;
183         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
184         CORBA_sequence_set_release(contracts, CORBA_TRUE);
185         contracts->_maximum = contracts->_length = num;
186
187         /* TODO: Add also the changed contracts (e.g. because of
188          * priorities). Question: How to recognize which contracts are
189          * changed because of this commit? */
190         for (i=0; i < num; i++) {
191                 char id[40];
192                 const char *operation;
193                 fres_contract_id_to_string(id, &ids->_buffer[i], sizeof(id));
194                 
195                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
196                 if (c && c->reserved) {
197                         if (fres_contract_get_num_blocks(c->reserved) == 0) {
198                                 /* Cancelation request */
199                                 operation = "cancelation";
200                                 contracts->_buffer[i] = c->reserved;
201                                 c->reserved = NULL;
202                                 fres_sa_scenario_del_contract(frm->scenario, c);
203                                 fres_sa_contract_destroy(c);
204                         } else {
205                                 /* Normal reservation */
206                                 if (c->committed) {
207                                         operation = "renegotiation";
208                                         fres_contract_destroy(c->committed);
209                                 } else {
210                                         operation = "negotiation";
211                                 }
212                                 c->committed = c->reserved;
213                                 c->reserved = NULL;
214                                 contracts->_buffer[i] = fres_contract_duplicate(c->committed);
215                         }
216                 } else {
217                         operation = "error";
218                         contracts->_buffer[i] = NULL;
219                         if (!c) ul_logerr("Commit to unknown contract ID\n");
220                         else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
221                 }
222
223                 ul_logdeb("  commit contract %s %s\n", id, operation);
224         }
225
226         *contracts_with_scheduling_data = contracts;
227 err:;
228 }
229
230 static void cancel_reservations(fres_resource_manager obj,
231                                 const fres_contract_id_seq* ids,
232                                 CORBA_Environment *ev)
233 {
234         int i;
235         struct frm_data *frm = object_to_frm(obj);
236
237         ul_logmsg("cancel_reservations\n");
238
239         for (i=0; i<ids->_length; i++) {
240                 struct fres_sa_contract *c;
241                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
242
243                 if (c->reserved) {
244                         if (c->committed) {
245                                 /* Roll-back renegotiation */
246                                 fres_contract_destroy(c->reserved);
247                                 c->reserved = NULL;
248                                 c->contract = c->committed;
249                         } else {
250                                 /* Roll-back negotiation */
251                                 fres_sa_scenario_del_contract(frm->scenario, c);
252                                 fres_sa_contract_destroy(c);
253                         }
254                 }
255         }
256 }
257
258
259 static void get_contracts(fres_resource_manager obj,
260                           fres_contract_ptr_seq** contracts_out,
261                           CORBA_long* utilization, CORBA_Environment *ev)
262 {
263         struct frm_data *frm = object_to_frm(obj);
264         struct fres_sa_contract *c;
265         fres_contract_ptr_seq *contracts;
266
267         contracts = forb_malloc(sizeof(*contracts));
268         if (!contracts) {
269                 ev->major = FORB_EX_NO_MEMORY;
270                 goto err;
271         }
272         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(frm->scenario->num_contracts);
273         CORBA_sequence_set_release(contracts, CORBA_TRUE);
274         contracts->_maximum = frm->scenario->num_contracts;
275         contracts->_length = 0;
276
277         fres_sa_scenario_for_each_contract(frm->scenario, c) {
278                 if (c->committed) {
279                         int i = contracts->_length;
280                         contracts->_buffer[i] = fres_contract_duplicate(c->committed);
281                         contracts->_length++;
282                 }
283                                         
284         }
285         *contracts_out = contracts;
286         *utilization = frm->scenario->utilization;
287 err:;
288 }
289
290
291 static const struct forb_fres_resource_manager_impl frm_impl = {
292         .reserve_contracts = reserve_contracts,
293         .commit_contracts  = commit_contracts,
294         .cancel_reservations = cancel_reservations,
295         .get_contracts  = get_contracts,
296 };
297
298 /** 
299  * Initializes generic resource manager. The only thing a
300  * caller has to supply is admission test function, which is passed in
301  * @a frm_data->admission_test.
302  * 
303  * @param orb FORB object used to communicate with other components.
304  * @param[out] frm_data Pointer to frm_data structure
305  * @param executor Executor used for this resource manager.
306  * @param desc Description on the resource manager
307  * 
308  * @return Resource manager object reference of NULL in case of error.
309  */
310 fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
311                                    forb_executor_t *executor,
312                                    const struct fres_res_manager *desc)
313 {
314         fres_contract_broker fcb;
315         fres_resource_desc res_desc;
316         fres_resource_manager frm;
317         struct forb_env env;
318         int ret;
319
320         memset(frm_data, 0, sizeof(*frm_data));
321         frm_data->desc = desc;
322         frm_data->scenario = fres_sa_scenario_new();
323         if (!frm_data->scenario) {
324                 save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
325                 goto err;
326         }
327
328         fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
329         if (!fcb) {
330                 save_errno(ul_logerr("Could not find contract broker\n"));
331                 goto err;
332         }
333
334         frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
335         if (!frm) {
336                 save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
337                 goto err_release_fcb;
338         }
339
340         ret = forb_executor_register_object(executor, frm);
341         if (ret) {
342                 save_errno(ul_logerr("forb_executor_register_object failed\n"));
343                 goto err_executor;
344         }
345
346         /* Register resource manager */
347         res_desc.manager = frm;
348         res_desc.name = desc->name;
349         ret = fres_contract_broker_register_resource(fcb,
350                                                      desc->res_type,
351                                                      desc->res_id,
352                                                      &res_desc, &env);
353         if (forb_exception_occurred(&env) || ret != 0) {
354                 save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
355                 goto err_register;
356         }
357
358         forb_object_release(fcb);
359         return frm;
360 err_register:   
361         forb_executor_unregister_object(executor, frm);
362 err_executor:
363         forb_object_release(frm);
364 err_release_fcb:        
365         forb_object_release(fcb);
366 err:
367         return NULL;
368 }
369
370 /** 
371  * Initializes and runs a generic resource manager. The only thing a
372  * caller has to supply is admission test function, which is passed in
373  * @a frm_data->admission_test.
374  * 
375  * @param orb FORB object used to communicate with other components.
376  * @param desc Description on the resource manager
377  * 
378  * @return 
379  */
380 int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
381 {
382         fres_resource_manager frm;
383         struct frm_data frm_data;
384         forb_executor_t executor;
385         int ret = -1;
386
387         /* Prepare executor before we register the resource manager
388          * with contract broker */
389         ret = forb_executor_init(&executor);
390         if (ret) {
391                 save_errno(ul_logerr("forb_executor_init failed\n"));
392                 goto err;
393         }
394
395         frm = frm_register(orb, &frm_data, &executor, desc);
396         if (!frm) {
397                 ret = -1;
398                 save_errno(ul_logerr("frm_register failed\n"));
399                 goto err_destroy_executor;
400         }
401                 
402         /* Start request processing */
403         ul_logmsg("Waiting for requests\n");
404         ret = forb_executor_run(&executor);
405         if (ret) goto err_release_frm;
406
407 err_release_frm:        
408         forb_object_release(frm);
409 err_destroy_executor:
410         forb_executor_destroy(&executor);
411 err:
412         return ret;
413 }