/* TODO */
}
+/**
+ * Process incomming HELLO messages.
+ *
+ * For every incomming HELLO message the peer table is searched
+ * whether it already contains a record for that peer or not. If not,
+ * the new peer is added to the table and another hello message is
+ * sent so that the new peer discovers us quickly.
+ *
+ * @param port
+ * @param codec
+ */
static void
process_hello(forb_port_t *port, CDR_Codec *codec)
{
forb_peer_t *peer;
forb_t *forb = port->forb;
- printf("Hello received at port %p\n", port);
+/* printf("Hello received at port %p\n", port); */
forb_server_id_deserialize(codec, &server_id);
if (port->proto->deserialize_addr) {
port->proto->deserialize_addr(codec, &addr);
}
- peer = forb_peer_find(forb, &server_id);
- if (peer) {
- /* TODO: Update last hello receive time */
- } else {
- /* New peer discovered */
- peer = forb_malloc(sizeof(*peer));
- peer->server_id = server_id;
- peer->port = port;
- peer->addr = addr;
- forb_peer_insert(forb, peer);
+ if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
+ peer = forb_peer_find(forb, &server_id);
+ if (peer) {
+ /* TODO: Update last hello receive time */
+ } else {
+ /* New peer discovered */
+/* char str[100]; */
+/* printf("New peer %s discovered port %p\n", */
+/* forb_server_id_to_string(str, &server_id, sizeof(str)), port); */
+ peer = forb_malloc(sizeof(*peer));
+ peer->server_id = server_id;
+ peer->port = port;
+ peer->addr = addr;
+ forb_peer_insert(forb, peer);
+
+ fosa_mutex_lock(&port->hello_mutex);
+ fosa_cond_signal(&port->hello_cond);
+ fosa_mutex_unlock(&port->hello_mutex);
+ }
}
-
}
static void
&c->buffer[c->wptr],
c->wptr_max - c->wptr);
c->wptr += rcvd;
+ c->wptr_last = c->wptr;
/* While there are some data in the buffer, process them. */
- while (c->wptr - c->rptr > 0) {
- len = c->wptr - c->rptr;
+ while (CDR_data_size(c) > 0) {
+ len = CDR_data_size(c);
/* Wait for and then process message header */
if (!header_received) {
if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
header_received = forb_iop_process_message_header(&mh, c);
- len = c->wptr - c->rptr;
+ len = CDR_data_size(c);
} else {
break; /* Wait for more data to arrive*/
}
/**
* Thread run for every port to broadcast HELLO messages. These
- * messages are used for a FORB to discover all peers and detect their
- * disconnection.
+ * messages are used for a FORB to discover all peers (and in future
+ * also to detect their disconnection).
*
* @param arg Pointer to ::forb_port_t typecasted to void *.
*
forb_port_t *port = arg;
const forb_proto_t *proto = port->proto;
CDR_Codec codec;
-
+ fosa_abs_time_t hello_time;
+ fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
+
CDR_codec_init_static(&codec);
CDR_buffer_init(&codec, 1024, 0);
-
+
+
+ fosa_clock_get_time(FOSA_CLOCK_REALTIME, &hello_time);
+
while (!port->finish) {
CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
proto->serialize_addr);
- printf("Broadcasting hello from port %p\n", port);
+/* printf("Broadcasting hello from port %p\n", port); */
proto->broadcast(port, &codec.buffer[codec.rptr],
- codec.wptr - codec.rptr);
- sleep(proto->hello_interval);
+ CDR_data_size(&codec));
+
+ /* Wait for next hello interval or until somebody
+ * signal us. */
+ fosa_abs_time_incr(hello_time, hello_interval);
+ /* sem_timedwait would be more appropriate */
+ fosa_mutex_lock(&port->hello_mutex);
+ fosa_cond_timedwait(&port->hello_cond, &port->hello_mutex,
+ &hello_time);
+ fosa_mutex_unlock(&port->hello_mutex);
}
CDR_codec_release_buffer(&codec);
return NULL;
forb_port_insert(forb, port);
fosa_mutex_unlock(&forb->port_mutex);
+ fosa_mutex_init(&port->hello_mutex, 0);
+ fosa_cond_init(&port->hello_cond);
+
CDR_codec_init_static(&port->codec);
if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
ret = FOSA_ENOMEM;
forb_t *forb = port->forb;
port->finish = true; /* Exit all the threads */
- fosa_mutex_lock(&forb->port_mutex);
- forb_port_delete(forb, port);
- fosa_mutex_unlock(&forb->port_mutex);
+
+/* fosa_mutex_lock(&forb->port_mutex); */
+/* forb_port_delete(forb, port); */
+/* fosa_mutex_unlock(&forb->port_mutex); */
/* TODO: Wait for the thread to finish - we need some FOSA API for this */
//fosa_cond_wait(port->threads)
size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
{
return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
- codec->wptr - codec->rptr);
+ CDR_data_size(codec));
}