+/**************************************************************************/
+/* ---------------------------------------------------------------------- */
+/* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
+/* */
+/* Universidad de Cantabria, SPAIN */
+/* University of York, UK */
+/* Scuola Superiore Sant'Anna, ITALY */
+/* Kaiserslautern University, GERMANY */
+/* Univ. Politécnica Valencia, SPAIN */
+/* Czech Technical University in Prague, CZECH REPUBLIC */
+/* ENEA SWEDEN */
+/* Thales Communication S.A. FRANCE */
+/* Visual Tools S.A. SPAIN */
+/* Rapita Systems Ltd UK */
+/* Evidence ITALY */
+/* */
+/* See http://www.frescor.org for a link to partners' websites */
+/* */
+/* FRESCOR project (FP6/2005/IST/5-034026) is funded */
+/* in part by the European Union Sixth Framework Programme */
+/* The European Union is not liable of any use that may be */
+/* made of this code. */
+/* */
+/* */
+/* This file is part of FRSH (FRescor ScHeduler) */
+/* */
+/* FRSH is free software; you can redistribute it and/or modify it */
+/* under terms of the GNU General Public License as published by the */
+/* Free Software Foundation; either version 2, or (at your option) any */
+/* later version. FRSH is distributed in the hope that it will be */
+/* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
+/* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
+/* General Public License for more details. You should have received a */
+/* copy of the GNU General Public License along with FRSH; see file */
+/* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
+/* Cambridge, MA 02139, USA. */
+/* */
+/* As a special exception, including FRSH header files in a file, */
+/* instantiating FRSH generics or templates, or linking other files */
+/* with FRSH objects to produce an executable application, does not */
+/* by itself cause the resulting executable application to be covered */
+/* by the GNU General Public License. This exception does not */
+/* however invalidate any other reasons why the executable file might be */
+/* covered by the GNU Public License. */
+/**************************************************************************/
+
+/**
+ * @file frm_generic.c
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @date Tue Nov 11 08:34:34 2008
+ *
+ * @brief Generic resource manager implementation.
+ *
+ *
+ */
#include <frm_generic.h>
#include <forb.h>
#include <ul_log.h>
#include <fres_sa_scenario.h>
#include <fcb.h>
-UL_LOG_CUST(ulogd_frm);
-ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
-
-struct frm_data {
- struct fres_sa_scenario *running;
- frm_adm_test_fnc_t admission_test;
- void *priv;
-};
+UL_LOG_CUST(ulogd_frm_generic);
+ul_log_domain_t ulogd_frm_generic = {UL_LOGL_DEB, "frm"};
#define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
#define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
+void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
+{
+ struct fres_sa_contract *c;
+ fres_sa_scenario_for_each_contract(scenario, c) {
+ if (c->new) {
+ if (c->reserved) {
+ fres_contract_destroy(c->reserved);
+ }
+ c->reserved = c->new;
+ c->new = NULL;
+ }
+ }
+}
+
+void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
+{
+ struct fres_sa_contract *c, *c_next;
+
+ /* Deleteion safe scenario traverse */
+ for(c=fres_sa_scenario_contract_first(scenario),
+ c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
+ c;
+ c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
+ if (c->new) {
+ if (c->committed || c->reserved) {
+ fres_contract_destroy(c->new);
+ c->new = NULL;
+ c->contract = c->reserved ? c->reserved : c->committed;
+ } else {
+ fres_sa_scenario_del_contract(scenario, c);
+ fres_sa_contract_destroy(c);
+ }
+ }
+ }
+}
+
+
static CORBA_long reserve_contracts(fres_resource_manager obj,
- const fres_contract_ptr_seq* contracts,
- CORBA_Environment *ev)
+ const fres_contract_ptr_seq* contracts,
+ CORBA_Environment *ev)
{
struct frm_data *frm = object_to_frm(obj);
- struct fres_sa_scenario *prospective;
- struct fres_contract *c;
- bool schedulable;
- int i;
+ struct fres_sa_scenario *scenario = frm->scenario;
+ bool schedulable =false;
+ int i, ret;
+ struct fres_sa_contract *c;
- ul_logdeb("reserve_contracts\n");
+ ul_logmsg("reserve_contracts\n");
- prospective = fres_sa_scenario_duplicate(frm->running);
for (i=0; i<contracts->_length; i++) {
- c = fres_contract_duplicate(contracts->_buffer[i]);
- fres_sa_scenario_add_contract(prospective, c);
+ struct fres_contract *cin = contracts->_buffer[i];
+
+ c = fres_sa_scenario_find_contract(scenario, &cin->id);
+ if (!c) {
+ c = fres_sa_contract_new();
+ if (c) {
+ c->id = cin->id;
+ fres_sa_scenario_add_contract(scenario, c);
+ }
+ }
+ if (!c) goto err;
+
+ {
+ char id[40];
+ char *operation;
+ fres_contract_id_to_string(id, &c->id, sizeof(id));
+ if (fres_contract_get_num_blocks(cin) == 0) operation = "cancelation";
+ else if (c->committed) operation = "renegotiation";
+ else if (c->reserved) operation = "negotiation (already reserved)";
+ else operation = "negotiation";
+ ul_logdeb(" reserve contract %s %s\n", id, operation);
+ }
+ assert(c->new == NULL);
+ c->new = fres_contract_duplicate(cin);
+ c->contract = c->new;
+ if (!c->new) goto err;
}
- schedulable = frm->admission_test(prospective, frm->priv);
+ ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
+ if (ret) {
+ ul_logmsg("admission_test failed: %d\n", ret);
+ goto err;
+ }
if (schedulable) {
- fres_sa_scenario_destroy(frm->running);
- frm->running = prospective;
+ fres_sa_scenario_reserve_new(scenario);
} else {
- fres_sa_scenario_destroy(prospective);
+ fres_sa_scenario_rollback(scenario);
}
return schedulable ? 0 : 1;
+err:
+ fres_sa_scenario_rollback(scenario);
+ return -1;
}
static void commit_contracts(fres_resource_manager obj,
- const fres_contract_id_seq* ids,
- fres_contract_ptr_seq** contract_with_scheduler_data,
- CORBA_Environment *ev)
+ const fres_contract_id_seq* ids,
+ fres_contract_ptr_seq** contracts_with_scheduling_data,
+ CORBA_Environment *ev)
{
struct frm_data *frm = object_to_frm(obj);
int i, num;
struct fres_sa_contract *c;
fres_contract_ptr_seq *contracts;
- ul_logdeb("commit_contracts\n");
+ ul_logmsg("commit_contracts\n");
contracts = forb_malloc(sizeof(*contracts));
if (!contracts) {
}
num = ids->_length;
contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
+ CORBA_sequence_set_release(contracts, CORBA_TRUE);
contracts->_maximum = contracts->_length = num;
-
+
+ /* TODO: Add also the changed contracts (e.g. because of
+ * priorities). Question: How to recognize which contracts are
+ * changed because of this commit? */
for (i=0; i < num; i++) {
- c = fres_sa_scenario_find_contract(frm->running, &ids->_buffer[i]);
+ char id[40];
+ const char *operation;
+ fres_contract_id_to_string(id, &ids->_buffer[i], sizeof(id));
+
+ c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
+ if (c && c->reserved) {
+ if (fres_contract_get_num_blocks(c->reserved) == 0) {
+ /* Cancelation request */
+ operation = "cancelation";
+ contracts->_buffer[i] = c->reserved;
+ c->reserved = NULL;
+ fres_sa_scenario_del_contract(frm->scenario, c);
+ fres_sa_contract_destroy(c);
+ } else {
+ /* Normal reservation */
+ if (c->committed) {
+ operation = "renegotiation";
+ fres_contract_destroy(c->committed);
+ } else {
+ operation = "negotiation";
+ }
+ c->committed = c->reserved;
+ c->reserved = NULL;
+ contracts->_buffer[i] = fres_contract_duplicate(c->committed);
+ }
+ } else {
+ operation = "error";
+ contracts->_buffer[i] = NULL;
+ if (!c) ul_logerr("Commit to unknown contract ID\n");
+ else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
+ }
+
+ ul_logdeb(" commit contract %s %s\n", id, operation);
}
-
- *contract_with_scheduler_data = contracts;
+
+ *contracts_with_scheduling_data = contracts;
err:;
}
-static void cancel_contracts(fres_resource_manager obj,
- const fres_contract_id_seq* ids,
- CORBA_Environment *ev)
+static void cancel_reservations(fres_resource_manager obj,
+ const fres_contract_id_seq* ids,
+ CORBA_Environment *ev)
{
int i;
struct frm_data *frm = object_to_frm(obj);
- ul_logdeb("cancel_contracts\n");
+ ul_logmsg("cancel_reservations\n");
for (i=0; i<ids->_length; i++) {
struct fres_sa_contract *c;
- c = fres_sa_scenario_find_contract(frm->running, &ids->_buffer[i]);
+ c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
+
+ if (c->reserved) {
+ if (c->committed) {
+ /* Roll-back renegotiation */
+ fres_contract_destroy(c->reserved);
+ c->reserved = NULL;
+ c->contract = c->committed;
+ } else {
+ /* Roll-back negotiation */
+ fres_sa_scenario_del_contract(frm->scenario, c);
+ fres_sa_contract_destroy(c);
+ }
+ }
+ }
+}
+
+
+static void get_contracts(fres_resource_manager obj,
+ fres_contract_ptr_seq** contracts_out,
+ CORBA_long* utilization, CORBA_Environment *ev)
+{
+ struct frm_data *frm = object_to_frm(obj);
+ struct fres_sa_contract *c;
+ fres_contract_ptr_seq *contracts;
+
+ contracts = forb_malloc(sizeof(*contracts));
+ if (!contracts) {
+ ev->major = FORB_EX_NO_MEMORY;
+ goto err;
+ }
+ contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(frm->scenario->num_contracts);
+ CORBA_sequence_set_release(contracts, CORBA_TRUE);
+ contracts->_maximum = frm->scenario->num_contracts;
+ contracts->_length = 0;
- if (c) {
- fres_sa_scenario_del_contract(frm->running, c);
+ fres_sa_scenario_for_each_contract(frm->scenario, c) {
+ if (c->committed) {
+ int i = contracts->_length;
+ contracts->_buffer[i] = fres_contract_duplicate(c->committed);
+ contracts->_length++;
}
+
}
+ *contracts_out = contracts;
+ *utilization = frm->scenario->utilization;
+err:;
}
static const struct forb_fres_resource_manager_impl frm_impl = {
.reserve_contracts = reserve_contracts,
.commit_contracts = commit_contracts,
- .cancel_contracts = cancel_contracts,
+ .cancel_reservations = cancel_reservations,
+ .get_contracts = get_contracts,
};
/**
- * Initializes and runs a generic resource manager. The only thing a
+ * Initializes generic resource manager. The only thing a
* caller has to supply is admission test function, which is passed in
* @a frm_data->admission_test.
*
* @param orb FORB object used to communicate with other components.
- * @param admission_test Admission test function.
- * @param priv Pointer to passed as priv parameter to frm_adm_test_fnc_t.
+ * @param[out] frm_data Pointer to frm_data structure
+ * @param executor Executor used for this resource manager.
+ * @param desc Description on the resource manager
*
- * @return
+ * @return Resource manager object reference of NULL in case of error.
*/
-int frm_register_and_run(forb_orb orb, const struct fres_res_manager *res_manager)
+fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
+ forb_executor_t *executor,
+ const struct fres_res_manager *desc)
{
fres_contract_broker fcb;
+ fres_resource_desc res_desc;
fres_resource_manager frm;
struct forb_env env;
- struct frm_data frm_data;
- forb_executor_t executor;
int ret;
- memset(&frm_data, 0, sizeof(frm_data));
- frm_data.admission_test = res_manager->admission_test;
- frm_data.priv = res_manager->priv;
+ memset(frm_data, 0, sizeof(*frm_data));
+ frm_data->desc = desc;
+ frm_data->scenario = fres_sa_scenario_new();
+ if (!frm_data->scenario) {
+ save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
+ goto err;
+ }
fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
if (!fcb) {
- save_errno(ul_logerr("Could not find contract broker"));
+ save_errno(ul_logerr("Could not find contract broker\n"));
goto err;
}
- frm = forb_fres_resource_manager_new(orb, &frm_impl, &frm_data);
+ frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
if (!frm) {
- save_errno(ul_logerr("forb_fres_resource_manager_new error"));
+ save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
goto err_release_fcb;
}
+ ret = forb_executor_register_object(executor, frm);
+ if (ret) {
+ save_errno(ul_logerr("forb_executor_register_object failed\n"));
+ goto err_executor;
+ }
+
+ /* Register resource manager */
+ res_desc.manager = frm;
+ ret = fres_contract_broker_register_resource(fcb,
+ desc->res_type,
+ desc->res_id,
+ &res_desc, &env);
+ if (forb_exception_occurred(&env) || ret != 0) {
+ save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
+ goto err_register;
+ }
+
+ forb_object_release(fcb);
+ return frm;
+err_register:
+ forb_executor_unregister_object(executor, frm);
+err_executor:
+ forb_object_release(frm);
+err_release_fcb:
+ forb_object_release(fcb);
+err:
+ return NULL;
+}
+
+/**
+ * Initializes and runs a generic resource manager. The only thing a
+ * caller has to supply is admission test function, which is passed in
+ * @a frm_data->admission_test.
+ *
+ * @param orb FORB object used to communicate with other components.
+ * @param desc Description on the resource manager
+ *
+ * @return
+ */
+int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
+{
+ fres_resource_manager frm;
+ struct frm_data frm_data;
+ forb_executor_t executor;
+ int ret = -1;
+
/* Prepare executor before we register the resource manager
* with contract broker */
ret = forb_executor_init(&executor);
- if (ret) goto err_release_frm;
-
- ret = forb_executor_register_object(&executor, frm);
- if (ret) goto err_executor;
-
- /* Register resource manager */
- ret = fres_contract_broker_register_manager(fcb,
- res_manager->res_type,
- res_manager->res_id,
- frm, &env);
- if (forb_exception_occured(&env) || ret != 0) {
- goto err_executor;
+ if (ret) {
+ save_errno(ul_logerr("forb_executor_init failed\n"));
+ goto err;
}
+ frm = frm_register(orb, &frm_data, &executor, desc);
+ if (!frm) {
+ ret = -1;
+ save_errno(ul_logerr("frm_register failed\n"));
+ goto err_destroy_executor;
+ }
+
/* Start request processing */
+ ul_logmsg("Waiting for requests\n");
ret = forb_executor_run(&executor);
- if (ret) goto err_executor;
-
- return 0;
-err_executor:
- forb_executor_destroy(&executor);
+ if (ret) goto err_release_frm;
+
err_release_frm:
forb_object_release(frm);
-err_release_fcb:
- forb_object_release(fcb);
+err_destroy_executor:
+ forb_executor_destroy(&executor);
err:
- return -1;
+ return ret;
}