]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/peer.c
Added mutex to protect request send from multiple threads
[frescor/forb.git] / src / peer.c
index 0151fcd4cec75a7e7de2a7c444be0e74ede95c9a..ea04c0c94ccb41f7f88aa8b3867b68aeacf350cf 100644 (file)
 /* covered by the GNU Public License.                                    */
 /**************************************************************************/
 
+/**
+ * @file   peer.c
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @date   Sun Oct 12 17:30:27 2008
+ * 
+ * @brief  Implementation of peer manipulation functions.
+ * 
+ * 
+ */
+
 #include "peer.h"
-#include "misc.h"
+#include "forb_utils.h"
 #include "proto.h"
 
 GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
@@ -53,7 +63,7 @@ GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
                       forb_peer_t,     /* cust_item_t */
                       forb_server_id,  /* cust_key_t */
                       peers,           /* cust_root_node */
-                      node,            /* cust_item_node */
+                      gnode,           /* cust_item_node */
                       server_id,       /* cust_item_key */
                       forb_server_id_cmp);/* cust_cmp_fnc */
 
@@ -66,7 +76,10 @@ forb_peer_new(void)
        
        peer = forb_malloc(sizeof(*peer));
        if (peer) {
+               memset(peer, 0, sizeof(*peer));
+               fosa_cond_init(&peer->cond);
                forb_ref_init(&peer->ref);
+               fosa_mutex_init(&peer->send_lock, 0);
        }
        return peer;
 }
@@ -75,9 +88,13 @@ void
 forb_peer_release(forb_ref_t *ref)
 {
        forb_peer_t *peer = container_of(ref, forb_peer_t, ref);
-       if (peer->port->proto->peer_destroy) {
-               peer->port->proto->peer_destroy(peer);
+       if (peer->port &&
+           peer->port->desc.proto->peer_destroy) {
+               peer->port->desc.proto->peer_destroy(peer);
        }
+       if (peer->addr)
+               forb_free(peer->addr);
+       
        forb_free(peer);
 }
 
@@ -85,15 +102,67 @@ void
 forb_peer_delete_by_port(forb_t *forb, forb_port_t *port)
 {
        forb_peer_t *peer;
-       fosa_mutex_lock(&port->forb->peer_mutex);
-       /* FIXME: Is gavl_cust_for_each() deletion safe? What about
-        * recursive locking of mutex. */
-#warning Delete during GAVL traversal is probably not correct.
-       gavl_cust_for_each(forb_peer_nolock, forb, peer) {
-               if (peer->port == port) {
-                       forb_peer_nolock_delete(forb, peer);
+       ul_list_for_each_cut(forb_port_peer, port, peer) {
+               forb_peer_delete(forb, peer);
+               forb_peer_put(peer);
+       }
+}
+
+/** 
+ * Finds the peer with given @a server_id. If the peer is not
+ * currently known (not yet discovered or not available), this
+ * function waits at maximum @a timeout for the peer to be discovered.
+ * 
+ * @param forb 
+ * @param server_id 
+ * @param timeout 
+ * 
+ * @return The peer structure or NULL if the peer is not known even
+ * after the timeout elapses.
+ * 
+ * @note After the returned peer is not needed, forb_peer_put() must
+ * called on it.
+ */
+forb_peer_t *
+forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
+                    fosa_abs_time_t *timeout)
+{
+       forb_peer_t *peer;
+       bool peer_allocated = false;
+       
+       fosa_mutex_lock(&forb->peer_mutex);
+       peer = forb_peer_nolock_find(forb, server_id);
+       if (!peer && !timeout) goto unlock;
+       if (peer) {
+               forb_peer_get(peer);
+       } else {
+               /* We are the first who want to contact this peer */
+               peer = forb_peer_new();
+               if (!peer) goto unlock;
+               peer->server_id = *server_id;
+               peer->state = FORB_PEER_WANTED;
+               forb_peer_nolock_insert(forb, forb_peer_get(peer));
+               peer_allocated = true;
+       }
+       while (peer->state == FORB_PEER_WANTED) {
+               int ret;
+               /* Wait for the peer to be discovered. */
+               ret = fosa_cond_timedwait(&peer->cond,
+                                         &forb->peer_mutex,
+                                         timeout);
+               if (ret == FOSA_ETIMEDOUT) {
+                       /* No peer discovered within timeout */
+                       if (peer_allocated) {
+                               forb_peer_nolock_delete(forb, peer);
+                               forb_peer_put(peer);
+                       }
                        forb_peer_put(peer);
+                       peer = NULL;
+                       break;
                }
        }
-       fosa_mutex_unlock(&port->forb->peer_mutex);
+unlock:        
+       fosa_mutex_unlock(&forb->peer_mutex);
+
+       return peer;
 }