1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
61 #include <ul_gavlcust.h>
64 #include <forb/server_id.h>
65 #include <fres_contract.h>
66 #include "fcb_config.h"
68 #ifdef CONFIG_FCB_INET
69 #include <forb/proto_inet.h>
72 UL_LOG_CUST(ulogd_fcb);
73 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
77 * Resource identification
80 frsh_resource_type_t type;
81 frsh_resource_id_t id;
90 fres_resource_manager mng; /**< Object reference of the resource manager */
91 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
95 * Resource allocator registered in different nodes/applications
100 fres_resource_allocator ra;
103 struct fcb_contract {
104 fres_contract_id_t id;
106 struct fres_contract *user_contract;
110 * Contract broker data
113 gavl_cust_root_field_t resources; /**< Registered resources */
114 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
117 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
119 struct fcb_contract *fcb_contract;
121 fcb_contract = malloc(sizeof(*fcb_contract));
125 memset(fcb_contract, 0, sizeof(*fcb_contract));
126 fcb_contract->id = *id;
131 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
133 if (fcb_contract->user_contract) {
134 fres_contract_destroy(fcb_contract->user_contract);
139 static inline int res_key_cmp(const struct res_key *a,
140 const struct res_key *b)
142 if (a->type < b->type) {
144 } else if (a->type > b->type) {
146 } else if (a->id < b->id) {
148 } else if (a->id > b->id) {
155 /* Container for registered resources */
156 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
157 struct fcb /* cust_root_t */, \
158 struct resource/* cust_item_t */, \
159 struct res_key /* cust_key_t */, \
160 resources /* cust_root_node */, \
161 node /* cust_item_node */, \
162 key /* cust_item_key */, \
163 res_key_cmp /* cust_cmp_fnc */);
165 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
166 struct fcb /* cust_root_t */, \
167 struct resource/* cust_item_t */, \
168 struct res_key /* cust_key_t */, \
169 resources /* cust_root_node */, \
170 node /* cust_item_node */, \
171 key /* cust_item_key */, \
172 res_key_cmp /* cust_cmp_fnc */);
174 /* Container for allocators registered for a given resource */
175 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
176 struct resource /* cust_root_t */, \
177 struct res_alloc /* cust_item_t */, \
178 forb_server_id /* cust_key_t */, \
179 allocators /* cust_root_node */, \
180 node /* cust_item_node */, \
181 app /* cust_item_key */, \
182 fres_contract_id_cmp /* cust_cmp_fnc */);
184 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
185 struct resource /* cust_root_t */, \
186 struct res_alloc /* cust_item_t */, \
187 forb_server_id /* cust_key_t */, \
188 allocators /* cust_root_node */, \
189 node /* cust_item_node */, \
190 app /* cust_item_key */, \
191 forb_server_id_cmp /* cust_cmp_fnc */);
193 /* Container for negotiated contracts */
194 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
195 struct fcb /* cust_root_t */, \
196 struct fcb_contract /* cust_item_t */, \
197 fres_contract_id_t /* cust_key_t */, \
198 contracts /* cust_root_node */, \
199 node /* cust_item_node */, \
200 id /* cust_item_key */, \
201 fres_contract_id_cmp /* cust_cmp_fnc */);
204 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
205 struct fcb /* cust_root_t */, \
206 struct fcb_contract /* cust_item_t */, \
207 fres_contract_id_t /* cust_key_t */, \
208 contracts /* cust_root_node */, \
209 node /* cust_item_node */, \
210 id /* cust_item_key */, \
211 fres_contract_id_cmp /* cust_cmp_fnc */);
213 #include "fcb_contract_gavl.inc"
217 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
220 get_res_key(const struct fcb *fcb, const struct fres_contract *contract, struct res_key *key)
222 fres_block_resource *block_res;
224 block_res = fres_contract_get_resource(contract);
225 if (!block_res && !fres_contract_id_is_empty(&contract->id)) {
226 /* If the contract doesn't have resource information,
227 * this might be cancelation or renegotiation request,
228 * so look at our database for formerly submited
230 struct fcb_contract *fcb_contract;
231 fcb_contract = fcb_contract_find(fcb, &contract->id);
233 block_res = fres_contract_get_resource(fcb_contract->user_contract);
237 ul_logerr("No resource specified\n");
240 key->type = block_res->resource_type;
241 key->id = block_res->resource_id;
247 * Checks whether all contracts refers to a single resource.
250 * @param contracts Array of contract pointers.
251 * @param num Number of contracts.
253 * @return If all contracts refer to a signle resource, pointer to the
254 * coresponding resource structure is returned. Otherwise, NULL is
258 check_single_resource(struct fcb *fcb, struct fres_contract *contracts[], int num)
260 struct resource *resource = NULL;
262 struct res_key key, key2 = {-1,-1};
264 for (i=0; i<num; i++) {
265 if (!get_res_key(fcb, contracts[i], &key)) {
268 if (i==0) key2 = key;
269 else if (key.type != key2.type ||
275 resource = fcb_resource_find(fcb, &key);
277 ul_logerr("No resource manager for %d.%d registered\n",
290 * @return Zero on success, non-zero error code on error.
293 prepare_reservation_contracts(struct fcb *fcb, struct fres_contract *contracts[], int num)
296 struct fcb_contract *fcb_contract;
298 for (i=0; i<num; i++) {
299 struct fres_contract *c = contracts[i];
301 if (fres_contract_id_is_empty(&c->id)) {
302 forb_uuid_generate((forb_uuid_t *)&c->id);
305 if (fres_contract_get_num_blocks(c) == 0) {
306 /* Nothing to do for deletion requesst */
311 fcb_contract = fcb_contract_find(fcb, &c->id);
313 /* Copy missing blocks from fcb_contract to contract */
314 fres_contract_merge(c, fcb_contract->user_contract);
317 fres_contract_id_to_string(str, &c->id, sizeof(str));
318 ul_logerr("Attempt to renegotiate unknown contract %s\n", str);
319 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
327 negotiate_contracts(fres_contract_broker obj,
328 const fres_contract_ptr_seq* contracts,
329 fres_contract_id_seq** ids_out,
330 CORBA_Environment *ev)
332 struct fcb *fcb = o2fcb(obj);
333 struct resource *resource;
334 struct res_alloc *ra;
337 fres_contract_ptr_seq *schedulable_contracts;
338 struct fcb_contract **fcb_contracts;
340 fres_contract_id_seq* ids;
342 resource = check_single_resource(fcb, contracts->_buffer, contracts->_length);
344 ret = FRSH_ERR_RESOURCE_ID_INVALID;
348 ret = prepare_reservation_contracts(fcb, contracts->_buffer, contracts->_length);
352 forb_get_req_source(obj, &app);
353 ra = fcb_alloc_find(resource, &app);
356 forb_server_id_to_string(str, &app, sizeof(str));
357 ul_logerr("No resource allocator found for %d.%d and %s\n",
358 resource->key.type, resource->key.id, str);
359 ret = FRES_ERR_NO_RESOURCE_ALLOCATOR;
363 /* Allocate all the needed memory before doing reservation. If
364 * there is no enough memory, it has no sense to call resource
366 ids = malloc(sizeof(*ids));
368 ev->major = FORB_EX_NO_MEMORY;
371 memset(ids, 0, sizeof(*ids));
372 CORBA_sequence_set_release(ids, CORBA_TRUE);
374 ids->_buffer = malloc(contracts->_length*sizeof(ids->_buffer[0]));
379 ids->_length = ids->_maximum = contracts->_length;
380 for (i=0; i<contracts->_length; i++) {
381 ids->_buffer[i] = contracts->_buffer[i]->id;
384 fcb_contracts = malloc(sizeof(fcb_contracts[0])*contracts->_length);
385 if (!fcb_contracts) {
389 memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*contracts->_length);
391 for (i=0; i<contracts->_length; i++) {
392 struct fres_contract *c = contracts->_buffer[i];
393 if (fres_contract_get_num_blocks(c) > 0) {
394 fcb_contracts[i] = fcb_contract_new(&c->id);
395 if (!fcb_contracts[i]) {
396 ret = errno ? errno : -1;
397 goto err_free_fcb_contracts;
399 fcb_contracts[i]->user_contract = fres_contract_duplicate(c);
400 if (!fcb_contracts[i]->user_contract) {
401 ret = errno ? errno : -1;
402 goto err_free_fcb_contracts;
407 /* TODO: Optimize the following by introducing
408 * reserve_and_commit FRM method. */
410 /* Reserve contract */
411 ret = fres_resource_manager_reserve_contracts(resource->mng, contracts, ev);
412 if (forb_exception_occurred(ev) || ret < 0) {
413 goto err_free_fcb_contracts;
416 ul_logmsg("Contract was not accepted\n");
417 goto err_free_fcb_contracts;
420 /* Commit contract */
421 fres_resource_manager_commit_contracts(resource->mng, ids,
422 &schedulable_contracts, ev);
423 if (forb_exception_occurred(ev)) {
424 ret = FRES_ERR_FORB_EXCEPTION;
425 goto err_cancel_reservation;
429 ret = fres_resource_allocator_change_vreses(ra->ra, schedulable_contracts, ev);
430 if (CORBA_sequence_get_release(schedulable_contracts)) {
432 for (i=0; i<schedulable_contracts->_length; i++) {
433 fres_contract_destroy(schedulable_contracts->_buffer[i]);
435 forb_free(schedulable_contracts->_buffer);
437 forb_free(schedulable_contracts);
438 if (forb_exception_occurred(ev)) {
439 ret = FRES_ERR_FORB_EXCEPTION;
440 goto err_cancel_reservation;
443 ul_logmsg("VRes was not created\n");
444 goto err_cancel_reservation;
447 /* Update database of negotiated contracts stored for later reference */
448 for (i=0; i<contracts->_length; i++) {
449 struct fcb_contract *fcb_contract;
450 fcb_contract = fcb_contract_find(fcb, &contracts->_buffer[i]->id);
451 /* Delete canceled or renegotiated user contract */
453 fcb_contract_delete(fcb, fcb_contract);
454 fcb_contract_destroy(fcb_contract);
456 if (fcb_contracts[i]) {
457 /* Insert new contracts */
458 fcb_contract_insert(fcb, fcb_contracts[i]);
464 err_cancel_reservation:
465 fres_resource_manager_cancel_reservations(resource->mng, ids, ev);
466 err_free_fcb_contracts:
467 for (i=0; i<contracts->_length; i++)
468 fcb_contract_destroy(fcb_contracts[i]);
474 CORBA_long register_resource(fres_contract_broker obj,
475 const frsh_resource_type_t restype,
476 const frsh_resource_id_t resid,
477 const fres_resource_desc *desc,
478 CORBA_Environment *ev)
480 struct fcb *fcb = o2fcb(obj);
481 struct resource *res, *res2;
483 res = malloc(sizeof(*res));
485 memset(res, 0, sizeof(*res));
486 res->key.type = restype;
488 res2 = fcb_resource_find(fcb, &res->key);
490 if (forb_object_is_stale(res2->mng)) {
491 ul_logmsg("Removing stale manager for resource %d.%d\n",
493 forb_object_release(res2->mng);
494 fcb_resource_delete(fcb, res2);
495 /* TODO: Delete also all allocators associated
496 * with this stale resource manager. */
500 ul_logerr("Resource manager %d.%d already registered\n",
505 res->mng = forb_object_duplicate(desc->manager);
507 fcb_alloc_init_root_field(res);
508 ul_logmsg("Registering manager for resource %d.%d\n",
510 fcb_resource_insert(fcb, res);
519 CORBA_long register_allocator(fres_contract_broker obj,
520 const frsh_resource_type_t restype,
521 const frsh_resource_id_t resid,
522 const fres_resource_allocator ra_obj,
523 CORBA_Environment *ev)
525 struct fcb *fcb = o2fcb(obj);
526 struct resource *res;
527 struct res_alloc *ra;
528 struct res_key resource;
529 forb_server_id server_id;
530 char server_id_str[40];
532 forb_get_server_id(ra_obj, &server_id);
533 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
534 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
535 restype, resid, server_id_str);
537 resource.type = restype;
539 res = fcb_resource_find(fcb, &resource);
541 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
545 ra = fcb_alloc_find(res, &server_id);
547 char *str = forb_object_to_string(ra_obj);
548 ul_logerr("Allocator from already registered (%s)\n",
553 ra = malloc(sizeof(*ra));
558 ra->ra = forb_object_duplicate(ra_obj);
559 fcb_alloc_insert(res, ra);
565 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
567 struct fcb *fcb = o2fcb(obj);
568 fres_resource_seq *seq;
569 struct resource *res;
572 seq = malloc(sizeof(*seq));
574 ev->major = FORB_EX_NO_MEMORY;
577 memset(seq, 0, sizeof(*seq));
580 gavl_cust_for_each(fcb_resource, fcb, res) {
584 seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
589 gavl_cust_for_each(fcb_resource, fcb, res) {
590 seq->_buffer[n].restype = res->key.type;
591 seq->_buffer[n].resid = res->key.id;
592 seq->_buffer[n].desc.manager = res->mng;
599 #ifdef CONFIG_FCB_INET
600 static int register_inet_port(forb_orb orb)
602 forb_port_t *port = forb_malloc(sizeof(*port));
604 struct in_addr listen_on;
608 memset(port, 0, sizeof(*port));
609 listen_on.s_addr = INADDR_ANY;
610 ret = forb_inet_port_init(&port->desc, listen_on);
613 ret = forb_register_port(orb, port);
618 struct forb_fres_contract_broker_impl impl = {
619 .negotiate_contracts = negotiate_contracts,
620 .register_resource = register_resource,
621 .register_allocator = register_allocator,
622 .get_resources = get_resources,
625 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
627 forb_server_id server_id;
628 char server_id_str[sizeof(forb_server_id)*2+1];
630 forb_get_server_id(peer_orb, &server_id);
631 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
633 ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
635 if (strcmp(orb_id, "org.frescor.fcb") == 0) {
636 ul_logmsg("node joined: %s\n", server_id_str);
640 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
644 int main(int argc, char *argv[])
648 fres_contract_broker fcb;
649 forb_executor_t executor;
651 forb_init_attr_t attr = {
652 .orb_id = "org.frescor.fcb",
653 .peer_discovery_callback = peer_discovery_callback,
654 .peer_dead_callback = peer_dead_callback,
657 orb = forb_init(&argc, &argv, &attr);
658 if (!orb) error(1, errno, "FORB initialization failed");
660 #ifdef CONFIG_FCB_INET
661 ret = register_inet_port(orb);
662 if (ret) error(0, errno, "INET port registration failed");
665 fcb_resource_init_root_field(&fcb_data);
666 fcb_contract_init_root_field(&fcb_data);
668 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
669 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
671 /* Prepare executor before we register the fcb reference,
672 * so that no reuqests are lost */
673 ret = forb_executor_init(&executor);
674 if (ret) error(1, errno, "forb_executor_init failed");
676 ret = forb_executor_register_object(&executor, fcb);
677 if (ret) error(1, errno, "forb_executor_register_object failed");
679 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
680 if (ret) error(1, errno, "forb_register_reference() failed");
682 ul_logmsg("Waiting for requests\n");
683 ret = forb_executor_run(&executor);
684 if (ret) error(1, errno, "forb_executor_run failed");