#include "discovery.h"
#include <ul_log.h>
#include "object.h"
+#include <forb/config.h>
+#include "iop.h"
extern UL_LOG_CUST(ulogd_forb_discovery);
forb_server_id_to_string(str, &peer->server_id, sizeof(str)),
orb_id);
}
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+ if (forb->attr.redistribute_hellos) {
+ forb_peer_t *p;
+ ul_list_for_each(forb_port_peer, port, p) {
+ if (p != peer &&
+ forb_server_id_cmp(&p->server_id, &forb->server_id) != 0) {
+ forb_iop_redistribute_hello_to(p, peer); /* Introduce new peer to others */
+ forb_iop_redistribute_hello_to(peer, p); /* Introduce other peers to the new one */
+ }
+ }
+ }
+#endif
if (forb->attr.peer_discovery_callback) {
forb_orb peer_orb = forb_object_new(forb->orb, &peer->server_id, 0);
forb->attr.peer_discovery_callback(peer_orb, orb_id);
void (*peer_dead_callback)(const forb_orb peer_orb, const char *orb_id);
uint16_t fixed_tcp_port; /**< If we want FORB's inet protocol to listen on a fixed port... */
forb_server_id fixed_server_id; /**< If we do not want the ID to be random. (HACK) */
+ bool redistribute_hellos;
} forb_init_attr_t;
return ret;
}
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
+{
+ forb_port_t *port = peer->port;
+ FORB_CDR_Codec codec;
+ int ret;
+ FORB_CDR_codec_init_static(&codec, port->forb->orb);
+ if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+ return -1;
+ if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+ ret = -1;
+ goto free;
+ }
+ if (!forb_iop_prepare_hello(&codec, &peer->server_id,
+ peer->addr,
+ peer->port->desc.proto->serialize_addr,
+ peer->orb_id)) {
+ ret = -1;
+ goto free;
+ }
+ ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
+ ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
+ ret = forb_proto_send(dest, &codec);
+ if (ret > 0) ret = 0;
+free:
+ FORB_CDR_codec_release_buffer(&codec);
+ return ret;
+}
+
bool
forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
{
peer = forb_peer_find(forb, &server_id);
if (peer && peer->state == FORB_PEER_DISCOVERED) {
/* TODO: Update last hello receive time */
- if (addr)
- forb_free(addr);
- if (peer_orb_id)
- forb_free(peer_orb_id);
- forb_peer_put(peer);
+ if (port->new_peer) {
+ ul_logdeb("peer already discovered but not connected - replacing\n");
+ forb_peer_disconnected(peer);
+ forb_peer_put(peer);
+ peer = port->new_peer;
+ port->new_peer = NULL;
+ forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
+ } else {
+ ul_logdeb("peer already discovered - ignoring\n");
+ if (addr)
+ forb_free(addr);
+ if (peer_orb_id)
+ forb_free(peer_orb_id);
+ forb_peer_put(peer);
+ }
} else {
if (port->new_peer) {
- if (peer)
+ if (peer) {
ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
+ forb_peer_put(peer);
+ }
peer = port->new_peer;
port->new_peer = NULL;
}
return ret;
}
+int setnonblocking(int fd);
+
static int
inet_connect(forb_peer_t *peer)
{
goto err_close;
}
+ setnonblocking(ipeer->socket);
+
struct epoll_event ev;
struct inet_port *p = peer->port->desc.proto_priv;
memset(&ev, 0, sizeof(ev));