From: Michal Sojka Date: Wed, 27 May 2009 18:32:15 +0000 (+0200) Subject: Discovery handling moved to discovery.c X-Git-Url: https://rtime.felk.cvut.cz/gitweb/frescor/forb.git/commitdiff_plain/1a096e992dc737d8886d206faf89cd7099536910?ds=sidebyside Discovery handling moved to discovery.c This was done to defined better (more higher level) interface to peer manipulation. The interface is represented by forb_new_peer_discovered(), forb_peer_disconnected() and forb_peer_find[_timer](). --- diff --git a/src/discovery.c b/src/discovery.c index b56dd8b..cc85990 100644 --- a/src/discovery.c +++ b/src/discovery.c @@ -44,12 +44,160 @@ /* covered by the GNU Public License. */ /**************************************************************************/ +/** + * @file discovery.c + * @author Michal Sojka + * @date Wed May 27 19:28:30 2009 + * + * @brief Discovery functions + * + */ + #include "discovery.h" #include #include "object.h" extern UL_LOG_CUST(ulogd_forb_discovery); +GAVL_CUST_NODE_INT_DEC(forb_peer_nolock,/* cust_prefix */ + forb_t, /* cust_root_t */ + forb_peer_t, /* cust_item_t */ + forb_server_id, /* cust_key_t */ + peers, /* cust_root_node */ + gnode, /* cust_item_node */ + server_id, /* cust_item_key */ + forb_server_id_cmp)/* cust_cmp_fnc */ + +GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */ + forb_t, /* cust_root_t */ + forb_peer_t, /* cust_item_t */ + forb_server_id, /* cust_key_t */ + peers, /* cust_root_node */ + gnode, /* cust_item_node */ + server_id, /* cust_item_key */ + forb_server_id_cmp);/* cust_cmp_fnc */ + +int forb_discovery_init(forb_t *forb) +{ + if (fosa_mutex_init(&forb->peer_mutex, 0) != 0) return -1; + forb_peer_nolock_init_root_field(forb); + + if (fosa_mutex_init(&forb->objkey_mutex, 0) != 0) return -1; + forb_objects_nolock_init_root_field(forb); + return 0; +} + +static inline void +forb_peer_insert(forb_t *forb, forb_peer_t *peer) +{ + fosa_mutex_lock(&forb->peer_mutex); + forb_peer_get(peer); + forb_peer_nolock_insert(forb, peer); + fosa_mutex_unlock(&forb->peer_mutex); +} + +static inline void +forb_peer_delete(forb_t *forb, forb_peer_t *peer) +{ + fosa_mutex_lock(&forb->peer_mutex); + forb_peer_nolock_delete(forb, peer); + forb_peer_put(peer); + fosa_mutex_unlock(&forb->peer_mutex); +} + +/** + * Finds peer with given @a server_id. + * + * @param forb + * @param server_id + * + * @return The found peer or NULL if no peer is found. You have to + * call forb_peer_put() after you finish working with the non NULL + * retuned value. + */ +forb_peer_t * +forb_peer_find(forb_t *forb, forb_server_id *server_id) +{ + forb_peer_t *ret; + fosa_mutex_lock(&forb->peer_mutex); + ret = forb_peer_nolock_find(forb, server_id); + if (ret) { + forb_peer_get(ret); + } + fosa_mutex_unlock(&forb->peer_mutex); + return ret; +} + +/** + * Finds the peer with given @a server_id. If the peer is not + * currently known (not yet discovered or not available), this + * function waits at maximum @a timeout for the peer to be discovered. + * + * @param forb + * @param server_id + * @param timeout + * + * @return The peer structure or NULL if the peer is not known even + * after the timeout elapses. + * + * @note After the returned peer is not needed, forb_peer_put() must + * called on it. + */ +forb_peer_t * +forb_peer_find_timed(forb_t *forb, forb_server_id *server_id, + fosa_abs_time_t *timeout) +{ + forb_peer_t *peer; + bool peer_allocated = false; + + fosa_mutex_lock(&forb->peer_mutex); + peer = forb_peer_nolock_find(forb, server_id); + if (!peer && !timeout) goto unlock; + if (peer) { + forb_peer_get(peer); + } else { + /* We are the first who want to contact this peer */ + peer = forb_peer_new(); + if (!peer) goto unlock; + peer->server_id = *server_id; + peer->state = FORB_PEER_WANTED; + forb_peer_nolock_insert(forb, forb_peer_get(peer)); + peer_allocated = true; + } + while (peer->state == FORB_PEER_WANTED) { + int ret; + /* Wait for the peer to be discovered. */ + ret = fosa_cond_timedwait(&peer->cond, + &forb->peer_mutex, + timeout); + if (ret == FOSA_ETIMEDOUT) { + /* No peer discovered within timeout */ + if (peer_allocated) { + forb_peer_nolock_delete(forb, peer); + forb_peer_put(peer); + } + forb_peer_put(peer); + peer = NULL; + break; + } + } +unlock: + fosa_mutex_unlock(&forb->peer_mutex); + + return peer; +} + +/** + * + * @param port + * @param peer + * @param server_id + * @param addr + * @param orb_id + * + * @warning This function has to be called either from receiver thread + * of when the recevier thread is not running. + */ void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer, forb_server_id server_id, void *addr, CORBA_string orb_id) @@ -93,3 +241,19 @@ void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer, /* Broadcast our hello packet now */ forb_syncobj_signal(&port->hello); } + +/** + * + * + * @param peer + * + * @warning This function has to be called either from receiver thread + * of when the recevier thread is not running. + */ +void forb_peer_disconnected(forb_peer_t *peer) +{ + forb_peer_delete(peer->port->forb, peer); + forb_port_peer_delete(peer->port, peer); + forb_peer_put(peer); /* This should release the peer and in case on proto_inet close the socker. */ +} + diff --git a/src/discovery.h b/src/discovery.h index 1b98e57..5425e29 100644 --- a/src/discovery.h +++ b/src/discovery.h @@ -50,9 +50,20 @@ #include "peer.h" #include "port.h" +int forb_discovery_init(forb_t *forb); + void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer, forb_server_id server_id, void *addr, CORBA_string orb_id); +void forb_peer_disconnected(forb_peer_t *peer); + +forb_peer_t * +forb_peer_find(forb_t *forb, forb_server_id *server_id); + +forb_peer_t * +forb_peer_find_timed(forb_t *forb, forb_server_id *server_id, + fosa_abs_time_t *timeout); + #endif diff --git a/src/forb.c b/src/forb.c index dc55cbc..fe34081 100644 --- a/src/forb.c +++ b/src/forb.c @@ -74,6 +74,7 @@ #include #include #include +#include "discovery.h" #ifdef CONFIG_FORB_PROTO_UNIX #include #endif @@ -203,12 +204,8 @@ forb_init(int *argc, char **argv[], const struct forb_init_attr *attr) if (fosa_mutex_init(&forb->request_mutex, 0) != 0) goto err2; forb_request_nolock_init_root_field(forb); - - if (fosa_mutex_init(&forb->peer_mutex, 0) != 0) goto err2; - forb_peer_nolock_init_root_field(forb); - if (fosa_mutex_init(&forb->objkey_mutex, 0) != 0) goto err2; - forb_objects_nolock_init_root_field(forb); + if (forb_discovery_init(forb) != 0) goto err2; if (fosa_mutex_init(&forb->regref_mutex, 0) != 0) goto err2; forb_regref_nolock_init_root_field(forb); diff --git a/src/peer.c b/src/peer.c index ea04c0c..946674d 100644 --- a/src/peer.c +++ b/src/peer.c @@ -58,17 +58,6 @@ #include "forb_utils.h" #include "proto.h" -GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */ - forb_t, /* cust_root_t */ - forb_peer_t, /* cust_item_t */ - forb_server_id, /* cust_key_t */ - peers, /* cust_root_node */ - gnode, /* cust_item_node */ - server_id, /* cust_item_key */ - forb_server_id_cmp);/* cust_cmp_fnc */ - - - forb_peer_t * forb_peer_new(void) { @@ -97,72 +86,3 @@ forb_peer_release(forb_ref_t *ref) forb_free(peer); } - -void -forb_peer_delete_by_port(forb_t *forb, forb_port_t *port) -{ - forb_peer_t *peer; - ul_list_for_each_cut(forb_port_peer, port, peer) { - forb_peer_delete(forb, peer); - forb_peer_put(peer); - } -} - -/** - * Finds the peer with given @a server_id. If the peer is not - * currently known (not yet discovered or not available), this - * function waits at maximum @a timeout for the peer to be discovered. - * - * @param forb - * @param server_id - * @param timeout - * - * @return The peer structure or NULL if the peer is not known even - * after the timeout elapses. - * - * @note After the returned peer is not needed, forb_peer_put() must - * called on it. - */ -forb_peer_t * -forb_peer_find_timed(forb_t *forb, forb_server_id *server_id, - fosa_abs_time_t *timeout) -{ - forb_peer_t *peer; - bool peer_allocated = false; - - fosa_mutex_lock(&forb->peer_mutex); - peer = forb_peer_nolock_find(forb, server_id); - if (!peer && !timeout) goto unlock; - if (peer) { - forb_peer_get(peer); - } else { - /* We are the first who want to contact this peer */ - peer = forb_peer_new(); - if (!peer) goto unlock; - peer->server_id = *server_id; - peer->state = FORB_PEER_WANTED; - forb_peer_nolock_insert(forb, forb_peer_get(peer)); - peer_allocated = true; - } - while (peer->state == FORB_PEER_WANTED) { - int ret; - /* Wait for the peer to be discovered. */ - ret = fosa_cond_timedwait(&peer->cond, - &forb->peer_mutex, - timeout); - if (ret == FOSA_ETIMEDOUT) { - /* No peer discovered within timeout */ - if (peer_allocated) { - forb_peer_nolock_delete(forb, peer); - forb_peer_put(peer); - } - forb_peer_put(peer); - peer = NULL; - break; - } - } -unlock: - fosa_mutex_unlock(&forb->peer_mutex); - - return peer; -} diff --git a/src/peer.h b/src/peer.h index 57e06f5..d6587f9 100644 --- a/src/peer.h +++ b/src/peer.h @@ -94,16 +94,6 @@ typedef struct forb_peer { fosa_mutex_t send_lock; } forb_peer_t; - -GAVL_CUST_NODE_INT_DEC(forb_peer_nolock,/* cust_prefix */ - forb_t, /* cust_root_t */ - forb_peer_t, /* cust_item_t */ - forb_server_id, /* cust_key_t */ - peers, /* cust_root_node */ - gnode, /* cust_item_node */ - server_id, /* cust_item_key */ - forb_server_id_cmp)/* cust_cmp_fnc */ - UL_LIST_CUST_DEC(forb_port_peer, /* cust_prefix */ forb_port_t, /* cust_head_t */ forb_peer_t, /* cust_item_t */ @@ -129,52 +119,7 @@ forb_peer_put(forb_peer_t *peer) forb_ref_put(&peer->ref, forb_peer_release); } -static inline void -forb_peer_insert(forb_t *forb, forb_peer_t *peer) -{ - fosa_mutex_lock(&forb->peer_mutex); - forb_peer_get(peer); - forb_peer_nolock_insert(forb, peer); - fosa_mutex_unlock(&forb->peer_mutex); -} - -static inline void -forb_peer_delete(forb_t *forb, forb_peer_t *peer) -{ - fosa_mutex_lock(&forb->peer_mutex); - forb_peer_nolock_delete(forb, peer); - forb_peer_put(peer); - fosa_mutex_unlock(&forb->peer_mutex); -} - void forb_peer_delete_by_port(forb_t *forb, forb_port_t *port); -/** - * Finds peer with given @a server_id. - * - * @param forb - * @param server_id - * - * @return The found peer or NULL if no peer is found. You have to - * call forb_peer_put() after you finish working with the non NULL - * retuned value. - */ -static inline forb_peer_t * -forb_peer_find(forb_t *forb, forb_server_id *server_id) -{ - forb_peer_t *ret; - fosa_mutex_lock(&forb->peer_mutex); - ret = forb_peer_nolock_find(forb, server_id); - if (ret) { - forb_peer_get(ret); - } - fosa_mutex_unlock(&forb->peer_mutex); - return ret; -} - -forb_peer_t * -forb_peer_find_timed(forb_t *forb, forb_server_id *server_id, - fosa_abs_time_t *timeout); - #endif diff --git a/src/port.c b/src/port.c index 60121b0..0c54c40 100644 --- a/src/port.c +++ b/src/port.c @@ -61,6 +61,7 @@ #include #include "iop.h" #include +#include "discovery.h" extern UL_LOG_CUST(ulogd_forb_port); @@ -168,14 +169,19 @@ void forb_destroy_port(forb_port_t *port) pthread_join(port->discovery_thread.pthread_id, &thread_return); #endif + /* Because of no locking of port->peers, this must be called + * after receiver thread is stopped. */ + forb_peer_t *peer; + ul_list_for_each_cut(forb_port_peer, port, peer) { + forb_peer_get(peer); + forb_peer_disconnected(peer); + forb_peer_put(peer); + } + if (port->desc.proto->port_destroy) { port->desc.proto->port_destroy(port); } - /* Because of no locking of port->peers, this must be called - * after receiver thread is stopped. */ - forb_peer_delete_by_port(forb, port); - FORB_CDR_codec_release_buffer(&port->codec); fosa_mutex_lock(&forb->port_mutex); diff --git a/src/proto.c b/src/proto.c index ed4e60b..3caab33 100644 --- a/src/proto.c +++ b/src/proto.c @@ -59,6 +59,7 @@ #include #include #include +#include "discovery.h" extern UL_LOG_CUST(ulogd_forb_proto); diff --git a/src/tests/discovery.c b/src/tests/discovery.c index ddb7996..48b0a08 100644 --- a/src/tests/discovery.c +++ b/src/tests/discovery.c @@ -62,6 +62,7 @@ #include #include #include "../proto.h" +#include "../discovery.h" #include #include #include