1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
56 #include <semaphore.h>
59 #include <forb/config.h>
61 #include <fcb_contact_info.h>
65 #include <ul_gavlcust.h>
68 #include <ul_logreg.h>
69 #include <forb/server_id.h>
70 #include <fres_contract.h>
71 #include "fcb_config.h"
72 #include <fosa_clocks_and_timers.h>
73 #include "contract_log.h"
74 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
75 #include <forb/proto_inet.h>
78 UL_LOG_CUST(ulogd_fcb);
79 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "main"};
80 UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(init_ulogd_fcb, ulogd_fcb);
82 bool opt_daemon = false;
83 char *opt_pidfile = NULL;
84 fosa_abs_time_t start_time;
86 /** List of contracts to be newly reserved or changed by resource manager */
87 struct reservation_list {
88 ul_list_head_t fcb_contracts;
93 * Resource identification
96 frsh_resource_type_t type;
97 frsh_resource_id_t id;
101 * Registered resource
107 fres_resource_manager mng; /**< Object reference of the resource manager */
108 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
109 ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
110 struct reservation_list rl; /**< Temporary list of contracts to be reserved on this resource */
114 * Resource allocator registered in different nodes/applications
119 fres_resource_allocator ra;
122 struct fcb_contract {
123 fres_contract_id_t id;
124 struct res_alloc *ra; /**< Allocator for this contract TODO: Remove contract if allocator is destroyed */
125 gavl_node_t node_fcb;
126 ul_list_node_t node_sc;
127 ul_list_node_t node_reservation;
132 fosa_abs_time_t end_of_stability_period;
133 struct fres_contract *user_contract; /* The contract sent by user. Set after sucessfull neotiation. */
134 struct fres_contract *requested_contract; /* User request for the contract. Set by prepare_reservation_requests() in the beginning of negotiation. */
135 struct fres_contract *to_be_reserved_contract;
136 struct fres_contract *schedulable_contract;
140 * Contract broker data
143 gavl_cust_root_field_t resources; /**< Registered resources */
144 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
147 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
149 struct fcb_contract *fcb_contract;
151 fcb_contract = malloc(sizeof(*fcb_contract));
155 memset(fcb_contract, 0, sizeof(*fcb_contract));
156 fcb_contract->id = *id;
161 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
163 if (fcb_contract->user_contract) {
164 fres_contract_destroy(fcb_contract->user_contract);
166 if (fcb_contract->to_be_reserved_contract) {
167 fres_contract_destroy(fcb_contract->to_be_reserved_contract);
169 if (fcb_contract->requested_contract) {
170 fres_contract_destroy(fcb_contract->requested_contract);
172 if (fcb_contract->schedulable_contract) {
173 fres_contract_destroy(fcb_contract->requested_contract);
178 static inline int res_key_cmp(const struct res_key *a,
179 const struct res_key *b)
181 if (a->type < b->type) {
183 } else if (a->type > b->type) {
185 } else if (a->id < b->id) {
187 } else if (a->id > b->id) {
194 /* Container for registered resources */
195 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
196 struct fcb /* cust_root_t */, \
197 struct resource/* cust_item_t */, \
198 struct res_key /* cust_key_t */, \
199 resources /* cust_root_node */, \
200 node /* cust_item_node */, \
201 key /* cust_item_key */, \
202 res_key_cmp /* cust_cmp_fnc */);
204 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
205 struct fcb /* cust_root_t */, \
206 struct resource/* cust_item_t */, \
207 struct res_key /* cust_key_t */, \
208 resources /* cust_root_node */, \
209 node /* cust_item_node */, \
210 key /* cust_item_key */, \
211 res_key_cmp /* cust_cmp_fnc */);
213 /* Container for allocators registered for a given resource */
214 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
215 struct resource /* cust_root_t */, \
216 struct res_alloc /* cust_item_t */, \
217 forb_server_id /* cust_key_t */, \
218 allocators /* cust_root_node */, \
219 node /* cust_item_node */, \
220 app /* cust_item_key */, \
221 fres_contract_id_cmp /* cust_cmp_fnc */);
223 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
224 struct resource /* cust_root_t */, \
225 struct res_alloc /* cust_item_t */, \
226 forb_server_id /* cust_key_t */, \
227 allocators /* cust_root_node */, \
228 node /* cust_item_node */, \
229 app /* cust_item_key */, \
230 forb_server_id_cmp /* cust_cmp_fnc */);
232 /* Container for contracts with spare capacity negotiated for a given resource */
233 UL_LIST_CUST_DEC(sc_contracts /* cust_prefix */, \
234 struct resource /* cust_head_t */, \
235 struct fcb_contract /* cust_item_t */, \
236 sc_contracts /* cust_head_field */, \
237 node_sc /* cust_node_field */);
239 UL_LIST_CUST_DEC(reservation_list /* cust_prefix */, \
240 struct reservation_list /* cust_head_t */, \
241 struct fcb_contract /* cust_item_t */, \
242 fcb_contracts /* cust_head_field */, \
243 node_reservation /* cust_node_field */);
245 /* Container for negotiated contracts */
246 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
247 struct fcb /* cust_root_t */, \
248 struct fcb_contract /* cust_item_t */, \
249 fres_contract_id_t /* cust_key_t */, \
250 contracts /* cust_root_node */, \
251 node_fcb /* cust_item_node */, \
252 id /* cust_item_key */, \
253 fres_contract_id_cmp /* cust_cmp_fnc */);
256 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
257 struct fcb /* cust_root_t */, \
258 struct fcb_contract /* cust_item_t */, \
259 fres_contract_id_t /* cust_key_t */, \
260 contracts /* cust_root_node */, \
261 node_fcb /* cust_item_node */, \
262 id /* cust_item_key */, \
263 fres_contract_id_cmp /* cust_cmp_fnc */);
265 #include "fcb_contract_gavl.inc"
269 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
271 static struct res_key*
272 get_res_key(const struct fres_contract *contract, struct res_key *key)
274 fres_block_resource *block_res;
276 block_res = fres_contract_get_resource(contract);
278 ul_logerr("No resource specified\n");
281 key->type = block_res->resource_type;
282 key->id = block_res->resource_id;
287 static struct res_key*
288 get_fc_res_key(const struct fcb_contract *fc, struct res_key *key)
290 if (fc->user_contract)
291 get_res_key(fc->user_contract, key);
293 get_res_key(fc->requested_contract, key);
298 * Fills in an array of fcb_contracts according to requests submited
299 * by an application through negotiate_contracts() or
300 * negotiate_transaction(). For every contract in @a contracts, there
301 * is allocated one fcb_contract in @a fcb_contracts array.
304 * @param fcb_contracts Where to store pointers to fcb_contracts
305 * @param contracts Contracts submited for negotiation
306 * @param num Number of contracts in contracts (which is the same as the number in fcb_contracts).
308 * @return Zero on success, non-zero error code on error.
311 prepare_fcb_contracts(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
312 struct fres_contract *contracts[], int num)
315 struct fcb_contract *fc;
317 for (i=0; i<num; i++) {
318 struct fres_contract *c = contracts[i];
320 if (fres_contract_id_is_empty(&c->id)) {
321 /* Normal negotiation request */
322 forb_uuid_generate((forb_uuid_t *)&c->id);
323 fc = fcb_contract_new(&c->id);
326 fcb_contracts[i] = fc;
327 log_contract("Negotiation request", i, c);
329 fc = fcb_contract_find(fcb, &c->id);
332 fres_contract_id_to_string(str, &c->id, sizeof(str));
333 ul_logerr("Attempt to change unknown contract %s\n", str);
334 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
336 fcb_contracts[i] = fc;
337 if (fres_contract_get_num_blocks(c) == 0) {
339 log_contract("Cancelation request", i, fc->user_contract);
342 log_contract("Renegotiation request", i, fc->user_contract);
346 fc->requested_contract = c;
352 * Ensure that all contracts belong to the same resource and find
353 * resource allocators for the contracts.
356 * @param fcb_contracts Array of fcb_contracts prepared by prepare_fcb_contracts()
357 * @param num Number of contracts in @a fcb_contracts
358 * @param resource Resource for which negotiation is being done.
359 * @param app Application which requests negotiation
362 check_and_setup_resource(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
363 struct resource **resource,
368 struct res_alloc *ra;
369 struct res_key key, key2 = {-1,-1};
371 for (i=0; i<num; i++) {
372 struct fcb_contract *fc = fcb_contracts[i];
374 if (!get_fc_res_key(fc, &key))
375 return FRSH_ERR_RESOURCE_ID_INVALID;
377 /* Check that all contracts are for the same resource */
380 *resource = fcb_resource_find(fcb, &key);
382 ul_logerr("No resource manager for %d.%d registered\n",
384 return FRES_ERR_NO_RESOURCE_MANAGER;
386 /* Find allocator for this request */
387 ra = fcb_alloc_find(*resource, app);
390 forb_server_id_to_string(str, app, sizeof(str));
391 ul_logerr("No resource allocator found for %d.%d and %s\n",
392 (*resource)->key.type, (*resource)->key.id, str);
393 return FRES_ERR_NO_RESOURCE_ALLOCATOR;
396 else if (key.type != key2.type ||
398 ul_logerr("Contract #%d differs in resource!\n", i);
399 return FRSH_ERR_RESOURCE_ID_INVALID;
407 * Prepares a list of contracts to pass to resource manager for (re)reserving.
409 * @todo Needs to be changed for compatibility with transactions.
411 * @param resource Resource for which to rebalance capacity and negotiate new contracts
412 * @param fcb_contract New requests to negotiate
413 * @param num The number of elements in @a fcb_contract
414 * @param[out] rl List with changed contracts to be commited
417 prepare_reservation_list(struct resource *resource,
418 struct fcb_contract *fcb_contract[], int num)
422 struct fcb_contract *fc;
424 reservation_list_init_head(&resource->rl);
425 resource->rl.length = 0;
426 for (i=0; i<num; i++) {
427 assert(fcb_contract[i]->requested_contract != NULL);
428 reservation_list_insert(&resource->rl, fcb_contract[i]);
429 resource->rl.length++;
431 fosa_clock_get_time(CLOCK_REALTIME, &now);
432 ul_list_for_each(sc_contracts, resource, fc) {
433 if (fosa_abs_time_smaller(fc->end_of_stability_period, now) &&
434 fc->requested_contract == NULL) /* Do not insert contract inserted above */
436 reservation_list_insert(&resource->rl, fc);
437 resource->rl.length++;
445 * @param resource Resource for which to rebalance capacity and negotiate new contracts
446 * @param rl List with changed contracts to be commited
448 * @return Zero on success, non-zero error code on error
451 rebalance_spare_capacity_and_reserve(struct resource *resource)
455 struct reservation_list *rl = &resource->rl;
456 struct fcb_contract *fc;
457 fres_block_spare_capacity *s;
459 fres_contract_ptr_seq contracts;
460 if (!forb_sequence_alloc_buf(&contracts, rl->length)) {
463 contracts._length = rl->length;
465 /* Initialize optimization */
466 ul_list_for_each(reservation_list, rl, fc) {
467 fc->to_be_reserved_contract =
468 fc->requested_contract ? fres_contract_duplicate(fc->requested_contract) :
469 fc->user_contract ? fres_contract_duplicate(fc->user_contract) :
471 assert(fc->to_be_reserved_contract != NULL);
473 forb_sequence_elem(&contracts, i) = fc->to_be_reserved_contract;
476 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
477 if (s && s->granularity == FRSH_GR_DISCRETE) {
478 fc->sc_variant.initial = s->variants._length - 1;
479 fc->sc_variant.try = fc->sc_variant.initial;
483 bool all_combinations_tried;
484 int criterion, best_criterion = -1;
485 struct fcb_contract *fcb_changed;
486 /* Exhaustive search. Try all combinations of spare capacity
487 * variants and find the one with best creterion. */
489 all_combinations_tried = true;
492 /* Prepare spare capacity variant */
493 ul_list_for_each(reservation_list, rl, fc) {
494 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
495 /* TODO: Simulate continuous granularity by discretization */
496 if (s && s->granularity == FRSH_GR_DISCRETE) {
497 fres_container_copy(fc->to_be_reserved_contract->container,
498 forb_sequence_elem(&s->variants, fc->sc_variant.try));
499 criterion += fc->sc_variant.try;
501 if (fcb_changed == NULL) {
502 /* Chnage tried variant for the next round */
503 fc->sc_variant.try = fc->sc_variant.try > 0 ?
504 fc->sc_variant.try - 1 :
505 s->variants._length - 1;
506 /* Do not change other
507 * contracts unless we have
508 * tried all variants */
509 if (fc->sc_variant.try != fc->sc_variant.initial) {
513 if (fc->sc_variant.try != fc->sc_variant.initial)
514 all_combinations_tried = false;
518 if (criterion > best_criterion) {
519 CORBA_Environment ev;
520 /* Reserve contract */
521 ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, &ev);
522 if (forb_exception_occurred(&ev)) {
523 ret = fres_forbex2err(&ev);
524 ul_logerr("FORB exception when reserving contracts\n");
528 ul_logerr("Contract reservation error %d\n", ret);
529 ret = FRES_ERR_ADMISSION_TEST;
532 if (ret == 0) { /* negotiation succeeded */
533 best_criterion = criterion;
536 } while (!all_combinations_tried);
538 if (best_criterion == -1) {
539 ret = FRSH_ERR_CONTRACT_REJECTED;
541 /* At least some variant succeeded */
543 sc_contracts_init_head(resource);
544 ul_list_for_each(reservation_list, rl, fc) {
545 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
546 if (s && s->granularity == FRSH_GR_DISCRETE) {
547 sc_contracts_insert(resource, fc);
552 ul_list_for_each(reservation_list, rl, fc) {
553 fres_contract_destroy(fc->to_be_reserved_contract);
554 fc->to_be_reserved_contract = NULL;
556 forb_sequence_free_buf(&contracts, forb_no_destructor);
561 * Create/change VReses according to @a schedulable_contracts.
563 * There might be more allocators for schedulable contracts, so merge
564 * adjacent vreses with the same allocator together and create/change
565 * them in one step. Also the order of schedulable contracts might be
566 * different from the initial order od ids in commit_contracts()
567 * therefore we have to search for every contract.
570 change_vreses(struct fcb *fcb, fres_contract_ptr_seq *schedulable_contracts)
572 struct res_alloc *last_ra = NULL;
573 fres_contract_ptr_seq vreses;
575 CORBA_Environment ev;
577 if (!forb_sequence_alloc_buf(&vreses, schedulable_contracts->_length)) {
582 for (i=0; i<schedulable_contracts->_length; i++) {
583 struct fcb_contract *fc;
584 struct fres_contract *sc = schedulable_contracts->_buffer[i];
586 log_contract("Changing VRES", i, sc);
588 fc = fcb_contract_find(fcb, &sc->id);
591 if (true /* TODO: if the schedulable contract is changed */) {
592 if (last_ra != fc->ra) {
593 if (vreses._length) {
594 ret = fres_resource_allocator_change_vreses(last_ra->ra,
596 if (forb_exception_occurred(&ev)) {
597 ret = fres_forbex2err(&ev);
604 vreses._buffer[vreses._length] = sc;
606 fres_contract_destroy(fc->schedulable_contract);
607 fc->schedulable_contract = fres_contract_duplicate(sc);
613 ret = fres_resource_allocator_change_vreses(last_ra->ra,
615 if (forb_exception_occurred(&ev))
616 ret = fres_forbex2err(&ev);
619 forb_sequence_free_buf(&vreses, forb_no_destructor);
624 free_fcb_contracts(struct fcb_contract *fcb_contracts[], int num)
627 struct fcb_contract *fc;
628 for (i=0; i<num; i++) {
629 fc = fcb_contracts[i];
630 if (fc && !fc->user_contract) {
631 fc->requested_contract = NULL; /* Destroyed by FORB */
632 fcb_contract_destroy(fc);
639 commit_resource(struct resource *resource,
640 fres_contract_ptr_seq **schedulable_contracts)
643 fres_contract_id_seq commit_ids;
645 struct fcb_contract *fc;
646 CORBA_Environment ev;
648 if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) {
653 commit_ids._length = resource->rl.length;
655 ul_list_for_each(reservation_list, &resource->rl, fc) {
656 forb_sequence_elem(&commit_ids, i) = fc->id;
660 fres_resource_manager_commit_contracts(resource->mng, &commit_ids,
661 schedulable_contracts, &ev);
662 if (forb_exception_occurred(&ev)) {
663 ret = fres_forbex2err(&ev);
668 forb_sequence_free_buf(&commit_ids, forb_no_destructor);
673 int cancel_reservations(struct resource *resource)
676 fres_contract_id_seq commit_ids;
678 struct fcb_contract *fc;
679 CORBA_Environment ev;
681 if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) {
686 commit_ids._length = resource->rl.length;
688 ul_list_for_each(reservation_list, &resource->rl, fc) {
689 forb_sequence_elem(&commit_ids, i) = fc->id;
693 fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, &ev);
694 if (forb_exception_occurred(&ev)) {
695 ret = fres_forbex2err(&ev);
700 forb_sequence_free_buf(&commit_ids, forb_no_destructor);
707 negotiate_contracts(fres_contract_broker obj,
708 const fres_contract_ptr_seq* contracts,
709 fres_contract_id_seq** ids_out,
710 CORBA_Environment *ev)
712 struct fcb *fcb = o2fcb(obj);
713 struct resource *resource = NULL;
716 fres_contract_ptr_seq *schedulable_contracts;
717 struct fcb_contract **fcb_contracts, *fc;
719 int num = contracts->_length;
721 /* Prepare output sequence for the case we return eariler with
723 forb_sequence_alloc(*ids_out, 0);
725 ev->major = FORB_EX_NO_MEMORY;
728 CORBA_sequence_set_release(*ids_out, CORBA_TRUE);
730 forb_get_req_source(obj, &app);
732 fcb_contracts = malloc(sizeof(fcb_contracts[0])*num);
733 if (!fcb_contracts) {
737 memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*num);
739 ret = prepare_fcb_contracts(fcb, fcb_contracts,
740 contracts->_buffer, num);
742 goto err_free_fcb_contracts;
743 ret = check_and_setup_resource(fcb, fcb_contracts, &resource,
746 goto err_free_fcb_contracts;
748 prepare_reservation_list(resource, fcb_contracts, num);
750 /* Reserve contracts */
751 ret = rebalance_spare_capacity_and_reserve(resource);
753 if (ret == FRSH_ERR_CONTRACT_REJECTED) {
754 ul_logmsg("Contract(s) was/were rejected\n");
757 fres_strerror(ret, msg, sizeof(msg));
758 ul_logerr("Reservation error: %s\n", msg);
760 goto err_free_fcb_contracts;
763 /* Commit contracts */
764 ret = commit_resource(resource, &schedulable_contracts);
766 goto err_cancel_reservation;
768 /* Add new contracts to our fcb database for later
769 * reference. Canceled contracts are removed below. */
770 for (i=0; i<num; i++) {
771 fc = fcb_contracts[i];
773 if (fc->user_contract) {
774 if (fres_contract_get_num_blocks(fc->requested_contract) > 0) {
776 fres_contract_destroy(fc->user_contract);
777 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
778 /* Note: requested_contract is also
779 * pointed by contracts parameter and
780 * will be freed by FORB. */
781 fc->requested_contract = NULL;
784 /* Insert new contracts */
785 fcb_contract_insert(fcb, fcb_contracts[i]);
786 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
787 fc->requested_contract = NULL;
788 /* See the note above. */
792 ret = change_vreses(fcb, schedulable_contracts);
794 goto err_cancel_reservation;
796 forb_sequence_free(schedulable_contracts, fres_contract_ptr_destroy);
798 ul_logerr("VRes was not created\n");
799 goto err_cancel_contracts;
803 /* Return IDs and delete canceled contracts from out database */
804 if (!forb_sequence_alloc_buf(*ids_out, num)) {
805 ev->major = FORB_EX_NO_MEMORY;
806 goto err_cancel_contracts;
808 (*ids_out)->_length = num;
809 for (i=0; i<num; i++) {
810 struct fcb_contract *fc = fcb_contracts[i];
811 forb_sequence_elem(*ids_out, i) = fc->id;
813 if (fc->requested_contract &&
814 fres_contract_get_num_blocks(fc->requested_contract) == 0) {
815 fcb_contract_delete(fcb, fc);
816 /* Note: requested_contract is also pointed by
817 * contracts parameter and will be freed by FORB. */
818 fc->requested_contract = NULL;
819 fcb_contract_destroy(fc);
824 err_cancel_contracts:
826 goto err_free_fcb_contracts;
827 err_cancel_reservation:
828 cancel_reservations(resource);
829 err_free_fcb_contracts:
830 free_fcb_contracts(fcb_contracts, num);
835 void redistribute_spare_capacity(fres_contract_broker obj,
836 const frsh_resource_type_t restype,
837 const frsh_resource_id_t resid,
838 CORBA_Environment *ev)
840 struct fcb *fcb = o2fcb(obj);
841 struct res_key key = {restype, resid };
842 struct resource *resource;
844 resource = fcb_resource_find(fcb, &key);
846 prepare_reservation_list(resource, NULL, 0);
848 /* forb_sequence_alloc(ids, rl.length); */
849 /* if (!ids || !ids->_buffer) { */
850 /* ev->major = FORB_EX_NO_MEMORY; */
851 /* goto err_free_fcb_contracts; */
853 /* CORBA_sequence_set_release(ids, CORBA_TRUE); */
854 /* *ids_out = ids; /\* ids is freed by FORB *\/ */
857 rebalance_spare_capacity_and_reserve(resource);
861 CORBA_long register_resource(fres_contract_broker obj,
862 const frsh_resource_type_t restype,
863 const frsh_resource_id_t resid,
864 const fres_resource_desc *desc,
865 CORBA_Environment *ev)
867 struct fcb *fcb = o2fcb(obj);
868 struct resource *res, *res2;
870 res = malloc(sizeof(*res));
872 memset(res, 0, sizeof(*res));
873 res->key.type = restype;
875 res2 = fcb_resource_find(fcb, &res->key);
877 if (forb_object_is_stale(res2->mng)) {
878 ul_logmsg("Removing stale manager for resource %d.%d\n",
880 forb_object_release(res2->mng);
881 fcb_resource_delete(fcb, res2);
882 /* TODO: Delete also all allocators associated
883 * with this stale resource manager. */
887 ul_logerr("Resource manager %d.%d already registered\n",
892 res->mng = forb_object_duplicate(desc->manager);
893 res->name = desc->name;
895 fcb_alloc_init_root_field(res);
896 sc_contracts_init_head(res);
897 ul_logmsg("Registering manager for resource \"%s\" (%d.%d)\n",
898 res->name, restype, resid);
899 fcb_resource_insert(fcb, res);
908 CORBA_long register_allocator(fres_contract_broker obj,
909 const frsh_resource_type_t restype,
910 const frsh_resource_id_t resid,
911 const fres_resource_allocator ra_obj,
912 CORBA_Environment *ev)
914 struct fcb *fcb = o2fcb(obj);
915 struct resource *res;
916 struct res_alloc *ra;
917 struct res_key resource;
918 forb_server_id server_id;
919 char server_id_str[40];
922 forb_get_server_id(ra_obj, &server_id);
923 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
924 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
925 restype, resid, server_id_str);
927 resource.type = restype;
929 res = fcb_resource_find(fcb, &resource);
931 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
933 ret = FRES_ERR_NO_RESOURCE_MANAGER;
936 ra = fcb_alloc_find(res, &server_id);
938 char *str = forb_object_to_string(ra_obj);
939 ul_logerr("Allocator from already registered (%s)\n",
942 ret = FRES_ERR_ALLOCATOR_ALREADY_REGISTERED;
945 ra = malloc(sizeof(*ra));
951 ra->ra = forb_object_duplicate(ra_obj);
952 fcb_alloc_insert(res, ra);
958 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
960 struct fcb *fcb = o2fcb(obj);
961 fres_resource_seq *seq;
962 struct resource *res;
965 seq = malloc(sizeof(*seq));
967 ev->major = FORB_EX_NO_MEMORY;
970 memset(seq, 0, sizeof(*seq));
973 gavl_cust_for_each(fcb_resource, fcb, res) {
977 seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
982 gavl_cust_for_each(fcb_resource, fcb, res) {
983 seq->_buffer[n].restype = res->key.type;
984 seq->_buffer[n].resid = res->key.id;
985 seq->_buffer[n].desc.manager = res->mng;
986 seq->_buffer[n].desc.name = res->name;
994 transaction_has_spare_capacity(const fres_transaction_t* transaction)
996 struct fres_contract **c;
997 forb_sequence_foreach(&transaction->contracts, c)
998 if (fres_contract_get_spare_capacity(*c))
1004 transaction_get_resources(struct fcb *fcb, struct fcb_contract *fc[],
1005 int num, struct resource **resources[])
1008 struct resource **res;
1009 struct resource *resource;
1013 res = malloc(alloc*sizeof(*res));
1017 for (r = 0, i = 0; i < num; i++) {
1018 get_fc_res_key(fc[i], &key);
1019 resource = fcb_resource_find(fcb, &key);
1020 /* TODO: Check whether to use GSA for resources. */
1029 negotiate_transaction(fres_contract_broker _obj,
1030 const fres_transaction_t* transaction,
1031 CORBA_Environment *ev)
1033 struct fcb *fcb = o2fcb(_obj);
1034 struct fcb_contract **fcb_contracts;
1035 const fres_contract_ptr_seq* contracts = &transaction->contracts;
1036 int num = contracts->_length;
1039 if (transaction_has_spare_capacity(transaction)) {
1040 ret = FRES_ERR_SPARE_CAPACITY_NOT_SUPPORTED;
1044 fcb_contracts = malloc(sizeof(*fcb_contracts)*num);
1045 if (!fcb_contracts) {
1049 memset(fcb_contracts, 0, sizeof(*fcb_contracts)*num);
1051 ret = prepare_fcb_contracts(fcb, fcb_contracts,
1052 contracts->_buffer, num);
1054 goto err_free_fcb_contracts;
1056 //ret = transaction_get_resources(fcb_contracts, num, &resources);
1058 return FRSH_ERR_NOT_IMPLEMENTED;
1059 err_free_fcb_contracts:
1060 free_fcb_contracts(fcb_contracts, num);
1066 wait_transaction(fres_contract_broker _obj,
1067 const CORBA_char * name,
1068 fres_transaction_t** transaction,
1069 CORBA_Environment *ev)
1071 return FRSH_ERR_NOT_IMPLEMENTED;
1075 allocate_transaction_vres(fres_contract_broker _obj,
1076 const CORBA_long id,
1077 CORBA_Environment *ev)
1079 return FRSH_ERR_NOT_IMPLEMENTED;
1083 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1084 static int register_inet_port(forb_orb orb)
1086 forb_port_t *port = forb_malloc(sizeof(*port));
1088 struct in_addr listen_on;
1092 memset(port, 0, sizeof(*port));
1093 listen_on.s_addr = INADDR_ANY;
1094 ret = forb_inet_port_init(&port->desc, listen_on, 0);
1097 ret = forb_register_port(orb, port);
1102 struct forb_fres_contract_broker_impl impl = {
1103 .negotiate_contracts = negotiate_contracts,
1104 .register_resource = register_resource,
1105 .register_allocator = register_allocator,
1106 .redistribute_spare_capacity = redistribute_spare_capacity,
1107 .get_resources = get_resources,
1108 .negotiate_transaction = negotiate_transaction,
1109 .wait_transaction = wait_transaction,
1110 .allocate_transaction_vres = allocate_transaction_vres,
1113 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
1115 forb_server_id server_id;
1116 char server_id_str[sizeof(forb_server_id)*2+1];
1118 forb_get_server_id(peer_orb, &server_id);
1119 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
1121 ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
1127 if (strcmp(orb_id, "org.frescor.fcb") == 0) {
1128 fosa_abs_time_t now;
1129 fosa_rel_time_t delay;
1130 fosa_clock_get_time(CLOCK_REALTIME, &now);
1131 delay = fosa_abs_time_extract_interval(start_time, now);
1132 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
1133 fosa_rel_time_to_msec(delay));
1137 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
1141 static struct option long_opts[] = {
1142 { "daemon", optional_argument, NULL, 'd' },
1143 { "loglevel", required_argument, NULL, 'l' },
1150 printf("usage: fcb [ options ]\n");
1151 printf(" -d, --daemon [pid-file] go to background after FORB initialization\n");
1152 printf(" -l, --loglevel <number>|<domain>=<number>,...\n");
1155 int print_log_domain(ul_log_domain_t *domain, void *context)
1157 printf("%s = %d\n", domain->name, domain->level);
1161 int main(int argc, char *argv[])
1164 struct fcb fcb_data;
1165 fres_contract_broker fcb;
1166 forb_executor_t executor;
1168 forb_init_attr_t attr = {
1169 .orb_id = "org.frescor.fcb",
1170 .peer_discovery_callback = peer_discovery_callback,
1171 .peer_dead_callback = peer_dead_callback,
1172 .fixed_tcp_port = FCB_TCP_PORT,
1173 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT
1174 .fixed_server_id = FCB_SERVER_ID,
1175 .redistribute_hellos = true,
1180 while ((opt = getopt_long(argc, argv, "d::l:", &long_opts[0], NULL)) != EOF) {
1183 if (*optarg == '?') {
1184 ul_logreg_for_each_domain(print_log_domain, NULL);
1189 ret = ul_log_domain_arg2levels(optarg);
1191 error(1, EINVAL, "Error parsing -l argument at char %d\n", ret);
1196 opt_pidfile = optarg;
1201 exit(opt == 'h' ? 0 : 1);
1205 fosa_clock_get_time(CLOCK_REALTIME, &start_time);
1208 forb_daemon_prepare(opt_pidfile);
1210 orb = forb_init(&argc, &argv, &attr);
1211 if (!orb) error(1, errno, "FORB initialization failed");
1213 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1214 ret = register_inet_port(orb);
1215 if (ret) error(0, errno, "INET port registration failed");
1218 fcb_resource_init_root_field(&fcb_data);
1219 fcb_contract_init_root_field(&fcb_data);
1221 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
1222 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
1224 /* Prepare executor before we register the fcb reference,
1225 * so that no reuqests are lost */
1226 ret = forb_executor_init(&executor);
1227 if (ret) error(1, errno, "forb_executor_init failed");
1229 ret = forb_executor_register_object(&executor, fcb);
1230 if (ret) error(1, errno, "forb_executor_register_object failed");
1232 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
1233 if (ret) error(1, errno, "forb_register_reference() failed");
1235 ul_logmsg("Waiting for requests\n");
1237 forb_daemon_ready();
1239 ret = forb_executor_run(&executor);
1240 if (ret) error(1, errno, "forb_executor_run failed");