]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/iop.c
forb: Update documentation of forb_request_send()
[frescor/forb.git] / src / iop.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   iop.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Sep  1 21:51:58 2008
51  * 
52  * @brief  Implementation of Inter-ORB protocol.
53  * 
54  * 
55  */
56 #include "iop.h"
57 #include <forb/cdr.h>
58 #include <forb/object.h>
59 #include "proto.h"
60 #include <ul_log.h>
61 #include "exec_req.h"
62 #include <errno.h>
63 #include "peer.h"
64 #include "discovery.h"
65
66 /** Version of the protocol */
67 #define VER_MAJOR 0
68 #define VER_MINOR 0
69
70 #define VER(major, minor) ((major)<<8 || (minor))
71
72 extern UL_LOG_CUST(ulogd_forb_iop);
73
74
75 CORBA_boolean
76 forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
77 {
78         CORBA_boolean ret;
79         forb_iop_message_header mh;
80         mh.proto_version.major = VER_MAJOR;
81         mh.proto_version.minor = VER_MINOR;
82         mh.message_type = mt;
83         mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
84         mh.message_size = FORB_CDR_data_size(codec);
85         ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
86         if (ret) {
87                 ret = forb_iop_message_header_serialize(codec, &mh);
88         }
89         return ret;
90 }
91
92 CORBA_boolean
93 forb_iop_prepare_request(forb_request_t *req,
94                          CORBA_Environment *env)
95 {
96         CORBA_boolean ret;
97         forb_iop_request_header rh;
98
99         rh.request_id = req->request_id;
100         rh.iface = req->interface;
101         rh.objkey = forb_object_to_key(req->obj);
102         rh.method_index = req->method_ind;
103         rh.source = forb_object_to_forb(req->obj)->server_id;
104         ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
105         if (ret) {
106                 /* Request body is 8 byte aligned */
107                 ret = FORB_CDR_put_align(&req->cdr_request, 8);
108                 char str[50];
109                 ul_logdeb("preparing request: id=%d  dest=%s  iface=%s method=%d\n", req->request_id,
110                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
111                           rh.iface, rh.method_index);
112         }
113         req->end_of_header_index = req->cdr_request.wptr;
114         return ret;
115 }
116
117 static CORBA_boolean
118 forb_iop_prepare_hello(FORB_CDR_Codec *codec,
119                        const forb_server_id *server_id,
120                        const void *src_addr,
121                        CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
122                        const CORBA_char *orb_id)
123 {
124         if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
125         if (serialize_addr) {
126                 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
127         }
128         if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
129         /* All headers must be 8 byte aligned so align the length of
130          * this message */
131         if (!FORB_CDR_put_align(codec, 8)) return CORBA_FALSE; 
132         if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
133         return CORBA_TRUE;
134 }
135
136 int forb_iop_send_hello_to(forb_peer_t *peer)
137 {
138         forb_port_t *port = peer->port;
139         FORB_CDR_Codec codec;
140         int ret;
141         FORB_CDR_codec_init_static(&codec, port->forb->orb);
142         if (!FORB_CDR_buffer_init(&codec, 1024, 0))
143                 return -1;              
144         if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
145                 ret = -1;
146                 goto free;
147         }
148         if (!forb_iop_prepare_hello(&codec, &port->forb->server_id,
149                                     port->desc.addr,
150                                     port->desc.proto->serialize_addr,
151                                     port->forb->attr.orb_id)) {
152                 ret = -1;
153                 goto free;
154         }
155         ret = forb_proto_send(peer, &codec);
156         if (ret > 0) ret = 0;
157 free:
158         FORB_CDR_codec_release_buffer(&codec);
159         return ret;
160 }
161
162 int
163 forb_iop_redistribute_hello_to(forb_peer_t *dest, forb_peer_t *peer)
164 {
165         forb_port_t *port = peer->port;
166         FORB_CDR_Codec codec;
167         int ret;
168         FORB_CDR_codec_init_static(&codec, port->forb->orb);
169         if (!FORB_CDR_buffer_init(&codec, 1024, 0))
170                 return -1;              
171         if (!FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE)) {
172                 ret = -1;
173                 goto free;
174         }
175         if (!forb_iop_prepare_hello(&codec, &peer->server_id,
176                                     peer->addr,
177                                     peer->port->desc.proto->serialize_addr,
178                                     peer->orb_id)) {
179                 ret = -1;
180                 goto free;
181         }
182         ul_logdeb("redistributing hello of %s (%s) to %s (%s)\n",
183                   ""/*TODO:id*/, peer->orb_id, "", dest->orb_id);
184         ret = forb_proto_send(dest, &codec);
185         if (ret > 0) ret = 0;
186 free:
187         FORB_CDR_codec_release_buffer(&codec);
188         return ret;
189 }
190
191 bool
192 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
193 {
194         /* FIXME: If we have multiple protocol versions, use different
195          * type (independent from version) for return value instead of
196          * mh */
197         forb_iop_version_deserialize(codec, &mh->proto_version);
198         switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
199                 case VER(VER_MAJOR, VER_MINOR):
200                         forb_iop_message_type_deserialize(codec, &mh->message_type);
201                         forb_iop_message_flags_deserialize(codec, &mh->flags);
202                         /* Check whwther the type is meaningfull */
203                         switch (mh->message_type) {
204                                 case forb_iop_REQUEST:
205                                 case forb_iop_REPLY:
206                                 case forb_iop_HELLO:
207                                         break;
208                                 default:
209                                         ul_logerr("rcvd wrong message type: %d\n",
210                                                   mh->message_type);
211                                         return false;
212                         }
213                         codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
214                                 LittleEndian : BigEndian;                       
215                         CORBA_unsigned_long_deserialize(codec, &mh->message_size);
216                         return true;
217                         break;                  
218                 default:
219                         ul_logerr("rcvd wrong protocol versio: %d.%d\n",
220                                   mh->proto_version.major, mh->proto_version.minor);
221                         return false;
222         }
223 }
224
225 static inline CORBA_boolean
226 forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
227 {
228         return CORBA_long_serialize(codec, &env->major);
229 }
230
231 static inline CORBA_boolean
232 forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
233 {
234         /* TODO: Declare exceptions in IDL and don't typecast here. */
235         return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
236 }
237
238 void
239 forb_iop_send_reply(forb_t *forb,
240            forb_server_id *dest,
241            FORB_CDR_Codec *codec,
242            CORBA_long request_id,
243            struct forb_env *env)
244 {
245         forb_iop_reply_header reply_header;
246         CORBA_boolean ret;
247         forb_peer_t *peer;
248         fosa_abs_time_t timeout;
249         
250         reply_header.request_id = request_id;
251         reply_header.flags = 0;
252         if (forb_exception_occurred(env)) {
253                 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
254                 FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
255                                              forb_iop_REPLY_HEADER_SIZE);
256                 forb_exception_serialize(codec, env);
257         }
258         /* All headers must be 8 byte aligned so align the length of
259          * this message */
260         if (!FORB_CDR_put_align(codec, 8)) {
261                 ul_logerr("Not enough space for tail align\n");
262                 return;         /* FIXME: handle error (goto above)*/
263         }
264         /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
265          * header is shorter. We want reply data to be 8 byte
266          * aligned */
267         ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
268         if (!ret) {
269                 goto err;
270         }
271         ret = forb_iop_reply_header_serialize(codec, &reply_header);
272         if (!ret) {
273                 goto err;
274         }
275         forb_iop_prepend_message_header(codec, forb_iop_REPLY);
276
277         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
278         timeout = fosa_abs_time_incr(timeout,
279                                      fosa_msec_to_rel_time(1000));
280
281         peer = forb_get_next_hop(forb, dest, &timeout);
282         if (!peer) {
283                 char str[60];
284                 forb_server_id_to_string(str, dest, sizeof(str));
285                 ul_logerr("Reply destination not found: %s\n", str);
286                 goto err;
287         } else {
288                 char str[60];
289                 forb_server_id_to_string(str, dest, sizeof(str));
290                 ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
291         }
292         forb_proto_send(peer, codec);
293         forb_peer_put(peer);
294 err:
295         ;
296 }
297
298 static void
299 process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
300 {
301         forb_iop_request_header request_header;
302         CORBA_boolean ret;
303         forb_object obj;
304         size_t n;
305         forb_t *forb = port->forb;
306         struct forb_env env;
307         FORB_CDR_Codec reply_codec;
308         forb_exec_req_t *exec_req;
309         uint32_t req_size, data_size, header_size;
310         char str[32];
311         forb_peer_t *peer;
312
313         data_size = FORB_CDR_data_size(codec);
314         ret = forb_iop_request_header_deserialize(codec, &request_header);
315         if (!ret) {
316                 ul_logerr("Malformed request recevied\n");
317                 env.major = FORB_EX_COMM_FAILURE;
318                 goto out;
319         }
320         ret = FORB_CDR_get_align(codec, 8);
321         if (!ret) {
322                 ul_logerr("Malformed request recevied\n");
323                 env.major = FORB_EX_COMM_FAILURE;
324                 goto out;
325         }
326
327         header_size = data_size - FORB_CDR_data_size(codec);
328         req_size = message_size - header_size;
329
330         ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
331                   forb_server_id_to_string(str, &request_header.source, sizeof(str)),
332                   request_header.request_id,
333                   request_header.iface,
334                   request_header.method_index);
335
336         if (port->new_peer) {
337                 /* Request from a new peer was reported by the underlaying protocol */
338                 peer = forb_peer_find(port->forb, &request_header.source);
339                 if (peer) {
340                         /* We already know this peer */
341                         /* TODO: Can it be in FORB_PEER_WANTED state?
342                          * If yes, we cannot simply igore this. */
343                         ul_logmsg("new_peer was already known\n");
344                         forb_peer_put(port->new_peer);
345                         port->new_peer = NULL;
346                 } else {
347                         ul_logdeb("discovered new_peer from incomming connection\n");
348                         peer = port->new_peer;
349                         port->new_peer = NULL;
350                         forb_new_peer_discovered(port, peer, request_header.source,
351                                                  peer->addr, peer->orb_id);
352                 }
353                 
354         }
355         
356         obj = forb_key_to_object(forb, request_header.objkey);
357         if (!obj) {
358                 ul_logerr("Nonexistent object key\n");
359                 env.major = FORB_EX_OBJECT_NOT_EXIST;
360                 goto send_execption;
361         }
362
363         n = strlen(request_header.iface);
364         ret = strncmp(request_header.iface, obj->interface->name, n);
365         forb_free(request_header.iface);
366         request_header.iface = NULL;
367         if (ret != 0) {
368                 env.major = FORB_EX_INV_OBJREF;
369                 ul_logerr("Object reference has incorrect type\n");
370                 goto send_execption;
371         }
372         
373         if (request_header.method_index >= obj->interface->num_methods) {
374                 env.major = FORB_EX_INV_IDENT;
375                 ul_logerr("To high method number\n");
376                 goto send_execption;
377         }
378
379         if (!obj->executor) {
380                 env.major = FORB_EX_NO_IMPLEMENT;
381                 ul_logerr("No executor for object\n");
382                 goto send_execption;
383
384         }
385
386         /* Enqueue execution request */
387         exec_req = forb_malloc(sizeof(*exec_req));
388         if (exec_req) {
389                 memset(exec_req, 0, sizeof(exec_req));
390                 exec_req->request_type = FORB_EXEC_REQ_REMOTE; 
391                 exec_req->request_id = request_header.request_id;
392                 exec_req->source = request_header.source;
393                 exec_req->obj = obj;
394                 exec_req->method_index = request_header.method_index;
395                 /* Copy the request to exec_req */
396                 FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
397                 ret = FORB_CDR_buffer_init(&exec_req->codec,
398                                       req_size,
399                                       0);
400                 if (!ret) {
401                         env.major = FORB_EX_NO_MEMORY;
402                         ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
403                                   req_size, header_size);
404                         goto send_execption;
405                 }
406                 exec_req->codec.data_endian = codec->data_endian;
407                 FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
408                 /* TODO: Use better data structure for incomming
409                    buffer to achieve zero-copy behaviour. */
410                 forb_exec_req_ins_tail(obj->executor, exec_req);
411         } else {
412                 env.major = FORB_EX_NO_MEMORY;
413                 ul_logerr("No memory for executor request\n");
414                 goto send_execption;
415         }
416         
417 out:
418         return;
419         
420 send_execption:
421         FORB_CDR_codec_init_static(&reply_codec, codec->orb);   
422         ret = FORB_CDR_buffer_init(&reply_codec, 4096,
423                               forb_iop_MESSAGE_HEADER_SIZE +
424                               forb_iop_REPLY_HEADER_SIZE);
425         if (!ret) {
426                 ul_logerr("No memory for exception reply buffer\n");
427                 return;
428         }
429
430         forb_iop_send_reply(port->forb, &request_header.source,
431                             &reply_codec, request_header.request_id, &env);
432         FORB_CDR_codec_release_buffer(&reply_codec);
433         /* TODO: relese exec_req etc. */
434 }
435
436 static void
437 process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
438 {
439         forb_iop_reply_header rh;
440         forb_t *forb = port->forb;
441         forb_request_t *req;
442
443         forb_iop_reply_header_deserialize(codec, &rh);
444         /* Reply data are 8 byte aligned */
445         FORB_CDR_get_align(codec, 8);
446         ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
447         req = forb_request_find(forb, &rh.request_id);
448         if (!req) {
449                 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
450                 return;
451         }
452         forb_request_delete(forb, req); /* Deregister request from forb */
453
454         if (rh.flags & forb_iop_FLAG_EXCEPTION) {
455                 forb_exception_deserialize(codec, req->env);
456         } else {
457                 req->cdr_reply = codec;
458         }
459
460         /* Tell the stub where to signal that reply processing is
461          * finished */
462         req->reply_processed = &port->reply_processed;
463
464         /* Resume the stub waiting in forb_wait_for_reply() */
465         forb_syncobj_signal(&req->reply_ready);
466
467         /* Wait for stub to process the results from the codec's buffer */
468         forb_syncobj_wait(&port->reply_processed);
469 }
470
471 /** 
472  * Process incomming HELLO messages.
473  *
474  * For every incomming HELLO message the peer table is searched
475  * whether it already contains a record for that peer or not. If not,
476  * the new peer is added to the table and another hello message is
477  * sent so that the new peer discovers us quickly.
478  * 
479  * @param port Port, where hello was received
480  * @param codec Buffer with the hello message
481  */
482 static void
483 process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
484 {
485         forb_server_id server_id;
486         void *addr = NULL;
487         forb_peer_t *peer;
488         forb_t *forb = port->forb;
489         CORBA_string peer_orb_id = NULL;
490
491 /*      printf("Hello received at port %p\n", port); */
492
493         forb_server_id_deserialize(codec, &server_id);
494         if (port->desc.proto->deserialize_addr) {
495                 port->desc.proto->deserialize_addr(codec, &addr);
496         }
497         {
498                 char str[60], addrstr[60];
499                 if (port->desc.proto->addr2str) {
500                         port->desc.proto->addr2str(addrstr, sizeof(addrstr), addr);
501                 } else
502                         addrstr[0] = 0;
503                 ul_logdeb("rcvd hello from %s (addr %s)\n",
504                           forb_server_id_to_string(str, &server_id, sizeof(str)),
505                           addrstr);
506         }
507         CORBA_string_deserialize(codec, &peer_orb_id);
508         if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
509                 peer = forb_peer_find(forb, &server_id);
510                 if (peer && peer->state == FORB_PEER_DISCOVERED) {
511                         /* TODO: Update last hello receive time */
512                         if (port->new_peer) {
513                                 ul_logdeb("peer already discovered but not connected - replacing\n");
514                                 forb_peer_disconnected(peer);
515                                 forb_peer_put(peer);
516                                 peer = port->new_peer;
517                                 port->new_peer = NULL;
518                                 forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
519                         } else {
520                                 ul_logdeb("peer already discovered - ignoring\n");
521                                 if (addr)
522                                         forb_free(addr);
523                                 if (peer_orb_id)
524                                         forb_free(peer_orb_id);
525                                 forb_peer_put(peer);
526                         }
527                 } else {
528                         if (port->new_peer) {
529                                 if (peer) {
530                                         ul_logerr("Unahandled case - FORB_PEER_WANTED && port->new_peer\n");
531                                         forb_peer_put(peer);
532                                 }
533                                 peer = port->new_peer;
534                                 port->new_peer = NULL;
535                         }
536
537                         forb_new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
538                 }
539         }
540 }
541
542 static void
543 process_message(forb_port_t *port, const forb_iop_message_header *mh,
544                 FORB_CDR_Codec *codec)
545 {
546         CORBA_long data_size = FORB_CDR_data_size(codec);
547         /* TODO: Check destination address, whether the message is for
548          * us or should be routed. */
549
550         /* TODO: If there is some processing error, skip the rest of
551          * the message according mh.message_size. */
552         switch (mh->message_type) {
553                 case forb_iop_REQUEST:
554                         process_request(port, codec, mh->message_size);
555                         break;
556                 case forb_iop_REPLY:
557                         process_reply(port, codec);
558                         break;
559                 case forb_iop_HELLO:
560                         process_hello(port, codec);
561                         break;
562                 default:
563                         ul_logmsg("rcvd unknown message type\n");
564                         break;
565         }
566         if (port->new_peer) {
567                 /* If for some reaseon the new_peer was not processed so free it here. */
568                 ul_logmsg("Forgotten new_peer\n");
569                 forb_peer_put(port->new_peer);
570                 port->new_peer = NULL;
571         }
572
573         FORB_CDR_get_align(codec, 8);
574
575         if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
576                 size_t processed = data_size - FORB_CDR_data_size(codec);
577                 ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%zu); fixing\n",
578                           mh->message_type, mh->message_size, processed);
579                 ;
580                 codec->rptr += mh->message_size - processed;
581         }
582 }
583
584 /** 
585  * Thread run for every port to receive FORB messages from that port. 
586  * 
587  * @param arg Pointer to ::forb_port_t typecasted to void *.
588  * 
589  * @return Always NULL
590  */
591 void *forb_iop_receiver_thread(void *arg)
592 {
593         forb_port_t *port = arg;
594         const forb_proto_t *proto = port->desc.proto;
595         FORB_CDR_Codec *c = &port->codec;
596         ssize_t rcvd;
597         size_t len;
598         forb_iop_message_header mh;
599         bool header_received = false;
600
601         while (!port->finish) {
602                 if (c->rptr == c->wptr) {
603                         /* The buffer is empty now - start writing from begining*/
604                         FORB_CDR_buffer_reset(c, 0);
605                 }
606                 /* TODO: If there is not enough space for reception,
607                  * we should shift the already received data to the
608                  * beginning of the buffer. */
609                 rcvd = proto->recv(port,
610                                    &c->buffer[c->wptr],
611                                    c->wptr_max - c->wptr);
612                 if (rcvd < 0) {
613                         ul_logerr("recv returned error %zd (%s), exiting\n", rcvd, strerror(errno));
614                         return NULL;
615                 }
616                 c->wptr += rcvd;
617                 c->wptr_last = c->wptr;
618
619                 /* While there are some data in the buffer, process them. */
620                 while (FORB_CDR_data_size(c) > 0) {
621                         len = FORB_CDR_data_size(c);
622                         /* Wait for and then process message header */
623                         if (!header_received) {
624                                 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
625                                         if (c->rptr % 8 != 0) {
626                                                 ul_logerr("Header doesn't start at 8 byte bounday\n");
627                                         }
628                                         header_received = forb_iop_process_message_header(&mh, c);
629                                         if (!header_received) {
630                                                 ul_logerr("Wrong header received\n");
631                                                 /* TODO: We should probably reset the buffer here */
632                                         }
633                                         len = FORB_CDR_data_size(c);
634                                 } else {
635                                         break; /* Wait for more data to arrive*/
636                                 }
637                         }
638                         
639                         /* Wait for and then process the message body */
640                         if (header_received) {
641                                 if (len >= mh.message_size) {
642                                         process_message(port, &mh, c);
643                                         /* Wait for the next message */
644                                         header_received = false;
645                                 } else {
646                                         break; /* Wait for more data to arrive*/
647                                 }
648                         }
649                 }
650         }
651         return NULL;
652 }
653
654 static void
655 discovery_cleanup(void *codec)
656 {
657         FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
658         /* TODO: Broadcast some kind of bye bye message */
659 }
660
661
662 /** 
663  * Thread run for every port to broadcast HELLO messages. These
664  * messages are used for a FORB to discover all peers (and in future
665  * also to detect their disconnection).
666  * 
667  * @param arg Pointer to ::forb_port_t typecasted to void *.
668  * 
669  * @return Always NULL
670  */
671 void *forb_iop_discovery_thread(void *arg)
672 {
673         forb_port_t *port = arg;
674         const forb_proto_t *proto = port->desc.proto;
675         FORB_CDR_Codec codec;
676         fosa_abs_time_t hello_time;
677         fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
678         int ret;
679
680         FORB_CDR_codec_init_static(&codec, port->forb->orb);
681         FORB_CDR_buffer_init(&codec, 1024, 0);
682
683         pthread_cleanup_push(discovery_cleanup, &codec);
684         
685         /* Next hello interval is now */
686         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
687         
688         while (!port->finish) {
689                 /* Wait for next hello interval or until somebody
690                  * signals us. */
691                 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
692                 /* sem_timedwait would be more appropriate */
693                 if (ret == FOSA_ETIMEDOUT) {
694                         hello_time = fosa_abs_time_incr(hello_time, hello_interval);
695                 } else if (ret != 0) {
696                         ul_logerr("hello syncobj error: %s\n", strerror(ret));
697                 }
698
699                 if (port->finish) break;
700
701                 FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
702                 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
703                                        proto->serialize_addr, port->forb->attr.orb_id);
704 /*              printf("Broadcasting hello from port %p\n", port);  */
705                 proto->broadcast(port, &codec.buffer[codec.rptr],
706                                  FORB_CDR_data_size(&codec));
707         }
708
709         pthread_cleanup_pop(1);
710         return NULL;
711 }
712
713 /** 
714  * Sends REQUEST to another object.
715  *
716  * The request @a req has to be prepared previously by calling
717  * forb_iop_prepare_request(). Then, when the request destination is
718  * remote, this function adds a message header, connects to the
719  * destination FORB and sends the request. In the case of local
720  * request, it is directly enqueued into the executor's queue.
721  *
722  * If no exception is reported, then the caller must wait for response
723  * by calling forb_wait_for_reply().
724  * 
725  * @param req A request prepared by forb_iop_prepare_request()
726  * @param env Environment for returning exceptions
727  */
728 void
729 forb_request_send(forb_request_t *req, CORBA_Environment *env)
730 {
731         CORBA_boolean ret;
732         forb_peer_t *peer;
733         ssize_t size;
734         size_t len;
735         fosa_abs_time_t timeout;
736         forb_t *forb = forb_object_to_forb(req->obj);
737         forb_exec_req_t *exec_req;
738
739         if (!forb) {
740                 env->major = FORB_EX_INTERNAL;
741                 return;
742         }
743
744         req->env = env;     /* Remember, where to return exceptions */
745
746         /* All headers must be 8 byte aligned so align the length of
747          * this message */
748         if (!FORB_CDR_put_align(&req->cdr_request, 8)) {
749                 env->major = FORB_EX_INTERNAL;
750                 ul_logerr("Not enough space for tail align\n");
751                 return;
752         }
753
754         /* Local invocation case, destination of a message is only 
755          * a different executor thread */
756         if (forb_object_is_local(req->obj)) {
757                 exec_req = forb_malloc(sizeof(*exec_req));
758                 memset(exec_req, 0, sizeof(exec_req));
759                 exec_req->request_type = FORB_EXEC_REQ_LOCAL; 
760                 exec_req->input_request = req;
761                 exec_req->obj = forb_object_duplicate(req->obj);
762                 exec_req->method_index = req->method_ind;
763                 exec_req->interface = req->interface;
764                 req->cdr_request.rptr = req->end_of_header_index;
765                 exec_req->codec = req->cdr_request;
766                 req->cdr_request.release_buffer = CORBA_FALSE;
767                 exec_req->request_id = req->request_id;
768                 forb_exec_req_ins_tail(forb_object_get_executor(exec_req->obj), exec_req);
769                 return;
770         }
771
772         ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
773         if (!ret) {
774                 /* This should never happen */
775                 env->major = FORB_EX_INTERNAL;
776                 return;
777         }
778
779         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
780         timeout = fosa_abs_time_incr(timeout,
781                                      fosa_msec_to_rel_time(1000));
782         peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
783         if (!peer) {
784                 char str[50];
785                 ul_logerr("Cannot find peer to send request for server %s\n",
786                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
787                 env->major = FORB_EX_COMM_FAILURE;
788                 return;
789         }
790         /* Register the request with forb so we can match incomming
791          * reply to this request. */
792         ret = forb_request_insert(forb, req);
793         if (ret <= 0) {
794                 ul_logerr("Insert request error %d\n", ret);
795                 env->major = FORB_EX_INTERNAL;
796                 goto err_peer_put;
797         }
798
799         {
800                 char str[50];
801                 ul_logdeb("sending request: id=%d  dest=%s\n", req->request_id,
802                           forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
803         }
804         len = FORB_CDR_data_size(&req->cdr_request);
805         fosa_mutex_lock(&peer->send_lock);
806         size = forb_proto_send(peer, &req->cdr_request);
807         fosa_mutex_unlock(&peer->send_lock);
808         if (size <= 0 || size != len) {
809                 env->major = FORB_EX_COMM_FAILURE;
810                 /* Request is deleted when the stub calls forb_request_destroy() */
811         }
812  err_peer_put:
813         forb_peer_put(peer);
814 }