]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/forb.c
forb: Split forb_port_destroy() to stop and destroy phases
[frescor/forb.git] / src / forb.c
index 61342df00fdfad8b1ca7adee81ce2c6565fa15a6..309c6602e8f8b5a84b469be748058385e5b76e5e 100644 (file)
 /* covered by the GNU Public License.                                    */
 /**************************************************************************/
 
+/**
+ * @file   forb.c
+ * @author Michal Sojka <sojkam1@fel.cvut.cz>
+ * @date   Sun Oct 12 16:57:01 2008
+ * 
+ * @brief  Implementation of basic FORB's functions.
+ * 
+ * 
+ */
+
+#ifndef _BSD_SOURCE
 #define _BSD_SOURCE            /* Because of on_exit()  */
+#endif
 #include "proto.h"
 #include "regref.h"
-#include "regref.h"
-#include <fcntl.h>
 #include <forb/config.h>
 #include <forb/forb-internal.h>
-#include <forb/forb.h>
+#include <forb.h>
+#include "forb-idl.h"
 #include <forb/iop.h>
 #include <forb/object.h>
 #include <forb/uuid.h>
 #include <stdio.h>
-#include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <ul_log.h>
 #include <ul_logreg.h>
 #include <unistd.h>
+#include "discovery.h"
 #ifdef CONFIG_FORB_PROTO_UNIX
 #include <forb/proto_unix.h>
 #endif
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+#include <forb/proto_inet.h>
+#endif
+#ifdef CONFIG_FCB
+#include <fcb.h>
+#include <fcb_contact_info.h>
+#endif
+#include <fcntl.h>
 
 #ifdef DEBUG
 #define UL_LOGL_DEF UL_LOGL_DEB
 #include "log_domains.inc"
 
 extern UL_LOG_CUST(ulogd_forb);
+static int init_ul_log(void);
 
+UL_LOGREG_DOMAINS_INIT_FUNCTION(forb_logreg_domains, forb_logreg_domains_array);
 
+#if 0
 static void
 destroy_forb_on_exit(int exitcode, void *arg)
 {
        forb_orb orb = arg;
        forb_destroy(orb);
 }
+#endif
+
+static void
+forb_is_alive(forb_orb _obj, CORBA_Environment *ev)
+{
+       ul_logdeb("%s called\n", __FUNCTION__);
+}
+
+static struct forb_forb_orb_impl forb_implementation = {
+       .is_alive = forb_is_alive,
+};
 
 /**
- * Prepares /tmp/forb directory for use by forb
+ * Prepares #FORB_TMP_DIR directory for use by forb
  *
  * @return Zero on succes, -1 on error and errno is set appropriately.
  */
@@ -107,10 +140,42 @@ forb_init_tmp_dir(void)
        return mkdir(FORB_TMP_DIR, 0777);
 }
 
+/** 
+ * Thread for execution of remote requests for a FORB object.
+ * 
+ * @param arg 
+ * 
+ * @return 
+ */
+static void *
+forb_execution_thread(void *arg)
+{
+       forb_executor_t *executor = arg;
+       forb_executor_run(executor);
+       return NULL;
+}
+
+#ifdef CONFIG_FCB
+void hack_register_fcb(forb_orb orb, forb_port_t *port)
+{
+       forb_object fcb = forb_object_new(orb, &FCB_SERVER_ID, 1);
+       if (!fcb) {
+               ul_logerr("Cannot allocate FCB reference\n");
+               return;
+       }
+       forb_register_reference(fcb, fres_contract_broker_reg_name);
+       forb_object_release(fcb);
+}
+#else
+#define hack_register_fcb(orb)
+#endif
+
+
+
 forb_orb
