]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/iop.c
Added callback for peer discovery
[frescor/forb.git] / src / iop.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   iop.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Sep  1 21:51:58 2008
51  * 
52  * @brief  Implementation of Inter-ORB protocol.
53  * 
54  * 
55  */
56 #include "iop.h"
57 #include <forb/cdr.h>
58 #include <forb/object.h>
59 #include "proto.h"
60 #include <ul_log.h>
61 #include "exec_req.h"
62 #include <errno.h>
63 #include "peer.h"
64
65 /** Version of the protocol */
66 #define VER_MAJOR 0
67 #define VER_MINOR 0
68
69 #define VER(major, minor) ((major)<<8 || (minor))
70
71 extern UL_LOG_CUST(ulogd_forb_iop);
72
73
74 CORBA_boolean
75 forb_iop_prepend_message_header(FORB_CDR_Codec *codec, forb_iop_message_type mt)
76 {
77         CORBA_boolean ret;
78         forb_iop_message_header mh;
79         mh.proto_version.major = VER_MAJOR;
80         mh.proto_version.minor = VER_MINOR;
81         mh.message_type = mt;
82         mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
83         mh.message_size = FORB_CDR_data_size(codec);
84         ret = FORB_CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
85         if (ret) {
86                 ret = forb_iop_message_header_serialize(codec, &mh);
87         }
88         return ret;
89 }
90
91 CORBA_boolean
92 forb_iop_prepare_request(forb_request_t *req,
93                          char *iface,
94                          unsigned method_ind,
95                          CORBA_Environment *env)
96 {
97         CORBA_boolean ret;
98         forb_iop_request_header rh;
99
100         rh.request_id = req->request_id;
101         rh.iface = iface;
102         rh.objkey = forb_object_to_key(req->obj);
103         rh.method_index = method_ind;
104         rh.source = forb_object_to_forb(req->obj)->server_id;
105         ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
106         if (ret) {
107                 /* Request body is 8 byte aligned */
108                 ret = FORB_CDR_put_align(&req->cdr_request, 8);
109         }
110         return ret;
111 }
112
113 static CORBA_boolean
114 forb_iop_prepare_hello(FORB_CDR_Codec *codec,
115                        const forb_server_id *server_id,
116                        const void *src_addr,
117                        CORBA_boolean (*serialize_addr)(FORB_CDR_Codec *codec, const void *addr),
118                        const CORBA_string orb_id)
119 {
120         if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
121         if (serialize_addr) {
122                 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
123         }
124         if (!CORBA_string_serialize(codec, &orb_id)) return CORBA_FALSE;
125         if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
126         return CORBA_TRUE;
127 }
128
129 bool
130 forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *codec)
131 {
132         /* FIXME: If we have multiple protocol versions, use different
133          * type (independent from version) for return value instead of
134          * mh */
135         forb_iop_version_deserialize(codec, &mh->proto_version);
136         switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
137                 case VER(VER_MAJOR, VER_MINOR):
138                         forb_iop_message_type_deserialize(codec, &mh->message_type);
139                         forb_iop_message_flags_deserialize(codec, &mh->flags);
140                         /* Check whwther the type is meaningfull */
141                         switch (mh->message_type) {
142                                 case forb_iop_REQUEST:
143                                 case forb_iop_REPLY:
144                                 case forb_iop_HELLO:
145                                         break;
146                                 default:
147                                         return false;
148                         }
149                         codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
150                                 LittleEndian : BigEndian;                       
151                         CORBA_unsigned_long_deserialize(codec, &mh->message_size);
152                         return true;
153                         break;                  
154                 default:
155                         return false;
156         }
157 }
158
159 static inline CORBA_boolean
160 forb_exception_serialize(FORB_CDR_Codec *codec, struct forb_env *env)
161 {
162         return CORBA_long_serialize(codec, &env->major);
163 }
164
165 static inline CORBA_boolean
166 forb_exception_deserialize(FORB_CDR_Codec *codec, struct forb_env *env)
167 {
168         /* TODO: Declare exceptions in IDL and don't typecast here. */
169         return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
170 }
171
172 void
173 forb_iop_send_reply(forb_t *forb,
174            forb_server_id *dest,
175            FORB_CDR_Codec *codec,
176            CORBA_long request_id,
177            struct forb_env *env)
178 {
179         forb_iop_reply_header reply_header;
180         CORBA_boolean ret;
181         forb_peer_t *peer;
182         fosa_abs_time_t timeout;
183         
184         reply_header.request_id = request_id;
185         reply_header.flags = 0;
186         if (forb_exception_occurred(env)) {
187                 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
188                 FORB_CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
189                                              forb_iop_REPLY_HEADER_SIZE);
190                 forb_exception_serialize(codec, env);
191         }
192         /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
193          * header is shorter. We want reply data to be 8 byte
194          * aligned */
195         ret = FORB_CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
196         if (!ret) {
197                 goto err;
198         }
199         ret = forb_iop_reply_header_serialize(codec, &reply_header);
200         if (!ret) {
201                 goto err;
202         }
203         forb_iop_prepend_message_header(codec, forb_iop_REPLY);
204
205         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
206         timeout = fosa_abs_time_incr(timeout,
207                                      fosa_msec_to_rel_time(1000));
208
209         peer = forb_get_next_hop(forb, dest, &timeout);
210         if (!peer) {
211                 ul_logerr("Reply destination not found\n");
212                 goto err;
213         }
214         forb_proto_send(peer, codec);
215         forb_peer_put(peer);
216 err:
217         ;
218 }
219
220
221 static void
222 process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
223 {
224         forb_iop_request_header request_header;
225         CORBA_boolean ret;
226         forb_object obj;
227         size_t n;
228         forb_t *forb = port->forb;
229         struct forb_env env;
230         FORB_CDR_Codec reply_codec;
231         forb_exec_req_t *exec_req;
232         uint32_t req_size, data_size, header_size;
233         char str[32];
234
235         data_size = FORB_CDR_data_size(codec);
236         ret = forb_iop_request_header_deserialize(codec, &request_header);
237         if (!ret) {
238                 ul_logerr("Malformed request recevied\n");
239                 env.major = FORB_EX_COMM_FAILURE;
240                 goto out;
241         }
242         ret = FORB_CDR_get_align(codec, 8);
243         if (!ret) {
244                 ul_logerr("Malformed request recevied\n");
245                 env.major = FORB_EX_COMM_FAILURE;
246                 goto out;
247         }
248
249         header_size = data_size - FORB_CDR_data_size(codec);
250         req_size = message_size - header_size;
251
252         ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
253                   forb_server_id_to_string(str, &request_header.source, sizeof(str)),
254                   request_header.request_id,
255                   request_header.iface,
256                   request_header.method_index);
257         
258         obj = forb_key_to_object(forb, request_header.objkey);
259         if (!obj) {
260                 ul_logerr("Nonexistent object key\n");
261                 env.major = FORB_EX_OBJECT_NOT_EXIST;
262                 goto send_execption;
263         }
264
265         n = strlen(request_header.iface);
266         ret = strncmp(request_header.iface, obj->interface->name, n);
267         forb_free(request_header.iface);
268         request_header.iface = NULL;
269         if (ret != 0) {
270                 env.major = FORB_EX_INV_OBJREF;
271                 ul_logerr("Object reference has incorrect type\n");
272                 goto send_execption;
273         }
274         
275         if (request_header.method_index >= obj->interface->num_methods) {
276                 env.major = FORB_EX_INV_IDENT;
277                 ul_logerr("To high method number\n");
278                 goto send_execption;
279         }
280
281         if (!obj->executor) {
282                 env.major = FORB_EX_NO_IMPLEMENT;
283                 ul_logerr("No executor for object\n");
284                 goto send_execption;
285
286         }
287
288         /* Enqueue execution request */
289         exec_req = forb_malloc(sizeof(*exec_req));
290         if (exec_req) {
291                 exec_req->request_id = request_header.request_id;
292                 exec_req->source = request_header.source;
293                 exec_req->obj = obj;
294                 exec_req->method_index = request_header.method_index;
295                 /* Copy the request to exec_req */
296                 FORB_CDR_codec_init_static(&exec_req->codec, codec->orb);
297                 ret = FORB_CDR_buffer_init(&exec_req->codec,
298                                       req_size,
299                                       0);
300                 if (!ret) {
301                         env.major = FORB_EX_NO_MEMORY;
302                         ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
303                                   req_size, header_size);
304                         goto send_execption;
305                 }
306                 exec_req->codec.data_endian = codec->data_endian;
307                 FORB_CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
308                 /* TODO: Use better data structure for incomming
309                    buffer to achieve zero-copy behaviour. */
310                 forb_exec_req_ins_tail(obj->executor, exec_req);
311         } else {
312                 env.major = FORB_EX_NO_MEMORY;
313                 ul_logerr("No memory for executor request\n");
314                 goto send_execption;
315         }
316         
317 out:
318         return;
319         
320 send_execption:
321         FORB_CDR_codec_init_static(&reply_codec, codec->orb);   
322         ret = FORB_CDR_buffer_init(&reply_codec, 4096,
323                               forb_iop_MESSAGE_HEADER_SIZE +
324                               forb_iop_REPLY_HEADER_SIZE);
325         if (!ret) {
326                 ul_logerr("No memory for exception reply buffer\n");
327                 return;
328         }
329
330         forb_iop_send_reply(port->forb, &request_header.source,
331                             &reply_codec, request_header.request_id, &env);
332         FORB_CDR_codec_release_buffer(&reply_codec);
333 }
334
335 static void
336 process_reply(forb_port_t *port, FORB_CDR_Codec *codec)
337 {
338         forb_iop_reply_header rh;
339         forb_t *forb = port->forb;
340         forb_request_t *req;
341
342         forb_iop_reply_header_deserialize(codec, &rh);
343         /* Reply data are 8 byte aligned */
344         FORB_CDR_get_align(codec, 8);
345         ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
346         req = forb_request_find(forb, &rh.request_id);
347         if (!req) {
348                 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
349                 return;
350         }
351         forb_request_delete(forb, req); /* Deregister request from forb */
352
353         if (rh.flags & forb_iop_FLAG_EXCEPTION) {
354                 forb_exception_deserialize(codec, req->env);
355         } else {
356                 req->cdr_reply = codec;
357         }
358
359         /* Tell the stub where to signal that reply processing is
360          * finished */
361         req->reply_processed = &port->reply_processed;
362
363         /* Resume the stub waiting in forb_wait_for_reply() */
364         forb_syncobj_signal(&req->reply_ready);
365
366         /* Wait for stub to process the results from the codec's buffer */
367         forb_syncobj_wait(&port->reply_processed);
368 }
369
370 void new_peer_discovered(forb_port_t *port, forb_peer_t *peer,
371                          forb_server_id server_id, void *addr,
372                          CORBA_string orb_id)
373 {
374         forb_t *forb = port->forb;
375         bool notify_waiters = false;
376         if (peer /* && peer->state == FORB_PEER_WANTED */) {
377                 notify_waiters = true;
378         } else {
379                 peer = forb_peer_new();
380         }
381         if (!peer)
382                 return;
383
384         fosa_mutex_lock(&forb->peer_mutex);
385         peer->server_id = server_id;
386         peer->port = port;
387         peer->addr = addr;
388         peer->orb_id = orb_id;
389         peer->state = FORB_PEER_DISCOVERED;
390         if (notify_waiters) {
391                 fosa_cond_broadcast(&peer->cond);
392         } else {
393                 forb_peer_nolock_insert(forb, forb_peer_get(peer));
394         }
395         forb_port_peer_ins_tail(port, forb_peer_get(peer));
396         fosa_mutex_unlock(&forb->peer_mutex);
397         {
398                 char str[60];
399                 ul_logdeb("new peer discovered %s (orb_id '%s')\n",
400                           forb_server_id_to_string(str, &peer->server_id, sizeof(str)),
401                           orb_id);
402         }
403         if (forb->attr.peer_discovery_callback) {
404                 forb_orb peer_orb = forb_object_new(forb->orb, &peer->server_id, 0);
405                 forb->attr.peer_discovery_callback(peer_orb, orb_id);
406                 forb_object_release(peer_orb);
407         }
408         forb_peer_put(peer);
409
410         /* Broadcast our hello packet now */
411         forb_syncobj_signal(&port->hello);
412 }
413
414 /** 
415  * Process incomming HELLO messages.
416  *
417  * For every incomming HELLO message the peer table is searched
418  * whether it already contains a record for that peer or not. If not,
419  * the new peer is added to the table and another hello message is
420  * sent so that the new peer discovers us quickly.
421  * 
422  * @param port Port, where hello was received
423  * @param codec Buffer with the hello message
424  */
425 static void
426 process_hello(forb_port_t *port, FORB_CDR_Codec *codec)
427 {
428         forb_server_id server_id;
429         void *addr = NULL;
430         forb_peer_t *peer;
431         forb_t *forb = port->forb;
432         CORBA_string peer_orb_id = NULL;
433
434 /*      printf("Hello received at port %p\n", port); */
435
436         forb_server_id_deserialize(codec, &server_id);
437         {
438                 char str[60];
439                 ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
440         }
441         if (port->desc.proto->deserialize_addr) {
442                 port->desc.proto->deserialize_addr(codec, &addr);
443         }
444         CORBA_string_deserialize(codec, &peer_orb_id);
445         if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
446                 peer = forb_peer_find(forb, &server_id);
447                 if (peer && peer->state == FORB_PEER_DISCOVERED) {
448                         /* TODO: Update last hello receive time */
449                         if (addr)
450                                 forb_free(addr);
451                         if (peer_orb_id)
452                                 forb_free(peer_orb_id);
453                         forb_peer_put(peer);
454                 } else {
455                         new_peer_discovered(port, peer, server_id, addr, peer_orb_id);
456                 }
457         }
458 }
459
460 static void
461 process_message(forb_port_t *port, const forb_iop_message_header *mh,
462                 FORB_CDR_Codec *codec)
463 {
464         CORBA_long data_size = FORB_CDR_data_size(codec);
465         /* TODO: Check destination address, whether the message is for
466          * us or should be routed. */
467
468         /* TODO: If there is some processing error, skip the rest of
469          * the message according mh.message_size. */
470         switch (mh->message_type) {
471                 case forb_iop_REQUEST:
472                         process_request(port, codec, mh->message_size);
473                         break;
474                 case forb_iop_REPLY:
475                         process_reply(port, codec);
476                         break;
477                 case forb_iop_HELLO:
478                         process_hello(port, codec);
479                         break;
480                 default:
481                         ul_logmsg("rcvd unknown message type\n");
482                         break;
483         }
484         if (FORB_CDR_data_size(codec) != data_size - mh->message_size) {
485                 size_t processed = data_size - FORB_CDR_data_size(codec);
486                 ul_logmsg("Message of type %d handled incorrectly (size=%d, processed=%d); fixing\n",
487                           mh->message_type, mh->message_size, processed);
488                           ;
489                 codec->rptr += mh->message_size - processed;
490         }
491 }
492
493 /** 
494  * Thread run for every port to receive FORB messages from that port. 
495  * 
496  * @param arg Pointer to ::forb_port_t typecasted to void *.
497  * 
498  * @return Always NULL
499  */
500 void *forb_iop_receiver_thread(void *arg)
501 {
502         forb_port_t *port = arg;
503         const forb_proto_t *proto = port->desc.proto;
504         FORB_CDR_Codec *c = &port->codec;
505         size_t rcvd, len;
506         forb_iop_message_header mh;
507         bool header_received = false;
508
509         while (!port->finish) {
510                 if (c->rptr == c->wptr) {
511                         /* The buffer is empty now - start writing from begining*/
512                         FORB_CDR_buffer_reset(c, 0);
513                 }
514                 /* TODO: If there is not enough space for reception,
515                  * we should shift the already received data to the
516                  * beginning of the buffer. */
517                 rcvd = proto->recv(port,
518                                    &c->buffer[c->wptr],
519                                    c->wptr_max - c->wptr);
520                 if (rcvd < 0) {
521                         ul_logmsg("recv returned error %d\n", rcvd);
522                         return NULL;
523                 }
524                 c->wptr += rcvd;
525                 c->wptr_last = c->wptr;
526
527                 /* While there are some data in the buffer, process them. */
528                 while (FORB_CDR_data_size(c) > 0) {
529                         len = FORB_CDR_data_size(c);
530                         /* Wait for and then process message header */
531                         if (!header_received) {
532                                 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
533                                         if (c->rptr % 8 != 0) {
534                                                 ul_logerr("Header doesn't start at 8 byte bounday\n");
535                                         }
536                                         header_received = forb_iop_process_message_header(&mh, c);
537                                         len = FORB_CDR_data_size(c);
538                                 } else {
539                                         break; /* Wait for more data to arrive*/
540                                 }
541                         }
542                         
543                         /* Wait for and then process the message body */
544                         if (header_received) {
545                                 if (len >= mh.message_size) {
546                                         process_message(port, &mh, c);
547                                         /* Wait for the next message */
548                                         header_received = false;
549                                 } else {
550                                         break; /* Wait for more data to arrive*/
551                                 }
552                         }
553                 }
554         }
555         return NULL;
556 }
557
558 static void
559 discovery_cleanup(void *codec)
560 {
561         FORB_CDR_codec_release_buffer((FORB_CDR_Codec*)codec);
562         /* TODO: Broadcast some kind of bye bye message */
563 }
564
565
566 /** 
567  * Thread run for every port to broadcast HELLO messages. These
568  * messages are used for a FORB to discover all peers (and in future
569  * also to detect their disconnection).
570  * 
571  * @param arg Pointer to ::forb_port_t typecasted to void *.
572  * 
573  * @return Always NULL
574  */
575 void *forb_iop_discovery_thread(void *arg)
576 {
577         forb_port_t *port = arg;
578         const forb_proto_t *proto = port->desc.proto;
579         FORB_CDR_Codec codec;
580         fosa_abs_time_t hello_time;
581         fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
582         int ret;
583
584         FORB_CDR_codec_init_static(&codec, port->forb->orb);
585         FORB_CDR_buffer_init(&codec, 1024, 0);
586
587         pthread_cleanup_push(discovery_cleanup, &codec);
588         
589         /* Next hello interval is now */
590         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
591         
592         while (!port->finish) {
593                 /* Wait for next hello interval or until somebody
594                  * signals us. */
595                 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
596                 /* sem_timedwait would be more appropriate */
597                 if (ret == FOSA_ETIMEDOUT) {
598                         hello_time = fosa_abs_time_incr(hello_time, hello_interval);
599                 } else if (ret != 0) {
600                         ul_logerr("hello syncobj error: %s\n", strerror(ret));
601                 }
602
603                 if (port->finish) break;
604
605                 FORB_CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
606                 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->desc.addr,
607                                        proto->serialize_addr, port->forb->attr.orb_id);
608 /*              printf("Broadcasting hello from port %p\n", port);  */
609                 proto->broadcast(port, &codec.buffer[codec.rptr],
610                                  FORB_CDR_data_size(&codec));
611         }
612
613         pthread_cleanup_pop(1);
614         return NULL;
615 }
616
617