]> rtime.felk.cvut.cz Git - frescor/forb.git/blob - src/forb.c
forb: Split forb_port_destroy() to stop and destroy phases
[frescor/forb.git] / src / forb.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   forb.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Sun Oct 12 16:57:01 2008
51  * 
52  * @brief  Implementation of basic FORB's functions.
53  * 
54  * 
55  */
56
57 #ifndef _BSD_SOURCE
58 #define _BSD_SOURCE             /* Because of on_exit()  */
59 #endif
60 #include "proto.h"
61 #include "regref.h"
62 #include <forb/config.h>
63 #include <forb/forb-internal.h>
64 #include <forb.h>
65 #include "forb-idl.h"
66 #include <forb/iop.h>
67 #include <forb/object.h>
68 #include <forb/uuid.h>
69 #include <stdio.h>
70 #include <string.h>
71 #include <sys/stat.h>
72 #include <sys/types.h>
73 #include <ul_gavlcust.h>
74 #include <ul_log.h>
75 #include <ul_logreg.h>
76 #include <unistd.h>
77 #include "discovery.h"
78 #ifdef CONFIG_FORB_PROTO_UNIX
79 #include <forb/proto_unix.h>
80 #endif
81 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT
82 #include <forb/proto_inet.h>
83 #endif
84 #ifdef CONFIG_FCB
85 #include <fcb.h>
86 #include <fcb_contact_info.h>
87 #endif
88 #include <fcntl.h>
89
90 #ifdef DEBUG
91 #define UL_LOGL_DEF UL_LOGL_DEB
92 #else
93 #define UL_LOGL_DEF UL_LOGL_ERR
94 #endif
95 #include "log_domains.inc"
96
97 extern UL_LOG_CUST(ulogd_forb);
98 static int init_ul_log(void);
99
100 UL_LOGREG_DOMAINS_INIT_FUNCTION(forb_logreg_domains, forb_logreg_domains_array);
101
102 #if 0
103 static void
104 destroy_forb_on_exit(int exitcode, void *arg)
105 {
106         forb_orb orb = arg;
107         forb_destroy(orb);
108 }
109 #endif
110
111 static void
112 forb_is_alive(forb_orb _obj, CORBA_Environment *ev)
113 {
114         ul_logdeb("%s called\n", __FUNCTION__);
115 }
116
117 static struct forb_forb_orb_impl forb_implementation = {
118         .is_alive = forb_is_alive,
119 };
120
121 /**
122  * Prepares #FORB_TMP_DIR directory for use by forb
123  *
124  * @return Zero on succes, -1 on error and errno is set appropriately.
125  */
126 int
127 forb_init_tmp_dir(void)
128 {
129         struct stat st;
130         int ret;
131
132         ret = stat(FORB_TMP_DIR, &st);
133
134         if (ret == 0 && !S_ISDIR(st.st_mode)) {
135                 ret = unlink(FORB_TMP_DIR);
136                 if (ret) return -1;
137         } else if (ret == 0) {
138                 return 0;
139         }
140         return mkdir(FORB_TMP_DIR, 0777);
141 }
142
143 /** 
144  * Thread for execution of remote requests for a FORB object.
145  * 
146  * @param arg 
147  * 
148  * @return 
149  */
150 static void *
151 forb_execution_thread(void *arg)
152 {
153         forb_executor_t *executor = arg;
154         forb_executor_run(executor);
155         return NULL;
156 }
157
158 #ifdef CONFIG_FCB
159 void hack_register_fcb(forb_orb orb, forb_port_t *port)
160 {
161         forb_object fcb = forb_object_new(orb, &FCB_SERVER_ID, 1);
162         if (!fcb) {
163                 ul_logerr("Cannot allocate FCB reference\n");
164                 return;
165         }
166         forb_register_reference(fcb, fres_contract_broker_reg_name);
167         forb_object_release(fcb);
168 }
169 #else
170 #define hack_register_fcb(orb)
171 #endif
172
173
174
175 forb_orb
176 forb_init(int *argc, char **argv[], const struct forb_init_attr *attr)
177 {
178         forb_orb orb;
179         forb_t *forb;
180         int ret;
181
182         forb = forb_malloc(sizeof(*forb));
183         if (!forb) goto err;
184         memset(forb, 0, sizeof(*forb));
185
186         /* Initialize ULUT logging facility */
187         init_ul_log();
188         forb_logreg_domains();
189
190         if (attr) {
191                 forb->attr = *attr;
192         } else {
193                 memset(&forb->attr, 0, sizeof(forb->attr));
194         }
195
196         sem_init(&forb->server_ready, 0, 0);
197
198         if (forb_server_id_empty(&forb->attr.fixed_server_id)) {
199                 forb_server_id_init(&forb->server_id);
200         } else {
201                 forb->server_id = forb->attr.fixed_server_id;
202         }
203         forb_server_id_to_string(forb->server_id_str, &forb->server_id,
204                                  sizeof(forb->server_id_str));
205         ul_logdeb("Initializing forb %s\n", forb->server_id_str);
206
207         if (fosa_mutex_init(&forb->request_id_mutex, 0) != 0) goto err2;
208         if (fosa_mutex_init(&forb->port_mutex, 0) != 0) goto err2;
209         forb_port_init_head(forb);
210         
211         if (fosa_mutex_init(&forb->request_mutex, 0) != 0) goto err2;
212         forb_request_nolock_init_root_field(forb);
213
214         if (forb_discovery_init(forb) != 0) goto err2;
215
216         if (fosa_mutex_init(&forb->regref_mutex, 0) != 0) goto err2;
217         forb_regref_nolock_init_root_field(forb);
218
219         ret = forb_init_tmp_dir();
220         if (ret != 0) {
221                 ul_logerr("Cannot prepare " FORB_TMP_DIR "\n");
222                 return NULL;
223         }
224
225         forb_executor_prepare();        
226
227         orb = forb_forb_orb_new(NULL, &forb_implementation, forb);
228         if (!orb) goto err2;
229         /* Server ID must be assigned manualy */
230         orb->server = forb->server_id;
231
232         forb->orb = orb;
233
234         /* Insert our object reference to objects tree, so that we
235          * can accept remote request to our new ORB. */
236         forb_objects_nolock_insert(forb, orb);
237
238         ret = forb_executor_init(&forb->default_executor);
239         if (ret) goto err2;
240         ret = forb_executor_register_object(&forb->default_executor, orb);
241         if (ret) goto err3;
242
243         ret = fosa_thread_create(&forb->execution_thread, NULL,
244                                  forb_execution_thread, &forb->default_executor);
245         if (ret != 0)
246                 goto err3;
247
248         /* FIXME: I do not know how to deregister the exit handler if
249          * forb_destroy() is called manually. */
250         /* on_exit(destroy_forb_on_exit, orb); */
251
252 #ifdef CONFIG_FORB_PROTO_UNIX
253         {
254                 forb_port_t *port = forb_malloc(sizeof(*port));
255                 if (port) {
256                         memset(port, 0, sizeof(*port));
257                         ret = forb_unix_port_init(&port->desc, &forb->server_id);
258                         if (ret) goto err_free_unix;
259                         ret = forb_register_port(orb, port);
260                         if (ret) goto err_free_unix; /* TODO: forb_unix_port_done() */
261                         goto unix_ok;
262                 }
263         err_free_unix:
264                 free(port);
265                 goto err3;
266         unix_ok:;
267         }
268 #endif
269 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT
270         {
271                 forb_port_t *port = forb_malloc(sizeof(*port));
272                 if (port) {
273                         struct in_addr listen_on;
274
275                         memset(port, 0, sizeof(*port));
276                         listen_on.s_addr = INADDR_ANY;
277                         ret = forb_inet_port_init(&port->desc, listen_on,
278                                                   forb->attr.fixed_tcp_port);
279                         if (ret) goto err_free_inet;
280                         ret = forb_register_port(orb, port);
281                         if (ret) goto err_free_inet; /* TODO: forb_inet_port_done() */
282                         goto inet_ok;
283                 }
284         err_free_inet:
285                 free(port);
286                 goto err3;
287         inet_ok:;
288                 hack_register_fcb(orb, port);
289         }
290 #endif
291         return orb;
292
293 err3:   forb_executor_destroy(&forb->default_executor);
294 err2:   forb_free(forb);
295 err:    return NULL;
296 }
297
298 void forb_destroy(forb_orb orb)
299 {
300         forb_t *forb = forb_object_to_forb(orb);
301         forb_port_t *port;
302         forb_regref_t *regref;
303         forb_object obj;
304
305         /* Stop ports to prevent remote requests from coming */
306         ul_list_for_each(forb_port, forb, port) {
307                 forb_stop_port(port);
308         }
309
310         /* Wait for executors to finish all requests (and thus drop
311          * all references to peers). This is very inefficient for big
312          * number of objects, but we do not care */
313         gavl_cust_for_each(forb_objects_nolock, forb, obj) {
314                 forb_executor_t *executor;
315                 executor = forb_object_get_executor(obj);
316                 if (executor)
317                         forb_executor_synchronize(executor);
318         }
319
320         /* Destroy ports - this should drop all remaining references
321          * to peers and result in closing of all remote
322          * connections. */
323         ul_list_for_each_cut(forb_port, forb, port) {
324                 forb_destroy_port(port);
325         }
326
327         pthread_cancel(forb->execution_thread.pthread_id);
328         pthread_join(forb->execution_thread.pthread_id, NULL);
329         
330         /* Unregister all registered references */
331         while ((regref = forb_regref_first(forb))) {
332                 forb_unregister_reference(orb, regref->name.str);
333         }
334         
335         forb_object_release(orb);
336         forb_free(forb);
337 }
338
339 /* void */
340 /* forb_register_interface(const struct forb_interface *interface) */
341 /* { */
342 /*      gavl_insert(&type_registry, &interface->node); */
343 /* } */
344
345 /** 
346  * Initializes server ID variable.
347  * 
348  * @param server_id Serer ID to initialize.
349  */
350 void
351 forb_server_id_init(forb_server_id *server_id)
352 {
353         forb_uuid_generate((forb_uuid_t*)server_id->uuid);
354 }
355
356 /** 
357  * Checks whether the @a object is stale. Stale object is an object
358  * reference whose server cannot be contacted to handle requests.
359  * 
360  * @param object Object reference to check.
361  * 
362  * @return True if the object is stale, false otherwise.
363  */
364 bool forb_object_is_stale(forb_object object)
365 {
366         forb_orb remote_orb;
367         struct forb_env env;
368         bool stale = true;
369         
370         remote_orb = forb_get_orb_of(object);
371         if (!remote_orb) {      /* This shohuld never happen */
372                 goto err;
373         }
374         /* TODO: Check not only the ORB, but also whether the object
375          * is still registered with the remote orb. */
376         forb_orb_is_alive(remote_orb, &env);
377         if (env.major == FORB_EX_COMM_FAILURE) {
378                 /* Orb is not alive */
379                 stale = true;
380         } else {
381                 if (forb_exception_occurred(&env)) {
382                         ul_logerr("%s: unexpected exception: %s\n", __FUNCTION__, forb_strerror(&env));
383                 }
384                 stale = false;  
385         }
386
387         forb_object_release(remote_orb);
388 err:
389         return stale;
390 }
391
392
393 /** 
394  * Returns object reference of forb::orb object associated with the
395  * given object.
396  * 
397  * @param obj 
398  * 
399  * @return 
400  */
401 forb_orb
402 forb_get_orb_of(const forb_object obj)
403 {
404         forb_orb orb;
405
406         if (forb_server_id_cmp(&obj->server, &obj->orb->server) == 0) {
407                 orb = forb_object_duplicate(obj->orb);
408         } else {
409                 orb = forb_object_new(obj->orb, &obj->server, 0);
410         }
411         return orb;
412 }
413
414 /** 
415  * Returns server ID of an object reference.
416  * 
417  * @param obj 
418  * @param dest 
419  */
420 void
421 forb_get_server_id(const forb_object obj, forb_server_id *dest)
422 {
423         if (obj) {
424                 *dest = obj->server;
425         }
426 }
427
428
429 /** 
430  * Return instance data registered when the object was created by
431  * forb_XXX_new().
432  * 
433  * @param obj Object reference
434  * 
435  * @return Pointer to the registered data.
436  */
437 void *
438 forb_instance_data(const forb_object obj)
439 {
440         return forb_object_instance_data(obj);
441 }
442
443 /** 
444  * Returns error message correspondings FORB exception.
445  * 
446  * @param env Environemnt
447  * 
448  * @return Non-NULL pointer to a string.
449  */
450 const char *
451 forb_strerror(CORBA_Environment *env)
452 {
453         if (!env) {
454                 return "Invalid environemnt";
455         }
456 #define ex(e) case FORB_EX_##e: return #e; break;
457         switch (env->major) {
458         ex(NONE);
459         ex(UNKNOWN);
460         ex(BAD_PARAM);
461         ex(NO_MEMORY);
462         ex(IMP_LIMIT);
463         ex(COMM_FAILURE);
464         ex(INV_OBJREF);
465         ex(NO_PERMISSION);
466         ex(INTERNAL);
467         ex(MARSHAL);
468         ex(INITIALIZE);
469         ex(NO_IMPLEMENT);
470         ex(BAD_OPERATION);
471         ex(NO_RESOURCES);
472         ex(NO_RESPONSE);
473         ex(TRANSIENT);
474         ex(FREE_MEM);
475         ex(INV_IDENT);
476         ex(INV_FLAG);
477         ex(DATA_CONVERSION);
478         ex(OBJECT_NOT_EXIST);
479         ex(TIMEOUT);
480         ex(APPLICATION);
481         }
482 #undef ex
483         return "Invalid error number";
484 }
485
486 /** 
487  * Return server id of the requesting application.
488  *
489  * This function should be only called from within interface
490  * implementation,
491  * 
492  * @param[in] obj Object being requested
493  * @param[out] req_source Server ID of the requesting application
494  */
495 void
496 forb_get_req_source(const forb_object obj, forb_server_id *req_source)
497 {
498         if (req_source) {
499                 if (obj && obj->exec_req) {
500                         *req_source = obj->exec_req->source;
501                 } else {
502                         memset(req_source, 0, sizeof(*req_source));
503                 }
504         }
505 }
506
507 /**
508  * Internal function for allocation of sequence bufers. Used by
509  * forb_sequence_alloc_buf().
510  *
511  * @param seq
512  * @param seq_size
513  * @param buf_pptr
514  * @param maximum_ptr
515  * @param num_elements
516  * @param elem_size
517  *
518  * @return CORBA_TRUE if the allocation was sucessfull, CORBA_FALSE if
519  * it wasn't.
520  */
521 CORBA_boolean
522 __forb_sequence_alloc_buf_internal(void *seq, size_t seq_size,
523                                    void **buf_pptr, CORBA_unsigned_long *maximum_ptr,
524                                    unsigned num_elements, size_t elem_size)
525 {
526         memset(seq, 0, seq_size);
527         /*(seq)._length = 0;*/
528         if (num_elements && elem_size) *buf_pptr = forb_malloc(num_elements * elem_size);
529         else *buf_pptr = NULL;
530         *maximum_ptr = *buf_pptr ? num_elements : 0;
531         return (*buf_pptr != NULL) || (num_elements == 0);
532 }
533
534 static FILE *forb_ul_log_file;
535 static char progname[64] = "";
536
537 void
538 forb_ul_log_fnc(ul_log_domain_t *domain, int level,
539                 const char *format, va_list ap)
540 {
541         struct timespec now;
542         if(!(level&UL_LOGL_CONT)) {
543                 level&=UL_LOGL_MASK;
544                 clock_gettime(CLOCK_MONOTONIC, &now);
545                 fprintf(forb_ul_log_file,"%ld.%6ld: ", now.tv_sec, now.tv_nsec/1000);
546                 if (progname[0])
547                         fprintf(forb_ul_log_file,"%s: ", progname);
548                 if(domain && domain->name)
549                         fprintf(forb_ul_log_file,"%s: ",domain->name);
550         }
551         vfprintf(forb_ul_log_file, format, ap);
552         fflush(forb_ul_log_file);
553 }
554
555 static int init_ul_log(void)
556 {
557         char *s;
558         char *log_fname;
559         int fd;
560         int flg = 0;
561         char path[128];
562
563 /*      if(ul_log_output != NULL) */
564 /*              return 0; */
565
566         fd = open("/proc/self/cmdline", O_RDONLY);
567         if (fd >= 0) {
568                 int ret;
569                 ret = read(fd, path, sizeof(path)-1);
570                 if (ret > 0) {
571                         path[ret]=0;
572                         s = strrchr(path, '/');
573                         if (s) s++;
574                         else s = path;
575                         strncpy(progname, s, sizeof(progname)-1);
576                 }
577                 close(fd);
578         }
579
580         if((log_fname=getenv("UL_LOG_FILENAME"))!=NULL){
581                 forb_ul_log_file=fopen(log_fname,"a");
582         }
583         if(forb_ul_log_file==NULL)
584                 forb_ul_log_file=stderr;
585         
586         if((s=getenv("UL_DEBUG_FLG")) != NULL){
587                 flg=atoi(s);
588         }
589
590         ul_log_redir(forb_ul_log_fnc, flg);
591
592         if((s = getenv("UL_LOG_LEVELS")) != NULL)
593                 ul_log_domain_arg2levels(s);
594
595         return 0;
596 }
597
598 /** 
599  * Wait for the server to be ready. Internal function intended forb
600  * forbrun.
601  * 
602  * @param orb ORB.
603  * 
604  * @return Zero on success; on error -1 is returned, and errno is set
605  * to indicate the error.
606  */
607 int forb_wait_for_server_ready(forb_orb orb)
608 {
609         forb_t *forb = forb_object_to_forb(orb);
610         return sem_wait(&forb->server_ready);
611 }
612
613 /** 
614  * Signal the the FORB core that the server is ready for accepting
615  * requests.
616  *
617  * This function should be called at the initialization of server
618  * implementation at the time when all objects are registered with
619  * executors. All other servers in the same address space are
620  * initialized after this function is called which allows the other
621  * servers to use the services provided by the calling server.
622  * 
623  * @param orb ORB object.
624  * 
625  * @return Zero on success; on error -1 is returned, and errno is set
626  * to indicate the error.
627  */
628 int forb_signal_server_ready(forb_orb orb)
629 {
630         forb_t *forb = forb_object_to_forb(orb);
631         return sem_post(&forb->server_ready);
632 }