1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FRSH (FRescor ScHeduler) */
27 /* FRSH is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FRSH is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FRSH; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FRSH header files in a file, */
39 /* instantiating FRSH generics or templates, or linking other files */
40 /* with FRSH objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Oct 20 18:00:18 2008
52 * @brief FRESCOR Contract Broker
61 #include <ul_gavlcust.h>
64 #include <forb/server_id.h>
65 #include <fres_contract.h>
66 #include "fcb_config.h"
68 #ifdef CONFIG_FCB_INET
69 #include <forb/proto_inet.h>
72 UL_LOG_CUST(ulogd_fcb);
73 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
77 * Resource identification
80 frsh_resource_type_t type;
81 frsh_resource_id_t id;
90 fres_resource_manager mng; /**< Object reference of the resource manager */
91 gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
95 * Resource allocator registered in different nodes/applications
100 fres_resource_allocator ra;
103 struct fcb_contract {
104 fres_contract_id_t id;
106 struct fres_contract *user_contract;
110 * Contract broker data
113 gavl_cust_root_field_t resources; /**< Registered resources */
114 gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
117 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
119 struct fcb_contract *fcb_contract;
121 fcb_contract = malloc(sizeof(*fcb_contract));
125 memset(fcb_contract, 0, sizeof(*fcb_contract));
126 fcb_contract->id = *id;
131 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
133 if (fcb_contract->user_contract) {
134 fres_contract_destroy(fcb_contract->user_contract);
139 static inline int res_key_cmp(const struct res_key *a,
140 const struct res_key *b)
142 if (a->type < b->type) {
144 } else if (a->type > b->type) {
146 } else if (a->id < b->id) {
148 } else if (a->id > b->id) {
155 /* Container for registered resources */
156 GAVL_CUST_NODE_INT_DEC(fcb_resource /* cust_prefix */, \
157 struct fcb /* cust_root_t */, \
158 struct resource/* cust_item_t */, \
159 struct res_key /* cust_key_t */, \
160 resources /* cust_root_node */, \
161 node /* cust_item_node */, \
162 key /* cust_item_key */, \
163 res_key_cmp /* cust_cmp_fnc */);
165 GAVL_CUST_NODE_INT_IMP(fcb_resource /* cust_prefix */, \
166 struct fcb /* cust_root_t */, \
167 struct resource/* cust_item_t */, \
168 struct res_key /* cust_key_t */, \
169 resources /* cust_root_node */, \
170 node /* cust_item_node */, \
171 key /* cust_item_key */, \
172 res_key_cmp /* cust_cmp_fnc */);
174 /* Container for allocators registered for a given resource */
175 GAVL_CUST_NODE_INT_DEC(fcb_alloc /* cust_prefix */, \
176 struct resource /* cust_root_t */, \
177 struct res_alloc /* cust_item_t */, \
178 forb_server_id /* cust_key_t */, \
179 allocators /* cust_root_node */, \
180 node /* cust_item_node */, \
181 app /* cust_item_key */, \
182 fres_contract_id_cmp /* cust_cmp_fnc */);
184 GAVL_CUST_NODE_INT_IMP(fcb_alloc /* cust_prefix */, \
185 struct resource /* cust_root_t */, \
186 struct res_alloc /* cust_item_t */, \
187 forb_server_id /* cust_key_t */, \
188 allocators /* cust_root_node */, \
189 node /* cust_item_node */, \
190 app /* cust_item_key */, \
191 forb_server_id_cmp /* cust_cmp_fnc */);
193 /* Container for negotiated contracts */
194 GAVL_CUST_NODE_INT_DEC(fcb_contract /* cust_prefix */, \
195 struct fcb /* cust_root_t */, \
196 struct fcb_contract /* cust_item_t */, \
197 fres_contract_id_t /* cust_key_t */, \
198 contracts /* cust_root_node */, \
199 node /* cust_item_node */, \
200 id /* cust_item_key */, \
201 fres_contract_id_cmp /* cust_cmp_fnc */);
204 GAVL_CUST_NODE_INT_IMP(fcb_contract /* cust_prefix */, \
205 struct fcb /* cust_root_t */, \
206 struct fcb_contract /* cust_item_t */, \
207 fres_contract_id_t /* cust_key_t */, \
208 contracts /* cust_root_node */, \
209 node /* cust_item_node */, \
210 id /* cust_item_key */, \
211 fres_contract_id_cmp /* cust_cmp_fnc */);
213 #include "fcb_contract_gavl.inc"
217 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
219 static struct resource *
220 get_resource(const struct fcb *fcb, const struct fres_contract *contract)
222 fres_block_resource *block_res;
223 struct res_key res_key;
224 struct resource *resource;
226 block_res = fres_contract_get_resource(contract);
228 ul_logerr("No resource specified\n");
231 res_key.type = block_res->resource_type;
232 res_key.id = block_res->resource_id;
234 resource = fcb_resource_find(fcb, &res_key);
236 ul_logerr("No resource manager for %d.%d registered\n",
237 res_key.type, res_key.id);
244 negotiate_contract(fres_contract_broker obj,
245 const fres_contract_ptr contract,
246 fres_contract_id_t* id,
247 CORBA_Environment *ev)
249 struct fcb *fcb = o2fcb(obj);
250 struct resource *resource;
251 struct res_alloc *ra;
254 fres_contract_ptr contract_seq_buf;
255 fres_contract_ptr_seq contracts;
256 fres_contract_ptr_seq *schedulable_contracts;
257 fres_contract_id_seq ids;
258 struct fcb_contract *fcb_contract;
260 resource = get_resource(fcb, contract);
262 ret = FRSH_ERR_RESOURCE_ID_INVALID;
266 forb_uuid_generate((forb_uuid_t *)id);
269 forb_get_req_source(obj, &app);
270 ra = fcb_alloc_find(resource, &app);
273 forb_server_id_to_string(str, &app, sizeof(str));
274 ul_logerr("No resource allocator found for %d.%d and %s\n",
275 resource->key.type, resource->key.id, str);
280 fcb_contract = fcb_contract_new(id);
285 fcb_contract->user_contract = fres_contract_duplicate(contract);
287 /* Reserve contract */
288 contracts._length = 1;
289 contract_seq_buf = contract;
290 contracts._buffer = &contract_seq_buf;
291 ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, ev);
292 if (forb_exception_occurred(ev) || ret < 0) {
293 goto err_free_fcb_contract;
296 ul_logmsg("Contract was not accepted\n");
297 goto err_free_fcb_contract;
300 /* Commit contract */
301 ids._length = ids._maximum = 1;
303 fres_resource_manager_commit_contracts(resource->mng, &ids,
304 &schedulable_contracts, ev);
305 if (forb_exception_occurred(ev)) {
306 ret = FRES_ERR_FORB_EXCEPTION;
307 goto err_free_fcb_contract;
311 ret = fres_resource_allocator_change_vreses(ra->ra, schedulable_contracts, ev);
312 if (CORBA_sequence_get_release(schedulable_contracts)) {
314 for (i=0; i<schedulable_contracts->_length; i++) {
315 fres_contract_destroy(schedulable_contracts->_buffer[i]);
317 forb_free(schedulable_contracts->_buffer);
319 forb_free(schedulable_contracts);
320 if (forb_exception_occurred(ev)) {
321 ret = FRES_ERR_FORB_EXCEPTION;
322 goto err_cancel_reservation;
325 ul_logmsg("VRes was not created\n");
326 goto err_cancel_reservation;
329 /* Store the negotiated contract for later reference */
330 fcb_contract_insert(fcb, fcb_contract);
334 err_cancel_reservation:
335 fres_resource_manager_cancel_contracts(resource->mng, &ids, ev);
336 err_free_fcb_contract:
337 fcb_contract_destroy(fcb_contract);
342 CORBA_long cancel_contract(fres_contract_broker obj,
343 const fres_contract_id_t* id,
344 CORBA_Environment *ev)
346 struct fcb *fcb = o2fcb(obj);
347 struct fcb_contract *fcb_contract;
348 struct resource *resource;
349 struct res_alloc *ra;
350 fres_contract_id_t id_buffer[1] = { *id };
351 fres_contract_id_seq ids;
355 fcb_contract = fcb_contract_find(fcb, &id_buffer[0]); /* FIXME: id */
357 ret = FRSH_ERR_NOT_CONTRACTED_VRES;
361 resource = get_resource(fcb, fcb_contract->user_contract);
363 ret = FRSH_ERR_RESOURCE_ID_INVALID;
367 forb_get_req_source(obj, &app);
368 ra = fcb_alloc_find(resource, &app);
371 forb_server_id_to_string(str, &app, sizeof(str));
372 ul_logerr("No resource allocator found for %d.%d and %s\n",
373 resource->key.type, resource->key.id, str);
378 ids._length = ids._maximum = 1;
379 ids._buffer = id_buffer;
380 ret = fres_resource_allocator_cancel_vreses(ra->ra, &ids, ev);
382 ul_logerr("Cannot cancel vres\n");
385 fres_resource_manager_cancel_contracts(resource->mng, &ids, ev);
392 CORBA_long register_resource(fres_contract_broker obj,
393 const frsh_resource_type_t restype,
394 const frsh_resource_id_t resid,
395 const fres_resource_desc *desc,
396 CORBA_Environment *ev)
398 struct fcb *fcb = o2fcb(obj);
399 struct resource *res, *res2;
401 res = malloc(sizeof(*res));
403 memset(res, 0, sizeof(*res));
404 res->key.type = restype;
406 res2 = fcb_resource_find(fcb, &res->key);
408 if (forb_object_is_stale(res2->mng)) {
409 ul_logmsg("Removing stale manager for resource %d.%d\n",
411 forb_object_release(res2->mng);
412 fcb_resource_delete(fcb, res2);
413 /* TODO: Delete also all allocators associated
414 * with this stale resource manager. */
418 ul_logerr("Resource manager %d.%d already registered\n",
423 res->mng = forb_object_duplicate(desc->manager);
425 fcb_alloc_init_root_field(res);
426 ul_logmsg("Registering manager for resource %d.%d\n",
428 fcb_resource_insert(fcb, res);
437 CORBA_long register_allocator(fres_contract_broker obj,
438 const frsh_resource_type_t restype,
439 const frsh_resource_id_t resid,
440 const fres_resource_allocator ra_obj,
441 CORBA_Environment *ev)
443 struct fcb *fcb = o2fcb(obj);
444 struct resource *res;
445 struct res_alloc *ra;
446 struct res_key resource;
447 forb_server_id server_id;
448 char server_id_str[40];
450 forb_get_server_id(ra_obj, &server_id);
451 forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
452 ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
453 restype, resid, server_id_str);
455 resource.type = restype;
457 res = fcb_resource_find(fcb, &resource);
459 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
463 ra = fcb_alloc_find(res, &server_id);
465 char *str = forb_object_to_string(ra_obj);
466 ul_logerr("Allocator from already registered (%s)\n",
471 ra = malloc(sizeof(*ra));
476 ra->ra = forb_object_duplicate(ra_obj);
477 fcb_alloc_insert(res, ra);
483 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
485 struct fcb *fcb = o2fcb(obj);
486 fres_resource_seq *seq;
487 struct resource *res;
490 seq = malloc(sizeof(*seq));
492 ev->major = FORB_EX_NO_MEMORY;
495 memset(seq, 0, sizeof(*seq));
498 gavl_cust_for_each(fcb_resource, fcb, res) {
502 seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
507 gavl_cust_for_each(fcb_resource, fcb, res) {
508 seq->_buffer[n].restype = res->key.type;
509 seq->_buffer[n].resid = res->key.id;
510 seq->_buffer[n].desc.manager = res->mng;
517 #ifdef CONFIG_FCB_INET
518 static int register_inet_port(forb_orb orb)
520 forb_port_t *port = forb_malloc(sizeof(*port));
522 struct in_addr listen_on;
526 memset(port, 0, sizeof(*port));
527 listen_on.s_addr = INADDR_ANY;
528 ret = forb_inet_port_init(&port->desc, listen_on);
531 ret = forb_register_port(orb, port);
536 struct forb_fres_contract_broker_impl impl = {
537 .negotiate_contract = negotiate_contract,
538 .cancel_contract = cancel_contract,
539 .register_resource = register_resource,
540 .register_allocator = register_allocator,
541 .get_resources = get_resources,
544 int main(int argc, char *argv[])
548 fres_contract_broker fcb;
549 forb_executor_t executor;
552 orb = forb_init(&argc, &argv, "fcb");
553 if (!orb) error(1, errno, "FORB initialization failed");
555 #ifdef CONFIG_FCB_INET
556 ret = register_inet_port(orb);
557 if (ret) error(0, errno, "INET port registration failed");
560 fcb_resource_init_root_field(&fcb_data);
561 fcb_contract_init_root_field(&fcb_data);
563 fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
564 if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
566 /* Prepare executor before we register the fcb reference,
567 * so that no reuqests are lost */
568 ret = forb_executor_init(&executor);
569 if (ret) error(1, errno, "forb_executor_init failed");
571 ret = forb_executor_register_object(&executor, fcb);
572 if (ret) error(1, errno, "forb_executor_register_object failed");
574 ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
575 if (ret) error(1, errno, "forb_register_reference() failed");
577 ul_logmsg("Waiting for requests\n");
578 ret = forb_executor_run(&executor);
579 if (ret) error(1, errno, "forb_executor_run failed");