#include "exec_req.h"
#include <errno.h>
#include "peer.h"
+#include "discovery.h"
/** Version of the protocol */
#define VER_MAJOR 0
CORBA_boolean
-forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
+forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
{
CORBA_boolean ret;
forb_iop_message_header mh;
mh.proto_version.minor = VER_MINOR;
mh.message_type = mt;
mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
- mh.message_size = CDR_data_size(codec);
- ret = CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
+ mh.message_size = FORB_CDR_data_size(codec);
+ ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
if (ret) {
ret = forb_iop_message_header_serialize(codec, &mh);
}
CORBA_boolean
forb_iop_prepare_request(forb_request_t *req,
- char *iface,
- unsigned method_ind,
CORBA_Environment *env)
{
CORBA_boolean ret;
forb_iop_request_header rh;
rh.request_id = req->request_id;
- rh.iface = iface;
+ rh.iface = req->interface;
rh.objkey = forb_object_to_key(req->obj);
- rh.method_index = method_ind;
+ rh.method_index = req->method_ind;
rh.source = forb_object_to_forb(req->obj)->server_id;
ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
if (ret) {
/* Request body is 8 byte aligned */
- ret = CDR_put_align(&req->cdr_request, 8);
+ ret = FORB_CDR_put_align(&req->cdr_request, 8);
+ char str[50];
+ ul_logdeb("preparing request: id=%d dest=%s iface=%s method=%d\n", req->request_id,
+ forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
+ rh.iface, rh.method_index);
}
+ req->end_of_header_index = req->cdr_request.wptr;
return ret;
}
-CORBA_boolean
-forb_iop_prepare_hello(CDR_Codec *codec,
+static CORBA_boolean
+forb_iop_prepare_hello(FORB_CDR_Codec *codec,
const forb_server_id *server_id,
const void *src_addr,
- CORBA_boolean (*serialize_addr)(CDR_Codec *codec, const void *addr))
+ CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
+ const CORBA_char *orb_id)
{
if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
if (serialize_addr) {
if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
}
+ if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
+ /* All headers must be 8 byte aligned so align the length of
+ * this message */
+ if (!FORB_CDR_put_align(codec, 8)) return CORBA_FALSE;
if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
return CORBA_TRUE;
}
+int forb_iop_send_hello_to(forb_peer_t *peer)
+{
+ forb_port_t *port = peer->port;
+ FORB_CDR_Codec codec;
+ int ret;
+ FORB_CDR_codec_init_static(&codec, port->forb->orb);
+ if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+ return -1;
+ if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+ ret = -1;
+ goto free;
+ }
+ if (!forb_iop_prepare_hello(&codec, &port->forb->server_id,
+ port->desc.addr,
+ port->desc.proto->serialize_addr,
+ port->forb->attr.orb_id)) {
+ ret = -1;
+ goto free;
+ }
+ ret = forb_proto_send(peer, &codec);
+ if (ret > 0) ret = 0;
+free:
+ FORB_CDR_codec_release_buffer(&codec);
+ return ret;
+}
+
+int
+forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
+{
+ forb_port_t *port = peer->port;
+ FORB_CDR_Codec codec;
+ int ret;
+ FORB_CDR_codec_init_static(&codec, port->forb->orb);
+ if (!FORB_CDR_buffer_init(&codec, 1024, 0))
+ return -1;
+ if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
+ ret = -1;
+ goto free;
+ }
+ if (!forb_iop_prepare_hello(&codec, &peer->server_id,
+ peer->addr,
+ peer->port->desc.proto->serialize_addr,
+ peer->orb_id)) {
+ ret = -1;
+ goto free;
+ }
+ ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
+ ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
+ ret = forb_proto_send(dest, &codec);
+ if (ret > 0) ret = 0;
+free:
+ FORB_CDR_codec_release_buffer(&codec);
+ return ret;
+}
+
bool
-forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
+forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
{
/* FIXME: If we have multiple protocol versions, use different
* type (independent from version) for return value instead of
case forb_iop_HELLO:
break;
default:
+ ul_logerr("rcvd wrong message type: %d\n",
+ mh->message_type);
return false;
}
codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
return true;
break;
default:
+ ul_logerr("rcvd wrong protocol versio: %d.%d\n",
+ mh->proto_version.major, mh->proto_version.minor);
return false;
}
}
static inline CORBA_boolean
-forb_exception_serialize(CDR_Codec *codec, struct forb_env *env)
+forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
{
return CORBA_long_serialize(codec, &env->major);
}
static inline CORBA_boolean
-forb_exception_deserialize(CDR_Codec *codec, struct forb_env *env)
+forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
{
/* TODO: Declare exceptions in IDL and don't typecast here. */
return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
void
forb_iop_send_reply(forb_t *forb,
forb_server_id *dest,
- CDR_Codec *codec,
+ FORB_CDR_Codec *codec,
CORBA_long request_id,
struct forb_env *env)
{
reply_header.request_id = request_id;
reply_header.flags = 0;
- if (forb_exception_occured(env)) {
+ if (forb_exception_occurred(env)) {
reply_header.flags |= forb_iop_FLAG_EXCEPTION;
- CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
- forb_iop_REPLY_HEADER_SIZE);
+ FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
+ forb_iop_REPLY_HEADER_SIZE);
forb_exception_serialize(codec, env);
}
+ /* All headers must be 8 byte aligned so align the length of
+ * this message */
+ if (!FORB_CDR_put_align(codec, 8)) {
+ ul_logerr("Not enough space for tail align\n");
+ return; /* FIXME: handle error (goto above)*/
+ }
/* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
* header is shorter. We want reply data to be 8 byte
* aligned */
- ret = CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
+ ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
if (!ret) {
goto err;
}
peer = forb_get_next_hop(forb, dest, &timeout);
if (!peer) {
- ul_logerr("Reply destination not found\n");
+ char str[60];
+ forb_server_id_to_string(str, dest, sizeof(str));
+ ul_logerr("Reply destination not found: %s\n", str);
goto err;
+ } else {
+ char str[60];
+ forb_server_id_to_string(str, dest, sizeof(str));
+ ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
}
forb_proto_send(peer, codec);
forb_peer_put(peer);
;
}
-
static void
-process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
+process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
{
forb_iop_request_header request_header;
CORBA_boolean ret;
size_t n;
forb_t *forb = port->forb;
struct forb_env env;
- CDR_Codec reply_codec;
+ FORB_CDR_Codec reply_codec;
forb_exec_req_t *exec_req;
uint32_t req_size, data_size, header_size;
char str[32];
+ forb_peer_t *peer;
- data_size = CDR_data_size(codec);
+ data_size = FORB_CDR_data_size(codec);
ret = forb_iop_request_header_deserialize(codec, &request_header);
if (!ret) {
ul_logerr("Malformed request recevied\n");
env.major = FORB_EX_COMM_FAILURE;
goto out;
}
- ret = CDR_get_align(codec, 8);
+ ret = FORB_CDR_get_align(codec, 8);
if (!ret) {
ul_logerr("Malformed request recevied\n");
env.major = FORB_EX_COMM_FAILURE;
goto out;
}
- header_size = data_size - CDR_data_size(codec);
+ header_size = data_size - FORB_CDR_data_size(codec);
req_size = message_size - header_size;
ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
request_header.request_id,
request_header.iface,
request_header.method_index);
+
+ if (port->new_peer) {
+ /* Request from a new peer was reported by the underlaying protocol */
+ peer = forb_peer_find(port->forb, &request_header.source);
+ if (peer) {
+ /* We already know this peer */
+ /* TODO: Can it be in FORB_PEER_WANTED state?
+ * If yes, we cannot simply igore this. */
+ ul_logmsg("new_peer was already known\n");
+ forb_peer_put(port->new_peer);
+ port->new_peer = NULL;
+ } else {
+ ul_logdeb("discovered new_peer from incomming connection\n");
+ peer = port->new_peer;
+ port->new_peer = NULL;
+ forb_new_peer_discovered(port, peer, request_header.source,
+ peer->addr, peer->orb_id);
+ }
+
+ }
obj = forb_key_to_object(forb, request_header.objkey);
if (!obj) {
}
n = strlen(request_header.iface);
- if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
+ ret = strncmp(request_header.iface, obj->interface->name, n);
+ forb_free(request_header.iface);
+ request_header.iface = NULL;
+ if (ret != 0) {
env.major = FORB_EX_INV_OBJREF;
ul_logerr("Object reference has incorrect type\n");
goto send_execption;
}
+
if (request_header.method_index >= obj->interface->num_methods) {
env.major = FORB_EX_INV_IDENT;
ul_logerr("To high method number\n");
/* Enqueue execution request */
exec_req = forb_malloc(sizeof(*exec_req));
if (exec_req) {
+ memset(exec_req, 0, sizeof(exec_req));
+ exec_req->request_type = FORB_EXEC_REQ_REMOTE;
exec_req->request_id = request_header.request_id;
exec_req->source = request_header.source;
exec_req->obj = obj;
exec_req->method_index = request_header.method_index;
/* Copy the request to exec_req */
- CDR_codec_init_static(&exec_req->codec);
- ret = CDR_buffer_init(&exec_req->codec,
+ FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
+ ret = FORB_CDR_buffer_init(&exec_req->codec,
req_size,
0);
if (!ret) {
goto send_execption;
}
exec_req->codec.data_endian = codec->data_endian;
- CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
+ FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
/* TODO: Use better data structure for incomming
buffer to achieve zero-copy behaviour. */
forb_exec_req_ins_tail(obj->executor, exec_req);
return;
send_execption:
- CDR_codec_init_static(&reply_codec);
- ret = CDR_buffer_init(&reply_codec, 4096,
+ FORB_CDR_codec_init_static(&reply_codec, codec->orb);
+ ret = FORB_CDR_buffer_init(&reply_codec, 4096,
forb_iop_MESSAGE_HEADER_SIZE +
forb_iop_REPLY_HEADER_SIZE);
if (!ret) {
forb_iop_send_reply(port->forb, &request_header.source,
&reply_codec, request_header.request_id, &env);
- CDR_codec_release_buffer(&reply_codec);
+ FORB_CDR_codec_release_buffer(&reply_codec);
+ /* TODO: relese exec_req etc. */
}
static void
-process_reply(forb_port_t *port, CDR_Codec *codec)
+process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
{
forb_iop_reply_header rh;
forb_t *forb = port->forb;
forb_iop_reply_header_deserialize(codec, &rh);
/* Reply data are 8 byte aligned */
- CDR_get_align(codec, 8);
+ FORB_CDR_get_align(codec, 8);
ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
req = forb_request_find(forb, &rh.request_id);
if (!req) {
* @param codec Buffer with the hello message
*/
static void
-process_hello(forb_port_t *port, CDR_Codec *codec)
+process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
{
forb_server_id server_id;
void *addr = NULL;
forb_peer_t *peer;
forb_t *forb = port->forb;
+ CORBA_string peer_orb_id = NULL;
/* printf("Hello received at port %p\n", port); */
forb_server_id_deserialize(codec, &server_id);
- {
- char str[60];
- ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
+ if (port->desc.proto->deserialize_addr) {
+ port->desc.proto->deserialize_addr(codec, &addr);
}
- if (port->proto->deserialize_addr) {
- port->proto->deserialize_addr(codec, &addr);
+ {
+ char str[60], addrstr[60];
+ if (port->desc.proto->addr2str) {
+ port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
+ } else
+ addrstr[0] = 0;
+ ul_logdeb("rcvd hello from %s (addr %s)\n",
+ forb_server_id_to_string(str, &server_id, sizeof(str)),
+ addrstr);
}
+ CORBA_string_deserialize(codec, &peer_orb_id);
if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
peer = forb_peer_find(forb, &server_id);
if (peer && peer->state == FORB_PEER_DISCOVERED) {
/* TODO: Update last hello receive time */
- forb_peer_put(peer);
- } else {
- /* New peer discovered */
- bool notify_waiters = false;
- if (peer /* && peer->state == FORB_PEER_WANTED */) {
- notify_waiters = true;
+ if (port->new_peer) {
+ ul_logdeb("peer already discovered but not connected - replacing\n");
+ forb_peer_disconnected(peer);
+ forb_peer_put(peer);
+ peer = port->new_peer;
+ port->new_peer = NULL;
+ forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
} else {
- peer = forb_peer_new();
+ ul_logdeb("peer already discovered - ignoring\n");
+ if (addr)
+ forb_free(addr);
+ if (peer_orb_id)
+ forb_free(peer_orb_id);
+ forb_peer_put(peer);
}
- if (peer) {
- fosa_mutex_lock(&forb->peer_mutex);
- peer->server_id = server_id;
- peer->port = port;
- peer->addr = addr;
- peer->state = FORB_PEER_DISCOVERED;
- if (notify_waiters) {
- fosa_cond_broadcast(&peer->cond);
- } else {
- forb_peer_get(peer);
- forb_peer_nolock_insert(forb, peer);
+ } else {
+ if (port->new_peer) {
+ if (peer) {
+ ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
+ forb_peer_put(peer);
}
- forb_peer_get(peer);
- forb_port_peer_ins_tail(port, peer);
- fosa_mutex_unlock(&forb->peer_mutex);
- forb_peer_put(peer);
+ peer = port->new_peer;
+ port->new_peer = NULL;
}
- /* Broadcast our hello packet now */
- forb_syncobj_signal(&port->hello);
+
+ forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
}
}
}
static void
process_message(forb_port_t *port, const forb_iop_message_header *mh,
- CDR_Codec *codec)
+ FORB_CDR_Codec *codec)
{
- CORBA_long data_size = CDR_data_size(codec);
+ CORBA_long data_size = FORB_CDR_data_size(codec);
/* TODO: Check destination address, whether the message is for
* us or should be routed. */
ul_logmsg("rcvd unknown message type\n");
break;
}
- if (CDR_data_size(codec) != data_size - mh->message_size) {
- ul_logerr("Message of type %d handled incorrectly (size=%d, processed=%d)\n",
- mh->message_type, mh->message_size,
- data_size - CDR_data_size(codec));
+ if (port->new_peer) {
+ /* If for some reaseon the new_peer was not processed so free it here. */
+ ul_logmsg("Forgotten new_peer\n");
+ forb_peer_put(port->new_peer);
+ port->new_peer = NULL;
+ }
+
+ FORB_CDR_get_align(codec, 8);
+
+ if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
+ size_t processed = data_size - FORB_CDR_data_size(codec);
+ ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%zu); fixing\n",
+ mh->message_type, mh->message_size, processed);
+ ;
+ codec->rptr += mh->message_size - processed;
}
}
void *forb_iop_receiver_thread(void *arg)
{
forb_port_t *port = arg;
- const forb_proto_t *proto = port->proto;
- CDR_Codec *c = &port->codec;
- size_t rcvd, len;
+ const forb_proto_t *proto = port->desc.proto;
+ FORB_CDR_Codec *c = &port->codec;
+ ssize_t rcvd;
+ size_t len;
forb_iop_message_header mh;
bool header_received = false;
+ int oldstate;
while (!port->finish) {
if (c->rptr == c->wptr) {
/* The buffer is empty now - start writing from begining*/
- CDR_buffer_reset(c, 0);
+ FORB_CDR_buffer_reset(c, 0);
}
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
/* TODO: If there is not enough space for reception,
* we should shift the already received data to the
* beginning of the buffer. */
rcvd = proto->recv(port,
&c->buffer[c->wptr],
c->wptr_max - c->wptr);
+ if (rcvd < 0) {
+ ul_logerr("recv returned error %zd (%s), exiting\n", rcvd, strerror(errno));
+ return NULL;
+ }
c->wptr += rcvd;
c->wptr_last = c->wptr;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
+
/* While there are some data in the buffer, process them. */
- while (CDR_data_size(c) > 0) {
- len = CDR_data_size(c);
- /* Wait for and then process message header */
+ while (FORB_CDR_data_size(c) > 0) {
+ len = FORB_CDR_data_size(c);
+ /* Find and process message header (if there is any) */
if (!header_received) {
if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
if (c->rptr % 8 != 0) {
ul_logerr("Header doesn't start at 8 byte bounday\n");
}
header_received = forb_iop_process_message_header(&mh, c);
- len = CDR_data_size(c);
+ if (!header_received) {
+ ul_logerr("Wrong header received\n");
+ /* TODO: We should probably reset the buffer here */
+ }
+ len = FORB_CDR_data_size(c);
} else {
break; /* Wait for more data to arrive*/
}
}
- /* Wait for and then process the message body */
+ /* If the whole message is received, then process it */
if (header_received) {
if (len >= mh.message_size) {
process_message(port, &mh, c);
static void
discovery_cleanup(void *codec)
{
- CDR_codec_release_buffer((CDR_Codec*)codec);
+ FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
/* TODO: Broadcast some kind of bye bye message */
}
void *forb_iop_discovery_thread(void *arg)
{
forb_port_t *port = arg;
- const forb_proto_t *proto = port->proto;
- CDR_Codec codec;
+ const forb_proto_t *proto = port->desc.proto;
+ FORB_CDR_Codec codec;
fosa_abs_time_t hello_time;
fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
int ret;
-
- CDR_codec_init_static(&codec);
- CDR_buffer_init(&codec, 1024, 0);
+
+ FORB_CDR_codec_init_static(&codec, port->forb->orb);
+ FORB_CDR_buffer_init(&codec, 1024, 0);
pthread_cleanup_push(discovery_cleanup, &codec);
if (port->finish) break;
- CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
- forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
- proto->serialize_addr);
+ FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
+ forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
+ proto->serialize_addr, port->forb->attr.orb_id);
/* printf("Broadcasting hello from port %p\n", port); */
proto->broadcast(port, &codec.buffer[codec.rptr],
- CDR_data_size(&codec));
+ FORB_CDR_data_size(&codec));
}
pthread_cleanup_pop(1);
return NULL;
}
+/**
+ * Sends REQUEST to another object.
+ *
+ * The request @a req has to be prepared previously by calling
+ * forb_iop_prepare_request(). Then, when the request destination is
+ * remote, this function adds a message header, connects to the
+ * destination FORB and sends the request. In the case of local
+ * request, it is directly enqueued into the executor's queue.
+ *
+ * If no exception is reported, then the caller must wait for response
+ * by calling forb_wait_for_reply().
+ *
+ * @param req A request prepared by forb_iop_prepare_request()
+ * @param env Environment for returning exceptions
+ */
+void
+forb_request_send(forb_request_t *req, CORBA_Environment *env)
+{
+ CORBA_boolean ret;
+ forb_peer_t *peer;
+ ssize_t size;
+ size_t len;
+ fosa_abs_time_t timeout;
+ forb_t *forb = forb_object_to_forb(req->obj);
+ forb_exec_req_t *exec_req;
+
+ if (!forb) {
+ env->major = FORB_EX_INTERNAL;
+ return;
+ }
+
+ req->env = env; /* Remember, where to return exceptions */
+
+ /* All headers must be 8 byte aligned so align the length of
+ * this message */
+ if (!FORB_CDR_put_align(&req->cdr_request, 8)) {
+ env->major = FORB_EX_INTERNAL;
+ ul_logerr("Not enough space for tail align\n");
+ return;
+ }
+ /* Local invocation case, destination of a message is only
+ * a different executor thread */
+ if (forb_object_is_local(req->obj)) {
+ exec_req = forb_malloc(sizeof(*exec_req));
+ memset(exec_req, 0, sizeof(exec_req));
+ exec_req->request_type = FORB_EXEC_REQ_LOCAL;
+ exec_req->input_request = req;
+ exec_req->obj = forb_object_duplicate(req->obj);
+ exec_req->method_index = req->method_ind;
+ exec_req->interface = req->interface;
+ req->cdr_request.rptr = req->end_of_header_index;
+ exec_req->codec = req->cdr_request;
+ req->cdr_request.release_buffer = CORBA_FALSE;
+ exec_req->request_id = req->request_id;
+ forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
+ return;
+ }
+
+ ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
+ if (!ret) {
+ /* This should never happen */
+ env->major = FORB_EX_INTERNAL;
+ return;
+ }
+
+ fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+ timeout = fosa_abs_time_incr(timeout,
+ fosa_msec_to_rel_time(1000));
+ peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
+ if (!peer) {
+ char str[50];
+ ul_logerr("Cannot find peer to send request for server %s\n",
+ forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+ env->major = FORB_EX_COMM_FAILURE;
+ return;
+ }
+ /* Register the request with forb so we can match incomming
+ * reply to this request. */
+ ret = forb_request_insert(forb, req);
+ if (ret <= 0) {
+ ul_logerr("Insert request error %d\n", ret);
+ env->major = FORB_EX_INTERNAL;
+ goto err_peer_put;
+ }
+
+ {
+ char str[50];
+ ul_logdeb("sending request: id=%d dest=%s\n", req->request_id,
+ forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+ }
+ len = FORB_CDR_data_size(&req->cdr_request);
+ fosa_mutex_lock(&peer->send_lock);
+ size = forb_proto_send(peer, &req->cdr_request);
+ fosa_mutex_unlock(&peer->send_lock);
+ if (size <= 0 || size != len) {
+ env->major = FORB_EX_COMM_FAILURE;
+ /* Request is deleted when the stub calls forb_request_destroy() */
+ }
+ err_peer_put:
+ forb_peer_put(peer);
+}