]> rtime.felk.cvut.cz Git - frescor/frsh.git/blobdiff - fres/resmng/frm_generic.c
Implemented support for contract renegotiation
[frescor/frsh.git] / fres / resmng / frm_generic.c
index 9015da023cdac05ac69f6c05dc0dc38c2b2ff590..f1a80dd3d8f3b29d837d512e8ecca18a5673234c 100644 (file)
+/**************************************************************************/
+/* ---------------------------------------------------------------------- */
+/* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                */
+/*                                                                       */
+/*   Universidad de Cantabria,              SPAIN                        */
+/*   University of York,                    UK                           */
+/*   Scuola Superiore Sant'Anna,            ITALY                        */
+/*   Kaiserslautern University,             GERMANY                      */
+/*   Univ. Politécnica  Valencia,           SPAIN                       */
+/*   Czech Technical University in Prague,  CZECH REPUBLIC               */
+/*   ENEA                                   SWEDEN                       */
+/*   Thales Communication S.A.              FRANCE                       */
+/*   Visual Tools S.A.                      SPAIN                        */
+/*   Rapita Systems Ltd                     UK                           */
+/*   Evidence                               ITALY                        */
+/*                                                                       */
+/*   See http://www.frescor.org for a link to partners' websites         */
+/*                                                                       */
+/*          FRESCOR project (FP6/2005/IST/5-034026) is funded            */
+/*       in part by the European Union Sixth Framework Programme         */
+/*       The European Union is not liable of any use that may be         */
+/*       made of this code.                                              */
+/*                                                                       */
+/*                                                                       */
+/*  This file is part of FRSH (FRescor ScHeduler)                        */
+/*                                                                       */
+/* FRSH is free software; you can redistribute it and/or modify it       */
+/* under terms of the GNU General Public License as published by the     */
+/* Free Software Foundation; either version 2, or (at your option) any   */
+/* later version.  FRSH is distributed in the hope that it will be       */
+/* useful, but WITHOUT ANY WARRANTY; without even the implied warranty   */
+/* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU   */
+/* General Public License for more details. You should have received a   */
+/* copy of the GNU General Public License along with FRSH; see file      */
+/* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
+/* Cambridge, MA 02139, USA.                                             */
+/*                                                                       */
+/* As a special exception, including FRSH header files in a file,        */
+/* instantiating FRSH generics or templates, or linking other files      */
+/* with FRSH objects to produce an executable application, does not      */
+/* by itself cause the resulting executable application to be covered    */
+/* by the GNU General Public License. This exception does not            */
+/* however invalidate any other reasons why the executable file might be  */
+/* covered by the GNU Public License.                                    */
+/**************************************************************************/
+
+/**
+ * @file   frm_generic.c
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @date   Tue Nov 11 08:34:34 2008
+ * 
+ * @brief  Generic resource manager implementation.
+ * 
+ * 
+ */
 #include <frm_generic.h>
 #include <forb.h>
 #include <ul_log.h>
 #include <fres_sa_scenario.h>
 #include <fcb.h>
 
-UL_LOG_CUST(ulogd_frm);
-ul_log_domain_t ulogd_frm = {UL_LOGL_MSG, "frm"};
-
-struct frm_data {
-       struct fres_sa_scenario *reserved;
-       frm_adm_test_fnc_t admission_test;
-       void *priv;
-};
+UL_LOG_CUST(ulogd_frm_generic);
+ul_log_domain_t ulogd_frm_generic = {UL_LOGL_DEB, "frm"};
 
 #define object_to_frm(o) (struct frm_data*)forb_instance_data(o)
 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
 
