*
*/
+#ifndef _BSD_SOURCE
#define _BSD_SOURCE /* Because of on_exit() */
+#endif
#include "proto.h"
#include "regref.h"
-#include <fcntl.h>
#include <forb/config.h>
#include <forb/forb-internal.h>
#include <forb.h>
#include <forb/object.h>
#include <forb/uuid.h>
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <ul_log.h>
#include <ul_logreg.h>
#include <unistd.h>
+#include "discovery.h"
#ifdef CONFIG_FORB_PROTO_UNIX
#include <forb/proto_unix.h>
#endif
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+#include <forb/proto_inet.h>
+#endif
+#ifdef CONFIG_FCB
+#include <fcb.h>
+#include <fcb_contact_info.h>
+#endif
+#include <fcntl.h>
#ifdef DEBUG
#define UL_LOGL_DEF UL_LOGL_DEB
#include "log_domains.inc"
extern UL_LOG_CUST(ulogd_forb);
+static int init_ul_log(void);
+UL_LOGREG_DOMAINS_INIT_FUNCTION(forb_logreg_domains, forb_logreg_domains_array);
+#if 0
static void
destroy_forb_on_exit(int exitcode, void *arg)
{
forb_orb orb = arg;
forb_destroy(orb);
}
+#endif
static void
forb_is_alive(forb_orb _obj, CORBA_Environment *ev)
static void *
forb_execution_thread(void *arg)
{
- forb_orb orb = arg;
-
- forb_execute_object(orb);
+ forb_executor_t *executor = arg;
+ forb_executor_run(executor);
return NULL;
}
+#ifdef CONFIG_FCB
+void hack_register_fcb(forb_orb orb, forb_port_t *port)
+{
+ forb_object fcb = forb_object_new(orb, &FCB_SERVER_ID, 1);
+ if (!fcb) {
+ ul_logerr("Cannot allocate FCB reference\n");
+ return;
+ }
+ forb_register_reference(fcb, fres_contract_broker_reg_name);
+ forb_object_release(fcb);
+}
+#else
+#define hack_register_fcb(orb)
+#endif
+
+
+
forb_orb
-forb_init(int *argc, char **argv[], const char *orb_id)
+forb_init(int *argc, char **argv[], const struct forb_init_attr *attr)
{
forb_orb orb;
forb_t *forb;
memset(forb, 0, sizeof(*forb));
/* Initialize ULUT logging facility */
- ul_logreg_domains_static(ul_log_domains_array,
- sizeof(ul_log_domains_array)/sizeof(ul_log_domains_array[0]));
-
- forb_server_id_init(&forb->server_id);
+ init_ul_log();
+ forb_logreg_domains();
+
+ if (attr) {
+ forb->attr = *attr;
+ } else {
+ memset(&forb->attr, 0, sizeof(forb->attr));
+ }
+
+ sem_init(&forb->server_ready, 0, 0);
+
+ if (forb_server_id_empty(&forb->attr.fixed_server_id)) {
+ forb_server_id_init(&forb->server_id);
+ } else {
+ forb->server_id = forb->attr.fixed_server_id;
+ }
forb_server_id_to_string(forb->server_id_str, &forb->server_id,
sizeof(forb->server_id_str));
ul_logdeb("Initializing forb %s\n", forb->server_id_str);
if (fosa_mutex_init(&forb->request_mutex, 0) != 0) goto err2;
forb_request_nolock_init_root_field(forb);
-
- if (fosa_mutex_init(&forb->peer_mutex, 0) != 0) goto err2;
- forb_peer_nolock_init_root_field(forb);
- if (fosa_mutex_init(&forb->objkey_mutex, 0) != 0) goto err2;
- forb_objects_nolock_init_root_field(forb);
+ if (forb_discovery_init(forb) != 0) goto err2;
if (fosa_mutex_init(&forb->regref_mutex, 0) != 0) goto err2;
forb_regref_nolock_init_root_field(forb);
return NULL;
}
+ forb_executor_prepare();
+
orb = forb_forb_orb_new(NULL, &forb_implementation, forb);
if (!orb) goto err2;
/* Server ID must be assigned manualy */
* can accept remote request to our new ORB. */
forb_objects_nolock_insert(forb, orb);
+ ret = forb_executor_init(&forb->default_executor);
+ if (ret) goto err2;
+ ret = forb_executor_register_object(&forb->default_executor, orb);
+ if (ret) goto err3;
+
ret = fosa_thread_create(&forb->execution_thread, NULL,
- forb_execution_thread, orb);
+ forb_execution_thread, &forb->default_executor);
if (ret != 0)
- goto err2;
-
- on_exit(destroy_forb_on_exit, orb);
+ goto err3;
+
+ /* FIXME: I do not know how to deregister the exit handler if
+ * forb_destroy() is called manually. */
+ /* on_exit(destroy_forb_on_exit, orb); */
#ifdef CONFIG_FORB_PROTO_UNIX
{
}
err_free_unix:
free(port);
- goto err2;
+ goto err3;
unix_ok:;
}
+#endif
+#ifdef CONFIG_FORB_PROTO_INET_DEFAULT
+ {
+ forb_port_t *port = forb_malloc(sizeof(*port));
+ if (port) {
+ struct in_addr listen_on;
+
+ memset(port, 0, sizeof(*port));
+ listen_on.s_addr = INADDR_ANY;
+ ret = forb_inet_port_init(&port->desc, listen_on,
+ forb->attr.fixed_tcp_port);
+ if (ret) goto err_free_inet;
+ ret = forb_register_port(orb, port);
+ if (ret) goto err_free_inet; /* TODO: forb_inet_port_done() */
+ goto inet_ok;
+ }
+ err_free_inet:
+ free(port);
+ goto err3;
+ inet_ok:;
+ hack_register_fcb(orb, port);
+ }
#endif
return orb;
+err3: forb_executor_destroy(&forb->default_executor);
err2: forb_free(forb);
err: return NULL;
}
-/* FIXME: forb_destroy is now called automatically on exit. Therefore
- * forb_destroy() should be either static or should deregister on_exit
- * handler (how???). */
void forb_destroy(forb_orb orb)
{
forb_t *forb = forb_object_to_forb(orb);
forb_port_t *port;
forb_regref_t *regref;
+ forb_object obj;
+
+ /* Destroy ports to prevent remote requests from coming */
+ ul_list_for_each_cut(forb_port, forb, port) {
+ forb_destroy_port(port);
+ }
+
+ /* Wait for executors to finish all requests (and thus close
+ * connections to peers). This is very inefficient for big
+ * number of objects, but we do not care */
+ gavl_cust_for_each(forb_objects_nolock, forb, obj) {
+ forb_executor_t *executor;
+ executor = forb_object_get_executor(obj);
+ if (executor)
+ forb_executor_synchronize(executor);
+ }
pthread_cancel(forb->execution_thread.pthread_id);
pthread_join(forb->execution_thread.pthread_id, NULL);
forb_unregister_reference(orb, regref->name.str);
}
- ul_list_for_each_cut(forb_port, forb, port) {
- forb_destroy_port(port);
- }
forb_object_release(orb);
forb_free(forb);
}
return stale;
}
-/**
- *
- *
- * @param fd
- * @param objref
- *
- * @return Zero on success, -1 on error.
- */
-static int
-rewrite_regref(int fd, const char *objref)
-{
- int ret;
- int len = strlen(objref);
- lseek(fd, 0, SEEK_SET);
- ftruncate(fd, 0);
-
- while (len > 0) {
- ret = write(fd, objref, len);
- if (ret < 0) goto out;
- len -= ret;
- objref += ret;
- }
- ret = 0; /* Success */
-out:
- return ret;
-}
-
-/**
- *
- *
- * @param orb
- * @param fn File name
- * @param objref string form of the object reference
- *
- * @return -1 on error, 0 if reference was replaced and 1 in the
- * reference is valid.
- */
-static int
-replace_regref_if_stale(forb_orb orb, const char *fn, const char *objref)
-{
- int fd, ret = 0;
- char str[100];
- forb_object object;
-
- fd = open(fn, O_RDWR);
- if (fd < 0) {
- ret = -1;
- goto out;
- }
- ret = lockf(fd, F_LOCK, 0);
- if (ret < 0) goto close_err;
- ret = read(fd, str, sizeof(str)-1);
- if (ret < 0) goto unlock_err;
- /* TODO: Check that we have read the whole file */
-
- str[ret] = '\0';
- object = forb_string_to_object(orb, str);
- if (!object) {
- /* reference is unreadable, so we can replace it */
- ret = rewrite_regref(fd, objref);
- /* We are done for now */
- goto unlock_err;
- }
- if (forb_object_is_stale(object)) {
- /* Orb is not alive */
- ret = rewrite_regref(fd, objref);
- } else {
- /* Reference's FORB is alive :-( */
- ret = 1;
- }
-
- forb_object_release(object);
-unlock_err:
- lockf(fd, F_ULOCK, 0);
-close_err:
- close(fd);
-out:
- return ret;
-}
-
-
-/**
- * Registers a given object reference so that other FORBs on the same
- * node can find it by using forb_resolve_reference().
- *
- * @param object Object reference to register.
- * @param name Name under which to register the reference.
- *
- * @return Zero on success, -1 on error.
- */
-int
-forb_register_reference(forb_object object, const char *name)
-{
- forb_regref_t *regref;
- forb_regref_name_t regname;
- forb_t *forb = forb_object_to_forb(object);
- char fn[100], fntmp[100];
- char *objref;
- FILE *f;
- int ret;
-
- strncpy(regname.str, name, FORB_REGREF_NAME_LEN-1);
- regname.str[FORB_REGREF_NAME_LEN-1]='\0';
-
- regref = forb_regref_new(object, regname);
- if (!regref) goto err;
-
- forb_regref_insert(forb, regref);
-
- snprintf(fn, sizeof(fn), FORB_TMP_DIR "/%s", regname.str);
- snprintf(fntmp, sizeof(fntmp), FORB_TMP_DIR "/%s.%s", regname.str,
- forb->server_id_str);
-
- f = fopen(fntmp, "w");
- if (!f) goto unalloc_err;
-
- objref = forb_object_to_string(object);
- if (!objref) goto unlink_err;
-
- ret = fprintf(f, "%s", objref);
- if (ret < 0) goto free_objref_err;
-
- ret = fclose(f);
- if (ret == EOF) goto free_objref_err;
-
- /* Make the atomic registration in filesystem */
- ret = link(fntmp, fn);
-
- if (ret < 0 && errno == EEXIST) {
- /* The reference exists. Try whether it is still
- * active. */
- if (replace_regref_if_stale(object->orb, fn, objref) != 0) {
- goto free_objref_err;
- }
- ul_logdeb("Stale registration replaced\n");
- } else if (ret < 0) {
- goto free_objref_err;
- }
-
- forb_free(objref);
-
- /* Unlink the temporary filename */
- unlink(fntmp);
-
- return 0;
-free_objref_err:
- ret = errno;
- forb_free(objref);
- errno = ret;
-unlink_err:
- ret = errno;
- unlink(fntmp);
- errno = ret;
-unalloc_err:
- ret = errno;
- forb_regref_delete(forb, regref);
- forb_regref_release(regref);
- errno = ret;
-err:
- return -1;
-}
-
-/**
- * Unregister reference previously registered by
- * forb_register_reference().
- *
- * @param orb forb::orb reference
- * @param name Name to unregister.
- *
- * @return Zero on success, -1 on error.
- */
-int
-forb_unregister_reference(forb_orb orb, const char *name)
-{
- forb_regref_t *regref;
- forb_regref_name_t regname;
- forb_t *forb = forb_object_to_forb(orb);
- char fn[100];
-
- strncpy(regname.str, name, FORB_REGREF_NAME_LEN-1);
- regname.str[FORB_REGREF_NAME_LEN-1]='\0';
-
- regref = forb_regref_find(forb, ®name);
- if (!regref) goto err;
-
- forb_regref_delete(forb, regref);
- forb_regref_release(regref);
-
- snprintf(fn, sizeof(fn), FORB_TMP_DIR "/%s", regname.str);
- return unlink(fn);
-
- return 0;
-err:
- return -1;
-}
-
-/**
- * Returns a named reference previously registered by
- * forb_register_reference(). This function can be called even if the
- * reference was registered in a diferent process (but on the same
- * node).
- *
- * @param orb Local orb object reference
- * @param name Name under which the reference was registered.
- *
- * @return Object reference on NULL in case of error.
- */
-forb_object
-forb_resolve_reference(const forb_orb orb, const char *name)
-{
- forb_regref_name_t regname;
- char fn[100];
- char str[100];
- int fd, ret;
- forb_object object;
-
- strncpy(regname.str, name, FORB_REGREF_NAME_LEN-1);
- regname.str[FORB_REGREF_NAME_LEN-1]='\0';
-
- snprintf(fn, sizeof(fn), FORB_TMP_DIR "/%s", regname.str);
-
- fd = open(fn, 0);
- if (fd < 0) {
- return NULL;
- }
- ret = read(fd, str, sizeof(str)-1);
- /* TODO: Check that we have read the whole file */
- str[ret] = '\0';
- object = forb_string_to_object(orb, str);
- close(fd);
-
- if (forb_object_is_stale(object)) {
- forb_object_release(object);
- object = NULL;
- }
-
- return object;
-}
/**
* Returns object reference of forb::orb object associated with the
}
}
}
+
+/**
+ * Internal function for allocation of sequence bufers. Used by
+ * forb_sequence_alloc_buf().
+ *
+ * @param seq
+ * @param seq_size
+ * @param buf_pptr
+ * @param maximum_ptr
+ * @param num_elements
+ * @param elem_size
+ *
+ * @return CORBA_TRUE if the allocation was sucessfull, CORBA_FALSE if
+ * it wasn't.
+ */
+CORBA_boolean
+__forb_sequence_alloc_buf_internal(void *seq, size_t seq_size,
+ void **buf_pptr, CORBA_unsigned_long *maximum_ptr,
+ unsigned num_elements, size_t elem_size)
+{
+ memset(seq, 0, seq_size);
+ /*(seq)._length = 0;*/
+ if (num_elements && elem_size) *buf_pptr = forb_malloc(num_elements * elem_size);
+ else *buf_pptr = NULL;
+ *maximum_ptr = *buf_pptr ? num_elements : 0;
+ return (*buf_pptr != NULL) || (num_elements == 0);
+}
+
+static FILE *forb_ul_log_file;
+static char progname[64] = "";
+
+void
+forb_ul_log_fnc(ul_log_domain_t *domain, int level,
+ const char *format, va_list ap)
+{
+ struct timespec now;
+ if(!(level&UL_LOGL_CONT)) {
+ level&=UL_LOGL_MASK;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ fprintf(forb_ul_log_file,"%ld.%6ld: ", now.tv_sec, now.tv_nsec/1000);
+ if (progname[0])
+ fprintf(forb_ul_log_file,"%s: ", progname);
+ if(domain && domain->name)
+ fprintf(forb_ul_log_file,"%s: ",domain->name);
+ }
+ vfprintf(forb_ul_log_file, format, ap);
+ fflush(forb_ul_log_file);
+}
+
+static int init_ul_log(void)
+{
+ char *s;
+ char *log_fname;
+ int fd;
+ int flg = 0;
+ char path[128];
+
+/* if(ul_log_output != NULL) */
+/* return 0; */
+
+ fd = open("/proc/self/cmdline", O_RDONLY);
+ if (fd >= 0) {
+ int ret;
+ ret = read(fd, path, sizeof(path)-1);
+ if (ret > 0) {
+ path[ret]=0;
+ s = strrchr(path, '/');
+ if (s) s++;
+ else s = path;
+ strncpy(progname, s, sizeof(progname)-1);
+ }
+ close(fd);
+ }
+
+ if((log_fname=getenv("UL_LOG_FILENAME"))!=NULL){
+ forb_ul_log_file=fopen(log_fname,"a");
+ }
+ if(forb_ul_log_file==NULL)
+ forb_ul_log_file=stderr;
+
+ if((s=getenv("UL_DEBUG_FLG")) != NULL){
+ flg=atoi(s);
+ }
+
+ ul_log_redir(forb_ul_log_fnc, flg);
+
+ if((s = getenv("UL_LOG_LEVELS")) != NULL)
+ ul_log_domain_arg2levels(s);
+
+ return 0;
+}
+
+/**
+ * Wait for the server to be ready. Internal function intended forb
+ * forbrun.
+ *
+ * @param orb ORB.
+ *
+ * @return Zero on success; on error -1 is returned, and errno is set
+ * to indicate the error.
+ */
+int forb_wait_for_server_ready(forb_orb orb)
+{
+ forb_t *forb = forb_object_to_forb(orb);
+ return sem_wait(&forb->server_ready);
+}
+
+/**
+ * Signal the the FORB core that the server is ready for accepting
+ * requests.
+ *
+ * This function should be called at the initialization of server
+ * implementation at the time when all objects are registered with
+ * executors. All other servers in the same address space are
+ * initialized after this function is called which allows the other
+ * servers to use the services provided by the calling server.
+ *
+ * @param orb ORB object.
+ *
+ * @return Zero on success; on error -1 is returned, and errno is set
+ * to indicate the error.
+ */
+int forb_signal_server_ready(forb_orb orb)
+{
+ forb_t *forb = forb_object_to_forb(orb);
+ return sem_post(&forb->server_ready);
+}