forb_iop_reply_header reply_header;
CORBA_boolean ret;
forb_peer_t *peer;
+ fosa_abs_time_t timeout;
reply_header.request_id = request_id;
reply_header.flags = 0;
}
forb_iop_prepend_message_header(codec, forb_iop_REPLY);
- peer = forb_get_next_hop(forb, dest);
+ fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+ timeout = fosa_abs_time_incr(timeout,
+ fosa_msec_to_rel_time(1000));
+
+ peer = forb_get_next_hop(forb, dest, &timeout);
if (!peer) {
ul_logerr("Reply destination not found\n");
goto err;
}
if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
peer = forb_peer_find(forb, &server_id);
- if (peer) {
+ if (peer && peer->state == FORB_PEER_DISCOVERED) {
/* TODO: Update last hello receive time */
forb_peer_put(peer);
} else {
/* New peer discovered */
-/* char str[100]; */
-/* printf("New peer %s discovered port %p\n", */
-/* forb_server_id_to_string(str, &server_id, sizeof(str)), port); */
- peer = forb_peer_new();
+ bool notify_waiters = false;
+ if (peer /* && peer->state == FORB_PEER_WANTED */) {
+ notify_waiters = true;
+ } else {
+ peer = forb_peer_new();
+ }
if (peer) {
+ fosa_mutex_lock(&forb->peer_mutex);
peer->server_id = server_id;
peer->port = port;
peer->addr = addr;
- forb_peer_insert(forb, peer);
+ peer->state = FORB_PEER_DISCOVERED;
+ if (notify_waiters) {
+ fosa_cond_broadcast(&peer->cond);
+ } else {
+ forb_peer_get(peer);
+ forb_peer_nolock_insert(forb, peer);
+ }
+ fosa_mutex_unlock(&forb->peer_mutex);
forb_peer_put(peer);
}
/* Broadcast our hello packet now */
peer = forb_malloc(sizeof(*peer));
if (peer) {
+ memset(peer, 0, sizeof(*peer));
+ fosa_cond_init(&peer->cond);
forb_ref_init(&peer->ref);
}
return peer;
}
fosa_mutex_unlock(&port->forb->peer_mutex);
}
+
+forb_peer_t *
+forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
+ fosa_abs_time_t *timeout)
+{
+ forb_peer_t *peer;
+
+ fosa_mutex_lock(&forb->peer_mutex);
+ peer = forb_peer_nolock_find(forb, server_id);
+ if (!peer && !timeout) goto unlock;
+ if (peer) {
+ forb_peer_get(peer);
+ }
+ if (!peer) {
+ /* We are the first who want to contact this peer */
+ peer = forb_peer_new();
+ if (!peer) goto unlock;
+ peer->server_id = *server_id;
+ peer->state = FORB_PEER_WANTED;
+ forb_peer_get(peer);
+ forb_peer_nolock_insert(forb, peer);
+ }
+ while (peer->state == FORB_PEER_WANTED) {
+ int ret;
+ /* Wait for the peer to be discovered. */
+ ret = fosa_cond_timedwait(&peer->cond,
+ &forb->peer_mutex,
+ timeout);
+ if (ret == FOSA_ETIMEDOUT) {
+ forb_peer_put(peer);
+ peer = NULL;
+ /* TODO: How to remove the peer from the tree?
+ * We should check refcnt and if equal to 1,
+ * we should remove it. */
+ }
+ }
+unlock:
+ fosa_mutex_unlock(&forb->peer_mutex);
+
+ return peer;
+}
#include "port.h"
#include "refcnt.h"
+enum forb_peer_state {
+ FORB_PEER_WANTED, /**< Somebody is waiting for the peer to be discovered. In this state, only @a server_id field is set. */
+ FORB_PEER_DISCOVERED, /**< The peer is already discovered, so it is addr is known and messages can be sent to him. */
+};
/**
* Description of a peer. We consider peers only in one-hop
struct forb_peer {
forb_server_id server_id; /**< Server_id of the peer */
forb_port_t *port; /**< Port of this peer is connected to */
- void *addr; /**< Protocol specific address of the peer */
+ void *addr; /**< Protocol specific address of the peer. */
void *proto_priv; /**< Protocol private data (e.g. info about established connection) */
gavl_node_t node; /**< Node of port's peers tree */
forb_ref_t ref; /**< Reference count */
+ enum forb_peer_state state;
+ fosa_cond_t cond; /**< Condition variable for waiting after the peer is discovered. Used together with forb::peer_mutex. */
};
typedef struct forb_peer forb_peer_t;
return ret;
}
+forb_peer_t *
+forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
+ fosa_abs_time_t *timeout);
+
#endif
* @return Next hop peer of NULL, if the destination is not found.
*/
forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
+forb_get_next_hop(forb_t *forb, forb_server_id *server_id, fosa_abs_time_t *timeout)
{
forb_peer_t *peer;
- peer = forb_peer_find(forb, server_id);
+ peer = forb_peer_find_timed(forb, server_id, timeout);
return peer;
}
size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec);
forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server);
+forb_get_next_hop(forb_t *forb, forb_server_id *server, fosa_abs_time_t *timeout);
#endif
CORBA_boolean ret;
forb_peer_t *peer;
size_t size;
+ fosa_abs_time_t timeout;
forb_t *forb = forb_object_to_forb(req->obj);
+
if (!forb) {
env->major = FORB_EX_INTERNAL;
return;
env->major = FORB_EX_INTERNAL;
return;
}
- peer = forb_get_next_hop(forb, &req->obj->server);
+
+ fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+ timeout = fosa_abs_time_incr(timeout,
+ fosa_msec_to_rel_time(1000));
+ peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
if (!peer) {
char str[50];
ul_logerr("Cannot find peer to send request to server: %s\n",
#include <unistd.h>
#include "../proto.h"
#include <error.h>
+#include <fosa.h>
#define NUM_ORBS 5
forb_orb orb[NUM_ORBS];
int i;
bool all_peers_found = true;
+ fosa_abs_time_t timeout;
for (i=0; i<NUM_ORBS; i++) {
orb[i] = forb_init(&argc, &argv, NULL);
}
- /* Wait for discovering all the peers */
- sleep(1);
+ fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+ timeout = fosa_abs_time_incr(timeout,
+ fosa_msec_to_rel_time(1000));
/* Check whether all the peers are found */
for (i=0; i<NUM_ORBS && all_peers_found; i++) {
forb_peer_t *peer;
for (j=0; j<NUM_ORBS && all_peers_found; j++) {
if (i==j) continue;
- peer = forb_peer_find(forb_data(orb[i]), &forb_data(orb[j])->server_id);
+ peer = forb_peer_find_timed(forb_data(orb[i]),
+ &forb_data(orb[j])->server_id,
+ &timeout);
all_peers_found &= (peer != NULL);
if (!all_peers_found) {
orb = forb_init(&argc_g, &argv_g, "hello_remote_client");
- sleep(1); /* Hack to wait for dicovery of the two peers */
-
hobj = forb_string_to_object(orb, objref);
if (!hobj) {
error(1, 0, "Cannot create object reference from the string (%s)\n", objref);
orb = forb_init(&argc_g, &argv_g, "objref_client");
- sleep(1); /* Hack to wait for dicovery of the two peers */
-
me = forb_client_new(orb, &client_impl, NULL);
fosa_thread_create(&tid, NULL, client_thread, me);