1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
61 #include <ul_gavlcust.h>
64 #include <forb/server_id.h>
65 #include <fres_contract.h>
66 #include "fcb_config.h"
67 #include <fosa_clocks_and_timers.h>
68 #ifdef CONFIG_FCB_INET
69 #include <forb/proto_inet.h>
72 UL_LOG_CUST(ulogd_fcb);
73 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
75 fosa_abs_time_t start_time;
79 * Resource identification
82 frsh_resource_type_t type;
83 frsh_resource_id_t id;
92 fres_resource_manager mng; /**< Object reference of the resource manager */
93 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
97 * Resource allocator registered in different nodes/applications
102 fres_resource_allocator ra;
105 struct fcb_contract {
106 fres_contract_id_t id;
108 struct fres_contract *user_contract;
112 * Contract broker data
115 gavl_cust_root_field_t resources; /**< Registered resources */
116 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
119 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
121 struct fcb_contract *fcb_contract;
123 fcb_contract = malloc(sizeof(*fcb_contract));
127 memset(fcb_contract, 0, sizeof(*fcb_contract));
128 fcb_contract->id = *id;
133 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
135 if (fcb_contract->user_contract) {
136 fres_contract_destroy(fcb_contract->user_contract);
141 static inline int res_key_cmp(const struct res_key *a,
142 const struct res_key *b)
144 if (a->type < b->type) {
146 } else if (a->type > b->type) {
148 } else if (a->id < b->id) {
150 } else if (a->id > b->id) {
157 /* Container for registered resources */
158 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
159 struct fcb /* cust_root_t */, \
160 struct resource/* cust_item_t */, \
161 struct res_key /* cust_key_t */, \
162 resources /* cust_root_node */, \
163 node /* cust_item_node */, \
164 key /* cust_item_key */, \
165 res_key_cmp /* cust_cmp_fnc */);
167 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
168 struct fcb /* cust_root_t */, \
169 struct resource/* cust_item_t */, \
170 struct res_key /* cust_key_t */, \
171 resources /* cust_root_node */, \
172 node /* cust_item_node */, \
173 key /* cust_item_key */, \
174 res_key_cmp /* cust_cmp_fnc */);
176 /* Container for allocators registered for a given resource */
177 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
178 struct resource /* cust_root_t */, \
179 struct res_alloc /* cust_item_t */, \
180 forb_server_id /* cust_key_t */, \
181 allocators /* cust_root_node */, \
182 node /* cust_item_node */, \
183 app /* cust_item_key */, \
184 fres_contract_id_cmp /* cust_cmp_fnc */);
186 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
187 struct resource /* cust_root_t */, \
188 struct res_alloc /* cust_item_t */, \
189 forb_server_id /* cust_key_t */, \
190 allocators /* cust_root_node */, \
191 node /* cust_item_node */, \
192 app /* cust_item_key */, \
193 forb_server_id_cmp /* cust_cmp_fnc */);
195 /* Container for negotiated contracts */
196 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
197 struct fcb /* cust_root_t */, \
198 struct fcb_contract /* cust_item_t */, \
199 fres_contract_id_t /* cust_key_t */, \
200 contracts /* cust_root_node */, \
201 node /* cust_item_node */, \
202 id /* cust_item_key */, \
203 fres_contract_id_cmp /* cust_cmp_fnc */);
206 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
207 struct fcb /* cust_root_t */, \
208 struct fcb_contract /* cust_item_t */, \
209 fres_contract_id_t /* cust_key_t */, \
210 contracts /* cust_root_node */, \
211 node /* cust_item_node */, \
212 id /* cust_item_key */, \
213 fres_contract_id_cmp /* cust_cmp_fnc */);
215 #include "fcb_contract_gavl.inc"
219 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
222 get_res_key(const struct fcb *fcb, const struct fres_contract *contract, struct res_key *key)
224 fres_block_resource *block_res;
226 block_res = fres_contract_get_resource(contract);
227 if (!block_res && !fres_contract_id_is_empty(&contract->id)) {
228 /* If the contract doesn't have resource information,
229 * this might be cancelation or renegotiation request,
230 * so look at our database for formerly submited
232 struct fcb_contract *fcb_contract;
233 fcb_contract = fcb_contract_find(fcb, &contract->id);
235 block_res = fres_contract_get_resource(fcb_contract->user_contract);
239 ul_logerr("No resource specified\n");
242 key->type = block_res->resource_type;
243 key->id = block_res->resource_id;
249 * Checks whether all contracts refers to a single resource.
252 * @param contracts Array of contract pointers.
253 * @param num Number of contracts.
255 * @return If all contracts refer to a signle resource, pointer to the
256 * coresponding resource structure is returned. Otherwise, NULL is
260 check_single_resource(struct fcb *fcb, struct fres_contract *contracts[], int num)
262 struct resource *resource = NULL;
264 struct res_key key, key2 = {-1,-1};
266 for (i=0; i<num; i++) {
267 if (!get_res_key(fcb, contracts[i], &key)) {
270 if (i==0) key2 = key;
271 else if (key.type != key2.type ||
277 resource = fcb_resource_find(fcb, &key);
279 ul_logerr("No resource manager for %d.%d registered\n",
292 * @return Zero on success, non-zero error code on error.
295 prepare_reservation_contracts(struct fcb *fcb, struct fres_contract *contracts[], int num)
298 struct fcb_contract *fcb_contract;
300 for (i=0; i<num; i++) {
301 struct fres_contract *c = contracts[i];
303 if (fres_contract_id_is_empty(&c->id)) {
304 forb_uuid_generate((forb_uuid_t *)&c->id);
307 if (fres_contract_get_num_blocks(c) == 0) {
308 /* Nothing to do for deletion requesst */
313 fcb_contract = fcb_contract_find(fcb, &c->id);
315 /* Copy missing blocks from fcb_contract to contract */
316 fres_contract_merge(c, fcb_contract->user_contract);
319 fres_contract_id_to_string(str, &c->id, sizeof(str));
320 ul_logerr("Attempt to renegotiate unknown contract %s\n", str);
321 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
329 negotiate_contracts(fres_contract_broker obj,
330 const fres_contract_ptr_seq* contracts,
331 fres_contract_id_seq** ids_out,
332 CORBA_Environment *ev)
334 struct fcb *fcb = o2fcb(obj);
335 struct resource *resource;
336 struct res_alloc *ra;
339 fres_contract_ptr_seq *schedulable_contracts;
340 struct fcb_contract **fcb_contracts;
342 fres_contract_id_seq* ids;
344 resource = check_single_resource(fcb, contracts->_buffer, contracts->_length);
346 ret = FRSH_ERR_RESOURCE_ID_INVALID;
350 ret = prepare_reservation_contracts(fcb, contracts->_buffer, contracts->_length);
354 forb_get_req_source(obj, &app);
355 ra = fcb_alloc_find(resource, &app);
358 forb_server_id_to_string(str, &app, sizeof(str));
359 ul_logerr("No resource allocator found for %d.%d and %s\n",
360 resource->key.type, resource->key.id, str);
361 ret = FRES_ERR_NO_RESOURCE_ALLOCATOR;
365 /* Allocate all the needed memory before doing reservation. If
366 * there is no enough memory, it has no sense to call resource
368 ids = malloc(sizeof(*ids));
370 ev->major = FORB_EX_NO_MEMORY;
373 memset(ids, 0, sizeof(*ids));
374 CORBA_sequence_set_release(ids, CORBA_TRUE);
376 ids->_buffer = malloc(contracts->_length*sizeof(ids->_buffer[0]));
381 ids->_length = ids->_maximum = contracts->_length;
382 for (i=0; i<contracts->_length; i++) {
383 ids->_buffer[i] = contracts->_buffer[i]->id;
386 fcb_contracts = malloc(sizeof(fcb_contracts[0])*contracts->_length);
387 if (!fcb_contracts) {
391 memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*contracts->_length);
393 for (i=0; i<contracts->_length; i++) {
394 struct fres_contract *c = contracts->_buffer[i];
395 if (fres_contract_get_num_blocks(c) > 0) {
396 fcb_contracts[i] = fcb_contract_new(&c->id);
397 if (!fcb_contracts[i]) {
398 ret = errno ? errno : -1;
399 goto err_free_fcb_contracts;
401 fcb_contracts[i]->user_contract = fres_contract_duplicate(c);
402 if (!fcb_contracts[i]->user_contract) {
403 ret = errno ? errno : -1;
404 goto err_free_fcb_contracts;
409 /* TODO: Optimize the following by introducing
410 * reserve_and_commit FRM method. */
412 /* Reserve contract */
413 ret = fres_resource_manager_reserve_contracts(resource->mng, contracts, ev);
414 if (forb_exception_occurred(ev) || ret < 0) {
415 goto err_free_fcb_contracts;
418 ul_logmsg("Contract was not accepted\n");
419 goto err_free_fcb_contracts;
422 /* Commit contract */
423 fres_resource_manager_commit_contracts(resource->mng, ids,
424 &schedulable_contracts, ev);
425 if (forb_exception_occurred(ev)) {
426 ret = FRES_ERR_FORB_EXCEPTION;
427 goto err_cancel_reservation;
431 ret = fres_resource_allocator_change_vreses(ra->ra, schedulable_contracts, ev);
432 if (CORBA_sequence_get_release(schedulable_contracts)) {
434 for (i=0; i<schedulable_contracts->_length; i++) {
435 fres_contract_destroy(schedulable_contracts->_buffer[i]);
437 forb_free(schedulable_contracts->_buffer);
439 forb_free(schedulable_contracts);
440 if (forb_exception_occurred(ev)) {
441 ret = FRES_ERR_FORB_EXCEPTION;
442 goto err_cancel_reservation;
445 ul_logmsg("VRes was not created\n");
446 goto err_cancel_reservation;
449 /* Update database of negotiated contracts stored for later reference */
450 for (i=0; i<contracts->_length; i++) {
451 struct fcb_contract *fcb_contract;
452 fcb_contract = fcb_contract_find(fcb, &contracts->_buffer[i]->id);
453 /* Delete canceled or renegotiated user contract */
455 fcb_contract_delete(fcb, fcb_contract);
456 fcb_contract_destroy(fcb_contract);
458 if (fcb_contracts[i]) {
459 /* Insert new contracts */
460 fcb_contract_insert(fcb, fcb_contracts[i]);
466 err_cancel_reservation:
467 fres_resource_manager_cancel_reservations(resource->mng, ids, ev);
468 err_free_fcb_contracts:
469 for (i=0; i<contracts->_length; i++)
470 fcb_contract_destroy(fcb_contracts[i]);
476 CORBA_long register_resource(fres_contract_broker obj,
477 const frsh_resource_type_t restype,
478 const frsh_resource_id_t resid,
479 const fres_resource_desc *desc,
480 CORBA_Environment *ev)
482 struct fcb *fcb = o2fcb(obj);
483 struct resource *res, *res2;
485 res = malloc(sizeof(*res));
487 memset(res, 0, sizeof(*res));
488 res->key.type = restype;
490 res2 = fcb_resource_find(fcb, &res->key);
492 if (forb_object_is_stale(res2->mng)) {
493 ul_logmsg("Removing stale manager for resource %d.%d\n",
495 forb_object_release(res2->mng);
496 fcb_resource_delete(fcb, res2);
497 /* TODO: Delete also all allocators associated
498 * with this stale resource manager. */
502 ul_logerr("Resource manager %d.%d already registered\n",
507 res->mng = forb_object_duplicate(desc->manager);
509 fcb_alloc_init_root_field(res);
510 ul_logmsg("Registering manager for resource %d.%d\n",
512 fcb_resource_insert(fcb, res);
521 CORBA_long register_allocator(fres_contract_broker obj,
522 const frsh_resource_type_t restype,
523 const frsh_resource_id_t resid,
524 const fres_resource_allocator ra_obj,
525 CORBA_Environment *ev)
527 struct fcb *fcb = o2fcb(obj);
528 struct resource *res;
529 struct res_alloc *ra;
530 struct res_key resource;
531 forb_server_id server_id;
532 char server_id_str[40];
534 forb_get_server_id(ra_obj, &server_id);
535 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
536 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
537 restype, resid, server_id_str);
539 resource.type = restype;
541 res = fcb_resource_find(fcb, &resource);
543 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
547 ra = fcb_alloc_find(res, &server_id);
549 char *str = forb_object_to_string(ra_obj);
550 ul_logerr("Allocator from already registered (%s)\n",
555 ra = malloc(sizeof(*ra));
560 ra->ra = forb_object_duplicate(ra_obj);
561 fcb_alloc_insert(res, ra);
567 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
569 struct fcb *fcb = o2fcb(obj);
570 fres_resource_seq *seq;
571 struct resource *res;
574 seq = malloc(sizeof(*seq));
576 ev->major = FORB_EX_NO_MEMORY;
579 memset(seq, 0, sizeof(*seq));
582 gavl_cust_for_each(fcb_resource, fcb, res) {
586 seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
591 gavl_cust_for_each(fcb_resource, fcb, res) {
592 seq->_buffer[n].restype = res->key.type;
593 seq->_buffer[n].resid = res->key.id;
594 seq->_buffer[n].desc.manager = res->mng;
601 #ifdef CONFIG_FCB_INET
602 static int register_inet_port(forb_orb orb)
604 forb_port_t *port = forb_malloc(sizeof(*port));
606 struct in_addr listen_on;
610 memset(port, 0, sizeof(*port));
611 listen_on.s_addr = INADDR_ANY;
612 ret = forb_inet_port_init(&port->desc, listen_on);
615 ret = forb_register_port(orb, port);
620 struct forb_fres_contract_broker_impl impl = {
621 .negotiate_contracts = negotiate_contracts,
622 .register_resource = register_resource,
623 .register_allocator = register_allocator,
624 .get_resources = get_resources,
627 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
629 forb_server_id server_id;
630 char server_id_str[sizeof(forb_server_id)*2+1];
632 forb_get_server_id(peer_orb, &server_id);
633 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
635 ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
637 if (strcmp(orb_id, "org.frescor.fcb") == 0) {
639 fosa_rel_time_t delay;
640 fosa_clock_get_time(CLOCK_REALTIME, &now);
641 delay = fosa_abs_time_extract_interval(start_time, now);
642 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
643 fosa_rel_time_to_msec(delay));
647 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
651 int main(int argc, char *argv[])
655 fres_contract_broker fcb;
656 forb_executor_t executor;
658 forb_init_attr_t attr = {
659 .orb_id = "org.frescor.fcb",
660 .peer_discovery_callback = peer_discovery_callback,
661 .peer_dead_callback = peer_dead_callback,
665 fosa_clock_get_time(CLOCK_REALTIME, &start_time);
667 orb = forb_init(&argc, &argv, &attr);
668 if (!orb) error(1, errno, "FORB initialization failed");
670 #ifdef CONFIG_FCB_INET
671 ret = register_inet_port(orb);
672 if (ret) error(0, errno, "INET port registration failed");
675 fcb_resource_init_root_field(&fcb_data);
676 fcb_contract_init_root_field(&fcb_data);
678 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
679 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
681 /* Prepare executor before we register the fcb reference,
682 * so that no reuqests are lost */
683 ret = forb_executor_init(&executor);
684 if (ret) error(1, errno, "forb_executor_init failed");
686 ret = forb_executor_register_object(&executor, fcb);
687 if (ret) error(1, errno, "forb_executor_register_object failed");
689 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
690 if (ret) error(1, errno, "forb_register_reference() failed");
692 ul_logmsg("Waiting for requests\n");
693 ret = forb_executor_run(&executor);
694 if (ret) error(1, errno, "forb_executor_run failed");