1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Sep 1 21:51:58 2008
52 * @brief Implementation of Inter-ORB protocol.
58 #include <forb/object.h>
64 #include "discovery.h"
66 /** Version of the protocol */
70 #define VER(major, minor) ((major)<<8 || (minor))
72 extern UL_LOG_CUST(ulogd_forb_iop);
76 forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
79 forb_iop_message_header mh;
80 mh.proto_version.major = VER_MAJOR;
81 mh.proto_version.minor = VER_MINOR;
83 mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
84 mh.message_size = FORB_CDR_data_size(codec);
85 ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
87 ret = forb_iop_message_header_serialize(codec, &mh);
93 forb_iop_prepare_request(forb_request_t *req,
94 CORBA_Environment *env)
97 forb_iop_request_header rh;
99 rh.request_id = req->request_id;
100 rh.iface = req->interface;
101 rh.objkey = forb_object_to_key(req->obj);
102 rh.method_index = req->method_ind;
103 rh.source = forb_object_to_forb(req->obj)->server_id;
104 ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
106 /* Request body is 8 byte aligned */
107 ret = FORB_CDR_put_align(&req->cdr_request, 8);
109 ul_logdeb("preparing request: id=%d dest=%s iface=%s method=%d\n", req->request_id,
110 forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
111 rh.iface, rh.method_index);
113 req->end_of_header_index = req->cdr_request.wptr;
118 forb_iop_prepare_hello(FORB_CDR_Codec *codec,
119 const forb_server_id *server_id,
120 const void *src_addr,
121 CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
122 const CORBA_char *orb_id)
124 if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
125 if (serialize_addr) {
126 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
128 if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
129 /* All headers must be 8 byte aligned so align the length of
131 if (!FORB_CDR_put_align(codec, 8)) return CORBA_FALSE;
132 if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
136 int forb_iop_send_hello_to(forb_peer_t *peer)
138 forb_port_t *port = peer->port;
139 FORB_CDR_Codec codec;
141 FORB_CDR_codec_init_static(&codec, port->forb->orb);
142 if (!FORB_CDR_buffer_init(&codec, 1024, 0))
144 if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
148 if (!forb_iop_prepare_hello(&codec, &port->forb->server_id,
150 port->desc.proto->serialize_addr,
151 port->forb->attr.orb_id)) {
155 ret = forb_proto_send(peer, &codec);
156 if (ret > 0) ret = 0;
158 FORB_CDR_codec_release_buffer(&codec);
163 forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
165 forb_port_t *port = peer->port;
166 FORB_CDR_Codec codec;
168 FORB_CDR_codec_init_static(&codec, port->forb->orb);
169 if (!FORB_CDR_buffer_init(&codec, 1024, 0))
171 if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
175 if (!forb_iop_prepare_hello(&codec, &peer->server_id,
177 peer->port->desc.proto->serialize_addr,
182 ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
183 ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
184 ret = forb_proto_send(dest, &codec);
185 if (ret > 0) ret = 0;
187 FORB_CDR_codec_release_buffer(&codec);
192 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
194 /* FIXME: If we have multiple protocol versions, use different
195 * type (independent from version) for return value instead of
197 forb_iop_version_deserialize(codec, &mh->proto_version);
198 switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
199 case VER(VER_MAJOR, VER_MINOR):
200 forb_iop_message_type_deserialize(codec, &mh->message_type);
201 forb_iop_message_flags_deserialize(codec, &mh->flags);
202 /* Check whwther the type is meaningfull */
203 switch (mh->message_type) {
204 case forb_iop_REQUEST:
209 ul_logerr("rcvd wrong message type: %d\n",
213 codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
214 LittleEndian : BigEndian;
215 CORBA_unsigned_long_deserialize(codec, &mh->message_size);
219 ul_logerr("rcvd wrong protocol versio: %d.%d\n",
220 mh->proto_version.major, mh->proto_version.minor);
225 static inline CORBA_boolean
226 forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
228 return CORBA_long_serialize(codec, &env->major);
231 static inline CORBA_boolean
232 forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
234 /* TODO: Declare exceptions in IDL and don't typecast here. */
235 return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
239 forb_iop_send_reply(forb_t *forb,
240 forb_server_id *dest,
241 FORB_CDR_Codec *codec,
242 CORBA_long request_id,
243 struct forb_env *env)
245 forb_iop_reply_header reply_header;
248 fosa_abs_time_t timeout;
250 reply_header.request_id = request_id;
251 reply_header.flags = 0;
252 if (forb_exception_occurred(env)) {
253 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
254 FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
255 forb_iop_REPLY_HEADER_SIZE);
256 forb_exception_serialize(codec, env);
258 /* All headers must be 8 byte aligned so align the length of
260 if (!FORB_CDR_put_align(codec, 8)) {
261 ul_logerr("Not enough space for tail align\n");
262 return; /* FIXME: handle error (goto above)*/
264 /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
265 * header is shorter. We want reply data to be 8 byte
267 ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
271 ret = forb_iop_reply_header_serialize(codec, &reply_header);
275 forb_iop_prepend_message_header(codec, forb_iop_REPLY);
277 fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
278 timeout = fosa_abs_time_incr(timeout,
279 fosa_msec_to_rel_time(1000));
281 peer = forb_get_next_hop(forb, dest, &timeout);
284 forb_server_id_to_string(str, dest, sizeof(str));
285 ul_logerr("Reply destination not found: %s\n", str);
289 forb_server_id_to_string(str, dest, sizeof(str));
290 ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
292 forb_proto_send(peer, codec);
299 process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
301 forb_iop_request_header request_header;
305 forb_t *forb = port->forb;
307 FORB_CDR_Codec reply_codec;
308 forb_exec_req_t *exec_req;
309 uint32_t req_size, data_size, header_size;
313 data_size = FORB_CDR_data_size(codec);
314 ret = forb_iop_request_header_deserialize(codec, &request_header);
316 ul_logerr("Malformed request recevied\n");
317 env.major = FORB_EX_COMM_FAILURE;
320 ret = FORB_CDR_get_align(codec, 8);
322 ul_logerr("Malformed request recevied\n");
323 env.major = FORB_EX_COMM_FAILURE;
327 header_size = data_size - FORB_CDR_data_size(codec);
328 req_size = message_size - header_size;
330 ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
331 forb_server_id_to_string(str, &request_header.source, sizeof(str)),
332 request_header.request_id,
333 request_header.iface,
334 request_header.method_index);
336 if (port->new_peer) {
337 /* Request from a new peer was reported by the underlaying protocol */
338 peer = forb_peer_find(port->forb, &request_header.source);
340 /* We already know this peer */
341 /* TODO: Can it be in FORB_PEER_WANTED state?
342 * If yes, we cannot simply igore this. */
343 ul_logmsg("new_peer was already known\n");
344 forb_peer_put(port->new_peer);
345 port->new_peer = NULL;
347 ul_logdeb("discovered new_peer from incomming connection\n");
348 peer = port->new_peer;
349 port->new_peer = NULL;
350 forb_new_peer_discovered(port, peer, request_header.source,
351 peer->addr, peer->orb_id);
356 obj = forb_key_to_object(forb, request_header.objkey);
358 ul_logerr("Nonexistent object key\n");
359 env.major = FORB_EX_OBJECT_NOT_EXIST;
363 n = strlen(request_header.iface);
364 ret = strncmp(request_header.iface, obj->interface->name, n);
365 forb_free(request_header.iface);
366 request_header.iface = NULL;
368 env.major = FORB_EX_INV_OBJREF;
369 ul_logerr("Object reference has incorrect type\n");
373 if (request_header.method_index >= obj->interface->num_methods) {
374 env.major = FORB_EX_INV_IDENT;
375 ul_logerr("To high method number\n");
379 if (!obj->executor) {
380 env.major = FORB_EX_NO_IMPLEMENT;
381 ul_logerr("No executor for object\n");
386 /* Enqueue execution request */
387 exec_req = forb_malloc(sizeof(*exec_req));
389 memset(exec_req, 0, sizeof(exec_req));
390 exec_req->request_type = FORB_EXEC_REQ_REMOTE;
391 exec_req->request_id = request_header.request_id;
392 exec_req->source = request_header.source;
394 exec_req->method_index = request_header.method_index;
395 /* Copy the request to exec_req */
396 FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
397 ret = FORB_CDR_buffer_init(&exec_req->codec,
401 env.major = FORB_EX_NO_MEMORY;
402 ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
403 req_size, header_size);
406 exec_req->codec.data_endian = codec->data_endian;
407 FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
408 /* TODO: Use better data structure for incomming
409 buffer to achieve zero-copy behaviour. */
410 forb_exec_req_ins_tail(obj->executor, exec_req);
412 env.major = FORB_EX_NO_MEMORY;
413 ul_logerr("No memory for executor request\n");
421 FORB_CDR_codec_init_static(&reply_codec, codec->orb);
422 ret = FORB_CDR_buffer_init(&reply_codec, 4096,
423 forb_iop_MESSAGE_HEADER_SIZE +
424 forb_iop_REPLY_HEADER_SIZE);
426 ul_logerr("No memory for exception reply buffer\n");
430 forb_iop_send_reply(port->forb, &request_header.source,
431 &reply_codec, request_header.request_id, &env);
432 FORB_CDR_codec_release_buffer(&reply_codec);
433 /* TODO: relese exec_req etc. */
437 process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
439 forb_iop_reply_header rh;
440 forb_t *forb = port->forb;
443 forb_iop_reply_header_deserialize(codec, &rh);
444 /* Reply data are 8 byte aligned */
445 FORB_CDR_get_align(codec, 8);
446 ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
447 req = forb_request_find(forb, &rh.request_id);
449 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
452 forb_request_delete(forb, req); /* Deregister request from forb */
454 if (rh.flags & forb_iop_FLAG_EXCEPTION) {
455 forb_exception_deserialize(codec, req->env);
457 req->cdr_reply = codec;
460 /* Tell the stub where to signal that reply processing is
462 req->reply_processed = &port->reply_processed;
464 /* Resume the stub waiting in forb_wait_for_reply() */
465 forb_syncobj_signal(&req->reply_ready);
467 /* Wait for stub to process the results from the codec's buffer */
468 forb_syncobj_wait(&port->reply_processed);
472 * Process incomming HELLO messages.
474 * For every incomming HELLO message the peer table is searched
475 * whether it already contains a record for that peer or not. If not,
476 * the new peer is added to the table and another hello message is
477 * sent so that the new peer discovers us quickly.
479 * @param port Port, where hello was received
480 * @param codec Buffer with the hello message
483 process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
485 forb_server_id server_id;
488 forb_t *forb = port->forb;
489 CORBA_string peer_orb_id = NULL;
491 /* printf("Hello received at port %p\n", port); */
493 forb_server_id_deserialize(codec, &server_id);
494 if (port->desc.proto->deserialize_addr) {
495 port->desc.proto->deserialize_addr(codec, &addr);
498 char str[60], addrstr[60];
499 if (port->desc.proto->addr2str) {
500 port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
503 ul_logdeb("rcvd hello from %s (addr %s)\n",
504 forb_server_id_to_string(str, &server_id, sizeof(str)),
507 CORBA_string_deserialize(codec, &peer_orb_id);
508 if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
509 peer = forb_peer_find(forb, &server_id);
510 if (peer && peer->state == FORB_PEER_DISCOVERED) {
511 /* TODO: Update last hello receive time */
512 if (port->new_peer) {
513 ul_logdeb("peer already discovered but not connected - replacing\n");
514 forb_peer_disconnected(peer);
516 peer = port->new_peer;
517 port->new_peer = NULL;
518 forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
520 ul_logdeb("peer already discovered - ignoring\n");
524 forb_free(peer_orb_id);
528 if (port->new_peer) {
530 ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
533 peer = port->new_peer;
534 port->new_peer = NULL;
537 forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
543 process_message(forb_port_t *port, const forb_iop_message_header *mh,
544 FORB_CDR_Codec *codec)
546 CORBA_long data_size = FORB_CDR_data_size(codec);
547 /* TODO: Check destination address, whether the message is for
548 * us or should be routed. */
550 /* TODO: If there is some processing error, skip the rest of
551 * the message according mh.message_size. */
552 switch (mh->message_type) {
553 case forb_iop_REQUEST:
554 process_request(port, codec, mh->message_size);
557 process_reply(port, codec);
560 process_hello(port, codec);
563 ul_logmsg("rcvd unknown message type\n");
566 if (port->new_peer) {
567 /* If for some reaseon the new_peer was not processed so free it here. */
568 ul_logmsg("Forgotten new_peer\n");
569 forb_peer_put(port->new_peer);
570 port->new_peer = NULL;
573 FORB_CDR_get_align(codec, 8);
575 if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
576 size_t processed = data_size - FORB_CDR_data_size(codec);
577 ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%zu); fixing\n",
578 mh->message_type, mh->message_size, processed);
580 codec->rptr += mh->message_size - processed;
585 * Thread run for every port to receive FORB messages from that port.
587 * @param arg Pointer to ::forb_port_t typecasted to void *.
589 * @return Always NULL
591 void *forb_iop_receiver_thread(void *arg)
593 forb_port_t *port = arg;
594 const forb_proto_t *proto = port->desc.proto;
595 FORB_CDR_Codec *c = &port->codec;
598 forb_iop_message_header mh;
599 bool header_received = false;
601 while (!port->finish) {
602 if (c->rptr == c->wptr) {
603 /* The buffer is empty now - start writing from begining*/
604 FORB_CDR_buffer_reset(c, 0);
606 /* TODO: If there is not enough space for reception,
607 * we should shift the already received data to the
608 * beginning of the buffer. */
609 rcvd = proto->recv(port,
611 c->wptr_max - c->wptr);
613 ul_logerr("recv returned error %zd (%s), exiting\n", rcvd, strerror(errno));
617 c->wptr_last = c->wptr;
619 /* While there are some data in the buffer, process them. */
620 while (FORB_CDR_data_size(c) > 0) {
621 len = FORB_CDR_data_size(c);
622 /* Wait for and then process message header */
623 if (!header_received) {
624 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
625 if (c->rptr % 8 != 0) {
626 ul_logerr("Header doesn't start at 8 byte bounday\n");
628 header_received = forb_iop_process_message_header(&mh, c);
629 if (!header_received) {
630 ul_logerr("Wrong header received\n");
631 /* TODO: We should probably reset the buffer here */
633 len = FORB_CDR_data_size(c);
635 break; /* Wait for more data to arrive*/
639 /* Wait for and then process the message body */
640 if (header_received) {
641 if (len >= mh.message_size) {
642 process_message(port, &mh, c);
643 /* Wait for the next message */
644 header_received = false;
646 break; /* Wait for more data to arrive*/
655 discovery_cleanup(void *codec)
657 FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
658 /* TODO: Broadcast some kind of bye bye message */
663 * Thread run for every port to broadcast HELLO messages. These
664 * messages are used for a FORB to discover all peers (and in future
665 * also to detect their disconnection).
667 * @param arg Pointer to ::forb_port_t typecasted to void *.
669 * @return Always NULL
671 void *forb_iop_discovery_thread(void *arg)
673 forb_port_t *port = arg;
674 const forb_proto_t *proto = port->desc.proto;
675 FORB_CDR_Codec codec;
676 fosa_abs_time_t hello_time;
677 fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
680 FORB_CDR_codec_init_static(&codec, port->forb->orb);
681 FORB_CDR_buffer_init(&codec, 1024, 0);
683 pthread_cleanup_push(discovery_cleanup, &codec);
685 /* Next hello interval is now */
686 fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
688 while (!port->finish) {
689 /* Wait for next hello interval or until somebody
691 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
692 /* sem_timedwait would be more appropriate */
693 if (ret == FOSA_ETIMEDOUT) {
694 hello_time = fosa_abs_time_incr(hello_time, hello_interval);
695 } else if (ret != 0) {
696 ul_logerr("hello syncobj error: %s\n", strerror(ret));
699 if (port->finish) break;
701 FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
702 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
703 proto->serialize_addr, port->forb->attr.orb_id);
704 /* printf("Broadcasting hello from port %p\n", port); */
705 proto->broadcast(port, &codec.buffer[codec.rptr],
706 FORB_CDR_data_size(&codec));
709 pthread_cleanup_pop(1);
714 * Sends REQUEST to another object.
716 * The request @a req has to be prepared previously by calling
717 * forb_iop_prepare_request(). Then, when the request destination is
718 * remote, this function adds a message header, connects to the
719 * destination FORB and sends the request. In the case of local
720 * request, it is directly enqueued into the executor's queue.
722 * If no exception is reported, then the caller must wait for response
723 * by calling forb_wait_for_reply().
725 * @param req A request prepared by forb_iop_prepare_request()
726 * @param env Environment for returning exceptions
729 forb_request_send(forb_request_t *req, CORBA_Environment *env)
735 fosa_abs_time_t timeout;
736 forb_t *forb = forb_object_to_forb(req->obj);
737 forb_exec_req_t *exec_req;
740 env->major = FORB_EX_INTERNAL;
744 req->env = env; /* Remember, where to return exceptions */
746 /* All headers must be 8 byte aligned so align the length of
748 if (!FORB_CDR_put_align(&req->cdr_request, 8)) {
749 env->major = FORB_EX_INTERNAL;
750 ul_logerr("Not enough space for tail align\n");
754 /* Local invocation case, destination of a message is only
755 * a different executor thread */
756 if (forb_object_is_local(req->obj)) {
757 exec_req = forb_malloc(sizeof(*exec_req));
758 memset(exec_req, 0, sizeof(exec_req));
759 exec_req->request_type = FORB_EXEC_REQ_LOCAL;
760 exec_req->input_request = req;
761 exec_req->obj = forb_object_duplicate(req->obj);
762 exec_req->method_index = req->method_ind;
763 exec_req->interface = req->interface;
764 req->cdr_request.rptr = req->end_of_header_index;
765 exec_req->codec = req->cdr_request;
766 req->cdr_request.release_buffer = CORBA_FALSE;
767 exec_req->request_id = req->request_id;
768 forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
772 ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
774 /* This should never happen */
775 env->major = FORB_EX_INTERNAL;
779 fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
780 timeout = fosa_abs_time_incr(timeout,
781 fosa_msec_to_rel_time(1000));
782 peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
785 ul_logerr("Cannot find peer to send request for server %s\n",
786 forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
787 env->major = FORB_EX_COMM_FAILURE;
790 /* Register the request with forb so we can match incomming
791 * reply to this request. */
792 ret = forb_request_insert(forb, req);
794 ul_logerr("Insert request error %d\n", ret);
795 env->major = FORB_EX_INTERNAL;
801 ul_logdeb("sending request: id=%d dest=%s\n", req->request_id,
802 forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
804 len = FORB_CDR_data_size(&req->cdr_request);
805 fosa_mutex_lock(&peer->send_lock);
806 size = forb_proto_send(peer, &req->cdr_request);
807 fosa_mutex_unlock(&peer->send_lock);
808 if (size <= 0 || size != len) {
809 env->major = FORB_EX_COMM_FAILURE;
810 /* Request is deleted when the stub calls forb_request_destroy() */