From cc9fad2f3ce9c38aa7031a3364a1d3a4a2a56d9b Mon Sep 17 00:00:00 2001 From: Martin Molnar Date: Thu, 5 Jun 2008 17:45:10 +0200 Subject: [PATCH] contract negotiation rework using rserve and commit contract_ops --- fwp/lib/core/fwp_vres.c | 2 +- fwp/lib/core/fwp_vres.h | 2 +- fwp/lib/mngt/fwp_contract.c | 91 +++++++------------------------ fwp/lib/mngt/fwp_contract.h | 85 ++++++++++++++++++++--------- fwp/lib/mngt/fwp_contract_table.c | 5 -- fwp/lib/mngt/fwp_contract_table.h | 21 ------- fwp/lib/mngt/fwp_mngt.c | 8 +-- fwp/lib/mngt/fwp_msg.h | 2 +- fwp/mngr/fwp_mngr.c | 39 +++++++------ 9 files changed, 107 insertions(+), 148 deletions(-) diff --git a/fwp/lib/core/fwp_vres.c b/fwp/lib/core/fwp_vres.c index 380ca60..035050a 100644 --- a/fwp/lib/core/fwp_vres.c +++ b/fwp/lib/core/fwp_vres.c @@ -155,7 +155,7 @@ static inline void fwp_vres_free(fwp_vres_t *vres) vres->status = FWP_VRES_FREE; } -int fwp_vres_set_params(fwp_vres_params_t *params, fwp_vres_d_t vresd) +int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params) { fwp_vres_t *vres = vresd; int rv; diff --git a/fwp/lib/core/fwp_vres.h b/fwp/lib/core/fwp_vres.h index 073d5a3..81daea0 100644 --- a/fwp/lib/core/fwp_vres.h +++ b/fwp/lib/core/fwp_vres.h @@ -47,7 +47,7 @@ int fwp_vres_table_init(unsigned int nr_vres); fwp_vres_d_t fwp_vres_alloc(); fwp_vres_id_t fwp_vres_get_id(fwp_vres_d_t vresd); -int fwp_vres_set_params(fwp_vres_params_t *params, fwp_vres_d_t vresd); +int fwp_vres_set_params(fwp_vres_d_t vresd, fwp_vres_params_t *params); int fwp_vres_create(fwp_vres_params_t *params, fwp_vres_d_t *vresdp); int fwp_vres_destroy(fwp_vres_d_t vresd); /* */ diff --git a/fwp/lib/mngt/fwp_contract.c b/fwp/lib/mngt/fwp_contract.c index 102fbcd..ee8ad38 100644 --- a/fwp/lib/mngt/fwp_contract.c +++ b/fwp/lib/mngt/fwp_contract.c @@ -2,81 +2,20 @@ #include "fwp_contract.h" #include "fwp_contract_table.h" - /** * Negotiates contract for application. Negotiation request is sent to * fwp agent and then waits for response. * - * \param[in] contract Contract to negotiate - * \param[out] vres_id Id of vres after the contract was accepted + * \param[in] contractd descriptor of the contract to negotiate + * \param[out] vresd Id of vres after the contract was accepted * * \return It returns FWP_CONTNEGT_ACCEPT when the contract was accepted and - * identifier of created vres is return in vres_id parameter. + * the descriptor of created vres is returned in vresd parameter. * It returns FWP_CONTNEGT_REJECT when the contract was rejected. * * If an error occured it returns negative error code. */ -#if 0 -int fwp_contract_negotiate(struct fwp_contract *contract, fwp_vres_d_t *vresdp) -{ - fwp_vres_params_t vparams; - fwp_contract_status_t status; - struct fwp_msgb *msgb; - unsigned int code; - fwp_appcall_id_t appcall_id; - struct sockaddr_un from; - socklen_t fromlen = sizeof(struct sockaddr_un); - int rc; - ssize_t len; - - msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + - sizeof(struct fwp_msg_vres_params)/*FWP_MTU*/); - - if (!msgb) - return -ENOMEM; - - appcall_id.callid = fwp_callid++; - appcall_id.appid = fwp_appid; - fwp_msg_header_deflate(msgb->tail, FWP_CONTNEGT_REQ, &appcall_id); - fwp_msgb_put(msgb, sizeof(struct fwp_msg_header)); - - fwp_msg_contract_deflate(msgb->tail, contract); - fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract)); - - /* sendto FWP agent through unix socket - * and wait for reply - */ - FWP_DEBUG("Negotiation request sent to agent\n"); - _fwp_sendto(fwp_client_sockfd, msgb->data, msgb->len, 0, - &fwp_agent_addr, sizeof(fwp_agent_addr)); - - _fwp_recvfrom(len, fwp_client_sockfd, msgb->data, msgb->buf_size, 0, - &from, &fromlen); - FWP_DEBUG("Negotiation response received from agent\n"); - - fwp_msg_header_inflate(msgb->data, &code, &appcall_id); - fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header)); - - fwp_msg_vres_params_inflate(msgb->data, &status, &vparams); - fwp_msgb_pull(msgb, sizeof(struct fwp_msg_vres_params)); - - fwp_msgb_free(msgb); - FWP_DEBUG("status = %d\n", status); - if (status == FWP_CONT_NEGOTIATED) { - FWP_DEBUG("Contract negotiated\n"); - if ((rc = fwp_vres_create(&vparams, vresdp)) < 0){ - return rc; - } - return FWP_CONTNEGT_ACCEPTED; - - } else { - FWP_DEBUG("Contract rejected\n"); - return FWP_CONTNEGT_REJECTED; - } -} -#endif - -int fwp_contract_negotiate(fwp_contract_d_t contractd, fwp_vres_d_t *vresd) +int fwp_contract_negotiate(fwp_contract_d_t contractd, fwp_vres_d_t *vresdp) { fwp_contract_t contract; fwp_vres_params_t vres_params; @@ -84,8 +23,11 @@ int fwp_contract_negotiate(fwp_contract_d_t contractd, fwp_vres_d_t *vresd) /*contdata = fwp_contract_table_find(&fwp_participant_this->contract_table, contract_id);*/ fwp_contract_reserve(contractd); - - fwp_contract_commit(contractd); + if (!fwp_contract_is_reserved(contdata)) { + return -EAGAIN; + } + fwp_contract_commit(contractd, vresdp); + return 0; } /*fwp_contract_d_t fwp_contract_create(fwp_contract_t *contract, resource_d_t resource)*/ @@ -124,10 +66,10 @@ int fwp_contract_reserve(fwp_contract_d_t contractd) /* reserve place for header */ fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header)); - + /*Add contract header*/ fwp_msgb_contracthdr_in(msgb->tail, contdata->id, contdata->status); fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr)); - + /* Add contract params */ fwp_msg_contract_in(msgb->tail, &contdata->contract); fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract)); @@ -146,20 +88,25 @@ int fwp_contract_reserve(fwp_contract_d_t contractd) return 0; } -int fwp_contract_commit(fwp_contract_d_t contractd) +int fwp_contract_commit(fwp_contract_d_t contractd, fwp_vres_d_t *vresdp) { fwp_contract_data_t *contdata = contractd; fwp_msgb_t *msgb; /* Send COMMIT to manager */ msgb = fwp_msgb_alloc(sizeof(struct fwp_msg_header) + - sizeof(struct fwp_msg_contracthdr) + + sizeof(struct fwp_msg_contracthdr)); fwp_msgb_reserve(msgb, sizeof(struct fwp_msg_header)); fwp_msgb_contracthdr_in(msgb->tail, contdata->id, contdata->status); - fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr); + fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr)); fwp_mngt_send(FWP_MSG_NEGT_COMMIT, msgb, fwp_participant_this, fwp_participant_mngr); + /* Set parameters of vres + * and activate it if needed */ + fwp_vres_set_params(contdata->vresd, contdata->vres_params); + *vresdp = contdata->vresd; + /*TODO: error handling */ return 0; } diff --git a/fwp/lib/mngt/fwp_contract.h b/fwp/lib/mngt/fwp_contract.h index 0d946cd..11af006 100644 --- a/fwp/lib/mngt/fwp_contract.h +++ b/fwp/lib/mngt/fwp_contract.h @@ -2,13 +2,27 @@ #define _FWP_CONTRACT_H #include "fwp_vres.h" +#include "ul_list.h" +#include "ul_gavlcust.h" -/**< Contract negotiation status */ -enum fwp_contnegt_status { - FWP_CONTNEGT_ACCEPTED = 0, - FWP_CONTNEGT_REJECTED = 1, - FWP_CONTNEGT_RUNNING = 3, /**< for asynchronous negotiations*/ -} fwp_contnegt_status_t; +/* + *typedef struct mfrsh_contract_ops{ + * fwp_contract_d_t (*create)(fwp_contract_t *contract); + * int (*reserve)(fwp_contract_d_t contractd); + * int (*commit)(fwp_contract_d_t contractd); + *} mfrsh_contract_ops_t; + * + * typedef struct mfrsh_contract_d { + * mfrsh_resource_id_t resource_id; + * mfrsh_contract_id_t contract_id; + *} + */ + +typedef enum { + FWP_CONT_NOTNEGOTIATED = 0, + FWP_CONT_RESERVED = 1, + FWP_CONT_NEGOTIATED = 2 +} fwp_contract_status_t; /**< Contract Status */ /*typedef enum { @@ -19,24 +33,6 @@ enum fwp_contnegt_status { FWP_CONT_ACCEPTED = 4 } fwp_contract_status_t;*/ -typedef enum { - FWP_CONT_NOTNEGOTIATED = 0, - FWP_CONT_RESERVED = 1, - FWP_CONT_NEGOTIATED = 2 -} fwp_contract_status_t; - -struct fwp_contract_data; - -typedef fwp_vres_id_t fwp_contract_id_t; -typedef struct fwp_contract_data* fwp_contract_d_t; - -/* - * typedef struct mfrsh_contract_d { - * mfrsh_resource_id_t resource_id; - * mfrsh_contract_id_t contract_id; - *} - */ - /** * FWP contract. * It is an external representation of contract intented for application @@ -49,9 +45,48 @@ struct fwp_contract { int period_usec; /**< all time units are in microseconds */ } fwp_contract_t; -int fwp_contract_negotiate(struct fwp_contract *contract, fwp_vres_d_t *vresdp); +typedef fwp_vres_id_t fwp_contract_id_t; + +typedef +struct fwp_contract_data { + fwp_contract_id_t id; + /**< contract specified by user */ + fwp_contract_t contract; + /**< parameters from contract negotiated for vres */ + fwp_vres_params_t vres_params; + /**< the address of agent from that the contract comes */ + /*fwp_transaction_id_t trans_id;*/ + /* pointer to fwp_vres or fwp_participant */ + /*void *priv; */ + fwp_vres_d_t vresd; + fwp_contract_status_t status; + + ul_list_node_t list_node; + gavl_node_t tree_node; +} fwp_contract_data_t; + +typedef fwp_contract_data_t* fwp_contract_d_t; + +static inline fwp_contract_data_t* fwp_contract_data_new() +{ + return memset(malloc(sizeof (fwp_contract_data_t)),'\0', + sizeof(fwp_contract_data_t)); +} + +static inline int fwp_contract_is_reserved(fwp_contract_d_t contract) +{ + return (contract->status == FWP_CONT_RESERVED); +} + +static inline int fwp_contract_is_negotiated(fwp_contract_d_t contract) +{ + return (contract->status == FWP_CONT_NEGOTIATED); +} + fwp_contract_d_t fwp_contract_create(fwp_contract_t *contract); int fwp_contract_reserve(fwp_contract_d_t contractd); int fwp_contract_commit(fwp_contract_d_t contractd); +int fwp_contract_negotiate(fwp_contract_d_t contract, fwp_vres_d_t *vresdp); + #endif /*_FWP_CONTRACT_H */ diff --git a/fwp/lib/mngt/fwp_contract_table.c b/fwp/lib/mngt/fwp_contract_table.c index b41cca0..91ad679 100644 --- a/fwp/lib/mngt/fwp_contract_table.c +++ b/fwp/lib/mngt/fwp_contract_table.c @@ -8,11 +8,6 @@ GAVL_CUST_NODE_INT_IMP(_fwp_contract_table, fwp_contract_table_t, fwp_contract_data_t, fwp_contract_id_t, contract_tree, tree_node, id, gavl_cmp_int); -fwp_contract_data_t* fwp_contract_data_new() -{ - return memset(malloc(sizeof (fwp_contract_data_t)),'\0', - sizeof(fwp_contract_data_t)); -} void fwp_contract_table_init(struct fwp_contract_table *tbl) { diff --git a/fwp/lib/mngt/fwp_contract_table.h b/fwp/lib/mngt/fwp_contract_table.h index bb5de46..097bbf2 100644 --- a/fwp/lib/mngt/fwp_contract_table.h +++ b/fwp/lib/mngt/fwp_contract_table.h @@ -4,10 +4,6 @@ #include "fwp_contract.h" #include "fwp_vres.h" -#include "ul_list.h" -#include "ul_gavl.h" -#include "ul_gavlcust.h" - #include /** @@ -16,23 +12,6 @@ * protocol. * */ -typedef -struct fwp_contract_data { - fwp_contract_id_t id; - /**< contract specified by user */ - fwp_contract_t contract; - /**< parameters from contract negotiated for vres */ - fwp_vres_params_t vres_params; - /**< the address of agent from that the contract comes */ - /*fwp_transaction_id_t trans_id;*/ - /* pointer to fwp_vres or fwp_participant */ - /*void *priv; */ - fwp_vres_d_t vresd; - fwp_contract_status_t status; - - ul_list_node_t list_node; - gavl_node_t tree_node; -} fwp_contract_data_t; /** * List of contract_data structures diff --git a/fwp/lib/mngt/fwp_mngt.c b/fwp/lib/mngt/fwp_mngt.c index db7b1a9..2bc194c 100644 --- a/fwp/lib/mngt/fwp_mngt.c +++ b/fwp/lib/mngt/fwp_mngt.c @@ -24,7 +24,7 @@ int fwp_mngt_send(fwp_msg_type_t type,fwp_msgb_t *msgb, fwp_participant_t *source, fwp_participant_t *dest) { fwp_msgb_push(msgb, sizeof(struct fwp_msg_header)); - fwp_msg_header_deflate(msgb->data, type, source->id); + fwp_msg_header_in(msgb->data, type, source->id); fwp_send(dest->epointd, msgb->data, msgb->len); @@ -41,7 +41,7 @@ int fwp_mngt_recv(fwp_msg_type_t *type, fwp_participant_id_t *participant_id, msgb->buffer_size); fwp_msgb_put(msgb, size); - fwp_msg_header_inflate(msgb->data, type, participant_id); + fwp_msg_header_out(msgb->data, type, participant_id); fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header)); FWP_DEBUG("Received msg: type=%d from nodeid=%d appid=%d\n", *type, @@ -88,7 +88,7 @@ void fwp_mngt_connect() my_info.id = fwp_participant_this->id; my_info.stream_id = fwp_participant_this->stream_id; - fwp_msg_hello_deflate(msgb->tail, &my_info); + fwp_msg_hello_in(msgb->tail, &my_info); fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello)); /* Send hello to manager */ @@ -103,7 +103,7 @@ void fwp_mngt_connect() /* Process hello msg from manager */ - fwp_msg_hello_inflate(msgb->data, &mngr_info); + fwp_msg_hello_out(msgb->data, &mngr_info); fwp_participant_mngr->id = mngr_info.id; fwp_participant_mngr->stream_id = mngr_info.stream_id; FWP_DEBUG("Received HELLO msg contains nodeid= %d appid= %d\n", diff --git a/fwp/lib/mngt/fwp_msg.h b/fwp/lib/mngt/fwp_msg.h index 5800c85..ede4948 100644 --- a/fwp/lib/mngt/fwp_msg.h +++ b/fwp/lib/mngt/fwp_msg.h @@ -59,7 +59,7 @@ void fwp_msg_vres_params_out(unsigned char *data, fwp_vres_params_t *vparams); void fwp_msg_hello_in(unsigned char *data, fwp_participant_info_t *participant_info); -void fwp_msg_hello_inflate(unsigned char *data, +void fwp_msg_hello_out(unsigned char *data, fwp_participant_info_t *participant_info); #endif /* _FWP_MSG_H */ diff --git a/fwp/mngr/fwp_mngr.c b/fwp/mngr/fwp_mngr.c index d10e3bc..5c16721 100644 --- a/fwp/mngr/fwp_mngr.c +++ b/fwp/mngr/fwp_mngr.c @@ -55,7 +55,7 @@ void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id) participant_id.node_id, participant_id.app_id); /* Create a new participant */ - fwp_msg_hello_inflate(msgb->data, &participant_info); + fwp_msg_hello_out(msgb->data, &participant_info); participant = fwp_participant_create(&participant_info); fwp_mngt_service_vres_create(&participant->vresd); fwp_send_endpoint_create(participant->id.node_id, participant->stream_id, @@ -74,7 +74,7 @@ void fwp_mngr_hello(fwp_msgb_t *msgb, fwp_participant_id_t participant_id) my_info.id = fwp_participant_this->id; my_info.stream_id = fwp_participant_this->stream_id; - fwp_msg_hello_deflate(msgb->tail, &my_info); + fwp_msg_hello_in(msgb->tail, &my_info); fwp_msgb_put(msgb, sizeof(struct fwp_msg_hello)); /* Send hello to manager */ @@ -97,13 +97,14 @@ fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id) } contdata = fwp_contract_data_new(); + + /* Extract contract header */ + fwp_msg_contracthdr_out(msgb->data, &contdata->id, &contdata->status); + fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr)); + /* Extract contract params */ + fwp_msg_contract_out(msgb->data, &contdata->contract); + fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contract)); - contdata->id = ntohl((uint32_t)*msgb->data); - fwp_msgb_pull(msgb, 4); - contdata->status = (int8_t)*msgb->data; - fwp_msgb_pull(msgb, 1); - - fwp_msg_contract_inflate(msgb->data, &contdata->contract); /*launch admission test */ fwp_admctrl_test(contdata); @@ -112,16 +113,17 @@ fwp_mngr_contract_reserve(fwp_msgb_t *msgb, fwp_participant_id_t participant_id) sizeof(struct fwp_msg_contract) + sizeof(struct fwp_msg_vres_params)); fwp_msgb_reserve(msgb,sizeof(struct fwp_msg_header)); - /* Add contract_id and status */ - msgb->tail = htonl(contdata->id); - fwp_msgb_put(msgb, 4); - msgb->tail = contdata->status; - fwp_msgb_put(msgb, 1); - + + /*Add contract header*/ + fwp_msg_contracthdr_in(msgb->tail, contdata->id, contdata->status); + fwp_msgb_put(msgb, sizeof(struct fwp_msg_contracthdr)); + /* Add contract params */ + fwp_msg_contract_in(msgb->tail, &contdata->contract); + fwp_msgb_put(msgb, sizeof(struct fwp_msg_contract)); /*Send back contract reservation */ if (contdata->status == FWP_CONT_RESERVED) { - fwp_msg_vres_params_deflate(msgb->tail,&contdata->vres_params); + fwp_msg_vres_params_in(msgb->tail, &contdata->vres_params); fwp_msgb_put(msgb, sizeof(struct fwp_msg_vres_params)); /* Add contract to contract table */ fwp_contract_table_insert(&participant->contract_table,contdata); @@ -141,14 +143,15 @@ int fwp_mngr_contract_commit(fwp_msgb_t *msgb, fwp_participant_id_t participant_ fwp_participant_t *participant; fwp_contract_data_t *contdata; fwp_contract_id_t id; + fwp_contract_status_t status; /* Find participant */ if (!(participant = fwp_participant_table_find(&participant_id))){ return -EPERM; } - id = msgb->data; - fwp_msgb_pull(msgb, 4); + fwp_msg_contracthdr_out(msgb->tail, &id, &status); + fwp_msgb_pull(msgb, sizeof(struct fwp_msg_contracthdr)); contdata = fwp_contract_table_find(&participant->contract_table, id); contdata->status = FWP_CONT_NEGOTIATED; @@ -161,7 +164,7 @@ void fwp_mngr_msg_handler(fwp_msgb_t *msgb) fwp_msg_type_t msg_type; fwp_participant_id_t participant_id; - fwp_msg_header_inflate(msgb->data, &msg_type, &participant_id); + fwp_msg_header_out(msgb->data, &msg_type, &participant_id); fwp_msgb_pull(msgb, sizeof(struct fwp_msg_header)); switch (msg_type) { -- 2.39.2