]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/iop.c
Fixed problem with port destroying
[frescor/forb.git] / src / iop.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   iop.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Sep  1 21:51:58 2008
51  * 
52  * @brief  Implementation of Inter-ORB protocol.
53  * 
54  * 
55  */
56 #include "iop.h"
57 #include <forb/cdr.h>
58 #include <forb/object.h>
59 #include "proto.h"
60 #include <ul_log.h>
61 #include "exec_req.h"
62 #include <errno.h>
63
64 /** Version of the protocol */
65 #define VER_MAJOR 0
66 #define VER_MINOR 0
67
68 #define VER(major, minor) ((major)<<8 || (minor))
69
70 extern UL_LOG_CUST(ulogd_forb_iop);
71
72
73 CORBA_boolean
74 forb_iop_prepend_message_header(CDR_Codec *codec, forb_iop_message_type mt)
75 {
76         CORBA_boolean ret;
77         forb_iop_message_header mh;
78         mh.proto_version.major = VER_MAJOR;
79         mh.proto_version.minor = VER_MINOR;
80         mh.message_type = mt;
81         mh.flags = (codec->data_endian == LittleEndian) ? forb_iop_LITTLE_ENDIAN : 0;
82         mh.message_size = CDR_data_size(codec);
83         ret = CDR_buffer_prepend(codec, forb_iop_MESSAGE_HEADER_SIZE);
84         if (ret) {
85                 ret = forb_iop_message_header_serialize(codec, &mh);
86         }
87         return ret;
88 }
89
90 CORBA_boolean
91 forb_iop_prepare_request(forb_request_t *req,
92                          char *iface,
93                          unsigned method_ind,
94                          CORBA_Environment *env)
95 {
96         CORBA_boolean ret;
97         forb_iop_request_header rh;
98
99         rh.request_id = req->request_id;
100         rh.iface = iface;
101         rh.objkey = forb_object_to_key(req->obj);
102         rh.method_index = method_ind;
103         rh.source = forb_object_to_forb(req->obj)->server_id;
104         ret = forb_iop_request_header_serialize(&req->cdr_request, &rh);
105         if (ret) {
106                 /* Request body is 8 byte aligned */
107                 ret = CDR_put_align(&req->cdr_request, 8);
108         }
109         return ret;
110 }
111
112 CORBA_boolean
113 forb_iop_prepare_hello(CDR_Codec *codec,
114                        const forb_server_id *server_id,
115                        const void *src_addr,
116                        CORBA_boolean (*serialize_addr)(CDR_Codec *codec, const void *addr))
117 {
118         if (!forb_server_id_serialize(codec, server_id)) return CORBA_FALSE;
119         if (serialize_addr) {
120                 if (!serialize_addr(codec, src_addr)) return CORBA_FALSE;
121         }
122         if (!forb_iop_prepend_message_header(codec, forb_iop_HELLO)) return CORBA_FALSE;
123         return CORBA_TRUE;
124 }
125
126 bool
127 forb_iop_process_message_header(forb_iop_message_header *mh, CDR_Codec *codec)
128 {
129         /* FIXME: If we have multiple protocol versions, use different
130          * type (independent from version) for return value instead of
131          * mh */
132         forb_iop_version_deserialize(codec, &mh->proto_version);
133         switch (VER(mh->proto_version.major, mh->proto_version.minor)) {
134                 case VER(VER_MAJOR, VER_MINOR):
135                         forb_iop_message_type_deserialize(codec, &mh->message_type);
136                         forb_iop_message_flags_deserialize(codec, &mh->flags);
137                         /* Check whwther the type is meaningfull */
138                         switch (mh->message_type) {
139                                 case forb_iop_REQUEST:
140                                 case forb_iop_REPLY:
141                                 case forb_iop_HELLO:
142                                         break;
143                                 default:
144                                         return false;
145                         }
146                         codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
147                                 LittleEndian : BigEndian;                       
148                         CORBA_unsigned_long_deserialize(codec, &mh->message_size);
149                         return true;
150                         break;                  
151                 default:
152                         return false;
153         }
154 }
155
156 static inline CORBA_boolean
157 forb_exception_serialize(CDR_Codec *codec, struct forb_env *env)
158 {
159         return CORBA_long_serialize(codec, &env->major);
160 }
161
162 static inline CORBA_boolean
163 forb_exception_deserialize(CDR_Codec *codec, struct forb_env *env)
164 {
165         /* TODO: Declare exceptions in IDL and don't typecast here. */
166         return CORBA_long_deserialize(codec, (CORBA_long*)&env->major);
167 }
168
169 void
170 forb_iop_send_reply(forb_t *forb,
171            forb_server_id *dest,
172            CDR_Codec *codec,
173            CORBA_long request_id,
174            struct forb_env *env)
175 {
176         forb_iop_reply_header reply_header;
177         CORBA_boolean ret;
178         forb_peer_t *peer;
179         fosa_abs_time_t timeout;
180         
181         reply_header.request_id = request_id;
182         reply_header.flags = 0;
183         if (forb_exception_occured(env)) {
184                 reply_header.flags |= forb_iop_FLAG_EXCEPTION;
185                 CDR_buffer_reset(codec, forb_iop_MESSAGE_HEADER_SIZE +
186                                         forb_iop_REPLY_HEADER_SIZE);
187                 forb_exception_serialize(codec, env);
188         }
189         /* forb_iop_REPLY_HEADER_SIZE equals to 8 even if the real
190          * header is shorter. We want reply data to be 8 byte
191          * aligned */
192         ret = CDR_buffer_prepend(codec, forb_iop_REPLY_HEADER_SIZE);
193         if (!ret) {
194                 goto err;
195         }
196         ret = forb_iop_reply_header_serialize(codec, &reply_header);
197         if (!ret) {
198                 goto err;
199         }
200         forb_iop_prepend_message_header(codec, forb_iop_REPLY);
201
202         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
203         timeout = fosa_abs_time_incr(timeout,
204                                      fosa_msec_to_rel_time(1000));
205
206         peer = forb_get_next_hop(forb, dest, &timeout);
207         if (!peer) {
208                 ul_logerr("Reply destination not found\n");
209                 goto err;
210         }
211         forb_proto_send(peer, codec);
212         forb_peer_put(peer);
213 err:
214         ;
215 }
216
217
218 static void
219 process_request(forb_port_t *port, CDR_Codec *codec, uint32_t message_size)
220 {
221         forb_iop_request_header request_header;
222         CORBA_boolean ret;
223         forb_object obj;
224         size_t n;
225         forb_t *forb = port->forb;
226         struct forb_env env;
227         CDR_Codec reply_codec;
228         forb_exec_req_t *exec_req;
229         uint32_t req_size, data_size, header_size;
230         char str[32];
231
232         data_size = CDR_data_size(codec);
233         ret = forb_iop_request_header_deserialize(codec, &request_header);
234         if (!ret) {
235                 ul_logerr("Malformed request recevied\n");
236                 env.major = FORB_EX_COMM_FAILURE;
237                 goto out;
238         }
239         ret = CDR_get_align(codec, 8);
240         if (!ret) {
241                 ul_logerr("Malformed request recevied\n");
242                 env.major = FORB_EX_COMM_FAILURE;
243                 goto out;
244         }
245
246         header_size = data_size - CDR_data_size(codec);
247         req_size = message_size - header_size;
248
249         ul_logdeb("rcvd request: src=%s, id=%u, iface=%s, method=%hd\n",
250                   forb_server_id_to_string(str, &request_header.source, sizeof(str)),
251                   request_header.request_id,
252                   request_header.iface,
253                   request_header.method_index);
254         
255         obj = forb_key_to_object(forb, request_header.objkey);
256         if (!obj) {
257                 ul_logerr("Nonexistent object key\n");
258                 env.major = FORB_EX_OBJECT_NOT_EXIST;
259                 goto send_execption;
260         }
261
262         n = strlen(request_header.iface);
263         if (strncmp(request_header.iface, obj->interface->name, n) != 0) {
264                 env.major = FORB_EX_INV_OBJREF;
265                 ul_logerr("Object reference has incorrect type\n");
266                 goto send_execption;
267         }
268         if (request_header.method_index >= obj->interface->num_methods) {
269                 env.major = FORB_EX_INV_IDENT;
270                 ul_logerr("To high method number\n");
271                 goto send_execption;
272         }
273
274         if (!obj->executor) {
275                 env.major = FORB_EX_NO_IMPLEMENT;
276                 ul_logerr("No executor for object\n");
277                 goto send_execption;
278
279         }
280
281         /* Enqueue execution request */
282         exec_req = forb_malloc(sizeof(*exec_req));
283         if (exec_req) {
284                 exec_req->request_id = request_header.request_id;
285                 exec_req->source = request_header.source;
286                 exec_req->obj = obj;
287                 exec_req->method_index = request_header.method_index;
288                 /* Copy the request to exec_req */
289                 CDR_codec_init_static(&exec_req->codec);
290                 ret = CDR_buffer_init(&exec_req->codec,
291                                       req_size,
292                                       0);
293                 if (!ret) {
294                         env.major = FORB_EX_NO_MEMORY;
295                         ul_logerr("No memory for executor request bufer of size %d (header_size=%d)\n",
296                                   req_size, header_size);
297                         goto send_execption;
298                 }
299                 exec_req->codec.data_endian = codec->data_endian;
300                 CDR_buffer_gets(codec, exec_req->codec.buffer, req_size);
301                 /* TODO: Use better data structure for incomming
302                    buffer to achieve zero-copy behaviour. */
303                 forb_exec_req_ins_tail(obj->executor, exec_req);
304         } else {
305                 env.major = FORB_EX_NO_MEMORY;
306                 ul_logerr("No memory for executor request\n");
307                 goto send_execption;
308         }
309         
310 out:
311         return;
312         
313 send_execption:
314         CDR_codec_init_static(&reply_codec);    
315         ret = CDR_buffer_init(&reply_codec, 4096,
316                               forb_iop_MESSAGE_HEADER_SIZE +
317                               forb_iop_REPLY_HEADER_SIZE);
318         if (!ret) {
319                 ul_logerr("No memory for exception reply buffer\n");
320                 return;
321         }
322
323         forb_iop_send_reply(port->forb, &request_header.source, &reply_codec, request_header.request_id, &env);
324         CDR_codec_release_buffer(&reply_codec);
325 }
326
327 static void
328 process_reply(forb_port_t *port, CDR_Codec *codec)
329 {
330         forb_iop_reply_header rh;
331         forb_t *forb = port->forb;
332         forb_request_t *req;
333
334         forb_iop_reply_header_deserialize(codec, &rh);
335         /* Reply data are 8 byte aligned */
336         CDR_get_align(codec, 8);
337         ul_logdeb("rcvd reply: id=%u\n", rh.request_id);
338         req = forb_request_find(forb, &rh.request_id);
339         if (!req) {
340                 ul_logerr("Received reply to unknown request_id %u\n", rh.request_id);
341                 return;
342         }
343         if (rh.flags & forb_iop_FLAG_EXCEPTION) {
344                 forb_exception_deserialize(codec, req->env);
345         } else {
346                 req->cdr_reply = codec;
347         }
348         req->reply_processed = &port->reply_processed;
349
350         /* Resume the stub witing in forb_wait_for_reply() */
351         forb_syncobj_signal(&req->reply_ready);
352
353         /* Wait for stub to process the results from the codec's buffer */
354         forb_syncobj_wait(&port->reply_processed);
355 }
356
357 /** 
358  * Process incomming HELLO messages.
359  *
360  * For every incomming HELLO message the peer table is searched
361  * whether it already contains a record for that peer or not. If not,
362  * the new peer is added to the table and another hello message is
363  * sent so that the new peer discovers us quickly.
364  * 
365  * @param port Port, where hello was received
366  * @param codec Buffer with the hello message
367  */
368 static void
369 process_hello(forb_port_t *port, CDR_Codec *codec)
370 {
371         forb_server_id server_id;
372         void *addr = NULL;
373         forb_peer_t *peer;
374         forb_t *forb = port->forb;
375
376 /*      printf("Hello received at port %p\n", port); */
377
378         forb_server_id_deserialize(codec, &server_id);
379         {
380                 char str[60];
381                 ul_logdeb("rcvd hello from %s\n", forb_server_id_to_string(str, &server_id, sizeof(str)));
382         }
383         if (port->proto->deserialize_addr) {
384                 port->proto->deserialize_addr(codec, &addr);
385         }
386         if (forb_server_id_cmp(&server_id, &forb->server_id) != 0) {
387                 peer = forb_peer_find(forb, &server_id);
388                 if (peer && peer->state == FORB_PEER_DISCOVERED) {
389                         /* TODO: Update last hello receive time */
390                         forb_peer_put(peer);
391                 } else {
392                         /* New peer discovered */
393                         bool notify_waiters = false;
394                         if (peer /* && peer->state == FORB_PEER_WANTED */) {
395                                 notify_waiters = true;
396                         } else {
397                                 peer = forb_peer_new();
398                         }
399                         if (peer) {
400                                 fosa_mutex_lock(&forb->peer_mutex);
401                                 peer->server_id = server_id;
402                                 peer->port = port;
403                                 peer->addr = addr;
404                                 peer->state = FORB_PEER_DISCOVERED;
405                                 if (notify_waiters) {
406                                         fosa_cond_broadcast(&peer->cond);
407                                 } else {
408                                         forb_peer_get(peer);
409                                         forb_peer_nolock_insert(forb, peer);
410                                 }
411                                 fosa_mutex_unlock(&forb->peer_mutex);
412                                 forb_peer_put(peer);
413                         }
414                         /* Broadcast our hello packet now */
415                         forb_syncobj_signal(&port->hello);
416                 }
417         }
418 }
419
420 static void
421 process_message(forb_port_t *port, const forb_iop_message_header *mh,
422                 CDR_Codec *codec)
423 {
424         CORBA_long data_size = CDR_data_size(codec);
425         /* TODO: Check destination address, whether the message is for
426          * us or should be routed. */
427
428         /* TODO: If there is some processing error, skip the rest of
429          * the message according mh.message_size. */
430         switch (mh->message_type) {
431                 case forb_iop_REQUEST:
432                         process_request(port, codec, mh->message_size);
433                         break;
434                 case forb_iop_REPLY:
435                         process_reply(port, codec);
436                         break;
437                 case forb_iop_HELLO:
438                         process_hello(port, codec);
439                         break;
440                 default:
441                         ul_logmsg("rcvd unknown message type\n");
442                         break;
443         }
444         if (CDR_data_size(codec) != data_size - mh->message_size) {
445                 ul_logerr("Message of type %d handled incorrectly (size=%d, processed=%d)\n",
446                           mh->message_type, mh->message_size,
447                           data_size - CDR_data_size(codec));
448         }
449 }
450
451 /** 
452  * Thread run for every port to receive FORB messages from that port. 
453  * 
454  * @param arg Pointer to ::forb_port_t typecasted to void *.
455  * 
456  * @return Always NULL
457  */
458 void *forb_iop_receiver_thread(void *arg)
459 {
460         forb_port_t *port = arg;
461         const forb_proto_t *proto = port->proto;
462         CDR_Codec *c = &port->codec;
463         size_t rcvd, len;
464         forb_iop_message_header mh;
465         bool header_received = false;
466
467         while (!port->finish) {
468                 if (c->rptr == c->wptr) {
469                         /* The buffer is empty now - start writing from begining*/
470                         CDR_buffer_reset(c, 0);
471                 }
472                 /* TODO: If there is not enough space for reception,
473                  * we should shift the already received data to the
474                  * beginning of the buffer. */
475                 rcvd = proto->recv(port,
476                                    &c->buffer[c->wptr],
477                                    c->wptr_max - c->wptr);
478                 c->wptr += rcvd;
479                 c->wptr_last = c->wptr;
480
481                 /* While there are some data in the buffer, process them. */
482                 while (CDR_data_size(c) > 0) {
483                         len = CDR_data_size(c);
484                         /* Wait for and then process message header */
485                         if (!header_received) {
486                                 if (len >= forb_iop_MESSAGE_HEADER_SIZE) {
487                                         if (c->rptr % 8 != 0) {
488                                                 ul_logerr("Header doesn't start at 8 byte bounday\n");
489                                         }
490                                         header_received = forb_iop_process_message_header(&mh, c);
491                                         len = CDR_data_size(c);
492                                 } else {
493                                         break; /* Wait for more data to arrive*/
494                                 }
495                         }
496                         
497                         /* Wait for and then process the message body */
498                         if (header_received) {
499                                 if (len >= mh.message_size) {
500                                         process_message(port, &mh, c);
501                                         /* Wait for the next message */
502                                         header_received = false;
503                                 } else {
504                                         break; /* Wait for more data to arrive*/
505                                 }
506                         }
507                 }
508         }
509         return NULL;
510 }
511
512 static void
513 discovery_cleanup(void *codec)
514 {
515         CDR_codec_release_buffer((CDR_Codec*)codec);
516         /* TODO: Broadcast some kind of bye bye message */
517 }
518
519
520 /** 
521  * Thread run for every port to broadcast HELLO messages. These
522  * messages are used for a FORB to discover all peers (and in future
523  * also to detect their disconnection).
524  * 
525  * @param arg Pointer to ::forb_port_t typecasted to void *.
526  * 
527  * @return Always NULL
528  */
529 void *forb_iop_discovery_thread(void *arg)
530 {
531         forb_port_t *port = arg;
532         const forb_proto_t *proto = port->proto;
533         CDR_Codec codec;
534         fosa_abs_time_t hello_time;
535         fosa_rel_time_t hello_interval = fosa_msec_to_rel_time(1000*proto->hello_interval);
536         int ret;
537         
538         CDR_codec_init_static(&codec);
539         CDR_buffer_init(&codec, 1024, 0);
540
541         pthread_cleanup_push(discovery_cleanup, &codec);
542         
543         /* Next hello interval is now */
544         fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &hello_time);
545         
546         while (!port->finish) {
547                 /* Wait for next hello interval or until somebody
548                  * signals us. */
549                 ret = forb_syncobj_timedwait(&port->hello, &hello_time);
550                 /* sem_timedwait would be more appropriate */
551                 if (ret == FOSA_ETIMEDOUT) {
552                         hello_time = fosa_abs_time_incr(hello_time, hello_interval);
553                 } else if (ret != 0) {
554                         ul_logerr("hello syncobj error: %s\n", strerror(ret));
555                 }
556
557                 if (port->finish) break;
558
559                 CDR_buffer_reset(&codec, forb_iop_MESSAGE_HEADER_SIZE);
560                 forb_iop_prepare_hello(&codec, &port->forb->server_id, port->addr,
561                                        proto->serialize_addr);
562 /*              printf("Broadcasting hello from port %p\n", port);  */
563                 proto->broadcast(port, &codec.buffer[codec.rptr],
564                                  CDR_data_size(&codec));
565         }
566
567         pthread_cleanup_pop(1);
568         return NULL;
569 }
570
571