From: Michal Sojka Date: Thu, 28 May 2009 03:29:59 +0000 (+0200) Subject: Added redistribution of HELLO packets X-Git-Url: https://rtime.felk.cvut.cz/gitweb/frescor/forb.git/commitdiff_plain/39b3139e2b61dc928571382a7381ed3241d46ff3 Added redistribution of HELLO packets In FRSH_FORB, contract broker acts as central node and redistributes HELLO packets to all connected applications. This way every application can connect to every other. --- diff --git a/src/discovery.c b/src/discovery.c index cc85990..a0391a9 100644 --- a/src/discovery.c +++ b/src/discovery.c @@ -56,6 +56,8 @@ #include "discovery.h" #include #include "object.h" +#include +#include "iop.h" extern UL_LOG_CUST(ulogd_forb_discovery); @@ -231,6 +233,18 @@ void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer, forb_server_id_to_string(str, &peer->server_id, sizeof(str)), orb_id); } +#ifdef CONFIG_FORB_PROTO_INET_DEFAULT + if (forb->attr.redistribute_hellos) { + forb_peer_t *p; + ul_list_for_each(forb_port_peer, port, p) { + if (p != peer && + forb_server_id_cmp(&p->server_id, &forb->server_id) != 0) { + forb_iop_redistribute_hello_to(p, peer); /* Introduce new peer to others */ + forb_iop_redistribute_hello_to(peer, p); /* Introduce other peers to the new one */ + } + } + } +#endif if (forb->attr.peer_discovery_callback) { forb_orb peer_orb = forb_object_new(forb->orb, &peer->server_id, 0); forb->attr.peer_discovery_callback(peer_orb, orb_id); diff --git a/src/forb.h b/src/forb.h index 60d6842..187d0d3 100644 --- a/src/forb.h +++ b/src/forb.h @@ -160,6 +160,7 @@ typedef struct forb_init_attr { void (*peer_dead_callback)(const forb_orb peer_orb, const char *orb_id); uint16_t fixed_tcp_port; /**< If we want FORB's inet protocol to listen on a fixed port... */ forb_server_id fixed_server_id; /**< If we do not want the ID to be random. (HACK) */ + bool redistribute_hellos; } forb_init_attr_t; diff --git a/src/iop.c b/src/iop.c index c043463..94c351d 100644 --- a/src/iop.c +++ b/src/iop.c @@ -160,6 +160,35 @@ free: return ret; } +int +forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer) +{ + forb_port_t *port = peer->port; + FORB_CDR_Codec codec; + int ret; + FORB_CDR_codec_init_static(&codec, port->forb->orb); + if (!FORB_CDR_buffer_init(&codec, 1024, 0)) + return -1; + if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) { + ret = -1; + goto free; + } + if (!forb_iop_prepare_hello(&codec, &peer->server_id, + peer->addr, + peer->port->desc.proto->serialize_addr, + peer->orb_id)) { + ret = -1; + goto free; + } + ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n", + ""/*TODO:id*/, peer->orb_id, "", dest->orb_id); + ret = forb_proto_send(dest, &codec); + if (ret > 0) ret = 0; +free: + FORB_CDR_codec_release_buffer(&codec); + return ret; +} + bool forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec) { @@ -480,15 +509,27 @@ process_hello(forb_port_t *port, FORB_CDR_Codec *codec) peer = forb_peer_find(forb, &server_id); if (peer && peer->state == FORB_PEER_DISCOVERED) { /* TODO: Update last hello receive time */ - if (addr) - forb_free(addr); - if (peer_orb_id) - forb_free(peer_orb_id); - forb_peer_put(peer); + if (port->new_peer) { + ul_logdeb("peer already discovered but not connected - replacing\n"); + forb_peer_disconnected(peer); + forb_peer_put(peer); + peer = port->new_peer; + port->new_peer = NULL; + forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id); + } else { + ul_logdeb("peer already discovered - ignoring\n"); + if (addr) + forb_free(addr); + if (peer_orb_id) + forb_free(peer_orb_id); + forb_peer_put(peer); + } } else { if (port->new_peer) { - if (peer) + if (peer) { ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n"); + forb_peer_put(peer); + } peer = port->new_peer; port->new_peer = NULL; } diff --git a/src/iop.h b/src/iop.h index 38971f6..e257309 100644 --- a/src/iop.h +++ b/src/iop.h @@ -95,5 +95,7 @@ forb_request_send(forb_request_t *req, CORBA_Environment *env); int forb_iop_send_hello_to(forb_peer_t *peer); +int +forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer); #endif diff --git a/src/proto_inet.c b/src/proto_inet.c index 32a45c8..94296c1 100644 --- a/src/proto_inet.c +++ b/src/proto_inet.c @@ -147,6 +147,8 @@ inet_deserialize_addr(FORB_CDR_Codec *codec, void **addr) return ret; } +int setnonblocking(int fd); + static int inet_connect(forb_peer_t *peer) { @@ -177,6 +179,8 @@ inet_connect(forb_peer_t *peer) goto err_close; } + setnonblocking(ipeer->socket); + struct epoll_event ev; struct inet_port *p = peer->port->desc.proto_priv; memset(&ev, 0, sizeof(ev));