1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
61 #include <ul_gavlcust.h>
64 #include <forb/server_id.h>
65 #include <fres_contract.h>
66 #include "fcb_config.h"
67 #include <fosa_clocks_and_timers.h>
68 #ifdef CONFIG_FCB_INET
69 #include <forb/proto_inet.h>
72 UL_LOG_CUST(ulogd_fcb);
73 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
75 fosa_abs_time_t start_time;
79 * Resource identification
82 frsh_resource_type_t type;
83 frsh_resource_id_t id;
92 fres_resource_manager mng; /**< Object reference of the resource manager */
93 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
94 ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
98 * Resource allocator registered in different nodes/applications
103 fres_resource_allocator ra;
106 struct fcb_contract {
107 fres_contract_id_t id;
108 gavl_node_t node_fcb;
109 ul_list_node_t node_sc;
111 int reserved_sc_variant;
112 fosa_abs_time_t end_of_stability_period;
113 struct fres_contract *user_contract;
114 struct fres_contract *reserved_contract;
115 struct fres_contract *to_be_reserved_contract;
119 * Contract broker data
122 gavl_cust_root_field_t resources; /**< Registered resources */
123 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
126 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
128 struct fcb_contract *fcb_contract;
130 fcb_contract = malloc(sizeof(*fcb_contract));
134 memset(fcb_contract, 0, sizeof(*fcb_contract));
135 fcb_contract->id = *id;
140 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
142 if (fcb_contract->user_contract) {
143 fres_contract_destroy(fcb_contract->user_contract);
148 static inline int res_key_cmp(const struct res_key *a,
149 const struct res_key *b)
151 if (a->type < b->type) {
153 } else if (a->type > b->type) {
155 } else if (a->id < b->id) {
157 } else if (a->id > b->id) {
164 /* Container for registered resources */
165 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
166 struct fcb /* cust_root_t */, \
167 struct resource/* cust_item_t */, \
168 struct res_key /* cust_key_t */, \
169 resources /* cust_root_node */, \
170 node /* cust_item_node */, \
171 key /* cust_item_key */, \
172 res_key_cmp /* cust_cmp_fnc */);
174 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
175 struct fcb /* cust_root_t */, \
176 struct resource/* cust_item_t */, \
177 struct res_key /* cust_key_t */, \
178 resources /* cust_root_node */, \
179 node /* cust_item_node */, \
180 key /* cust_item_key */, \
181 res_key_cmp /* cust_cmp_fnc */);
183 /* Container for allocators registered for a given resource */
184 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
185 struct resource /* cust_root_t */, \
186 struct res_alloc /* cust_item_t */, \
187 forb_server_id /* cust_key_t */, \
188 allocators /* cust_root_node */, \
189 node /* cust_item_node */, \
190 app /* cust_item_key */, \
191 fres_contract_id_cmp /* cust_cmp_fnc */);
193 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
194 struct resource /* cust_root_t */, \
195 struct res_alloc /* cust_item_t */, \
196 forb_server_id /* cust_key_t */, \
197 allocators /* cust_root_node */, \
198 node /* cust_item_node */, \
199 app /* cust_item_key */, \
200 forb_server_id_cmp /* cust_cmp_fnc */);
202 /* Container for contracts with spare capacity negotiated for a given resource */
203 UL_LIST_CUST_DEC(sc_contracts /* cust_prefix */, \
204 struct resource /* cust_head_t */, \
205 struct fcb_contract /* cust_item_t */, \
206 sc_contracts /* cust_head_field */, \
207 node_sc /* cust_node_field */);
209 /* Container for negotiated contracts */
210 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
211 struct fcb /* cust_root_t */, \
212 struct fcb_contract /* cust_item_t */, \
213 fres_contract_id_t /* cust_key_t */, \
214 contracts /* cust_root_node */, \
215 node_fcb /* cust_item_node */, \
216 id /* cust_item_key */, \
217 fres_contract_id_cmp /* cust_cmp_fnc */);
220 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
221 struct fcb /* cust_root_t */, \
222 struct fcb_contract /* cust_item_t */, \
223 fres_contract_id_t /* cust_key_t */, \
224 contracts /* cust_root_node */, \
225 node_fcb /* cust_item_node */, \
226 id /* cust_item_key */, \
227 fres_contract_id_cmp /* cust_cmp_fnc */);
229 #include "fcb_contract_gavl.inc"
233 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
236 get_res_key(const struct fcb *fcb, const struct fres_contract *contract, struct res_key *key)
238 fres_block_resource *block_res;
240 block_res = fres_contract_get_resource(contract);
241 if (!block_res && !fres_contract_id_is_empty(&contract->id)) {
242 /* If the contract doesn't have resource information,
243 * this might be cancelation or renegotiation request,
244 * so look at our database for formerly submited
246 struct fcb_contract *fcb_contract;
247 fcb_contract = fcb_contract_find(fcb, &contract->id);
249 block_res = fres_contract_get_resource(fcb_contract->user_contract);
253 ul_logerr("No resource specified\n");
256 key->type = block_res->resource_type;
257 key->id = block_res->resource_id;
263 * Checks whether all contracts refers to a single resource.
266 * @param contracts Array of contract pointers.
267 * @param num Number of contracts.
269 * @return If all contracts refer to a signle resource, pointer to the
270 * coresponding resource structure is returned. Otherwise, NULL is
274 check_single_resource(struct fcb *fcb, struct fres_contract *contracts[], int num)
276 struct resource *resource = NULL;
278 struct res_key key, key2 = {-1,-1};
280 for (i=0; i<num; i++) {
281 if (!get_res_key(fcb, contracts[i], &key)) {
284 if (i==0) key2 = key;
285 else if (key.type != key2.type ||
291 resource = fcb_resource_find(fcb, &key);
293 ul_logerr("No resource manager for %d.%d registered\n",
306 * @return Zero on success, non-zero error code on error.
309 prepare_reservation_contracts(struct fcb *fcb, struct fres_contract *contracts[], int num)
312 struct fcb_contract *fcb_contract;
314 for (i=0; i<num; i++) {
315 struct fres_contract *c = contracts[i];
317 if (fres_contract_id_is_empty(&c->id)) {
318 /* Normal negotiation request */
319 forb_uuid_generate((forb_uuid_t *)&c->id);
322 if (fres_contract_get_num_blocks(c) == 0) {
323 /* Nothing to do for deletion requesst */
328 fcb_contract = fcb_contract_find(fcb, &c->id);
330 /* Copy missing blocks from fcb_contract to contract */
331 fres_contract_merge(c, fcb_contract->user_contract);
334 fres_contract_id_to_string(str, &c->id, sizeof(str));
335 ul_logerr("Attempt to renegotiate unknown contract %s\n", str);
336 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
343 static int prepare_next_reservation_variant(struct fcb_contract *fc, fosa_abs_time_t now)
345 fres_block_spare_capacity *s;
346 fres_contract_destroy(fc->to_be_reserved_contract);
347 fc->to_be_reserved_contract = fres_contract_duplicate(fc->user_contract);
348 if (!fc->to_be_reserved_contract)
350 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
353 variant = fosa_abs_time_smaller(fc->end_of_stability_period, now) ?
354 fc->next_sc_variant :
355 fc->reserved_sc_variant;
357 fres_container_copy(fc->to_be_reserved_contract->container,
358 forb_sequence_elem(s->variants, variant));
363 rebalance_spare_capacity_and_reserve(struct fcb *fcb, struct resource *resource,
364 struct fcb_contract *fcb_contract[], int num)
368 fosa_clock_get_time(CLOCK_REALTIME, &now);
369 for (i=0; i<num; i++) {
370 prepare_next_reservation_variant(fcb_contract[i], now);
374 /* Reserve contract */
375 ret = fres_resource_manager_reserve_contracts(resource->mng, contracts, ev);
376 if (forb_exception_occurred(ev) || ret < 0) {
385 negotiate_contracts(fres_contract_broker obj,
386 const fres_contract_ptr_seq* contracts,
387 fres_contract_id_seq** ids_out,
388 CORBA_Environment *ev)
390 struct fcb *fcb = o2fcb(obj);
391 struct resource *resource;
392 struct res_alloc *ra;
395 fres_contract_ptr_seq *schedulable_contracts;
396 struct fcb_contract **fcb_contracts;
398 fres_contract_id_seq* ids;
400 resource = check_single_resource(fcb, contracts->_buffer, contracts->_length);
402 ret = FRSH_ERR_RESOURCE_ID_INVALID;
406 ret = prepare_reservation_contracts(fcb, contracts->_buffer, contracts->_length);
410 forb_get_req_source(obj, &app);
411 ra = fcb_alloc_find(resource, &app);
414 forb_server_id_to_string(str, &app, sizeof(str));
415 ul_logerr("No resource allocator found for %d.%d and %s\n",
416 resource->key.type, resource->key.id, str);
417 ret = FRES_ERR_NO_RESOURCE_ALLOCATOR;
421 /* Allocate all the needed memory before doing reservation. If
422 * there is no enough memory, it has no sense to call resource
424 /* TODO: Use FORB macros to allocate sequence */
425 ids = malloc(sizeof(*ids));
427 ev->major = FORB_EX_NO_MEMORY;
430 memset(ids, 0, sizeof(*ids));
431 CORBA_sequence_set_release(ids, CORBA_TRUE);
433 ids->_buffer = malloc(contracts->_length*sizeof(ids->_buffer[0]));
438 ids->_length = ids->_maximum = contracts->_length;
439 for (i=0; i<contracts->_length; i++) {
440 ids->_buffer[i] = contracts->_buffer[i]->id;
443 fcb_contracts = malloc(sizeof(fcb_contracts[0])*contracts->_length);
444 if (!fcb_contracts) {
448 memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*contracts->_length);
450 for (i=0; i<contracts->_length; i++) {
451 /* Allocate FCB contracts */
452 struct fres_contract *c = contracts->_buffer[i];
453 if (fres_contract_get_num_blocks(c) > 0) {
454 fcb_contracts[i] = fcb_contract_new(&c->id);
455 if (!fcb_contracts[i]) {
456 ret = errno ? errno : -1;
457 goto err_free_fcb_contracts;
459 fcb_contracts[i]->user_contract = fres_contract_duplicate(c);
460 if (!fcb_contracts[i]->user_contract) {
461 ret = errno ? errno : -1;
462 goto err_free_fcb_contracts;
467 /* TODO: Optimize the following by introducing
468 * reserve_and_commit FRM method. */
470 /* Reserve contract */
471 ret = fres_resource_manager_reserve_contracts(resource->mng, contracts, ev);
472 if (forb_exception_occurred(ev) || ret < 0) {
473 goto err_free_fcb_contracts;
476 ret = rebalance_spare_capacity_and_reserve(fcb, resource, contracts->_buffer,
477 fcb_contracts, contracts->_length);
480 ul_logmsg("Contract(s) was not accepted\n");
481 goto err_free_fcb_contracts;
484 /* Commit contract */
485 fres_resource_manager_commit_contracts(resource->mng, ids,
486 &schedulable_contracts, ev);
487 if (forb_exception_occurred(ev)) {
488 ret = FRES_ERR_FORB_EXCEPTION;
489 goto err_cancel_reservation;
493 ret = fres_resource_allocator_change_vreses(ra->ra, schedulable_contracts, ev);
494 if (CORBA_sequence_get_release(schedulable_contracts)) {
496 for (i=0; i<schedulable_contracts->_length; i++) {
497 fres_contract_destroy(schedulable_contracts->_buffer[i]);
499 forb_free(schedulable_contracts->_buffer);
501 forb_free(schedulable_contracts);
502 if (forb_exception_occurred(ev)) {
503 ret = FRES_ERR_FORB_EXCEPTION;
504 goto err_cancel_reservation;
507 ul_logmsg("VRes was not created\n");
508 goto err_cancel_reservation;
511 /* Update database of negotiated contracts stored for later reference */
512 for (i=0; i<contracts->_length; i++) {
513 struct fcb_contract *fcb_contract;
514 fcb_contract = fcb_contract_find(fcb, &contracts->_buffer[i]->id);
515 /* Delete canceled or renegotiated user contract */
517 fcb_contract_delete(fcb, fcb_contract);
518 fcb_contract_destroy(fcb_contract);
520 if (fcb_contracts[i]) {
521 /* Insert new contracts */
522 fcb_contract_insert(fcb, fcb_contracts[i]);
528 err_cancel_reservation:
529 fres_resource_manager_cancel_reservations(resource->mng, ids, ev);
530 err_free_fcb_contracts:
531 for (i=0; i<contracts->_length; i++)
532 fcb_contract_destroy(fcb_contracts[i]);
538 CORBA_long register_resource(fres_contract_broker obj,
539 const frsh_resource_type_t restype,
540 const frsh_resource_id_t resid,
541 const fres_resource_desc *desc,
542 CORBA_Environment *ev)
544 struct fcb *fcb = o2fcb(obj);
545 struct resource *res, *res2;
547 res = malloc(sizeof(*res));
549 memset(res, 0, sizeof(*res));
550 res->key.type = restype;
552 res2 = fcb_resource_find(fcb, &res->key);
554 if (forb_object_is_stale(res2->mng)) {
555 ul_logmsg("Removing stale manager for resource %d.%d\n",
557 forb_object_release(res2->mng);
558 fcb_resource_delete(fcb, res2);
559 /* TODO: Delete also all allocators associated
560 * with this stale resource manager. */
564 ul_logerr("Resource manager %d.%d already registered\n",
569 res->mng = forb_object_duplicate(desc->manager);
571 fcb_alloc_init_root_field(res);
572 sc_contracts_init_head(res);
573 ul_logmsg("Registering manager for resource %d.%d\n",
575 fcb_resource_insert(fcb, res);
584 CORBA_long register_allocator(fres_contract_broker obj,
585 const frsh_resource_type_t restype,
586 const frsh_resource_id_t resid,
587 const fres_resource_allocator ra_obj,
588 CORBA_Environment *ev)
590 struct fcb *fcb = o2fcb(obj);
591 struct resource *res;
592 struct res_alloc *ra;
593 struct res_key resource;
594 forb_server_id server_id;
595 char server_id_str[40];
597 forb_get_server_id(ra_obj, &server_id);
598 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
599 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
600 restype, resid, server_id_str);
602 resource.type = restype;
604 res = fcb_resource_find(fcb, &resource);
606 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
610 ra = fcb_alloc_find(res, &server_id);
612 char *str = forb_object_to_string(ra_obj);
613 ul_logerr("Allocator from already registered (%s)\n",
618 ra = malloc(sizeof(*ra));
623 ra->ra = forb_object_duplicate(ra_obj);
624 fcb_alloc_insert(res, ra);
630 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
632 struct fcb *fcb = o2fcb(obj);
633 fres_resource_seq *seq;
634 struct resource *res;
637 seq = malloc(sizeof(*seq));
639 ev->major = FORB_EX_NO_MEMORY;
642 memset(seq, 0, sizeof(*seq));
645 gavl_cust_for_each(fcb_resource, fcb, res) {
649 seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
654 gavl_cust_for_each(fcb_resource, fcb, res) {
655 seq->_buffer[n].restype = res->key.type;
656 seq->_buffer[n].resid = res->key.id;
657 seq->_buffer[n].desc.manager = res->mng;
664 #ifdef CONFIG_FCB_INET
665 static int register_inet_port(forb_orb orb)
667 forb_port_t *port = forb_malloc(sizeof(*port));
669 struct in_addr listen_on;
673 memset(port, 0, sizeof(*port));
674 listen_on.s_addr = INADDR_ANY;
675 ret = forb_inet_port_init(&port->desc, listen_on);
678 ret = forb_register_port(orb, port);
683 struct forb_fres_contract_broker_impl impl = {
684 .negotiate_contracts = negotiate_contracts,
685 .register_resource = register_resource,
686 .register_allocator = register_allocator,
687 .get_resources = get_resources,
690 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
692 forb_server_id server_id;
693 char server_id_str[sizeof(forb_server_id)*2+1];
695 forb_get_server_id(peer_orb, &server_id);
696 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
698 ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
700 if (strcmp(orb_id, "org.frescor.fcb") == 0) {
702 fosa_rel_time_t delay;
703 fosa_clock_get_time(CLOCK_REALTIME, &now);
704 delay = fosa_abs_time_extract_interval(start_time, now);
705 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
706 fosa_rel_time_to_msec(delay));
710 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
714 int main(int argc, char *argv[])
718 fres_contract_broker fcb;
719 forb_executor_t executor;
721 forb_init_attr_t attr = {
722 .orb_id = "org.frescor.fcb",
723 .peer_discovery_callback = peer_discovery_callback,
724 .peer_dead_callback = peer_dead_callback,
728 fosa_clock_get_time(CLOCK_REALTIME, &start_time);
730 orb = forb_init(&argc, &argv, &attr);
731 if (!orb) error(1, errno, "FORB initialization failed");
733 #ifdef CONFIG_FCB_INET
734 ret = register_inet_port(orb);
735 if (ret) error(0, errno, "INET port registration failed");
738 fcb_resource_init_root_field(&fcb_data);
739 fcb_contract_init_root_field(&fcb_data);
741 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
742 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
744 /* Prepare executor before we register the fcb reference,
745 * so that no reuqests are lost */
746 ret = forb_executor_init(&executor);
747 if (ret) error(1, errno, "forb_executor_init failed");
749 ret = forb_executor_register_object(&executor, fcb);
750 if (ret) error(1, errno, "forb_executor_register_object failed");
752 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
753 if (ret) error(1, errno, "forb_register_reference() failed");
755 ul_logmsg("Waiting for requests\n");
756 ret = forb_executor_run(&executor);
757 if (ret) error(1, errno, "forb_executor_run failed");