]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resmng/frm_generic.c
Implemented support for contract renegotiation
[frescor/frsh.git] / fres / resmng / frm_generic.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   frm_generic.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Tue Nov 11 08:34:34 2008
51  * 
52  * @brief  Generic resource manager implementation.
53  * 
54  * 
55  */
56 #include <frm_generic.h>
57 #include <forb.h>
58 #include <ul_log.h>
59 #include <fres_sa_scenario.h>
60 #include <fcb.h>
61
62 UL_LOG_CUST(ulogd_frm_generic);
63 ul_log_domain_t ulogd_frm_generic = {UL_LOGL_DEB, "frm"};
64
65 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
66 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
67
68 void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
69 {
70         struct fres_sa_contract *c;
71         fres_sa_scenario_for_each_contract(scenario, c) {
72                 if (c->new) {
73                         if (c->reserved) {
74                                 fres_contract_destroy(c->reserved);
75                         }
76                         c->reserved = c->new;
77                         c->new = NULL;
78                 }
79         }
80 }
81
82 void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
83 {
84         struct fres_sa_contract *c, *c_next;
85
86         /* Deleteion safe scenario traverse */
87         for(c=fres_sa_scenario_contract_first(scenario),
88                     c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
89             c;
90             c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
91                 if (c->new) {
92                         if (c->committed || c->reserved) {
93                                         fres_contract_destroy(c->new);
94                                         c->new = NULL;
95                                         c->contract = c->reserved ? c->reserved : c->committed;
96                         } else {
97                                 fres_sa_scenario_del_contract(scenario, c);
98                                 fres_sa_contract_destroy(c);
99                         }
100                 }
101         }
102 }
103
104
105 static CORBA_long reserve_contracts(fres_resource_manager obj,
106                                     const fres_contract_ptr_seq* contracts,
107                                     CORBA_Environment *ev)
108 {
109         struct frm_data *frm = object_to_frm(obj);
110         struct fres_sa_scenario *scenario = frm->scenario;
111         bool schedulable =false;
112         int i, ret;
113         struct fres_sa_contract *c;
114
115         ul_logmsg("reserve_contracts\n");
116         
117         for (i=0; i<contracts->_length; i++) {
118                 struct fres_contract *cin = contracts->_buffer[i];
119
120                 c = fres_sa_scenario_find_contract(scenario, &cin->id);
121                 if (!c) {
122                         c = fres_sa_contract_new();
123                         if (c) {
124                                 c->id = cin->id;
125                                 fres_sa_scenario_add_contract(scenario, c);
126                         }
127                 }
128                 if (!c) goto err;
129
130                 {
131                         char id[40];
132                         char *operation;
133                         fres_contract_id_to_string(id, &c->id, sizeof(id));
134                         if (fres_contract_get_num_blocks(cin) == 0) operation = "cancelation";
135                         else if (c->committed) operation = "renegotiation";
136                         else if (c->reserved) operation = "negotiation (already reserved)";
137                         else operation = "negotiation";
138                         ul_logdeb("  reserve contract %s %s\n", id, operation);
139                 }
140                 assert(c->new == NULL);
141                 c->new = fres_contract_duplicate(cin);
142                 c->contract = c->new;
143                 if (!c->new) goto err;
144         }
145
146         ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
147         if (ret) {
148                 ul_logmsg("admission_test failed: %d\n", ret);
149                 goto err;
150         }
151
152         if (schedulable) {
153                 fres_sa_scenario_reserve_new(scenario);
154         } else {
155                 fres_sa_scenario_rollback(scenario);
156         }
157         return schedulable ? 0 : 1;
158 err:
159         fres_sa_scenario_rollback(scenario);
160         return -1;
161 }
162
163 static void commit_contracts(fres_resource_manager obj,
164                              const fres_contract_id_seq* ids,
165                              fres_contract_ptr_seq** contracts_with_scheduling_data,
166                              CORBA_Environment *ev)
167 {
168         struct frm_data *frm = object_to_frm(obj);
169         int i, num;
170         struct fres_sa_contract *c;
171         fres_contract_ptr_seq *contracts;
172
173         ul_logmsg("commit_contracts\n");
174
175         contracts = forb_malloc(sizeof(*contracts));
176         if (!contracts) {
177                 ev->major = FORB_EX_NO_MEMORY;
178                 goto err;
179         }
180         num = ids->_length;
181         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
182         CORBA_sequence_set_release(contracts, CORBA_TRUE);
183         contracts->_maximum = contracts->_length = num;
184
185         /* TODO: Add also the changed contracts (e.g. because of
186          * priorities). Question: How to recognize which contracts are
187          * changed because of this commit? */
188         for (i=0; i < num; i++) {
189                 char id[40];
190                 const char *operation;
191                 fres_contract_id_to_string(id, &ids->_buffer[i], sizeof(id));
192                 
193                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
194                 if (c && c->reserved) {
195                         if (fres_contract_get_num_blocks(c->reserved) == 0) {
196                                 /* Cancelation request */
197                                 operation = "cancelation";
198                                 contracts->_buffer[i] = c->reserved;
199                                 c->reserved = NULL;
200                                 fres_sa_scenario_del_contract(frm->scenario, c);
201                                 fres_sa_contract_destroy(c);
202                         } else {
203                                 /* Normal reservation */
204                                 if (c->committed) {
205                                         operation = "renegotiation";
206                                         fres_contract_destroy(c->committed);
207                                 } else {
208                                         operation = "negotiation";
209                                 }
210                                 c->committed = c->reserved;
211                                 c->reserved = NULL;
212                                 contracts->_buffer[i] = fres_contract_duplicate(c->committed);
213                         }
214                 } else {
215                         operation = "error";
216                         contracts->_buffer[i] = NULL;
217                         if (!c) ul_logerr("Commit to unknown contract ID\n");
218                         else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
219                 }
220
221                 ul_logdeb("  commit contract %s %s\n", id, operation);
222         }
223
224         *contracts_with_scheduling_data = contracts;
225 err:;
226 }
227
228 static void cancel_reservations(fres_resource_manager obj,
229                                 const fres_contract_id_seq* ids,
230                                 CORBA_Environment *ev)
231 {
232         int i;
233         struct frm_data *frm = object_to_frm(obj);
234
235         ul_logmsg("cancel_reservations\n");
236
237         for (i=0; i<ids->_length; i++) {
238                 struct fres_sa_contract *c;
239                 c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
240
241                 if (c->reserved) {
242                         if (c->committed) {
243                                 /* Roll-back renegotiation */
244                                 fres_contract_destroy(c->reserved);
245                                 c->reserved = NULL;
246                                 c->contract = c->committed;
247                         } else {
248                                 /* Roll-back negotiation */
249                                 fres_sa_scenario_del_contract(frm->scenario, c);
250                                 fres_sa_contract_destroy(c);
251                         }
252                 }
253         }
254 }
255
256
257 static void get_contracts(fres_resource_manager obj,
258                           fres_contract_ptr_seq** contracts_out,
259                           CORBA_long* utilization, CORBA_Environment *ev)
260 {
261         struct frm_data *frm = object_to_frm(obj);
262         struct fres_sa_contract *c;
263         fres_contract_ptr_seq *contracts;
264
265         contracts = forb_malloc(sizeof(*contracts));
266         if (!contracts) {
267                 ev->major = FORB_EX_NO_MEMORY;
268                 goto err;
269         }
270         contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(frm->scenario->num_contracts);
271         CORBA_sequence_set_release(contracts, CORBA_TRUE);
272         contracts->_maximum = frm->scenario->num_contracts;
273         contracts->_length = 0;
274
275         fres_sa_scenario_for_each_contract(frm->scenario, c) {
276                 if (c->committed) {
277                         int i = contracts->_length;
278                         contracts->_buffer[i] = fres_contract_duplicate(c->committed);
279                         contracts->_length++;
280                 }
281                                         
282         }
283         *contracts_out = contracts;
284         *utilization = frm->scenario->utilization;
285 err:;
286 }
287
288
289 static const struct forb_fres_resource_manager_impl frm_impl = {
290         .reserve_contracts = reserve_contracts,
291         .commit_contracts  = commit_contracts,
292         .cancel_reservations = cancel_reservations,
293         .get_contracts  = get_contracts,
294 };
295
296 /** 
297  * Initializes generic resource manager. The only thing a
298  * caller has to supply is admission test function, which is passed in
299  * @a frm_data->admission_test.
300  * 
301  * @param orb FORB object used to communicate with other components.
302  * @param[out] frm_data Pointer to frm_data structure
303  * @param executor Executor used for this resource manager.
304  * @param desc Description on the resource manager
305  * 
306  * @return Resource manager object reference of NULL in case of error.
307  */
308 fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
309                                    forb_executor_t *executor,
310                                    const struct fres_res_manager *desc)
311 {
312         fres_contract_broker fcb;
313         fres_resource_desc res_desc;
314         fres_resource_manager frm;
315         struct forb_env env;
316         int ret;
317
318         memset(frm_data, 0, sizeof(*frm_data));
319         frm_data->desc = desc;
320         frm_data->scenario = fres_sa_scenario_new();
321         if (!frm_data->scenario) {
322                 save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
323                 goto err;
324         }
325
326         fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
327         if (!fcb) {
328                 save_errno(ul_logerr("Could not find contract broker\n"));
329                 goto err;
330         }
331
332         frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
333         if (!frm) {
334                 save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
335                 goto err_release_fcb;
336         }
337
338         ret = forb_executor_register_object(executor, frm);
339         if (ret) {
340                 save_errno(ul_logerr("forb_executor_register_object failed\n"));
341                 goto err_executor;
342         }
343
344         /* Register resource manager */
345         res_desc.manager = frm;
346         ret = fres_contract_broker_register_resource(fcb,
347                                                      desc->res_type,
348                                                      desc->res_id,
349                                                      &res_desc, &env);
350         if (forb_exception_occurred(&env) || ret != 0) {
351                 save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
352                 goto err_register;
353         }
354
355         forb_object_release(fcb);
356         return frm;
357 err_register:   
358         forb_executor_unregister_object(executor, frm);
359 err_executor:
360         forb_object_release(frm);
361 err_release_fcb:        
362         forb_object_release(fcb);
363 err:
364         return NULL;
365 }
366
367 /** 
368  * Initializes and runs a generic resource manager. The only thing a
369  * caller has to supply is admission test function, which is passed in
370  * @a frm_data->admission_test.
371  * 
372  * @param orb FORB object used to communicate with other components.
373  * @param desc Description on the resource manager
374  * 
375  * @return 
376  */
377 int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
378 {
379         fres_resource_manager frm;
380         struct frm_data frm_data;
381         forb_executor_t executor;
382         int ret = -1;
383
384         /* Prepare executor before we register the resource manager
385          * with contract broker */
386         ret = forb_executor_init(&executor);
387         if (ret) {
388                 save_errno(ul_logerr("forb_executor_init failed\n"));
389                 goto err;
390         }
391
392         frm = frm_register(orb, &frm_data, &executor, desc);
393         if (!frm) {
394                 ret = -1;
395                 save_errno(ul_logerr("frm_register failed\n"));
396                 goto err_destroy_executor;
397         }
398                 
399         /* Start request processing */
400         ul_logmsg("Waiting for requests\n");
401         ret = forb_executor_run(&executor);
402         if (ret) goto err_release_frm;
403
404 err_release_frm:        
405         forb_object_release(frm);
406 err_destroy_executor:
407         forb_executor_destroy(&executor);
408 err:
409         return ret;
410 }