#include <forb/forb-idl.h>
#include <forb/iop.h>
#include <forb/config.h>
+#include <stdio.h>
GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
forb_t, /* cust_root_t */
forb_server_id_cmp)/* cust_cmp_fnc */
+static void
+process_request(forb_port_t *port, CDR_Codec *codec)
+{
+ forb_iop_request_header rh;
+ forb_iop_request_header_deserialize(codec, &rh);
+}
+
+static void
+process_reply(forb_port_t *port, CDR_Codec *codec)
+{
+ forb_iop_reply_header rh;
+ forb_iop_reply_header_deserialize(codec, &rh);
+ /* TODO */
+}
+
+static void
+process_hello(forb_port_t *port, CDR_Codec *codec)
+{
+ forb_server_id server_id;
+ void *addr = NULL;
+ forb_peer_t *peer;
+ forb_t *forb = port->forb;
+
+ printf("Hello received at port %p\n", port);
+
+ forb_server_id_deserialize(codec, &server_id);
+ if (port->proto->deserialize_addr) {
+ port->proto->deserialize_addr(codec, &addr);
+ }
+ peer = forb_peer_find(forb, &server_id);
+ if (peer) {
+ /* TODO: Update last hello receive time */
+ } else {
+ /* New peer discovered */
+ peer = forb_malloc(sizeof(*peer));
+ peer->server_id = server_id;
+ peer->port = port;
+ peer->addr = addr;
+ forb_peer_insert(forb, peer);
+ }
+
+}
+
+static void
+process_message(forb_port_t *port, forb_iop_message_header *mh, CDR_Codec *codec)
+{
+ switch (mh->message_type) {
+ case forb_iop_REQUEST:
+ process_request(port, codec);
+ break;
+ case forb_iop_REPLY:
+ process_reply(port, codec);
+ break;
+ case forb_iop_HELLO:
+ process_hello(port, codec);
+ break;
+ default:
+ break;
+ }
+}
+
/**
* Thread run for every port to receive FORB messages from that port.
*
bool header_received = false;
while (!port->finish) {
+ if (c->rptr == c->wptr) {
+ /* The buffer is empty now - start writing from begining*/
+ CDR_buffer_reset(c, 0);
+ }
rcvd = proto->recv(port,
&c->buffer[c->wptr],
c->wptr_max - c->wptr);
c->wptr += rcvd;
- len = c->wptr - c->rptr;
- /* Wait for and then process message header */
- if (!header_received && len >= forb_iop_MESSAGE_HEADER_SIZE) {
- header_received = forb_iop_process_message_header(&mh, c);
- if (!header_received) { /* Reset the receiving buffer */
- c->rptr = c->wptr = 0;
- }
- }
- len = c->wptr - c->rptr;
- /* Wait for and then process the message body */
- if (header_received && len >= mh.message_size) {
- forb_iop_process_message(&mh, c);
- header_received = false;
- if (c->rptr == c->wptr) {
- /* The buffer is empty now */
- c->rptr = c->wptr = 0;
+
+ /* While there are some data in the buffer, process them. */
+ while (c->wptr - c->rptr > 0) {
+ len = c->wptr - c->rptr;
+ /* Wait for and then process message header */
+ if (!header_received) {
+ if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
+ header_received = forb_iop_process_message_header(&mh, c);
+ len = c->wptr - c->rptr;
+ } else {
+ break; /* Wait for more data to arrive*/
+ }
+ }
+
+ /* Wait for and then process the message body */
+ if (header_received) {
+ if (len >= mh.message_size) {
+ process_message(port, &mh, c);
+ /* Wait for the next message */
+ header_received = false;
+ } else {
+ break; /* Wait for more data to arrive*/
+ }
}
}
}
*
* @return Always NULL
*/
-static void *port_discovery_thread(void *arg)
-{
- forb_port_t *port = arg;
- const forb_proto_t *proto = port->proto;
- CDR_Codec codec;
+ static void *port_discovery_thread(void *arg)
+ {
+ forb_port_t *port = arg;
+ const forb_proto_t *proto = port->proto;
+ CDR_Codec codec;
- CDR_codec_init_static(&codec);
- CDR_buffer_init(&codec, 1024, 0);
+ CDR_codec_init_static(&codec);
+ CDR_buffer_init(&codec, 1024, 0);
- while (!port->finish) {
- CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
- forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
- proto->serialize_addr);
- proto->broadcast(port, &codec.buffer[codec.rptr],
- codec.wptr - codec.rptr);
- sleep(proto->hello_interval);
+ while (!port->finish) {
+ CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
+ forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
+ proto->serialize_addr);
+ printf("Broadcasting hello from port %p\n", port);
+ proto->broadcast(port, &codec.buffer[codec.rptr],
+ codec.wptr - codec.rptr);
+ sleep(proto->hello_interval);
+ }
+ CDR_codec_release_buffer(&codec);
+ return NULL;
}
- CDR_codec_release_buffer(&codec);
- return NULL;
-}
/**
* Registers a new port in FORB and run receiver thread on it to
*
* @return Zero on success, FOSA error code on error.
*/
-int forb_register_port(forb_t *forb, forb_port_t *port)
-{
- int ret;
+ int forb_register_port(forb_t *forb, forb_port_t *port)
+ {
+ int ret;
- port->forb = forb;
- port->finish = false;
+ port->forb = forb;
+ port->finish = false;
- fosa_mutex_lock(&forb->port_mutex);
- forb_port_insert(forb, port);
- fosa_mutex_unlock(&forb->port_mutex);
-
- CDR_codec_init_static(&port->codec);
- if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
- ret = FOSA_ENOMEM;
- goto err;
- }
- ret = fosa_thread_create(&port->receiver_thread, NULL,
- port_receiver_thread, port);
- if (ret != 0)
- goto err2;
- ret = fosa_thread_create(&port->discovery_thread, NULL,
- port_discovery_thread, port);
- if (ret != 0)
- goto err3;
- return 0;
-err3:
- port->finish = true;
- /* TODO: Wait for the thread to finish - we need some FOSA API for this */
-err2:
- CDR_codec_release_buffer(&port->codec);
-err:
- fosa_mutex_lock(&forb->port_mutex);
- forb_port_delete(forb, port);
- fosa_mutex_unlock(&forb->port_mutex);
+ fosa_mutex_lock(&forb->port_mutex);
+ forb_port_insert(forb, port);
+ fosa_mutex_unlock(&forb->port_mutex);
+
+ CDR_codec_init_static(&port->codec);
+ if (!CDR_buffer_init(&port->codec, CONFIG_FORB_RECV_BUF_SIZE, 0)) {
+ ret = FOSA_ENOMEM;
+ goto err;
+ }
+ ret = fosa_thread_create(&port->receiver_thread, NULL,
+ port_receiver_thread, port);
+ if (ret != 0)
+ goto err2;
+ ret = fosa_thread_create(&port->discovery_thread, NULL,
+ port_discovery_thread, port);
+ if (ret != 0)
+ goto err3;
+ return 0;
+ err3:
+ port->finish = true;
+ /* TODO: Wait for the thread to finish - we need some FOSA API for this */
+ err2:
+ CDR_codec_release_buffer(&port->codec);
+ err:
+ fosa_mutex_lock(&forb->port_mutex);
+ forb_port_delete(forb, port);
+ fosa_mutex_unlock(&forb->port_mutex);
- return ret;
-}
+ return ret;
+ }
-void forb_destroy_port(forb_port_t *port)
-{
- forb_peer_t *peer;
- forb_t *forb = port->forb;
+ void forb_destroy_port(forb_port_t *port)
+ {
+ forb_peer_t *peer;
+ forb_t *forb = port->forb;
- port->finish = true; /* Exit all the threads */
- fosa_mutex_lock(&forb->port_mutex);
- forb_port_delete(forb, port);
- fosa_mutex_unlock(&forb->port_mutex);
+ port->finish = true; /* Exit all the threads */
+ fosa_mutex_lock(&forb->port_mutex);
+ forb_port_delete(forb, port);
+ fosa_mutex_unlock(&forb->port_mutex);
- /* TODO: Wait for the thread to finish - we need some FOSA API for this */
- //fosa_cond_wait(port->threads)
+ /* TODO: Wait for the thread to finish - we need some FOSA API for this */
+ //fosa_cond_wait(port->threads)
- if (port->proto->port_destroy) {
- port->proto->port_destroy(port);
- }
+ if (port->proto->port_destroy) {
+ port->proto->port_destroy(port);
+ }
- fosa_mutex_lock(&port->forb->peer_mutex);
- gavl_cust_for_each(forb_peer_nolock, forb, peer) {
- if (peer->port == port) {
- if (port->proto->peer_destroy) {
- port->proto->peer_destroy(peer);
+ fosa_mutex_lock(&port->forb->peer_mutex);
+ gavl_cust_for_each(forb_peer_nolock, forb, peer) {
+ if (peer->port == port) {
+ if (port->proto->peer_destroy) {
+ port->proto->peer_destroy(peer);
+ }
+ /* TODO: Reference counting */
+ forb_free(peer);
}
- /* TODO: Reference counting */
- forb_free(peer);
}
- }
- fosa_mutex_unlock(&port->forb->peer_mutex);
+ fosa_mutex_unlock(&port->forb->peer_mutex);
- /* TODO: reference counting */
- forb_free(port);
-}
+ /* TODO: reference counting */
+ forb_free(port);
+ }
-size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
-{
- return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
- codec->wptr - codec->rptr);
-}
+ size_t forb_proto_send(forb_peer_t *peer, CDR_Codec *codec)
+ {
+ return peer->port->proto->send(peer, &codec->buffer[codec->rptr],
+ codec->wptr - codec->rptr);
+ }
-forb_peer_t *
-forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
-{
- forb_peer_t *peer;
- peer = forb_peer_find(forb, server_id);
- return peer;
+ forb_peer_t *
+ forb_get_next_hop(forb_t *forb, forb_server_id *server_id)
+ {
+ forb_peer_t *peer;
+ peer = forb_peer_find(forb, server_id);
+ return peer;
-}
+ }