-forb_init(int *argc, char **argv[], const char *orb_id)
+forb_init(int *argc, char **argv[], const struct forb_init_attr *attr)
 {
-       forb_orb object;
+       forb_orb orb;
        forb_t *forb;
        int ret;
 
@@ -119,10 +184,22 @@ forb_init(int *argc, char **argv[], const char *orb_id)
        memset(forb, 0, sizeof(*forb));
 
        /* Initialize ULUT logging facility */
-       ul_logreg_domains_static(ul_log_domains_array,
-                                sizeof(ul_log_domains_array)/sizeof(ul_log_domains_array[0]));
-       
-       forb_server_id_init(&forb->server_id);
+       init_ul_log();
+       forb_logreg_domains();
+
+       if (attr) {
+               forb->attr = *attr;
+       } else {
+               memset(&forb->attr, 0, sizeof(forb->attr));
+       }
+
+       sem_init(&forb->server_ready, 0, 0);
+
+       if (forb_server_id_empty(&forb->attr.fixed_server_id)) {
+               forb_server_id_init(&forb->server_id);
+       } else {
+               forb->server_id = forb->attr.fixed_server_id;
+       }
        forb_server_id_to_string(forb->server_id_str, &forb->server_id,
                                 sizeof(forb->server_id_str));
        ul_logdeb("Initializing forb %s\n", forb->server_id_str);
@@ -133,12 +210,8 @@ forb_init(int *argc, char **argv[], const char *orb_id)
        
        if (fosa_mutex_init(&forb->request_mutex, 0) != 0) goto err2;
        forb_request_nolock_init_root_field(forb);
-       
-       if (fosa_mutex_init(&forb->peer_mutex, 0) != 0) goto err2;
-       forb_peer_nolock_init_root_field(forb);
 
-       if (fosa_mutex_init(&forb->objkey_mutex, 0) != 0) goto err2;
-       forb_objects_nolock_init_root_field(forb);
+       if (forb_discovery_init(forb) != 0) goto err2;
 
        if (fosa_mutex_init(&forb->regref_mutex, 0) != 0) goto err2;
        forb_regref_nolock_init_root_field(forb);
@@ -149,47 +222,118 @@ forb_init(int *argc, char **argv[], const char *orb_id)
                return NULL;
        }
 
+       forb_executor_prepare();        
+
+       orb = forb_forb_orb_new(NULL, &forb_implementation, forb);
+       if (!orb) goto err2;
+       /* Server ID must be assigned manualy */
+       orb->server = forb->server_id;
+
+       forb->orb = orb;
+
+       /* Insert our object reference to objects tree, so that we
+        * can accept remote request to our new ORB. */
+       forb_objects_nolock_insert(forb, orb);
+
+       ret = forb_executor_init(&forb->default_executor);
+       if (ret) goto err2;
+       ret = forb_executor_register_object(&forb->default_executor, orb);
+       if (ret) goto err3;
+
+       ret = fosa_thread_create(&forb->execution_thread, NULL,
+                                forb_execution_thread, &forb->default_executor);
+       if (ret != 0)
+               goto err3;
+
+       /* FIXME: I do not know how to deregister the exit handler if
+        * forb_destroy() is called manually. */
+       /* on_exit(destroy_forb_on_exit, orb); */
+
 #ifdef CONFIG_FORB_PROTO_UNIX
        {
                forb_port_t *port = forb_malloc(sizeof(*port));
                if (port) {
-                       forb_unix_port_init(port, &forb->server_id);
-                       forb_register_port(forb, port);
+                       memset(port, 0, sizeof(*port));
+                       ret = forb_unix_port_init(&port->desc, &forb->server_id);
+                       if (ret) goto err_free_unix;
+                       ret = forb_register_port(orb, port);
+                       if (ret) goto err_free_unix; /* TODO: forb_unix_port_done() */
+                       goto unix_ok;
                }
+       err_free_unix:
+               free(port);
+               goto err3;
+       unix_ok:;
        }
 #endif
-       object = forb_object_new(NULL, &forb->server_id, 0);
-       if (!object) goto err2;
-       object->instance_data = forb;
-       
-
-       on_exit(destroy_forb_on_exit, object);
-       return object;
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+       {
+               forb_port_t *port = forb_malloc(sizeof(*port));
+               if (port) {
+                       struct in_addr listen_on;
+
+                       memset(port, 0, sizeof(*port));
+                       listen_on.s_addr = INADDR_ANY;
+                       ret = forb_inet_port_init(&port->desc, listen_on,
+                                                 forb->attr.fixed_tcp_port);
+                       if (ret) goto err_free_inet;
+                       ret = forb_register_port(orb, port);
+                       if (ret) goto err_free_inet; /* TODO: forb_inet_port_done() */
+                       goto inet_ok;
+               }
+       err_free_inet:
+               free(port);
+               goto err3;
+       inet_ok:;
+               hack_register_fcb(orb, port);
+       }
+#endif
+       return orb;
 
+err3:   forb_executor_destroy(&forb->default_executor);
 err2:  forb_free(forb);
-/* err1:       forb_free(object); */
 err:   return NULL;
 }
 
