*
*/
struct fres_sa_contract {
- enum fres_sa_contract_status status;
+ fres_contract_id_t id; /**< ID of all contract versions stored here. */
+
+ /** If the contract was already commited, the commited version
+ * is stored here. The contract is moved here from @a reserved
+ * field in fres_resource_manager::commit_contracts(). */
+ struct fres_contract *commited;
+
+ /** The reserved (and not yet commited) version of the
+ * contract. If a new contract is to be negotiated or an old
+ * one is to be changed, the new version is stored here after
+ * successful admission test. */
+ struct fres_contract *reserved;
+
+ /** If a new contract is to be negotiated it is stored
+ * here. After sucessfull admission test, the contract is
+ * moved to @a reserved field. If the contract is to be
+ * changed, the admission test can compare the commited and
+ * new version and analyze not only the target state but also
+ * the mode change. */
+ struct fres_contract *new;
+
+ /** Convenience pointer, which has the same value as one of @a
+ * new, @a reserved and @a commited. It is meant for simpler
+ * admission tests, which do not evaluate mode changes and
+ * therefore have no need for comparison of commited and new
+ * version of contact. */
struct fres_contract *contract;
+
+
void *priv; /**< Private data for use by admission test */
gavl_node_t node;
};
fres_contract_id_t /* cust_key_t */,
contracts /* cust_root_node */,
node /* cust_item_node */,
- contract->id /* cust_item_key */,
+ id /* cust_item_key */,
fres_contract_id_cmp /* cust_cmp_fnc */);
ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
struct frm_data {
- struct fres_sa_scenario *reserved;
- frm_adm_test_fnc_t admission_test;
- void *priv;
+ struct fres_sa_scenario *scenario;
+ const struct fres_res_manager *desc;
};
#define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
#define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
+void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
+{
+ struct fres_sa_contract *c;
+ fres_sa_scenario_for_each_contract(scenario, c) {
+ if (c->new) {
+ if (c->reserved) {
+ fres_contract_destroy(c->reserved);
+ }
+ c->reserved = c->new;
+ c->new = NULL;
+ }
+ }
+}
+
+void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
+{
+ struct fres_sa_contract *c, *c_next;
+
+ /* Deleteion safe scenario traverse */
+ for(c=fres_sa_scenario_contract_first(scenario),
+ c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
+ c;
+ c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
+ if (c->new) {
+ if (c->commited || c->reserved) {
+ fres_contract_destroy(c->new);
+ c->new = NULL;
+ c->contract = c->reserved ? c->reserved : c->commited;
+ } else {
+ fres_sa_scenario_del_contract(scenario, c);
+ fres_sa_contract_destroy(c);
+ }
+ }
+ }
+}
+
+
static CORBA_long reserve_contracts(fres_resource_manager obj,
const fres_contract_ptr_seq* contracts,
CORBA_Environment *ev)
{
struct frm_data *frm = object_to_frm(obj);
- struct fres_sa_scenario *prospective;
- bool schedulable;
+ struct fres_sa_scenario *scenario = frm->scenario;
+ bool schedulable =false;
int i, ret;
+ struct fres_sa_contract *c;
ul_logmsg("reserve_contracts\n");
- prospective = fres_sa_scenario_duplicate(frm->reserved);
- if (!prospective) goto err;
for (i=0; i<contracts->_length; i++) {
- struct fres_sa_contract *c;
- c = fres_sa_contract_new();
+ struct fres_contract *cin = contracts->_buffer[i];
+ c = fres_sa_scenario_find_contract(scenario, &cin->id);
+ if (!c) {
+ c = fres_sa_contract_new();
+ if (c) {
+ c->id = cin->id;
+ fres_sa_scenario_add_contract(scenario, c);
+ }
+ }
if (!c) goto err;
- c->status = FRES_SA_CONTRACT_NEW;
- c->contract = fres_contract_duplicate(contracts->_buffer[i]);
- if (!c->contract) goto err;
- fres_sa_scenario_add_contract(prospective, c);
+ assert(c->new == NULL);
+ c->new = fres_contract_duplicate(cin);
+ c->contract = c->new;
+ if (!c->new) goto err;
}
- ret = frm->admission_test(prospective, frm->priv, &schedulable);
+ ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
if (ret) {
ul_logerr("admission_test failed: %d\n", ret);
goto err;
}
if (schedulable) {
- struct fres_sa_contract *c;
- fres_sa_scenario_for_each_contract(prospective, c) {
- if (c->status == FRES_SA_CONTRACT_NEW) {
- c->status = FRES_SA_CONTRACT_RESERVED;
- }
- }
- fres_sa_scenario_destroy(frm->reserved);
- frm->reserved = prospective;
+ fres_sa_scenario_reserve_new(scenario);
} else {
- fres_sa_scenario_destroy(prospective);
+ fres_sa_scenario_rollback(scenario);
}
return schedulable ? 0 : 1;
err:
- fres_sa_scenario_destroy(prospective);
+ fres_sa_scenario_rollback(scenario);
return -1;
}
contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
CORBA_sequence_set_release(contracts, CORBA_TRUE);
contracts->_maximum = contracts->_length = num;
-
+
+ /* TODO: Add also the changed contracts (e.g. because of
+ * priorities). Question: How to recognize which contracts are
+ * changed because of this commit? */
for (i=0; i < num; i++) {
- c = fres_sa_scenario_find_contract(frm->reserved, &ids->_buffer[i]);
- c->status = FRES_SA_CONTRACT_COMMITED;
- contracts->_buffer[i] = c->contract;
+ c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
+ if (c && c->reserved) {
+ if (c->commited) fres_contract_destroy(c->commited);
+ c->commited = c->reserved;
+ c->reserved = NULL;
+ contracts->_buffer[i] = fres_contract_duplicate(c->commited);
+ } else {
+ contracts->_buffer[i] = NULL;
+ if (!c) ul_logerr("Commit to unknown contract ID\n");
+ else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
+ }
}
*contracts_with_scheduling_data = contracts;
for (i=0; i<ids->_length; i++) {
struct fres_sa_contract *c;
- c = fres_sa_scenario_find_contract(frm->reserved, &ids->_buffer[i]);
+ c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
if (c) {
- fres_sa_scenario_del_contract(frm->reserved, c);
+ fres_sa_scenario_del_contract(frm->scenario, c);
+ fres_sa_contract_destroy(c);
}
}
}
.cancel_contracts = cancel_contracts,
};
-/**
- * Initializes and runs a generic resource manager. The only thing a
- * caller has to supply is admission test function, which is passed in
- * @a frm_data->admission_test.
- *
- * @param orb FORB object used to communicate with other components.
- * @param admission_test Admission test function.
- * @param priv Pointer to passed as priv parameter to frm_adm_test_fnc_t.
- *
- * @return
- */
-int frm_register_and_run(forb_orb orb, const struct fres_res_manager *res_manager)
+fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
+ forb_executor_t *executor,
+ const struct fres_res_manager *desc)
{
fres_contract_broker fcb;
fres_resource_manager frm;
struct forb_env env;
- struct frm_data frm_data;
- forb_executor_t executor;
int ret;
- memset(&frm_data, 0, sizeof(frm_data));
- frm_data.admission_test = res_manager->admission_test;
- frm_data.priv = res_manager->priv;
- frm_data.reserved = fres_sa_scenario_new();
- if (!frm_data.reserved) {
+ memset(frm_data, 0, sizeof(*frm_data));
+ frm_data->desc = desc;
+ frm_data->scenario = fres_sa_scenario_new();
+ if (!frm_data->scenario) {
save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
goto err;
}
fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
if (!fcb) {
- save_errno(ul_logerr("Could not find contract broker"));
+ save_errno(ul_logerr("Could not find contract broker\n"));
goto err;
}
- frm = forb_fres_resource_manager_new(orb, &frm_impl, &frm_data);
+ frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
if (!frm) {
- save_errno(ul_logerr("forb_fres_resource_manager_new error"));
+ save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
goto err_release_fcb;
}
- /* Prepare executor before we register the resource manager
- * with contract broker */
- ret = forb_executor_init(&executor);
- if (ret) {
- save_errno(ul_logerr("forb_executor_init failed"));
- goto err_release_frm;
- }
-
- ret = forb_executor_register_object(&executor, frm);
+ ret = forb_executor_register_object(executor, frm);
if (ret) {
- save_errno(ul_logerr("forb_executor_register_object failed"));
+ save_errno(ul_logerr("forb_executor_register_object failed\n"));
goto err_executor;
}
/* Register resource manager */
ret = fres_contract_broker_register_manager(fcb,
- res_manager->res_type,
- res_manager->res_id,
+ desc->res_type,
+ desc->res_id,
frm, &env);
if (forb_exception_occured(&env) || ret != 0) {
- save_errno(ul_logerr("fres_contract_broker_register_manager exception\n"));
- goto err_executor;
+ save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
+ goto err_register;
}
+ forb_object_release(fcb);
+ return frm;
+err_register:
+ forb_executor_unregister_object(executor, frm);
+err_executor:
+ forb_object_release(frm);
+err_release_fcb:
+ forb_object_release(fcb);
+err:
+ return NULL;
+}
+
+/**
+ * Initializes and runs a generic resource manager. The only thing a
+ * caller has to supply is admission test function, which is passed in
+ * @a frm_data->admission_test.
+ *
+ * @param orb FORB object used to communicate with other components.
+ * @param desc Description on the resource manager
+ *
+ * @return
+ */
+int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
+{
+ fres_resource_manager frm;
+ struct frm_data frm_data;
+ forb_executor_t executor;
+ int ret = -1;
+
+ /* Prepare executor before we register the resource manager
+ * with contract broker */
+ ret = forb_executor_init(&executor);
+ if (ret) {
+ save_errno(ul_logerr("forb_executor_init failed\n"));
+ goto err;
+ }
+
+ frm = frm_register(orb, &frm_data, &executor, desc);
+ if (!frm) {
+ ret = -1;
+ save_errno(ul_logerr("frm_register failed\n"));
+ goto err_destroy_executor;
+ }
+
/* Start request processing */
ul_logmsg("Waiting for requests\n");
ret = forb_executor_run(&executor);
- if (ret) goto err_executor;
-
- return 0;
-err_executor:
- forb_executor_destroy(&executor);
+ if (ret) goto err_release_frm;
+
err_release_frm:
forb_object_release(frm);
-err_release_fcb:
- forb_object_release(fcb);
+err_destroy_executor:
+ forb_executor_destroy(&executor);
err:
- return -1;
+ return ret;
}