]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/iop.c
f6dca49e57319e83b43b12865963f697447ea549
[frescor/forb.git] / src / iop.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   iop.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Sep  1 21:51:58 2008
51  * 
52  * @brief  Implementation of Inter-ORB protocol.
53  * 
54  * 
55  */
56 #include "iop.h"
57 #include <forb/cdr.h>
58 #include <forb/object.h>
59 #include "proto.h"
60 #include <ul_log.h>
61 #include "exec_req.h"
62 #include <errno.h>
63 #include "peer.h"
64 #include "discovery.h"
65
66 /** Version of the protocol */
67 #define VER_MAJOR 0
68 #define VER_MINOR 0
69
70 #define VER(major, minor) ((major)<<8 || (minor))
71
72 extern UL_LOG_CUST(ulogd_forb_iop);
73
74
75 CORBA_boolean
76 forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
77 {
78         CORBA_boolean ret;
79         forb_iop_message_header mh;
80         mh.proto_version.major = VER_MAJOR;
81         mh.proto_version.minor = VER_MINOR;
82         mh.message_type = mt;
83         mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
84         mh.message_size = FORB_CDR_data_size(codec);
85         ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
86         if (ret) {
87                 ret = forb_iop_message_header_serialize(codec, &mh);
88         }
89         return ret;
90 }
91
92 CORBA_boolean
93 forb_iop_prepare_request(forb_request_t *req,
94                          char *iface,
95                          unsigned method_ind,
96                          CORBA_Environment *env)
97 {
98         CORBA_boolean ret;
99         forb_iop_request_header rh;
100
101         rh.request_id = req->request_id;
102         rh.iface = iface;
103         rh.objkey = forb_object_to_key(req->obj);
104         rh.method_index = method_ind;
105         rh.source = forb_object_to_forb(req->obj)->server_id;
106         ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
107         if (ret) {
108                 /* Request body is 8 byte aligned */
109                 ret = FORB_CDR_put_align(&req->cdr_request, 8);
110                 char str[50];
111                 ul_logdeb("preparing request: id=%d  dest=%s  iface=%s method=%d\n", req->request_id,
112                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
113                           iface, method_ind);
114         }
115         return ret;
116 }
117
118 static CORBA_boolean
119 forb_iop_prepare_hello(FORB_CDR_Codec *codec,
120                        const forb_server_id *server_id,
121                        const void *src_addr,
122                        CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
123                        const CORBA_string orb_id)
124 {
125         if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
126         if (serialize_addr) {
127                 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
128         }
129         if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
130         if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
131         return CORBA_TRUE;
132 }
133
134 bool
135 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
136 {
137         /* FIXME: If we have multiple protocol versions, use different
138          * type (independent from version) for return value instead of
139          * mh */
140         forb_iop_version_deserialize(codec, &mh->proto_version);
141         switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
142                 case VER(VER_MAJOR, VER_MINOR):
143                         forb_iop_message_type_deserialize(codec, &mh->message_type);
144                         forb_iop_message_flags_deserialize(codec, &mh->flags);
145                         /* Check whwther the type is meaningfull */
146                         switch (mh->message_type) {
147                                 case forb_iop_REQUEST:
148                                 case forb_iop_REPLY:
149                                 case forb_iop_HELLO:
150                                         break;
151                                 default:
152                                         ul_logerr("rcvd wrong message type: %d\n",
153                                                   mh->message_type);
154                                         return false;
155                         }
156                         codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
157                                 LittleEndian : BigEndian;                       
158                         CORBA_unsigned_long_deserialize(codec, &mh->message_size);
159                         return true;
160                         break;                  
161                 default:
162                         ul_logerr("rcvd wrong protocol versio: %d.%d\n",
163                                   mh->proto_version.major, mh->proto_version.minor);
164                         return false;
165         }
166 }
167
168 static inline CORBA_boolean
169 forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
170 {
171         return CORBA_long_serialize(codec, &env->major);
172 }
173
174 static inline CORBA_boolean
175 forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
176 {
177         /* TODO: Declare exceptions in IDL and don't typecast here. */
178         return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
179 }
180
181 void
182 forb_iop_send_reply(forb_t *forb,
183            forb_server_id *dest,
184            FORB_CDR_Codec *codec,
185            CORBA_long request_id,
186            struct forb_env *env)
187 {
188         forb_iop_reply_header reply_header;
189         CORBA_boolean ret;
190         forb_peer_t *peer;
191         fosa_abs_time_t timeout;
192         
193         reply_header.request_id = request_id;
194         reply_header.flags = 0;
195         if (forb_exception_occurred(env)) {
196                 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
197                 FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
198                                              forb_iop_REPLY_HEADER_SIZE);
199                 forb_exception_serialize(codec, env);
200         }
201         /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
202          * header is shorter. We want reply data to be 8 byte
203          * aligned */
204         ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
205         if (!ret) {
206                 goto err;
207         }
208         ret = forb_iop_reply_header_serialize(codec, &reply_header);
209         if (!ret) {
210                 goto err;
211         }
212         forb_iop_prepend_message_header(codec, forb_iop_REPLY);
213
214         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
215         timeout = fosa_abs_time_incr(timeout,
216                                      fosa_msec_to_rel_time(1000));
217
218         peer = forb_get_next_hop(forb, dest, &timeout);
219         if (!peer) {
220                 char str[60];
221                 forb_server_id_to_string(str, dest, sizeof(str));
222                 ul_logerr("Reply destination not found: %s\n", str);
223                 goto err;
224         } else {
225                 char str[60];
226                 forb_server_id_to_string(str, dest, sizeof(str));
227                 ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
228         }
229         forb_proto_send(peer, codec);
230         forb_peer_put(peer);
231 err:
232         ;
233 }
234
235 static void
236 process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
237 {
238         forb_iop_request_header request_header;
239         CORBA_boolean ret;
240         forb_object obj;
241         size_t n;
242         forb_t *forb = port->forb;
243         struct forb_env env;
244         FORB_CDR_Codec reply_codec;
245         forb_exec_req_t *exec_req;
246         uint32_t req_size, data_size, header_size;
247         char str[32];
248         forb_peer_t *peer;
249
250         data_size = FORB_CDR_data_size(codec);
251         ret = forb_iop_request_header_deserialize(codec, &request_header);
252         if (!ret) {
253                 ul_logerr("Malformed request recevied\n");
254                 env.major = FORB_EX_COMM_FAILURE;
255                 goto out;
256         }
257         ret = FORB_CDR_get_align(codec, 8);
258         if (!ret) {
259                 ul_logerr("Malformed request recevied\n");
260                 env.major = FORB_EX_COMM_FAILURE;
261                 goto out;
262         }
263
264         header_size = data_size - FORB_CDR_data_size(codec);
265         req_size = message_size - header_size;
266
267         ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
268                   forb_server_id_to_string(str, &request_header.source, sizeof(str)),
269                   request_header.request_id,
270                   request_header.iface,
271                   request_header.method_index);
272
273         if (port->new_peer) {
274                 /* Request from a new peer was reported by the underlaying protocol */
275                 peer = forb_peer_find(port->forb, &request_header.source);
276                 if (peer) {
277                         /* We already know this peer */
278                         /* TODO: Can it be in FORB_PEER_WANTED state?
279                          * If yes, we cannot simply igore this. */
280                         ul_logmsg("new_peer was already known\n");
281                         forb_peer_put(port->new_peer);
282                         port->new_peer = NULL;
283                 } else {
284                         ul_logdeb("discovered new_peer from incomming connection\n");
285                         peer = port->new_peer;
286                         port->new_peer = NULL;
287                         forb_new_peer_discovered(port, peer, request_header.source,
288                                                  peer->addr, peer->orb_id);
289                 }
290                 
291         }
292         
293         obj = forb_key_to_object(forb, request_header.objkey);
294         if (!obj) {
295                 ul_logerr("Nonexistent object key\n");
296                 env.major = FORB_EX_OBJECT_NOT_EXIST;
297                 goto send_execption;
298         }
299
300         n = strlen(request_header.iface);
301         ret = strncmp(request_header.iface, obj->interface->name, n);
302         forb_free(request_header.iface);
303         request_header.iface = NULL;
304         if (ret != 0) {
305                 env.major = FORB_EX_INV_OBJREF;
306                 ul_logerr("Object reference has incorrect type\n");
307                 goto send_execption;
308         }
309         
310         if (request_header.method_index >= obj->interface->num_methods) {
311                 env.major = FORB_EX_INV_IDENT;
312                 ul_logerr("To high method number\n");
313                 goto send_execption;
314         }
315
316         if (!obj->executor) {
317                 env.major = FORB_EX_NO_IMPLEMENT;
318                 ul_logerr("No executor for object\n");
319                 goto send_execption;
320
321         }
322
323         /* Enqueue execution request */
324         exec_req = forb_malloc(sizeof(*exec_req));
325         if (exec_req) {
326                 memset(exec_req, 0, sizeof(exec_req));
327                 exec_req->request_id = request_header.request_id;
328                 exec_req->source = request_header.source;
329                 exec_req->obj = obj;
330                 exec_req->method_index = request_header.method_index;
331                 /* Copy the request to exec_req */
332                 FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
333                 ret = FORB_CDR_buffer_init(&exec_req->codec,
334                                       req_size,
335                                       0);
336                 if (!ret) {
337                         env.major = FORB_EX_NO_MEMORY;
338                         ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
339                                   req_size, header_size);
340                         goto send_execption;
341                 }
342                 exec_req->codec.data_endian = codec->data_endian;
343                 FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
344                 /* TODO: Use better data structure for incomming
345                    buffer to achieve zero-copy behaviour. */
346                 forb_exec_req_ins_tail(obj->executor, exec_req);
347         } else {
348                 env.major = FORB_EX_NO_MEMORY;
349                 ul_logerr("No memory for executor request\n");
350                 goto send_execption;
351         }
352         
353 out:
354         return;
355         
356 send_execption:
357         FORB_CDR_codec_init_static(&reply_codec, codec->orb);   
358         ret = FORB_CDR_buffer_init(&reply_codec, 4096,
359                               forb_iop_MESSAGE_HEADER_SIZE +
360                               forb_iop_REPLY_HEADER_SIZE);
361         if (!ret) {
362                 ul_logerr("No memory for exception reply buffer\n");
363                 return;
364         }
365
366         forb_iop_send_reply(port->forb, &request_header.source,
367                             &reply_codec, request_header.request_id, &env);
368         FORB_CDR_codec_release_buffer(&reply_codec);
369         /* TODO: relese exec_req etc. */
370 }
371
372 static void
373 process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
374 {
375         forb_iop_reply_header rh;
376         forb_t *forb = port->forb;
377         forb_request_t *req;
378
379         forb_iop_reply_header_deserialize(codec, &rh);
380         /* Reply data are 8 byte aligned */
381         FORB_CDR_get_align(codec, 8);
382         ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
383         req = forb_request_find(forb, &rh.request_id);
384         if (!req) {
385                 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
386                 return;
387         }
388         forb_request_delete(forb, req); /* Deregister request from forb */
389
390         if (rh.flags & forb_iop_FLAG_EXCEPTION) {
391                 forb_exception_deserialize(codec, req->env);
392         } else {
393                 req->cdr_reply = codec;
394         }
395
396         /* Tell the stub where to signal that reply processing is
397          * finished */
398         req->reply_processed = &port->reply_processed;
399
400         /* Resume the stub waiting in forb_wait_for_reply() */
401         forb_syncobj_signal(&req->reply_ready);
402
403         /* Wait for stub to process the results from the codec's buffer */
404         forb_syncobj_wait(&port->reply_processed);
405 }
406
407 /** 
408  * Process incomming HELLO messages.
409  *
410  * For every incomming HELLO message the peer table is searched
411  * whether it already contains a record for that peer or not. If not,
412  * the new peer is added to the table and another hello message is
413  * sent so that the new peer discovers us quickly.
414  * 
415  * @param port Port, where hello was received
416  * @param codec Buffer with the hello message
417  */
418 static void
419 process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
420 {
421         forb_server_id server_id;
422         void *addr = NULL;
423         forb_peer_t *peer;
424         forb_t *forb = port->forb;
425         CORBA_string peer_orb_id = NULL;
426
427 /*      printf("Hello received at port %p\n", port); */
428
429         forb_server_id_deserialize(codec, &server_id);
430         if (port->desc.proto->deserialize_addr) {
431                 port->desc.proto->deserialize_addr(codec, &addr);
432         }
433         {
434                 char str[60], addrstr[60];
435                 if (port->desc.proto->addr2str) {
436                         port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
437                 } else
438                         addrstr[0] = 0;
439                 ul_logdeb("rcvd hello from %s (addr %s)\n",
440                           forb_server_id_to_string(str, &server_id, sizeof(str)),
441                           addrstr);
442         }
443         CORBA_string_deserialize(codec, &peer_orb_id);
444         if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
445                 peer = forb_peer_find(forb, &server_id);
446                 if (peer && peer->state == FORB_PEER_DISCOVERED) {
447                         /* TODO: Update last hello receive time */
448                         if (addr)
449                                 forb_free(addr);
450                         if (peer_orb_id)
451                                 forb_free(peer_orb_id);
452                         forb_peer_put(peer);
453                 } else {
454                         forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
455                 }
456         }
457 }
458
459 static void
460 process_message(forb_port_t *port, const forb_iop_message_header *mh,
461                 FORB_CDR_Codec *codec)
462 {
463         CORBA_long data_size = FORB_CDR_data_size(codec);
464         /* TODO: Check destination address, whether the message is for
465          * us or should be routed. */
466
467         /* TODO: If there is some processing error, skip the rest of
468          * the message according mh.message_size. */
469         switch (mh->message_type) {
470                 case forb_iop_REQUEST:
471                         process_request(port, codec, mh->message_size);
472                         break;
473                 case forb_iop_REPLY:
474                         process_reply(port, codec);
475                         break;
476                 case forb_iop_HELLO:
477                         process_hello(port, codec);
478                         break;
479                 default:
480                         ul_logmsg("rcvd unknown message type\n");
481                         break;
482         }
483         if (port->new_peer) {
484                 /* If for some reaseon the new_peer was not processed so free it here. */
485                 ul_logmsg("Forgotten new_peer\n");
486                 forb_peer_put(port->new_peer);
487                 port->new_peer = NULL;
488         }
489         if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
490                 size_t processed = data_size - FORB_CDR_data_size(codec);
491                 ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%d); fixing\n",
492                           mh->message_type, mh->message_size, processed);
493                 ;
494                 codec->rptr += mh->message_size - processed;
495         }
496 }
497
498 /** 
499  * Thread run for every port to receive FORB messages from that port. 
500  * 
501  * @param arg Pointer to ::forb_port_t typecasted to void *.
502  * 
503  * @return Always NULL
504  */
505 void *forb_iop_receiver_thread(void *arg)
506 {
507         forb_port_t *port = arg;
508         const forb_proto_t *proto = port->desc.proto;
509         FORB_CDR_Codec *c = &port->codec;
510         ssize_t rcvd;
511         size_t len;
512         forb_iop_message_header mh;
513         bool header_received = false;
514
515         while (!port->finish) {
516                 if (c->rptr == c->wptr) {
517                         /* The buffer is empty now - start writing from begining*/
518                         FORB_CDR_buffer_reset(c, 0);
519                 }
520                 /* TODO: If there is not enough space for reception,
521                  * we should shift the already received data to the
522                  * beginning of the buffer. */
523                 rcvd = proto->recv(port,
524                                    &c->buffer[c->wptr],
525                                    c->wptr_max - c->wptr);
526                 if (rcvd < 0) {
527                         ul_logerr("recv returned error %d (%s), exiting\n", rcvd, strerror(errno));
528                         return NULL;
529                 }
530                 c->wptr += rcvd;
531                 c->wptr_last = c->wptr;
532
533                 /* While there are some data in the buffer, process them. */
534                 while (FORB_CDR_data_size(c) > 0) {
535                         len = FORB_CDR_data_size(c);
536                         /* Wait for and then process message header */
537                         if (!header_received) {
538                                 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
539                                         if (c->rptr % 8 != 0) {
540                                                 ul_logerr("Header doesn't start at 8 byte bounday\n");
541                                         }
542                                         header_received = forb_iop_process_message_header(&mh, c);
543                                         if (!header_received) {
544                                                 ul_logerr("Wrong header received\n");
545                                                 /* TODO: We should probably reset the buffer here */
546                                         }
547                                         len = FORB_CDR_data_size(c);
548                                 } else {
549                                         break; /* Wait for more data to arrive*/
550                                 }
551                         }
552                         
553                         /* Wait for and then process the message body */
554                         if (header_received) {
555                                 if (len >= mh.message_size) {
556                                         process_message(port, &mh, c);
557                                         /* Wait for the next message */
558                                         header_received = false;
559                                 } else {
560                                         break; /* Wait for more data to arrive*/
561                                 }
562                         }
563                 }
564         }
565         return NULL;
566 }
567
568 static void
569 discovery_cleanup(void *codec)
570 {
571         FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
572         /* TODO: Broadcast some kind of bye bye message */
573 }
574
575
576 /** 
577  * Thread run for every port to broadcast HELLO messages. These
578  * messages are used for a FORB to discover all peers (and in future
579  * also to detect their disconnection).
580  * 
581  * @param arg Pointer to ::forb_port_t typecasted to void *.
582  * 
583  * @return Always NULL
584  */
585 void *forb_iop_discovery_thread(void *arg)
586 {
587         forb_port_t *port = arg;
588         const forb_proto_t *proto = port->desc.proto;
589         FORB_CDR_Codec codec;
590         fosa_abs_time_t hello_time;
591         fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
592         int ret;
593
594         FORB_CDR_codec_init_static(&codec, port->forb->orb);
595         FORB_CDR_buffer_init(&codec, 1024, 0);
596
597         pthread_cleanup_push(discovery_cleanup, &codec);
598         
599         /* Next hello interval is now */
600         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
601         
602         while (!port->finish) {
603                 /* Wait for next hello interval or until somebody
604                  * signals us. */
605                 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
606                 /* sem_timedwait would be more appropriate */
607                 if (ret == FOSA_ETIMEDOUT) {
608                         hello_time = fosa_abs_time_incr(hello_time, hello_interval);
609                 } else if (ret != 0) {
610                         ul_logerr("hello syncobj error: %s\n", strerror(ret));
611                 }
612
613                 if (port->finish) break;
614
615                 FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
616                 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
617                                        proto->serialize_addr, port->forb->attr.orb_id);
618 /*              printf("Broadcasting hello from port %p\n", port);  */
619                 proto->broadcast(port, &codec.buffer[codec.rptr],
620                                  FORB_CDR_data_size(&codec));
621         }
622
623         pthread_cleanup_pop(1);
624         return NULL;
625 }
626
627 /** 
628  * Sends REQUEST message to another FORB.
629  *
630  * The request @a req has to be prepared by
631  * forb_iop_prepare_request(). Then, this function adds a message
632  * header, connects to the destination FORB and sends the request.
633  *
634  * If no exception is reported, then the caller must wait for response
635  * by calling forb_wait_for_reply().
636  * 
637  * @param req A request prepared by forb_iop_prepare_request()
638  * @param env Environment for returning exceptions
639  */
640 void
641 forb_request_send(forb_request_t *req, CORBA_Environment *env)
642 {
643         CORBA_boolean ret;
644         forb_peer_t *peer;
645         ssize_t size;
646         size_t len;
647         fosa_abs_time_t timeout;
648         forb_t *forb = forb_object_to_forb(req->obj);
649
650         if (!forb) {
651                 env->major = FORB_EX_INTERNAL;
652                 return;
653         }
654
655         req->env = env;     /* Remember, where to return exceptions */
656
657         ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
658         if (!ret) {
659                 /* This should never happen */
660                 env->major = FORB_EX_INTERNAL;
661                 return;
662         }
663
664         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
665         timeout = fosa_abs_time_incr(timeout,
666                                      fosa_msec_to_rel_time(1000));
667         peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
668         if (!peer) {
669                 char str[50];
670                 ul_logerr("Cannot find peer to send request for server %s\n",
671                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
672                 env->major = FORB_EX_COMM_FAILURE;
673                 return;
674         }
675         /* Register the request with forb so we can match incomming
676          * reply to this request. */
677         ret = forb_request_insert(forb, req);
678         if (ret <= 0) {
679                 ul_logerr("Insert request error %d\n", ret);
680                 env->major = FORB_EX_INTERNAL;
681                 goto err_peer_put;
682         }
683
684         {
685                 char str[50];
686                 ul_logdeb("sending request: id=%d  dest=%s\n", req->request_id,
687                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
688         }
689         len = FORB_CDR_data_size(&req->cdr_request);
690         fosa_mutex_lock(&peer->send_lock);
691         size = forb_proto_send(peer, &req->cdr_request);
692         fosa_mutex_unlock(&peer->send_lock);
693         if (size <= 0 || size != len) {
694                 env->major = FORB_EX_COMM_FAILURE;
695                 /* Request is deleted when the stub calls forb_request_destroy() */
696         }
697  err_peer_put:
698         forb_peer_put(peer);
699 }