+void fres_sa_scenario_reserve_new(struct fres_sa_scenario *scenario)
+{
+       struct fres_sa_contract *c;
+       fres_sa_scenario_for_each_contract(scenario, c) {
+               if (c->new) {
+                       if (c->reserved) {
+                               fres_contract_destroy(c->reserved);
+                       }
+                       c->reserved = c->new;
+                       c->new = NULL;
+               }
+       }
+}
+
+void fres_sa_scenario_rollback(struct fres_sa_scenario *scenario)
+{
+       struct fres_sa_contract *c, *c_next;
+
+       /* Deleteion safe scenario traverse */
+       for(c=fres_sa_scenario_contract_first(scenario),
+                   c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL;
+           c;
+           c=c_next,c_next=c?fres_sa_scenario_contract_next(scenario,c):NULL) {
+               if (c->new) {
+                       if (c->committed || c->reserved) {
+                                       fres_contract_destroy(c->new);
+                                       c->new = NULL;
+                                       c->contract = c->reserved ? c->reserved : c->committed;
+                       } else {
+                               fres_sa_scenario_del_contract(scenario, c);
+                               fres_sa_contract_destroy(c);
+                       }
+               }
+       }
+}
+
+
 static CORBA_long reserve_contracts(fres_resource_manager obj,
                                    const fres_contract_ptr_seq* contracts,
                                    CORBA_Environment *ev)
 {
        struct frm_data *frm = object_to_frm(obj);
-       struct fres_sa_scenario *prospective;
-       bool schedulable;
-       int i;
+       struct fres_sa_scenario *scenario = frm->scenario;
+       bool schedulable =false;
+       int i, ret;
+       struct fres_sa_contract *c;
 
        ul_logmsg("reserve_contracts\n");
        
-       prospective = fres_sa_scenario_duplicate(frm->reserved);
-       if (!prospective) goto err;
        for (i=0; i<contracts->_length; i++) {
-               struct fres_sa_contract *c;
-               c = fres_sa_contract_new();
+               struct fres_contract *cin = contracts->_buffer[i];
+
+               c = fres_sa_scenario_find_contract(scenario, &cin->id);
+               if (!c) {
+                       c = fres_sa_contract_new();
+                       if (c) {
+                               c->id = cin->id;
+                               fres_sa_scenario_add_contract(scenario, c);
+                       }
+               }
                if (!c) goto err;
-               c->status = FRES_SA_CONTRACT_NEW;
-               c->contract = fres_contract_duplicate(contracts->_buffer[i]);
-               if (!c->contract) goto err;
-               fres_sa_scenario_add_contract(prospective, c);
+
+               {
+                       char id[40];
+                       char *operation;
+                       fres_contract_id_to_string(id, &c->id, sizeof(id));
+                       if (fres_contract_get_num_blocks(cin) == 0) operation = "cancelation";
+                       else if (c->committed) operation = "renegotiation";
+                       else if (c->reserved) operation = "negotiation (already reserved)";
+                       else operation = "negotiation";
+                       ul_logdeb("  reserve contract %s %s\n", id, operation);
+               }
+               assert(c->new == NULL);
+               c->new = fres_contract_duplicate(cin);
+               c->contract = c->new;
+               if (!c->new) goto err;
        }
 
-       schedulable = frm->admission_test(prospective, frm->priv);
+       ret = frm->desc->admission_test(scenario, frm->desc->priv, &schedulable);
+       if (ret) {
+               ul_logmsg("admission_test failed: %d\n", ret);
+               goto err;
+       }
 
        if (schedulable) {
-               struct fres_sa_contract *c;
-               fres_sa_scenario_for_each_contract(prospective, c) {
-                       if (c->status == FRES_SA_CONTRACT_NEW) {
-                               c->status = FRES_SA_CONTRACT_RESERVED;
-                       }
-               }
-               fres_sa_scenario_destroy(frm->reserved);
-               frm->reserved = prospective;
+               fres_sa_scenario_reserve_new(scenario);
        } else {
-               fres_sa_scenario_destroy(prospective);
+               fres_sa_scenario_rollback(scenario);
        }
        return schedulable ? 0 : 1;
 err:
-       fres_sa_scenario_destroy(prospective);
+       fres_sa_scenario_rollback(scenario);
        return -1;
 }
 
 static void commit_contracts(fres_resource_manager obj,
-                     const fres_contract_id_seq* ids,
-                     fres_contract_ptr_seq** contracts_with_scheduling_data,
-                     CORBA_Environment *ev)
+                            const fres_contract_id_seq* ids,
+                            fres_contract_ptr_seq** contracts_with_scheduling_data,
+                            CORBA_Environment *ev)
 {
        struct frm_data *frm = object_to_frm(obj);
        int i, num;
@@ -80,113 +181,230 @@ static void commit_contracts(fres_resource_manager obj,
        contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(num);
        CORBA_sequence_set_release(contracts, CORBA_TRUE);
        contracts->_maximum = contracts->_length = num;
-       
+
+       /* TODO: Add also the changed contracts (e.g. because of
+        * priorities). Question: How to recognize which contracts are
+        * changed because of this commit? */
        for (i=0; i < num; i++) {
-               c = fres_sa_scenario_find_contract(frm->reserved, &ids->_buffer[i]);
-               c->status = FRES_SA_CONTRACT_COMMITED;
-               contracts->_buffer[i] = c->contract;
+               char id[40];
+               const char *operation;
+               fres_contract_id_to_string(id, &ids->_buffer[i], sizeof(id));
+               
+               c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
+               if (c && c->reserved) {
+                       if (fres_contract_get_num_blocks(c->reserved) == 0) {
+                               /* Cancelation request */
+                               operation = "cancelation";
+                               contracts->_buffer[i] = c->reserved;
+                               c->reserved = NULL;
+                               fres_sa_scenario_del_contract(frm->scenario, c);
+                               fres_sa_contract_destroy(c);
+                       } else {
+                               /* Normal reservation */
+                               if (c->committed) {
+                                       operation = "renegotiation";
+                                       fres_contract_destroy(c->committed);
+                               } else {
+                                       operation = "negotiation";
+                               }
+                               c->committed = c->reserved;
+                               c->reserved = NULL;
+                               contracts->_buffer[i] = fres_contract_duplicate(c->committed);
+                       }
+               } else {
+                       operation = "error";
+                       contracts->_buffer[i] = NULL;
+                       if (!c) ul_logerr("Commit to unknown contract ID\n");
+                       else if (!c->reserved) ul_logerr("Commit to not reserved contract\n");
+               }
+
+               ul_logdeb("  commit contract %s %s\n", id, operation);
        }
-       
+
        *contracts_with_scheduling_data = contracts;
 err:;
 }
 
-static void cancel_contracts(fres_resource_manager obj,
-                     const fres_contract_id_seq* ids,
-                     CORBA_Environment *ev)
+static void cancel_reservations(fres_resource_manager obj,
+                               const fres_contract_id_seq* ids,
+                               CORBA_Environment *ev)
 {
        int i;
        struct frm_data *frm = object_to_frm(obj);
 
-       ul_logmsg("cancel_contracts\n");
+       ul_logmsg("cancel_reservations\n");
 
        for (i=0; i<ids->_length; i++) {
                struct fres_sa_contract *c;
-               c = fres_sa_scenario_find_contract(frm->reserved, &ids->_buffer[i]);
+               c = fres_sa_scenario_find_contract(frm->scenario, &ids->_buffer[i]);
+
+               if (c->reserved) {
+                       if (c->committed) {
+                               /* Roll-back renegotiation */
+                               fres_contract_destroy(c->reserved);
+                               c->reserved = NULL;
+                               c->contract = c->committed;
+                       } else {
+                               /* Roll-back negotiation */
+                               fres_sa_scenario_del_contract(frm->scenario, c);
+                               fres_sa_contract_destroy(c);
+                       }
+               }
+       }
+}
+
+
+static void get_contracts(fres_resource_manager obj,
+                         fres_contract_ptr_seq** contracts_out,
+                         CORBA_long* utilization, CORBA_Environment *ev)
+{
+       struct frm_data *frm = object_to_frm(obj);
+       struct fres_sa_contract *c;
+       fres_contract_ptr_seq *contracts;
+
+       contracts = forb_malloc(sizeof(*contracts));
+       if (!contracts) {
+               ev->major = FORB_EX_NO_MEMORY;
+               goto err;
+       }
+       contracts->_buffer = CORBA_sequence_fres_contract_ptr_allocbuf(frm->scenario->num_contracts);
+       CORBA_sequence_set_release(contracts, CORBA_TRUE);
+       contracts->_maximum = frm->scenario->num_contracts;
+       contracts->_length = 0;
 
-               if (c) {
-                       fres_sa_scenario_del_contract(frm->reserved, c);
+       fres_sa_scenario_for_each_contract(frm->scenario, c) {
+               if (c->committed) {
+                       int i = contracts->_length;
+                       contracts->_buffer[i] = fres_contract_duplicate(c->committed);
+                       contracts->_length++;
                }
+                                       
        }
+       *contracts_out = contracts;
+       *utilization = frm->scenario->utilization;
+err:;
 }
 
 
 static const struct forb_fres_resource_manager_impl frm_impl = {
        .reserve_contracts = reserve_contracts,
        .commit_contracts  = commit_contracts,
-       .cancel_contracts  = cancel_contracts,
+       .cancel_reservations = cancel_reservations,
+       .get_contracts  = get_contracts,
 };
 
 /** 
- * Initializes and runs a generic resource manager. The only thing a
+ * Initializes generic resource manager. The only thing a
  * caller has to supply is admission test function, which is passed in
  * @a frm_data->admission_test.
  * 
  * @param orb FORB object used to communicate with other components.
- * @param admission_test Admission test function.
- * @param priv Pointer to passed as priv parameter to frm_adm_test_fnc_t.
+ * @param[out] frm_data Pointer to frm_data structure
+ * @param executor Executor used for this resource manager.
+ * @param desc Description on the resource manager
  * 
- * @return 
+ * @return Resource manager object reference of NULL in case of error.
  */
-int frm_register_and_run(forb_orb orb, const struct fres_res_manager *res_manager)
+fres_resource_manager frm_register(forb_orb orb, struct frm_data *frm_data,
+                                  forb_executor_t *executor,
+                                  const struct fres_res_manager *desc)
 {
        fres_contract_broker fcb;
+       fres_resource_desc res_desc;
        fres_resource_manager frm;
        struct forb_env env;
-       struct frm_data frm_data;
-       forb_executor_t executor;
        int ret;
 
-       memset(&frm_data, 0, sizeof(frm_data));
-       frm_data.admission_test = res_manager->admission_test;
-       frm_data.priv = res_manager->priv;
-       frm_data.reserved = fres_sa_scenario_new();
-       if (!frm_data.reserved) {
+       memset(frm_data, 0, sizeof(*frm_data));
+       frm_data->desc = desc;
+       frm_data->scenario = fres_sa_scenario_new();
+       if (!frm_data->scenario) {
                save_errno(ul_logerr("fres_sa_scenario_new failed\n"));
                goto err;
        }
 
        fcb = forb_resolve_reference(orb, fres_contract_broker_reg_name);
        if (!fcb) {
-               save_errno(ul_logerr("Could not find contract broker"));
+               save_errno(ul_logerr("Could not find contract broker\n"));
                goto err;
        }
 
-       frm = forb_fres_resource_manager_new(orb, &frm_impl, &frm_data);
+       frm = forb_fres_resource_manager_new(orb, &frm_impl, frm_data);
        if (!frm) {
-               save_errno(ul_logerr("forb_fres_resource_manager_new error"));
+               save_errno(ul_logerr("forb_fres_resource_manager_new error\n"));
                goto err_release_fcb;
        }
 
+       ret = forb_executor_register_object(executor, frm);
+       if (ret) {
+               save_errno(ul_logerr("forb_executor_register_object failed\n"));
+               goto err_executor;
+       }
+
+       /* Register resource manager */
+       res_desc.manager = frm;
+       ret = fres_contract_broker_register_resource(fcb,
+                                                    desc->res_type,
+                                                    desc->res_id,
+                                                    &res_desc, &env);
+       if (forb_exception_occurred(&env) || ret != 0) {
+               save_errno(ul_logerr("fres_contract_broker_register_resource exception\n"));
+               goto err_register;
+       }
+
+       forb_object_release(fcb);
+       return frm;
+err_register:  
+       forb_executor_unregister_object(executor, frm);
+err_executor:
+       forb_object_release(frm);
+err_release_fcb:       
+       forb_object_release(fcb);
+err:
+       return NULL;
+}
+
+/** 
+ * Initializes and runs a generic resource manager. The only thing a
+ * caller has to supply is admission test function, which is passed in
+ * @a frm_data->admission_test.
+ * 
+ * @param orb FORB object used to communicate with other components.
+ * @param desc Description on the resource manager
+ * 
+ * @return 
+ */
+int frm_register_and_run(forb_orb orb, const struct fres_res_manager *desc)
+{
+       fres_resource_manager frm;
+       struct frm_data frm_data;
+       forb_executor_t executor;
+       int ret = -1;
+
        /* Prepare executor before we register the resource manager
         * with contract broker */
        ret = forb_executor_init(&executor);
-       if (ret) goto err_release_frm;
-               
-       ret = forb_executor_register_object(&executor, frm);
-       if (ret) goto err_executor;
-
-       /* Register resource manager */
-       ret = fres_contract_broker_register_manager(fcb,
-                                                   res_manager->res_type,
-                                                   res_manager->res_id,
-                                                   frm, &env);
-       if (forb_exception_occured(&env) || ret != 0) {
-               goto err_executor;
+       if (ret) {
+               save_errno(ul_logerr("forb_executor_init failed\n"));
+               goto err;
        }
 
+       frm = frm_register(orb, &frm_data, &executor, desc);
+       if (!frm) {
+               ret = -1;
+               save_errno(ul_logerr("frm_register failed\n"));
+               goto err_destroy_executor;
+       }
+               
        /* Start request processing */
        ul_logmsg("Waiting for requests\n");
        ret = forb_executor_run(&executor);
-       if (ret) goto err_executor;
-       
-       return 0;
-err_executor:
-       forb_executor_destroy(&executor);
+       if (ret) goto err_release_frm;
+
 err_release_frm:       
        forb_object_release(frm);
-err_release_fcb:       
-       forb_object_release(fcb);
+err_destroy_executor:
+       forb_executor_destroy(&executor);
 err:
-       return -1;
+       return ret;
 }