1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners: */
5 /* Universidad de Cantabria, SPAIN */
6 /* University of York, UK */
7 /* Scuola Superiore Sant'Anna, ITALY */
8 /* Kaiserslautern University, GERMANY */
9 /* Univ. Politécnica Valencia, SPAIN */
10 /* Czech Technical University in Prague, CZECH REPUBLIC */
12 /* Thales Communication S.A. FRANCE */
13 /* Visual Tools S.A. SPAIN */
14 /* Rapita Systems Ltd UK */
17 /* See http://www.frescor.org for a link to partners' websites */
19 /* FRESCOR project (FP6/2005/IST/5-034026) is funded */
20 /* in part by the European Union Sixth Framework Programme */
21 /* The European Union is not liable of any use that may be */
22 /* made of this code. */
25 /* This file is part of FORB (Frescor Object Request Broker) */
27 /* FORB is free software; you can redistribute it and/or modify it */
28 /* under terms of the GNU General Public License as published by the */
29 /* Free Software Foundation; either version 2, or (at your option) any */
30 /* later version. FORB is distributed in the hope that it will be */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU */
33 /* General Public License for more details. You should have received a */
34 /* copy of the GNU General Public License along with FORB; see file */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, */
36 /* Cambridge, MA 02139, USA. */
38 /* As a special exception, including FORB header files in a file, */
39 /* instantiating FORB generics or templates, or linking other files */
40 /* with FORB objects to produce an executable application, does not */
41 /* by itself cause the resulting executable application to be covered */
42 /* by the GNU General Public License. This exception does not */
43 /* however invalidate any other reasons why the executable file might be */
44 /* covered by the GNU Public License. */
45 /**************************************************************************/
49 * @author Michal Sojka <sojkam1@fel.cvut.cz>
50 * @date Mon Sep 1 21:51:58 2008
52 * @brief Implementation of Inter-ORB protocol.
58 #include <forb/object.h>
64 /** Version of the protocol */
68 #define VER(major, minor) ((major)<<8 || (minor))
70 extern UL_LOG_CUST(ulogd_forb_iop);
74 forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
77 forb_iop_message_header mh;
78 mh.proto_version.major = VER_MAJOR;
79 mh.proto_version.minor = VER_MINOR;
81 mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
82 mh.message_size = CDR_data_size(codec);
83 ret = CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
85 ret = forb_iop_message_header_serialize(codec, &mh);
91 forb_iop_prepare_request(forb_request_t *req,
94 CORBA_Environment *env)
97 forb_iop_request_header rh;
99 rh.request_id = req->request_id;
101 rh.objkey = forb_object_to_key(req->obj);
102 rh.method_index = method_ind;
103 rh.source = forb_object_to_forb(req->obj)->server_id;
104 ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
106 /* Request body is 8 byte aligned */
107 ret = CDR_put_align(&req->cdr_request, 8);
113 forb_iop_prepare_hello(CDR_Codec *codec,
114 const forb_server_id *server_id,
115 const void *src_addr,
116 CORBA_boolean (*serialize_addr)(CDR_Codec *codec, const void *addr))
118 if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
119 if (serialize_addr) {
120 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
122 if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
127 forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
129 /* FIXME: If we have multiple protocol versions, use different
130 * type (independent from version) for return value instead of
132 forb_iop_version_deserialize(codec, &mh->proto_version);
133 switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
134 case VER(VER_MAJOR, VER_MINOR):
135 forb_iop_message_type_deserialize(codec, &mh->message_type);
136 forb_iop_message_flags_deserialize(codec, &mh->flags);
137 /* Check whwther the type is meaningfull */
138 switch (mh->message_type) {
139 case forb_iop_REQUEST:
146 codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
147 LittleEndian : BigEndian;
148 CORBA_unsigned_long_deserialize(codec, &mh->message_size);
156 static inline CORBA_boolean
157 forb_exception_serialize(CDR_Codec *codec, struct forb_env *env)
159 return CORBA_long_serialize(codec, &env->major);
162 static inline CORBA_boolean
163 forb_exception_deserialize(CDR_Codec *codec, struct forb_env *env)
165 /* TODO: Declare exceptions in IDL and don't typecast here. */
166 return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
170 forb_iop_send_reply(forb_t *forb,
171 forb_server_id *dest,
173 CORBA_long request_id,
174 struct forb_env *env)
176 forb_iop_reply_header reply_header;
179 fosa_abs_time_t timeout;
181 reply_header.request_id = request_id;
182 reply_header.flags = 0;
183 if (forb_exception_occured(env)) {
184 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
185 CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
186 forb_iop_REPLY_HEADER_SIZE);
187 forb_exception_serialize(codec, env);
189 /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
190 * header is shorter. We want reply data to be 8 byte
192 ret = CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
196 ret = forb_iop_reply_header_serialize(codec, &reply_header);
200 forb_iop_prepend_message_header(codec, forb_iop_REPLY);
202 fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
203 timeout = fosa_abs_time_incr(timeout,
204 fosa_msec_to_rel_time(1000));
206 peer = forb_get_next_hop(forb, dest, &timeout);
208 ul_logerr("Reply destination not found\n");
211 forb_proto_send(peer, codec);
219 process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
221 forb_iop_request_header request_header;
225 forb_t *forb = port->forb;
227 CDR_Codec reply_codec;
228 forb_exec_req_t *exec_req;
229 uint32_t req_size, data_size, header_size;
232 data_size = CDR_data_size(codec);
233 ret = forb_iop_request_header_deserialize(codec, &request_header);
235 ul_logerr("Malformed request recevied\n");
236 env.major = FORB_EX_COMM_FAILURE;
239 ret = CDR_get_align(codec, 8);
241 ul_logerr("Malformed request recevied\n");
242 env.major = FORB_EX_COMM_FAILURE;
246 header_size = data_size - CDR_data_size(codec);
247 req_size = message_size - header_size;
249 ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
250 forb_server_id_to_string(str, &request_header.source, sizeof(str)),
251 request_header.request_id,
252 request_header.iface,
253 request_header.method_index);
255 obj = forb_key_to_object(forb, request_header.objkey);
257 ul_logerr("Nonexistent object key\n");
258 env.major = FORB_EX_OBJECT_NOT_EXIST;
262 n = strlen(request_header.iface);
263 if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
264 env.major = FORB_EX_INV_OBJREF;
265 ul_logerr("Object reference has incorrect type\n");
268 if (request_header.method_index >= obj->interface->num_methods) {
269 env.major = FORB_EX_INV_IDENT;
270 ul_logerr("To high method number\n");
274 if (!obj->executor) {
275 env.major = FORB_EX_NO_IMPLEMENT;
276 ul_logerr("No executor for object\n");
281 /* Enqueue execution request */
282 exec_req = forb_malloc(sizeof(*exec_req));
284 exec_req->request_id = request_header.request_id;
285 exec_req->source = request_header.source;
287 exec_req->method_index = request_header.method_index;
288 /* Copy the request to exec_req */
289 CDR_codec_init_static(&exec_req->codec);
290 ret = CDR_buffer_init(&exec_req->codec,
294 env.major = FORB_EX_NO_MEMORY;
295 ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
296 req_size, header_size);
299 exec_req->codec.data_endian = codec->data_endian;
300 CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
301 /* TODO: Use better data structure for incomming
302 buffer to achieve zero-copy behaviour. */
303 forb_exec_req_ins_tail(obj->executor, exec_req);
305 env.major = FORB_EX_NO_MEMORY;
306 ul_logerr("No memory for executor request\n");
314 CDR_codec_init_static(&reply_codec);
315 ret = CDR_buffer_init(&reply_codec, 4096,
316 forb_iop_MESSAGE_HEADER_SIZE +
317 forb_iop_REPLY_HEADER_SIZE);
319 ul_logerr("No memory for exception reply buffer\n");
323 forb_iop_send_reply(port->forb, &request_header.source, &reply_codec, request_header.request_id, &env);
324 CDR_codec_release_buffer(&reply_codec);
328 process_reply(forb_port_t *port, CDR_Codec *codec)
330 forb_iop_reply_header rh;
331 forb_t *forb = port->forb;
334 forb_iop_reply_header_deserialize(codec, &rh);
335 /* Reply data are 8 byte aligned */
336 CDR_get_align(codec, 8);
337 ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
338 req = forb_request_find(forb, &rh.request_id);
340 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
343 if (rh.flags & forb_iop_FLAG_EXCEPTION) {
344 forb_exception_deserialize(codec, req->env);
346 req->cdr_reply = codec;
348 req->reply_processed = &port->reply_processed;
350 /* Resume the stub witing in forb_wait_for_reply() */
351 forb_syncobj_signal(&req->reply_ready);
353 /* Wait for stub to process the results from the codec's buffer */
354 forb_syncobj_wait(&port->reply_processed);
358 * Process incomming HELLO messages.
360 * For every incomming HELLO message the peer table is searched
361 * whether it already contains a record for that peer or not. If not,
362 * the new peer is added to the table and another hello message is
363 * sent so that the new peer discovers us quickly.
365 * @param port Port, where hello was received
366 * @param codec Buffer with the hello message
369 process_hello(forb_port_t *port, CDR_Codec *codec)
371 forb_server_id server_id;
374 forb_t *forb = port->forb;
376 /* printf("Hello received at port %p\n", port); */
378 forb_server_id_deserialize(codec, &server_id);
381 ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
383 if (port->proto->deserialize_addr) {
384 port->proto->deserialize_addr(codec, &addr);
386 if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
387 peer = forb_peer_find(forb, &server_id);
388 if (peer && peer->state == FORB_PEER_DISCOVERED) {
389 /* TODO: Update last hello receive time */
392 /* New peer discovered */
393 bool notify_waiters = false;
394 if (peer /* && peer->state == FORB_PEER_WANTED */) {
395 notify_waiters = true;
397 peer = forb_peer_new();
400 fosa_mutex_lock(&forb->peer_mutex);
401 peer->server_id = server_id;
404 peer->state = FORB_PEER_DISCOVERED;
405 if (notify_waiters) {
406 fosa_cond_broadcast(&peer->cond);
409 forb_peer_nolock_insert(forb, peer);
411 fosa_mutex_unlock(&forb->peer_mutex);
414 /* Broadcast our hello packet now */
415 forb_syncobj_signal(&port->hello);
421 process_message(forb_port_t *port, const forb_iop_message_header *mh,
424 CORBA_long data_size = CDR_data_size(codec);
425 /* TODO: Check destination address, whether the message is for
426 * us or should be routed. */
428 /* TODO: If there is some processing error, skip the rest of
429 * the message according mh.message_size. */
430 switch (mh->message_type) {
431 case forb_iop_REQUEST:
432 process_request(port, codec, mh->message_size);
435 process_reply(port, codec);
438 process_hello(port, codec);
441 ul_logmsg("rcvd unknown message type\n");
444 if (CDR_data_size(codec) != data_size - mh->message_size) {
445 ul_logerr("Message of type %d handled incorrectly (size=%d, processed=%d)\n",
446 mh->message_type, mh->message_size,
447 data_size - CDR_data_size(codec));
452 * Thread run for every port to receive FORB messages from that port.
454 * @param arg Pointer to ::forb_port_t typecasted to void *.
456 * @return Always NULL
458 void *forb_iop_receiver_thread(void *arg)
460 forb_port_t *port = arg;
461 const forb_proto_t *proto = port->proto;
462 CDR_Codec *c = &port->codec;
464 forb_iop_message_header mh;
465 bool header_received = false;
467 while (!port->finish) {
468 if (c->rptr == c->wptr) {
469 /* The buffer is empty now - start writing from begining*/
470 CDR_buffer_reset(c, 0);
472 /* TODO: If there is not enough space for reception,
473 * we should shift the already received data to the
474 * beginning of the buffer. */
475 rcvd = proto->recv(port,
477 c->wptr_max - c->wptr);
479 c->wptr_last = c->wptr;
481 /* While there are some data in the buffer, process them. */
482 while (CDR_data_size(c) > 0) {
483 len = CDR_data_size(c);
484 /* Wait for and then process message header */
485 if (!header_received) {
486 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
487 if (c->rptr % 8 != 0) {
488 ul_logerr("Header doesn't start at 8 byte bounday\n");
490 header_received = forb_iop_process_message_header(&mh, c);
491 len = CDR_data_size(c);
493 break; /* Wait for more data to arrive*/
497 /* Wait for and then process the message body */
498 if (header_received) {
499 if (len >= mh.message_size) {
500 process_message(port, &mh, c);
501 /* Wait for the next message */
502 header_received = false;
504 break; /* Wait for more data to arrive*/
513 discovery_cleanup(void *codec)
515 CDR_codec_release_buffer((CDR_Codec*)codec);
516 /* TODO: Broadcast some kind of bye bye message */
521 * Thread run for every port to broadcast HELLO messages. These
522 * messages are used for a FORB to discover all peers (and in future
523 * also to detect their disconnection).
525 * @param arg Pointer to ::forb_port_t typecasted to void *.
527 * @return Always NULL
529 void *forb_iop_discovery_thread(void *arg)
531 forb_port_t *port = arg;
532 const forb_proto_t *proto = port->proto;
534 fosa_abs_time_t hello_time;
535 fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
538 CDR_codec_init_static(&codec);
539 CDR_buffer_init(&codec, 1024, 0);
541 pthread_cleanup_push(discovery_cleanup, &codec);
543 /* Next hello interval is now */
544 fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
546 while (!port->finish) {
547 /* Wait for next hello interval or until somebody
549 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
550 /* sem_timedwait would be more appropriate */
551 if (ret == FOSA_ETIMEDOUT) {
552 hello_time = fosa_abs_time_incr(hello_time, hello_interval);
553 } else if (ret != 0) {
554 ul_logerr("hello syncobj error: %s\n", strerror(ret));
557 if (port->finish) break;
559 CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
560 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
561 proto->serialize_addr);
562 /* printf("Broadcasting hello from port %p\n", port); */
563 proto->broadcast(port, &codec.buffer[codec.rptr],
564 CDR_data_size(&codec));
567 pthread_cleanup_pop(1);