From: Michal Sojka Date: Thu, 24 Jun 2010 13:38:20 +0000 (+0200) Subject: fcb: Refactor resource reserve/commit X-Git-Url: http://rtime.felk.cvut.cz/gitweb/frescor/frsh.git/commitdiff_plain/a74b3112959b49f713fbbbacfc9a21d587a49558 fcb: Refactor resource reserve/commit This allows sharing some code with transaction negotiation. --- diff --git a/fres/cbroker/Makefile.omk b/fres/cbroker/Makefile.omk index 0913e2a..dc2166f 100644 --- a/fres/cbroker/Makefile.omk +++ b/fres/cbroker/Makefile.omk @@ -1,5 +1,7 @@ bin_PROGRAMS += fcb +CFLAGS += --std=gnu99 #--save-temps + fcb_SOURCES = fcb.c contract_log.c fcb_LIBS = contract fosa forb pthread rt ulut frm_client fra_client fcb_LIBS += frsh # For frsh_strerror diff --git a/fres/cbroker/fcb.c b/fres/cbroker/fcb.c index b039ac2..2e825bc 100644 --- a/fres/cbroker/fcb.c +++ b/fres/cbroker/fcb.c @@ -83,6 +83,11 @@ bool opt_daemon = false; char *opt_pidfile = NULL; fosa_abs_time_t start_time; +/** List of contracts to be newly reserved or changed by resource manager */ +struct reservation_list { + ul_list_head_t fcb_contracts; + unsigned length; +}; /** * Resource identification @@ -102,6 +107,7 @@ struct resource { fres_resource_manager mng; /**< Object reference of the resource manager */ gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */ ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */ + struct reservation_list rl; /**< Temporary list of contracts to be reserved on this resource */ }; /** @@ -138,13 +144,6 @@ struct fcb { gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */ }; -/** List of contracts to be newly reserved or changed during spare - * capacity rebalancing */ -struct reservation_list { - ul_list_head_t fcb_contracts; - unsigned length; -}; - struct fcb_contract *fcb_contract_new(fres_contract_id_t *id) { struct fcb_contract *fcb_contract; @@ -416,27 +415,26 @@ check_and_setup_resource(struct fcb *fcb, struct fcb_contract *fcb_contracts[], */ static void prepare_reservation_list(struct resource *resource, - struct fcb_contract *fcb_contract[], int num, - struct reservation_list *rl) + struct fcb_contract *fcb_contract[], int num) { int i; fosa_abs_time_t now; struct fcb_contract *fc; - reservation_list_init_head(rl); - rl->length = 0; + reservation_list_init_head(&resource->rl); + resource->rl.length = 0; for (i=0; irequested_contract != NULL); - reservation_list_insert(rl, fcb_contract[i]); - rl->length++; + reservation_list_insert(&resource->rl, fcb_contract[i]); + resource->rl.length++; } fosa_clock_get_time(CLOCK_REALTIME, &now); ul_list_for_each(sc_contracts, resource, fc) { if (fosa_abs_time_smaller(fc->end_of_stability_period, now) && fc->requested_contract == NULL) /* Do not insert contract inserted above */ { - reservation_list_insert(rl, fc); - rl->length++; + reservation_list_insert(&resource->rl, fc); + resource->rl.length++; } } } @@ -450,11 +448,11 @@ prepare_reservation_list(struct resource *resource, * @return Zero on success, non-zero error code on error */ static int -rebalance_spare_capacity_and_reserve(struct resource *resource, - struct reservation_list *rl) +rebalance_spare_capacity_and_reserve(struct resource *resource) { int ret; unsigned i; + struct reservation_list *rl = &resource->rl; struct fcb_contract *fc; fres_block_spare_capacity *s; @@ -637,6 +635,74 @@ free_fcb_contracts(struct fcb_contract *fcb_contracts[], int num) free(fcb_contracts); } +int +commit_resource(struct resource *resource, + fres_contract_ptr_seq **schedulable_contracts) +{ + int ret; + fres_contract_id_seq commit_ids; + int i; + struct fcb_contract *fc; + CORBA_Environment ev; + + if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) { + ret = errno; + goto err; + } + + commit_ids._length = resource->rl.length; + i=0; + ul_list_for_each(reservation_list, &resource->rl, fc) { + forb_sequence_elem(&commit_ids, i) = fc->id; + i++; + } + + fres_resource_manager_commit_contracts(resource->mng, &commit_ids, + schedulable_contracts, &ev); + if (forb_exception_occurred(&ev)) { + ret = fres_forbex2err(&ev); + goto err_free; + } + return 0; +err_free: + forb_sequence_free_buf(&commit_ids, forb_no_destructor); +err: + return ret; +} + +int cancel_reservations(struct resource *resource) +{ + int ret; + fres_contract_id_seq commit_ids; + int i; + struct fcb_contract *fc; + CORBA_Environment ev; + + if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) { + ret = errno; + goto err; + } + + commit_ids._length = resource->rl.length; + i=0; + ul_list_for_each(reservation_list, &resource->rl, fc) { + forb_sequence_elem(&commit_ids, i) = fc->id; + i++; + } + + fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, &ev); + if (forb_exception_occurred(&ev)) { + ret = fres_forbex2err(&ev); + goto err_free; + } + return 0; +err_free: + forb_sequence_free_buf(&commit_ids, forb_no_destructor); +err: + return ret; +} + + CORBA_long negotiate_contracts(fres_contract_broker obj, const fres_contract_ptr_seq* contracts, @@ -650,7 +716,6 @@ negotiate_contracts(fres_contract_broker obj, fres_contract_ptr_seq *schedulable_contracts; struct fcb_contract **fcb_contracts, *fc; unsigned i; - fres_contract_id_seq commit_ids; int num = contracts->_length; /* Prepare output sequence for the case we return eariler with @@ -680,21 +745,10 @@ negotiate_contracts(fres_contract_broker obj, if (ret) goto err_free_fcb_contracts; - struct reservation_list rl; - prepare_reservation_list(resource, - fcb_contracts, num, - &rl); - - /* Allocate all the needed memory before doing reservation. If - * there is not enough memory, it has no sense to call resource - * manager. */ - if (!forb_sequence_alloc_buf(&commit_ids, rl.length)) { - ret = errno; - goto err_free_fcb_contracts; - } + prepare_reservation_list(resource, fcb_contracts, num); /* Reserve contracts */ - ret = rebalance_spare_capacity_and_reserve(resource, &rl); + ret = rebalance_spare_capacity_and_reserve(resource); if (ret) { if (ret == FRSH_ERR_CONTRACT_REJECTED) { ul_logmsg("Contract(s) was/were rejected\n"); @@ -707,19 +761,9 @@ negotiate_contracts(fres_contract_broker obj, } /* Commit contracts */ - commit_ids._length = rl.length; - i=0; - ul_list_for_each(reservation_list, &rl, fc) { - forb_sequence_elem(&commit_ids, i) = fc->id; - i++; - } - - fres_resource_manager_commit_contracts(resource->mng, &commit_ids, - &schedulable_contracts, ev); - if (forb_exception_occurred(ev)) { - ret = fres_forbex2err(ev); + ret = commit_resource(resource, &schedulable_contracts); + if (ret) goto err_cancel_reservation; - } /* Add new contracts to our fcb database for later * reference. Canceled contracts are removed below. */ @@ -781,7 +825,7 @@ err_cancel_contracts: /* TODO */ goto err_free_fcb_contracts; err_cancel_reservation: - fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, ev); + cancel_reservations(resource); err_free_fcb_contracts: free_fcb_contracts(fcb_contracts, num); err: @@ -796,11 +840,10 @@ void redistribute_spare_capacity(fres_contract_broker obj, struct fcb *fcb = o2fcb(obj); struct res_key key = {restype, resid }; struct resource *resource; - struct reservation_list rl; resource = fcb_resource_find(fcb, &key); - prepare_reservation_list(resource, NULL, 0, &rl); + prepare_reservation_list(resource, NULL, 0); /* forb_sequence_alloc(ids, rl.length); */ /* if (!ids || !ids->_buffer) { */ @@ -811,7 +854,7 @@ void redistribute_spare_capacity(fres_contract_broker obj, /* *ids_out = ids; /\* ids is freed by FORB *\/ */ - rebalance_spare_capacity_and_reserve(resource, &rl); + rebalance_spare_capacity_and_reserve(resource); /* Commit */ }