#include "exec_req.h"
#include <errno.h>
#include "peer.h"
+#include "discovery.h"
/** Version of the protocol */
#define VER_MAJOR 0
return ret;
}
-CORBA_boolean
+static CORBA_boolean
forb_iop_prepare_hello(FORB_CDR_Codec *codec,
const forb_server_id *server_id,
const void *src_addr,
- CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr))
+ CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
+ const CORBA_string orb_id)
{
if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
if (serialize_addr) {
if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
}
+ if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
return CORBA_TRUE;
}
peer = forb_get_next_hop(forb, dest, &timeout);
if (!peer) {
- ul_logerr("Reply destination not found\n");
+ char str[60];
+ forb_server_id_to_string(str, dest, sizeof(str));
+ ul_logerr("Reply destination not found: %s\n", str);
goto err;
+ } else {
+ char str[60];
+ forb_server_id_to_string(str, dest, sizeof(str));
+ ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
}
forb_proto_send(peer, codec);
forb_peer_put(peer);
;
}
-
static void
process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
{
forb_exec_req_t *exec_req;
uint32_t req_size, data_size, header_size;
char str[32];
+ forb_peer_t *peer;
data_size = FORB_CDR_data_size(codec);
ret = forb_iop_request_header_deserialize(codec, &request_header);
request_header.request_id,
request_header.iface,
request_header.method_index);
+
+ if (port->new_peer) {
+ /* Request from a new peer was reported by the underlaying protocol */
+ peer = forb_peer_find(port->forb, &request_header.source);
+ if (peer) {
+ /* We already know this peer */
+ /* TODO: Can it be in FORB_PEER_WANTED state?
+ * If yes, we cannot simply igore this. */
+ ul_logmsg("new_peer was already known\n");
+ forb_peer_put(port->new_peer);
+ port->new_peer = NULL;
+ } else {
+ ul_logdeb("discovered new_peer from incomming connection\n");
+ peer = port->new_peer;
+ port->new_peer = NULL;
+ forb_new_peer_discovered(port, peer, request_header.source,
+ peer->addr, peer->orb_id);
+ }
+
+ }
obj = forb_key_to_object(forb, request_header.objkey);
if (!obj) {
}
n = strlen(request_header.iface);
- if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
+ ret = strncmp(request_header.iface, obj->interface->name, n);
+ forb_free(request_header.iface);
+ request_header.iface = NULL;
+ if (ret != 0) {
env.major = FORB_EX_INV_OBJREF;
ul_logerr("Object reference has incorrect type\n");
goto send_execption;
}
+
if (request_header.method_index >= obj->interface->num_methods) {
env.major = FORB_EX_INV_IDENT;
ul_logerr("To high method number\n");
/* Enqueue execution request */
exec_req = forb_malloc(sizeof(*exec_req));
if (exec_req) {
+ memset(exec_req, 0, sizeof(exec_req));
exec_req->request_id = request_header.request_id;
exec_req->source = request_header.source;
exec_req->obj = obj;
forb_iop_send_reply(port->forb, &request_header.source,
&reply_codec, request_header.request_id, &env);
FORB_CDR_codec_release_buffer(&reply_codec);
+ /* TODO: relese exec_req etc. */
}
static void
void *addr = NULL;
forb_peer_t *peer;
forb_t *forb = port->forb;
+ CORBA_string peer_orb_id = NULL;
/* printf("Hello received at port %p\n", port); */
forb_server_id_deserialize(codec, &server_id);
- {
- char str[60];
- ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
- }
if (port->desc.proto->deserialize_addr) {
port->desc.proto->deserialize_addr(codec, &addr);
}
+ {
+ char str[60], addrstr[60];
+ if (port->desc.proto->addr2str) {
+ port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
+ } else
+ addrstr[0] = 0;
+ ul_logdeb("rcvd hello from %s (addr %s)\n",
+ forb_server_id_to_string(str, &server_id, sizeof(str)),
+ addrstr);
+ }
+ CORBA_string_deserialize(codec, &peer_orb_id);
if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
peer = forb_peer_find(forb, &server_id);
if (peer && peer->state == FORB_PEER_DISCOVERED) {
/* TODO: Update last hello receive time */
if (addr)
forb_free(addr);
+ if (peer_orb_id)
+ forb_free(peer_orb_id);
forb_peer_put(peer);
} else {
- /* New peer discovered */
- bool notify_waiters = false;
- if (peer /* && peer->state == FORB_PEER_WANTED */) {
- notify_waiters = true;
- } else {
- peer = forb_peer_new();
- }
- if (peer) {
- fosa_mutex_lock(&forb->peer_mutex);
- peer->server_id = server_id;
- peer->port = port;
- peer->addr = addr;
- peer->state = FORB_PEER_DISCOVERED;
- if (notify_waiters) {
- fosa_cond_broadcast(&peer->cond);
- } else {
- forb_peer_nolock_insert(forb, forb_peer_get(peer));
- }
- forb_port_peer_ins_tail(port, forb_peer_get(peer));
- fosa_mutex_unlock(&forb->peer_mutex);
- forb_peer_put(peer);
- }
- /* Broadcast our hello packet now */
- forb_syncobj_signal(&port->hello);
+ forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
}
}
}
ul_logmsg("rcvd unknown message type\n");
break;
}
+ if (port->new_peer) {
+ /* If for some reaseon the new_peer was not processed so free it here. */
+ ul_logmsg("Forgotten new_peer\n");
+ forb_peer_put(port->new_peer);
+ port->new_peer = NULL;
+ }
if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
size_t processed = data_size - FORB_CDR_data_size(codec);
ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%d); fixing\n",
mh->message_type, mh->message_size, processed);
- ;
+ ;
codec->rptr += mh->message_size - processed;
}
}
FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
- proto->serialize_addr);
+ proto->serialize_addr, port->forb->attr.orb_id);
/* printf("Broadcasting hello from port %p\n", port); */
proto->broadcast(port, &codec.buffer[codec.rptr],
FORB_CDR_data_size(&codec));
pthread_cleanup_pop(1);
return NULL;
}
-
-