-/* FIXME: forb_destroy is now called automatically on exit. Therefore
- * forb_destroy() should be either static or should deregister on_exit
- * handler (how???).  */
 void forb_destroy(forb_orb orb)
 {
        forb_t *forb = forb_object_to_forb(orb);
        forb_port_t *port;
        forb_regref_t *regref;
+       forb_object obj;
+
+       /* Stop ports to prevent remote requests from coming */
+       ul_list_for_each(forb_port, forb, port) {
+               forb_stop_port(port);
+       }
+
+       /* Wait for executors to finish all requests (and thus drop
+        * all references to peers). This is very inefficient for big
+        * number of objects, but we do not care */
+       gavl_cust_for_each(forb_objects_nolock, forb, obj) {
+               forb_executor_t *executor;
+               executor = forb_object_get_executor(obj);
+               if (executor)
+                       forb_executor_synchronize(executor);
+       }
+
+       /* Destroy ports - this should drop all remaining references
+        * to peers and result in closing of all remote
+        * connections. */
+       ul_list_for_each_cut(forb_port, forb, port) {
+               forb_destroy_port(port);
+       }
+
+       pthread_cancel(forb->execution_thread.pthread_id);
+       pthread_join(forb->execution_thread.pthread_id, NULL);
        
        /* Unregister all registered references */
        while ((regref = forb_regref_first(forb))) {
-               forb_unregister_reference(orb, regref->name);
+               forb_unregister_reference(orb, regref->name.str);
        }
        
-       ul_list_for_each_cut(forb_port, forb, port) {
-               forb_destroy_port(port);
-       }
-       forb_free(forb);
        forb_object_release(orb);
+       forb_free(forb);
 }
 
 /* void */
@@ -198,6 +342,11 @@ void forb_destroy(forb_orb orb)
 /*     gavl_insert(&type_registry, &interface->node); */
 /* } */
 
+/** 
+ * Initializes server ID variable.
+ * 
+ * @param server_id Serer ID to initialize.
+ */
 void
 forb_server_id_init(forb_server_id *server_id)
 {
@@ -205,122 +354,279 @@ forb_server_id_init(forb_server_id *server_id)
 }
 
 /** 
- * Registers a given object reference so that other FORBs on the same
- * node can find it by using forb_resolve_reference().
+ * Checks whether the @a object is stale. Stale object is an object
+ * reference whose server cannot be contacted to handle requests.
  * 
- * @param object Object reference to register.
- * @param name Name under which to register the reference.
- *
- * @return Zero on success, -1 on error.
+ * @param object Object reference to check.
+ * 
+ * @return True if the object is stale, false otherwise.
  */
-int
-forb_register_reference(forb_object object, const char *name)
+bool forb_object_is_stale(forb_object object)
 {
-       forb_regref_t *regref;
-       forb_regref_name_t regname;
-       forb_t *forb = forb_object_to_forb(object);
-       char fn[100], fntmp[100];
-       char *objref;
-       FILE *f;
-       int ret;
-
-       strncpy(regname, name, FORB_REGREF_NAME_LEN-1);
-       regname[FORB_REGREF_NAME_LEN-1]='\0';
+       forb_orb remote_orb;
+       struct forb_env env;
+       bool stale = true;
+       
+       remote_orb = forb_get_orb_of(object);
+       if (!remote_orb) {      /* This shohuld never happen */
+               goto err;
+       }
+       /* TODO: Check not only the ORB, but also whether the object
+        * is still registered with the remote orb. */
+       forb_orb_is_alive(remote_orb, &env);
+       if (env.major == FORB_EX_COMM_FAILURE) {
+               /* Orb is not alive */
+               stale = true;
+       } else {
+               if (forb_exception_occurred(&env)) {
+                       ul_logerr("%s: unexpected exception: %s\n", __FUNCTION__, forb_strerror(&env));
+               }
+               stale = false;  
+       }
 
-       regref = forb_regref_new(object, regname);
-       if (!regref) goto err;
+       forb_object_release(remote_orb);
+err:
+       return stale;
+}
 
-       forb_regref_insert(forb, regref);
 
