#include <sys/types.h>
#include <ul_log.h>
#include <unistd.h>
+#include <forb/config.h>
+#include "discovery.h"
+#include <stdlib.h>
+#include <netinet/tcp.h>
+#include "iop.h" /* FIXME: Sending hello should be handled in IOP layer */
/**
* @file proto_inet.c
struct inet_addr addr; /**< Address of this port */
int last_recv_fd; /**< Used in inet_recv() to read data longer than receiving buffer */
struct in_addr multicast_addr;
+ ul_list_head_t new_peers; /**< List of just connected peers which did not send any data yet. */
};
+UL_LIST_CUST_DEC(inet_port_new_peer, /* cust_prefix */
+ struct inet_port, /* cust_head_t */
+ forb_peer_t, /* cust_item_t */
+ new_peers, /* cust_head_field */
+ lnode) /* cust_node_field */
+
+
/** INET protocol data associated with every peer */
struct inet_peer {
int socket; /**< Connected socket to the peer */
return ret;
}
-static struct inet_peer *
+static int
+setnonblocking(int fd);
+
+static int
+setnodelay(int fd)
+{
+ int ret = 0;
+#if 0 /* For nice graphs in benchmarks */
+ int yes = 1;
+ ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+ if (ret < 0) {
+ ul_logerr("setsockopt(TCP_NODELAY): %s\n", strerror(errno));
+ }
+#endif
+ return ret;
+
+}
+
+static int
inet_connect(forb_peer_t *peer)
{
struct inet_peer *ipeer;
struct inet_addr *addr = peer->addr;
int ret;
- if (!addr)
- return NULL;
+ if (!addr) {
+ ul_logerr("No address to connect\n");
+ goto err;
+ }
ipeer = forb_malloc(sizeof(*ipeer));
if (!ipeer)
goto err;
ipeer->socket = socket(PF_INET, SOCK_STREAM, 0);
- if (!ipeer->socket)
+ if (!ipeer->socket) {
+ ul_logerr("socket(): %s\n", strerror(errno));
goto err_free;
+ }
sa.sin_family = AF_INET;
sa.sin_port = addr->port;
sa.sin_addr = addr->addr;
- ul_logdeb("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
+ ul_logtrash("connect to %s:%u\n", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
ret = connect(ipeer->socket, (struct sockaddr*)&sa, sizeof(sa));
- if (ret)
+ if (ret) {
+ ul_logerr("connect error: %s\n", strerror(errno));
+ goto err_close;
+ }
+
+ setnonblocking(ipeer->socket);
+ setnodelay(ipeer->socket);
+
+ struct epoll_event ev;
+ struct inet_port *p = peer->port->desc.proto_priv;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = ipeer->socket;
+ ret = epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, ipeer->socket, &ev);
+ if (ret) {
+ ul_logerr("epoll_ctl on connect failed: %s\n", strerror(errno));
goto err_close;
+ }
+
+ peer->proto_priv = ipeer;
+
+#ifndef TEST /* FIXME: Move hello to IOP, introduce proto connect callback */
+ ret = forb_iop_send_hello_to(peer);
+ if (ret) {
+ goto err_close;
+ }
+#endif
- return ipeer;
+ return 0;
err_close:
close(ipeer->socket);
err_free:
forb_free(ipeer);
err:
- return NULL;
+ return -1;
}
-static size_t
+static ssize_t
inet_send(forb_peer_t *peer, const void *buf, size_t len)
{
struct inet_peer *ipeer = peer->proto_priv;
+ ssize_t ret, sent;
if (!ipeer) {
- ipeer = inet_connect(peer);
- if (!ipeer)
- return -1;
- peer->proto_priv = ipeer;
-
+ ret = inet_connect(peer);
+ if (ret) {
+ return ret;
+ }
+ ipeer = peer->proto_priv;
}
-
- return send(ipeer->socket, buf, len, 0);
+
+ sent = 0;
+ ul_logtrash("send fd=%d len=%zu\n", ipeer->socket, len);
+ do {
+ ret = send(ipeer->socket, buf, len, 0);
+ if (ret < 0) {
+ ul_logerr("send error: %s\n", strerror(errno));
+ return ret;
+ }
+ sent += ret;
+ buf += ret;
+ len -= ret;
+ } while (len > 0);
+
+ return sent;
}
/*----------------------------------------------------------------------
#endif
}
-int inet_accept_connection(struct inet_port *p)
+static int
+inet_accept_connection(forb_port_t *port)
{
+ struct inet_port *p = port->desc.proto_priv;
int client;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
struct epoll_event ev;
int ret;
+ forb_peer_t *peer;
client = accept(p->listen_socket, (struct sockaddr *) &addr, &addrlen);
if (client < 0){
close(client);
return -1;
}
+ setnodelay(client);
+
+ peer = forb_peer_new();
+ if (peer) {
+ struct inet_peer *ipeer;
+
+ ipeer = forb_malloc(sizeof(*ipeer));
+ if (ipeer) {
+ ipeer->socket = client;
+ peer->proto_priv = ipeer;
+ peer->port = port;
+ peer->state = FORB_PEER_DISCOVERED;
+ inet_port_new_peer_insert(p, peer);
+ } else {
+ forb_peer_put(peer);
+ }
+ }
+
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client;
return epoll_ctl(p->epoll_fd, EPOLL_CTL_ADD, client, &ev);
}
-static size_t
+static ssize_t
inet_recv(forb_port_t *port, void *buf, size_t len)
{
struct inet_port *iport = port->desc.proto_priv;
#if 1
struct epoll_event ev;
- int ret, nfds;
+ ssize_t ret;
+ int nfds;
+ forb_peer_t *peer;
+ bool exported_new_peer = false;
+
for (;;) {
if (iport->last_recv_fd == -1) {
nfds = epoll_wait(iport->epoll_fd, &ev, 1, -1);
+ if (nfds == -1 && errno == EINTR)
+ continue;
if (nfds < 1)
return -1;
if (ev.data.fd == iport->listen_socket) {
- ret = inet_accept_connection(iport);
- if (ret)
+ ret = inet_accept_connection(port);
+ if (ret) {
+ ul_logerr("inet_accept_connection error: %s\n", strerror(errno));
return -1;
- else
+ } else
continue;
} else {
iport->last_recv_fd = ev.data.fd;
}
}
+ /* Check for first reception form a just connected peer */
+ ul_list_for_each(inet_port_new_peer, iport, peer) {
+ struct inet_peer *ipeer = peer->proto_priv;
+ //printf("checking new peer with fd=%d\n", ipeer->socket);
+ if (ipeer->socket == iport->last_recv_fd) {
+ inet_port_new_peer_delete(iport, peer);
+
+ if (port->new_peer)
+ forb_peer_put(port->new_peer);
+
+ /* Let the upper layer assign forb ID
+ * to this peer according to the request*/
+ port->new_peer = peer;
+ exported_new_peer = true;
+ break;
+ }
+ }
+
+ //printf("recv fd=%d\n", iport->last_recv_fd);
ret = recv(iport->last_recv_fd, buf, len, 0);
- if (ret == -1 && errno == EAGAIN) {
+ if (ret == -1) {
+ if (exported_new_peer) {
+ forb_peer_put(peer);
+ port->new_peer = NULL;
+ }
+ if (errno != EAGAIN) {
+ ul_logerr("recv fd=%d error: %s\n", iport->last_recv_fd, strerror(errno));
+ }
iport->last_recv_fd = -1;
continue;
}
if (ret == 0) {
- close(iport->last_recv_fd);
- /* TODO: Notify FORB about peer disconnect */
+ if (exported_new_peer) {
+ forb_peer_put(peer);
+ port->new_peer = NULL;
+ }
+ ul_logtrash("recv fd=%d disconnect\n", iport->last_recv_fd);
+ ul_list_for_each(forb_port_peer, port, peer) {
+ struct inet_peer *ipeer = peer->proto_priv;
+ if (ipeer && ipeer->socket == iport->last_recv_fd) {
+ forb_peer_disconnected(peer);
+ break;
+ }
+ }
iport->last_recv_fd = -1;
continue;
}
+ ul_logtrash("recv fd=%d len=%zd\n", iport->last_recv_fd, ret);
return ret;
}
#else
inet_port_destroy(forb_port_t * port)
{
struct inet_port *pd = port->desc.proto_priv;
+ forb_peer_t *peer;
close(pd->epoll_fd);
close(pd->udp_socket);
close(pd->listen_socket);
+ ul_list_for_each_cut(inet_port_new_peer, pd, peer) {
+ forb_peer_put(peer);
+ }
forb_free(pd);
return 0;
}
-static size_t
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
+static ssize_t
inet_broadcast(forb_port_t *port, const void *buf, size_t len)
{
struct inet_port *p = port->desc.proto_priv;
struct sockaddr_in addr;
- int ret;
+ ssize_t ret;
addr.sin_family = AF_INET;
addr.sin_port = htons(MCAST_PORT);
(struct sockaddr*)&addr, sizeof(addr));
return ret;
}
+#endif
static void
inet_peer_destroy(forb_peer_t *peer)
struct inet_peer *ipeer = peer->proto_priv;
if (ipeer) {
peer->proto_priv = NULL;
+ ul_logtrash("destroying peer fd=%d (orb_id=%s)\n",
+ ipeer->socket, peer->orb_id);
close(ipeer->socket);
free(ipeer);
}
return ret;
}
+#if CONFIG_FCB && CONFIG_FORB_PROTO_INET_DEFAULT
+
+#include <fcb.h>
+#include <fcb_contact_info.h>
+
+static void inet_register_cb(forb_port_t *port)
+{
+ struct inet_addr *ia;
+
+ ia = malloc(sizeof(*ia));
+ if (!ia) return;
+
+ char *fcb_addr = getenv("FCB_ADDR");
+ if (!fcb_addr) fcb_addr = "127.0.0.1";
+ ia->addr.s_addr = inet_addr(fcb_addr);
+ ia->port = htons(FCB_TCP_PORT);
+ forb_new_peer_discovered(port, NULL, FCB_SERVER_ID, ia, "");
+}
+#else
+#define inet_register_cb NULL
+#endif
static const forb_proto_t proto_inet = {
.hello_interval = 40 /* seconds */,
.peer_destroy = inet_peer_destroy,
.send = inet_send,
.recv = inet_recv,
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
.broadcast = inet_broadcast,
+#endif
.serialize_addr = inet_serialize_addr,
.deserialize_addr = inet_deserialize_addr,
.addr2str = inet_addr2str,
+ .register_cb = inet_register_cb,
};
#define MAX_INTERFACES 10
{
struct ifconf ifc;
struct ifreq *ifr, req[MAX_INTERFACES];
+ char *env;
+ bool loopback = false;
+ int ret;
+ struct in_addr env_addr;
+ env = getenv("FORB_EXTERNAL_IP");
+ if (env) {
+ if (inet_aton(env, &env_addr) == 0) {
+ ul_logerr("Cannot convert FORB_EXTERNAL_IP\n");
+ errno = EINVAL;
+ return -1;
+ }
+ }
ifc.ifc_len = sizeof(req);
ifc.ifc_req = req;
for (ifr = req; ifr < &req[MAX_INTERFACES]; ifr++) {
struct sockaddr_in ia;
memcpy(&ia, &ifr->ifr_addr, sizeof(ia));
- ioctl(sock, SIOCGIFFLAGS, ifr);
- if ((ifr->ifr_flags & IFF_UP) && !(ifr->ifr_flags & IFF_LOOPBACK)) {
- *addr = ia.sin_addr;
- return 0;
+ ret = ioctl(sock, SIOCGIFFLAGS, ifr);
+ if ((ret == 0) && (ifr->ifr_flags & IFF_UP)) {
+ if (env && env_addr.s_addr == ia.sin_addr.s_addr) {
+ *addr = env_addr;
+ return 0;
+ }
+ if (!(ifr->ifr_flags & IFF_LOOPBACK)) {
+ if (!env) {
+ *addr = ia.sin_addr;
+ return 0;
+ }
+ } else {
+ *addr = ia.sin_addr;
+ loopback = true;
+ }
}
}
- return -1;
+ if (env) {
+ ul_logerr("FORB_EXTERNAL_IP doesn't match local interface\n");
+ errno = ENODEV;
+ return -1;
+ }
+ if (loopback)
+ return 0;
+ else
+ return -1;
}
/**
* appropriately.
*/
int
-forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on)
+forb_inet_port_init(struct forb_port_desc *port_desc, struct in_addr listen_on,
+ uint16_t port)
{
int ret;
struct inet_port *port_priv;
memset(port_priv, 0, sizeof(*port_priv));
port_priv->last_recv_fd = -1;
+ inet_port_new_peer_init_head(port_priv);
/* Initialize UDP multicast socket */
port_priv->udp_socket = socket(PF_INET, SOCK_DGRAM, 0);
setnonblocking(port_priv->udp_socket);
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
struct ip_mreq mreq;
inet_aton(MCAST_ADDR, &port_priv->multicast_addr);
mreq.imr_multiaddr = port_priv->multicast_addr;
IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
if (ret)
goto err_close_udp;
+#endif
addr.sin_family = AF_INET;
addr.sin_port = htons(MCAST_PORT);
port_priv->listen_socket = socket(PF_INET, SOCK_STREAM, 0);
if (port_priv->listen_socket == -1) goto err_close_udp;
+ reuse = 1;
+ setsockopt(port_priv->listen_socket, SOL_SOCKET, SO_REUSEADDR, (int *) &reuse, sizeof(reuse));
+ if (ret)
+ goto err_close_listen;
+
addr.sin_family = AF_INET;
- addr.sin_port = htons(0); /* Random port */
+ addr.sin_port = htons(port);
addr.sin_addr = listen_on;
ret = bind(port_priv->listen_socket, (struct sockaddr *)&addr, sizeof(addr));
if (ret != 0) goto err_close_listen;
}
port_priv->addr.port = addr.sin_port;
- if (listen_on.s_addr == INADDR_ANY)
- get_local_address(port_priv->listen_socket, &port_priv->addr.addr);
- else
+ if (listen_on.s_addr == INADDR_ANY) {
+ if (get_local_address(port_priv->listen_socket, &port_priv->addr.addr)) {
+ goto err_close_listen;
+ }
+ } else
port_priv->addr.addr = listen_on;
/* Initialize epoll descriptor */
port_priv->listen_socket, &ev);
if (ret)
goto err_close_epoll;
-
+
+#ifndef CONFIG_FORB_PROTO_INET_DEFAULT
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = port_priv->udp_socket;
ret = epoll_ctl(port_priv->epoll_fd, EPOLL_CTL_ADD,
port_priv->udp_socket, &ev);
if (ret)
goto err_close_epoll;
+#endif
port_desc->proto = &proto_inet;
port_desc->proto_priv = port_priv;