1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
61 #include <ul_gavlcust.h>
64 #include <forb/server_id.h>
65 #include <fres_contract.h>
66 #include "fcb_config.h"
68 #ifdef CONFIG_FCB_INET
69 #include <forb/proto_inet.h>
72 UL_LOG_CUST(ulogd_fcb);
73 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
77 * Resource identification
80 frsh_resource_type_t type;
81 frsh_resource_id_t id;
90 fres_resource_manager mng; /**< Object reference of the resource manager */
91 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
95 * Resource allocator registered in different nodes/applications
100 fres_resource_allocator ra;
103 struct fcb_contract {
104 fres_contract_id_t id;
106 struct fres_contract *user_contract;
110 * Contract broker data
113 gavl_cust_root_field_t resources; /**< Registered resources */
114 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
117 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
119 struct fcb_contract *fcb_contract;
121 fcb_contract = malloc(sizeof(*fcb_contract));
125 memset(fcb_contract, 0, sizeof(*fcb_contract));
126 fcb_contract->id = *id;
131 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
133 if (fcb_contract->user_contract) {
134 fres_contract_destroy(fcb_contract->user_contract);
139 static inline int res_key_cmp(const struct res_key *a,
140 const struct res_key *b)
142 if (a->type < b->type) {
144 } else if (a->type > b->type) {
146 } else if (a->id < b->id) {
148 } else if (a->id > b->id) {
155 /* Container for registered resources */
156 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
157 struct fcb /* cust_root_t */, \
158 struct resource/* cust_item_t */, \
159 struct res_key /* cust_key_t */, \
160 resources /* cust_root_node */, \
161 node /* cust_item_node */, \
162 key /* cust_item_key */, \
163 res_key_cmp /* cust_cmp_fnc */);
165 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
166 struct fcb /* cust_root_t */, \
167 struct resource/* cust_item_t */, \
168 struct res_key /* cust_key_t */, \
169 resources /* cust_root_node */, \
170 node /* cust_item_node */, \
171 key /* cust_item_key */, \
172 res_key_cmp /* cust_cmp_fnc */);
174 /* Container for allocators registered for a given resource */
175 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
176 struct resource /* cust_root_t */, \
177 struct res_alloc /* cust_item_t */, \
178 forb_server_id /* cust_key_t */, \
179 allocators /* cust_root_node */, \
180 node /* cust_item_node */, \
181 app /* cust_item_key */, \
182 fres_contract_id_cmp /* cust_cmp_fnc */);
184 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
185 struct resource /* cust_root_t */, \
186 struct res_alloc /* cust_item_t */, \
187 forb_server_id /* cust_key_t */, \
188 allocators /* cust_root_node */, \
189 node /* cust_item_node */, \
190 app /* cust_item_key */, \
191 forb_server_id_cmp /* cust_cmp_fnc */);
193 /* Container for negotiated contracts */
194 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
195 struct fcb /* cust_root_t */, \
196 struct fcb_contract /* cust_item_t */, \
197 fres_contract_id_t /* cust_key_t */, \
198 contracts /* cust_root_node */, \
199 node /* cust_item_node */, \
200 id /* cust_item_key */, \
201 fres_contract_id_cmp /* cust_cmp_fnc */);
204 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
205 struct fcb /* cust_root_t */, \
206 struct fcb_contract /* cust_item_t */, \
207 fres_contract_id_t /* cust_key_t */, \
208 contracts /* cust_root_node */, \
209 node /* cust_item_node */, \
210 id /* cust_item_key */, \
211 fres_contract_id_cmp /* cust_cmp_fnc */);
213 #include "fcb_contract_gavl.inc"
217 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
219 static struct resource *
220 get_resource(const struct fcb *fcb, const struct fres_contract *contract)
222 fres_block_resource *block_res;
223 struct res_key res_key;
224 struct resource *resource;
226 block_res = fres_contract_get_resource(contract);
228 ul_logerr("No resource specified\n");
231 res_key.type = block_res->resource_type;
232 res_key.id = block_res->resource_id;
234 resource = fcb_resource_find(fcb, &res_key);
236 ul_logerr("No resource manager for %d.%d registered\n",
237 res_key.type, res_key.id);
244 negotiate_contract(fres_contract_broker obj,
245 const fres_contract_ptr contract,
246 fres_contract_id_t* id,
247 CORBA_Environment *ev)
249 struct fcb *fcb = o2fcb(obj);
250 struct resource *resource;
251 struct res_alloc *ra;
254 fres_contract_ptr contract_seq_buf;
255 fres_contract_ptr_seq contracts;
256 fres_contract_ptr_seq *schedulable_contracts;
257 fres_contract_id_seq ids;
258 struct fcb_contract *fcb_contract;
260 resource = get_resource(fcb, contract);
262 ret = FRSH_ERR_RESOURCE_ID_INVALID;
266 forb_uuid_generate((forb_uuid_t *)id);
269 forb_get_req_source(obj, &app);
270 ra = fcb_alloc_find(resource, &app);
273 forb_server_id_to_string(str, &app, sizeof(str));
274 ul_logerr("No resource allocator found for %d.%d and %s\n",
275 resource->key.type, resource->key.id, str);
280 fcb_contract = fcb_contract_new(id);
281 fcb_contract->user_contract = fres_contract_duplicate(contract);
283 /* Reserve contract */
284 contracts._length = 1;
285 contract_seq_buf = contract;
286 contracts._buffer = &contract_seq_buf;
287 ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, ev);
288 if (forb_exception_occurred(ev) || ret < 0) {
289 goto err_free_fcb_contract;
292 ul_logmsg("Contract was not accepted\n");
293 goto err_free_fcb_contract;
296 /* Commit contract */
297 ids._length = ids._maximum = 1;
299 fres_resource_manager_commit_contracts(resource->mng, &ids,
300 &schedulable_contracts, ev);
301 if (forb_exception_occurred(ev)) {
302 ret = FRES_ERR_FORB_EXCEPTION;
303 goto err_free_fcb_contract;
307 ret = fres_resource_allocator_change_vreses(ra->ra, schedulable_contracts, ev);
308 if (CORBA_sequence_get_release(schedulable_contracts)) {
310 for (i=0; i<schedulable_contracts->_length; i++) {
311 fres_contract_destroy(schedulable_contracts->_buffer[i]);
313 forb_free(schedulable_contracts->_buffer);
315 forb_free(schedulable_contracts);
316 if (forb_exception_occurred(ev)) {
317 ret = FRES_ERR_FORB_EXCEPTION;
318 goto err_cancel_reservation;
321 ul_logmsg("VRes was not created\n");
322 goto err_cancel_reservation;
325 /* Store the negotiated contract for later reference */
326 fcb_contract_insert(fcb, fcb_contract);
330 err_cancel_reservation:
331 fres_resource_manager_cancel_contracts(resource->mng, &ids, ev);
332 err_free_fcb_contract:
333 fcb_contract_destroy(fcb_contract);
338 CORBA_long cancel_contract(fres_contract_broker obj,
339 const fres_contract_id_t* id,
340 CORBA_Environment *ev)
342 struct fcb *fcb = o2fcb(obj);
343 struct fcb_contract *fcb_contract;
344 struct resource *resource;
345 struct res_alloc *ra;
346 fres_contract_id_t id_buffer[1] = { *id };
347 fres_contract_id_seq ids;
351 fcb_contract = fcb_contract_find(fcb, &id_buffer[0]); /* FIXME: id */
353 ret = FRSH_ERR_NOT_CONTRACTED_VRES;
357 resource = get_resource(fcb, fcb_contract->user_contract);
359 ret = FRSH_ERR_RESOURCE_ID_INVALID;
363 forb_get_req_source(obj, &app);
364 ra = fcb_alloc_find(resource, &app);
367 forb_server_id_to_string(str, &app, sizeof(str));
368 ul_logerr("No resource allocator found for %d.%d and %s\n",
369 resource->key.type, resource->key.id, str);
374 ids._length = ids._maximum = 1;
375 ids._buffer = id_buffer;
376 ret = fres_resource_allocator_cancel_vreses(ra->ra, &ids, ev);
378 ul_logerr("Cannot cancel vres\n");
381 fres_resource_manager_cancel_contracts(resource->mng, &ids, ev);
388 CORBA_long register_resource(fres_contract_broker obj,
389 const frsh_resource_type_t restype,
390 const frsh_resource_id_t resid,
391 const fres_resource_desc *desc,
392 CORBA_Environment *ev)
394 struct fcb *fcb = o2fcb(obj);
395 struct resource *res, *res2;
397 res = malloc(sizeof(*res));
399 memset(res, 0, sizeof(*res));
400 res->key.type = restype;
402 res2 = fcb_resource_find(fcb, &res->key);
404 if (forb_object_is_stale(res2->mng)) {
405 ul_logmsg("Removing stale manager for resource %d.%d\n",
407 forb_object_release(res2->mng);
408 fcb_resource_delete(fcb, res2);
409 /* TODO: Delete also all allocators associated
410 * with this stale resource manager. */
414 ul_logerr("Resource manager %d.%d already registered\n",
419 res->mng = forb_object_duplicate(desc->manager);
421 fcb_alloc_init_root_field(res);
422 ul_logmsg("Registering manager for resource %d.%d\n",
424 fcb_resource_insert(fcb, res);
433 CORBA_long register_allocator(fres_contract_broker obj,
434 const frsh_resource_type_t restype,
435 const frsh_resource_id_t resid,
436 const fres_resource_allocator ra_obj,
437 CORBA_Environment *ev)
439 struct fcb *fcb = o2fcb(obj);
440 struct resource *res;
441 struct res_alloc *ra;
442 struct res_key resource;
443 forb_server_id server_id;
444 char server_id_str[40];
446 forb_get_server_id(ra_obj, &server_id);
447 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
448 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
449 restype, resid, server_id_str);
451 resource.type = restype;
453 res = fcb_resource_find(fcb, &resource);
455 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
459 ra = fcb_alloc_find(res, &server_id);
461 char *str = forb_object_to_string(ra_obj);
462 ul_logerr("Allocator from already registered (%s)\n",
467 ra = malloc(sizeof(*ra));
472 ra->ra = forb_object_duplicate(ra_obj);
473 fcb_alloc_insert(res, ra);
479 #ifdef CONFIG_FCB_INET
480 static int register_inet_port(forb_orb orb)
482 forb_port_t *port = forb_malloc(sizeof(*port));
484 struct in_addr listen_on;
488 memset(port, 0, sizeof(*port));
489 listen_on.s_addr = INADDR_ANY;
490 ret = forb_inet_port_init(&port->desc, listen_on);
491 if (ret) error(1, errno, "INET port initialization failed");
492 ret = forb_register_port(orb, port);
493 if (ret) error(1, errno /* TODO: FOSA errno */,
494 "INET port registration failed");
499 struct forb_fres_contract_broker_impl impl = {
500 .negotiate_contract = negotiate_contract,
501 .cancel_contract = cancel_contract,
502 .register_resource = register_resource,
503 .register_allocator = register_allocator,
506 int main(int argc, char *argv[])
510 fres_contract_broker fcb;
511 forb_executor_t executor;
514 orb = forb_init(&argc, &argv, "fcb");
515 if (!orb) error(1, errno, "FORB initialization failed");
517 #ifdef CONFIG_FCB_INET
518 ret = register_inet_port(orb);
519 if (ret) error(1, errno, "INET port registration failed");
522 fcb_resource_init_root_field(&fcb_data);
524 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
525 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
527 /* Prepare executor before we register the fcb reference,
528 * so that no reuqests are lost */
529 ret = forb_executor_init(&executor);
530 if (ret) error(1, errno, "forb_executor_init failed");
532 ret = forb_executor_register_object(&executor, fcb);
533 if (ret) error(1, errno, "forb_executor_register_object failed");
535 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
536 if (ret) error(1, errno, "forb_register_reference() failed");
538 ul_logmsg("Waiting for requests\n");
539 ret = forb_executor_run(&executor);
540 if (ret) error(1, errno, "forb_executor_run failed");