-       snprintf(fn,    sizeof(fn),    FORB_TMP_DIR "/%s",    regname);
-       snprintf(fntmp, sizeof(fntmp), FORB_TMP_DIR "/%s.%s", regname,
-                forb->server_id_str);
+/** 
+ * Returns object reference of forb::orb object associated with the
+ * given object.
+ * 
+ * @param obj 
+ * 
+ * @return 
+ */
+forb_orb
+forb_get_orb_of(const forb_object obj)
+{
+       forb_orb orb;
 
-       f = fopen(fntmp, "w");
-       if (!f) goto unalloc_err;
-       
-       objref = forb_object_to_string(object);
-       if (!objref) goto unalloc_err;
-
-       ret = fprintf(f, "%s", objref);
-       forb_free(objref);
-       if (ret < 0) goto unalloc_err;
-
-       ret = fclose(f);
-       if (ret == EOF) goto unalloc_err;
-
-       /* Make the atomic registration in filesystem */
-       ret = link(fntmp, fn);
-       if (ret < 0) {
-               /* TODO: If the reference exists, try whether it is
-                * still active. */
-               goto unlink_err;
+       if (forb_server_id_cmp(&obj->server, &obj->orb->server) == 0) {
+               orb = forb_object_duplicate(obj->orb);
+       } else {
+               orb = forb_object_new(obj->orb, &obj->server, 0);
        }
+       return orb;
+}
 
-       /* Unlink the temporary filename */
-       unlink(fntmp);
-       
-       return 0;
-unlink_err:
-       ret = errno;
-       unlink(fntmp);
-       errno = ret;
-unalloc_err:
-       ret = errno;
-       forb_regref_delete(forb, regref);
-       forb_regref_release(regref);
-       errno = ret;
-err:
-       return -1;
+/** 
+ * Returns server ID of an object reference.
+ * 
+ * @param obj 
+ * @param dest 
+ */
+void
+forb_get_server_id(const forb_object obj, forb_server_id *dest)
+{
+       if (obj) {
+               *dest = obj->server;
+       }
 }
 
-int
-forb_unregister_reference(forb_orb orb, const char *name)
+
+/** 
+ * Return instance data registered when the object was created by
+ * forb_XXX_new().
+ * 
+ * @param obj Object reference
+ * 
+ * @return Pointer to the registered data.
+ */
+void *
+forb_instance_data(const forb_object obj)
 {
-       forb_regref_t *regref;
-       forb_regref_name_t regname;
-       forb_t *forb = forb_object_to_forb(orb);
-       char fn[100];
+       return forb_object_instance_data(obj);
+}
 
-       strncpy(regname, name, FORB_REGREF_NAME_LEN-1);
-       regname[FORB_REGREF_NAME_LEN-1]='\0';
+/** 
+ * Returns error message correspondings FORB exception.
+ * 
+ * @param env Environemnt
+ * 
+ * @return Non-NULL pointer to a string.
+ */
+const char *
+forb_strerror(CORBA_Environment *env)
+{
+       if (!env) {
+               return "Invalid environemnt";
+       }
+#define ex(e) case FORB_EX_##e: return #e; break;
+       switch (env->major) {
+       ex(NONE);
+       ex(UNKNOWN);
+       ex(BAD_PARAM);
+       ex(NO_MEMORY);
+       ex(IMP_LIMIT);
+       ex(COMM_FAILURE);
+       ex(INV_OBJREF);
+       ex(NO_PERMISSION);
+       ex(INTERNAL);
+       ex(MARSHAL);
+       ex(INITIALIZE);
+       ex(NO_IMPLEMENT);
+       ex(BAD_OPERATION);
+       ex(NO_RESOURCES);
+       ex(NO_RESPONSE);
+       ex(TRANSIENT);
+       ex(FREE_MEM);
+       ex(INV_IDENT);
+       ex(INV_FLAG);
+       ex(DATA_CONVERSION);
+       ex(OBJECT_NOT_EXIST);
+       ex(TIMEOUT);
+       ex(APPLICATION);
+       }
+#undef ex
+       return "Invalid error number";
+}
 
-       regref = forb_regref_find(forb, &regname);
-       if (!regref) goto err;
+/** 
+ * Return server id of the requesting application.
+ *
+ * This function should be only called from within interface
+ * implementation,
+ * 
+ * @param[in] obj Object being requested
+ * @param[out] req_source Server ID of the requesting application
+ */
+void
+forb_get_req_source(const forb_object obj, forb_server_id *req_source)
+{
+       if (req_source) {
+               if (obj && obj->exec_req) {
+                       *req_source = obj->exec_req->source;
+               } else {
+                       memset(req_source, 0, sizeof(*req_source));
+               }
+       }
+}
 
-       forb_regref_delete(forb, regref);
-       forb_regref_release(regref);
+/**
+ * Internal function for allocation of sequence bufers. Used by
+ * forb_sequence_alloc_buf().
+ *
+ * @param seq
+ * @param seq_size
+ * @param buf_pptr
+ * @param maximum_ptr
+ * @param num_elements
+ * @param elem_size
+ *
+ * @return CORBA_TRUE if the allocation was sucessfull, CORBA_FALSE if
+ * it wasn't.
+ */
+CORBA_boolean
+__forb_sequence_alloc_buf_internal(void *seq, size_t seq_size,
+                                  void **buf_pptr, CORBA_unsigned_long *maximum_ptr,
+                                  unsigned num_elements, size_t elem_size)
+{
+       memset(seq, 0, seq_size);
+       /*(seq)._length = 0;*/
+       if (num_elements && elem_size) *buf_pptr = forb_malloc(num_elements * elem_size);
+       else *buf_pptr = NULL;
+       *maximum_ptr = *buf_pptr ? num_elements : 0;
+       return (*buf_pptr != NULL) || (num_elements == 0);
+}
 
-       snprintf(fn,    sizeof(fn),    FORB_TMP_DIR "/%s",    regname);
-       return unlink(fn);
+static FILE *forb_ul_log_file;
+static char progname[64] = "";
 
-       return 0;
-err:
-       return -1;
+void
+forb_ul_log_fnc(ul_log_domain_t *domain, int level,
+               const char *format, va_list ap)
+{
+       struct timespec now;
+       if(!(level&UL_LOGL_CONT)) {
+               level&=UL_LOGL_MASK;
+               clock_gettime(CLOCK_MONOTONIC, &now);
+               fprintf(forb_ul_log_file,"%ld.%6ld: ", now.tv_sec, now.tv_nsec/1000);
+               if (progname[0])
+                       fprintf(forb_ul_log_file,"%s: ", progname);
+               if(domain && domain->name)
+                       fprintf(forb_ul_log_file,"%s: ",domain->name);
+       }
+       vfprintf(forb_ul_log_file, format, ap);
+       fflush(forb_ul_log_file);
 }
 
-forb_object
-forb_resolve_reference(const forb_orb orb, const char *name)
+static int init_ul_log(void)
 {
-       forb_regref_name_t regname;
-       char fn[100];
-       char str[100];
+       char *s;
+       char *log_fname;
        int fd;
-       forb_object object;
-
-       strncpy(regname, name, FORB_REGREF_NAME_LEN-1);
-       regname[FORB_REGREF_NAME_LEN-1]='\0';
-
-       snprintf(fn,    sizeof(fn),    FORB_TMP_DIR "/%s",    regname);
+       int flg = 0;
+       char path[128];
+
+/*     if(ul_log_output != NULL) */
+/*             return 0; */
+
+       fd = open("/proc/self/cmdline", O_RDONLY);
+       if (fd >= 0) {
+               int ret;
+               ret = read(fd, path, sizeof(path)-1);
+               if (ret > 0) {
+                       path[ret]=0;
+                       s = strrchr(path, '/');
+                       if (s) s++;
+                       else s = path;
+                       strncpy(progname, s, sizeof(progname)-1);
+               }
+               close(fd);
+       }
 
-       fd = open(fn, 0);
-       if (fd < 0) {
-               return NULL;
+       if((log_fname=getenv("UL_LOG_FILENAME"))!=NULL){
+               forb_ul_log_file=fopen(log_fname,"a");
        }
-       read(fd, str, sizeof(str)-1);
-       str[sizeof(str)-1] = '\0';
-       object = forb_string_to_object(orb, str);
-       close(fd);
+       if(forb_ul_log_file==NULL)
+               forb_ul_log_file=stderr;
        
-       return object;
+       if((s=getenv("UL_DEBUG_FLG")) != NULL){
+               flg=atoi(s);
+       }
+
+       ul_log_redir(forb_ul_log_fnc, flg);
+
+       if((s = getenv("UL_LOG_LEVELS")) != NULL)
+               ul_log_domain_arg2levels(s);
+
+       return 0;
+}
+
+/** 
+ * Wait for the server to be ready. Internal function intended forb
+ * forbrun.
+ * 
+ * @param orb ORB.
+ * 
+ * @return Zero on success; on error -1 is returned, and errno is set
+ * to indicate the error.
+ */
+int forb_wait_for_server_ready(forb_orb orb)
+{
+       forb_t *forb = forb_object_to_forb(orb);
+       return sem_wait(&forb->server_ready);
+}
+
+/** 
+ * Signal the the FORB core that the server is ready for accepting
+ * requests.
+ *
+ * This function should be called at the initialization of server
+ * implementation at the time when all objects are registered with
+ * executors. All other servers in the same address space are
+ * initialized after this function is called which allows the other
+ * servers to use the services provided by the calling server.
+ * 
+ * @param orb ORB object.
+ * 
+ * @return Zero on success; on error -1 is returned, and errno is set
+ * to indicate the error.
+ */
+int forb_signal_server_ready(forb_orb orb)
+{
+       forb_t *forb = forb_object_to_forb(orb);
+       return sem_post(&forb->server_ready);
 }