]> rtime.felk.cvut.cz Git - frescor/frsh.git/commitdiff
Synchronization objects.
authorDario Faggioli <faggioli@gandalf.sssup.it>
Wed, 25 Feb 2009 08:28:59 +0000 (09:28 +0100)
committerMichal Sojka <sojkam1@fel.cvut.cz>
Sun, 1 Mar 2009 18:30:44 +0000 (19:30 +0100)
This commit introduces synchronization objects and thus complete the
core API implementation. There still are some minor functions that are
not-implemented, and they will be addressed later.

Synchronization objects basics are implemented at FRES layer, then the
FRSH API uses these services to provide compliant semantic to the
application.
Implementation is done on top of FOSA mutexes and condition variables,
so that is as much portable as it is possible (we hope!).

fres/Makefile.omk
fres/synchobj/Makefile [new file with mode: 0644]
fres/synchobj/Makefile.omk [new file with mode: 0644]
fres/synchobj/fres_synchobj.c [new file with mode: 0644]
fres/synchobj/fres_synchobj.h [new file with mode: 0644]
fres/synchobj/fres_synchobj_idl.idl [new file with mode: 0644]
fres/synchobj/tests/Makefile [new file with mode: 0644]
fres/synchobj/tests/Makefile.omk [new file with mode: 0644]
frsh_api/Makefile.omk
frsh_api/frsh_opaque_types.h
frsh_api/frsh_synchobj.c

index 934de7a12ccacb1cda9649bf7ecfd9c33f1b3895..42d9645b375f3a2d1d43227fbfbe1f248da85924 100644 (file)
@@ -1,4 +1,4 @@
-SUBDIRS=contract cbroker resmng resalloc
+SUBDIRS=contract synchobj cbroker resmng resalloc
 
 ifneq ($(QTDIR),)
 SUBDIRS += frm_gui
diff --git a/fres/synchobj/Makefile b/fres/synchobj/Makefile
new file mode 100644 (file)
index 0000000..b22a357
--- /dev/null
@@ -0,0 +1,14 @@
+# Generic directory or leaf node makefile for OCERA make framework
+
+ifndef MAKERULES_DIR
+MAKERULES_DIR := $(shell ( old_pwd="" ;  while [ ! -e Makefile.rules ] ; do if [ "$$old_pwd" = `pwd`  ] ; then exit 1 ; else old_pwd=`pwd` ; cd -L .. 2>/dev/null ; fi ; done ; pwd ) )
+endif
+
+ifeq ($(MAKERULES_DIR),)
+all : default
+.DEFAULT::
+       @echo -e "\nThe Makefile.rules has not been found in this or partent directory\n"
+else
+include $(MAKERULES_DIR)/Makefile.rules
+endif
+
diff --git a/fres/synchobj/Makefile.omk b/fres/synchobj/Makefile.omk
new file mode 100644 (file)
index 0000000..5d50269
--- /dev/null
@@ -0,0 +1,13 @@
+shared_LIBRARIES = synchobj
+
+synchobj_SOURCES = fres_synchobj.c
+synchobj_CLIENT_IDL = fres_synchobj_idl.idl
+
+#fres_synchobj_idl_IDLFLAGS = --include=fres_contract_ser.h
+#fres_blocks_IDLFLAGS = --include=idl_native.h
+
+include_HEADERS = fres_synchobj.h
+
+include_GEN_HEADERS = fres_synchobj_idl.h
+
+SUBDIRS=tests
diff --git a/fres/synchobj/fres_synchobj.c b/fres/synchobj/fres_synchobj.c
new file mode 100644 (file)
index 0000000..e9eec8d
--- /dev/null
@@ -0,0 +1,213 @@
+/**
+ * @file   fres_synchobj.c
+ * @author Dario Faggioli <faggioli@gandalf.sssup.it>
+ * 
+ * @brief  Implementation of synchronization objects functions.
+ * 
+ * 
+ */
+#include <frsh_forb.h>
+#include <fres_synchobj.h>
+#include <string.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+/** 
+ * Allocates new fres_synchobj structure. Use fres_synchobj_destroy to
+ * deallocate the synchronization object.
+ * 
+ * @return The new object on succes, NULL on error.
+ */
+struct fres_synchobj* fres_synchobj_new()
+{
+       struct fres_synchobj *s;
+       fres_synchobj_id_t id;
+       char shm_path[108];
+       int shm_fd;
+
+       forb_uuid_generate((forb_uuid_t*) &id);
+       sprintf(shm_path, "/tmp/sync.%s",
+               fres_synchobj_id_to_string(shm_path, &id, sizeof(shm_path)));
+
+       shm_fd = shm_open(shm_path, O_CREAT|O_EXCL|O_RDWR, S_IRWXU|S_IRWXG);
+       if (shm_fd < 0) goto err;
+
+       s = mmap(NULL, sizeof(*s),
+                PROT_READ|PROT_WRITE, MAP_SHARED,
+                shm_fd, 0);
+       if (s == MAP_FAILED) goto unlink_err;
+
+       s->id = id;
+       memcpy(s->path, shm_path, sizeof(shm_path));
+
+       fosa_cond_init(&s->cond);
+       fosa_mutex_init(&s->mutex, 0);
+
+       s->nr_waiting = 0;
+       s->queued_signal = 0;
+
+       return s;
+
+unlink_err:
+       shm_unlink(shm_path);
+err:
+       return NULL;
+}
+
+struct fres_synchobj* fres_synchobj_get(const fres_synchobj_id_t *id)
+{
+       struct fres_synchobj *s = NULL;
+       char shm_path[108];
+       int shm_fd;
+
+       sprintf(shm_path, "/tmp/sync.%s",
+               fres_synchobj_id_to_string(shm_path, id, sizeof(shm_path)));
+
+       shm_fd = shm_open(shm_path, O_RDWR, S_IRWXU|S_IRWXG);
+       if (shm_fd < 0) goto out;
+
+       s = mmap(NULL, sizeof(*s),
+                PROT_READ|PROT_WRITE, MAP_SHARED,
+                shm_fd, 0);
+       if (s == MAP_FAILED) goto out;
+
+out:
+       return s;
+}
+
+struct fres_synchobj* fres_synchobj_get_path(const char synchobj_path[])
+{
+       struct fres_synchobj *s = NULL;
+       int shm_fd;
+
+       shm_fd = shm_open(synchobj_path, O_RDWR, S_IRWXU|S_IRWXG);
+       if (shm_fd < 0) goto out;
+
+       s = mmap(NULL, sizeof(*s),
+                PROT_READ|PROT_WRITE, MAP_SHARED,
+                shm_fd, 0);
+       if (s == MAP_FAILED) goto out;
+
+out:
+       return s;
+}
+
+int fres_synchobj_destroy(struct fres_synchobj *s)
+{
+       int ret = 0;
+
+       if (!s) return EINVAL;
+
+       /* Since now no new task can mmap the object. */
+       ret = shm_unlink(s->path);
+       if (ret) goto out;
+
+       fosa_mutex_lock(&s->mutex);
+
+       /* If there are waiting tasks try to wake up them. */
+       if (s->nr_waiting)
+               fosa_cond_broadcast(&s->cond);
+
+       /* Mark the synchronization object as invalid. */
+       s->nr_waiting = -1;
+       fosa_mutex_unlock(&s->mutex);
+
+       ret = munmap((void*) s, sizeof(*s));
+       if (ret) goto out;
+
+out:
+       return ret;
+}
+
+int fres_synchobj_wait_with_timeout
+  (struct fres_synchobj *s,
+   const fosa_abs_time_t *timeout)
+{
+       int ret = 0;
+
+       if (!s) return EINVAL;
+
+       fosa_mutex_lock(&s->mutex);
+       if (s->nr_waiting == -1) goto unmap_err;
+
+       if (!s->queued_signal) {
+               s->nr_waiting++;
+               if (!timeout)
+                       ret = fosa_cond_wait(&s->cond, &s->mutex);
+               else
+                       ret = fosa_cond_timedwait(&s->cond,
+                                                 &s->mutex,
+                                                 timeout);
+               if (ret) goto wakeup_err;
+
+               if (s->nr_waiting == -1) goto unmap_err;
+       } else
+               s->queued_signal--;
+
+wakeup_err:
+       s->nr_waiting--;
+       fosa_mutex_unlock(&s->mutex);
+
+       return ret;
+
+unmap_err:
+       fosa_mutex_unlock(&s->mutex);
+       ret = munmap((void*) s, sizeof(*s));
+
+       return ret;
+}
+
+int fres_synchobj_signal(struct fres_synchobj *s)
+{
+       int ret = 0;
+
+       fosa_mutex_lock(&s->mutex);
+       if (s->nr_waiting == -1) goto unmap_err;
+
+       if (!s->nr_waiting)
+               s->queued_signal++;
+       else
+               ret = fosa_cond_signal(&s->cond);
+
+       fosa_mutex_unlock(&s->mutex);
+
+       return ret;
+
+unmap_err:
+       fosa_mutex_unlock(&s->mutex);
+       ret = munmap((void*) s, sizeof(*s));
+
+       return ret;
+}
+
+int
+fres_synchobj_to_string(char *dest, size_t size, const struct fres_synchobj *s)
+{
+       int ret;
+       char id[30];
+
+       if (!s) return 0;
+
+       fres_synchobj_id_to_string(id, &s->id, sizeof(id));
+       ret = snprintf(dest, size, "id: %s\n"
+                      "path: %s\n"
+                      "nr. waiting: %d\n"
+                      "queued signals: %d\n",
+                      id, s->path, s->nr_waiting,s->queued_signal);
+
+        return ret;
+}
+
+void
+fres_synchobj_print(char *prefix, const struct fres_synchobj *s)
+{
+       char synchobj[1000];
+
+       fres_synchobj_to_string(synchobj, sizeof(synchobj)-1, s);
+       synchobj[sizeof(synchobj)-1] = 0;
+       printf("%s %s", prefix, synchobj);
+}
+
diff --git a/fres/synchobj/fres_synchobj.h b/fres/synchobj/fres_synchobj.h
new file mode 100644 (file)
index 0000000..0fb7921
--- /dev/null
@@ -0,0 +1,66 @@
+/**
+ * @file   fres_synchobj.h
+ * @author Dario Faggioli <faggioli@gandalf.sssup.it>
+ * 
+ * @brief  Declaration of synchronization object type and functions.
+ * 
+ * 
+ */
+#ifndef FRES_SYNCHOBJ_H
+#define FRES_SYNCHOBJ_H
+
+#include <ul_gavl.h>
+#include <forb/server_id.h>
+#include <fres_synchobj_idl.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Synchronization object data type.
+ * 
+ */
+struct fres_synchobj {
+       fres_synchobj_id_t id;  /**< Global ID */
+       char path[108]; /**< Filesystem path of the POSIX semaphore. */
+
+       fosa_cond_t cond;       /**< POSIX condition variable. */
+       fosa_mutex_t mutex;     /**< POSIX mutex (for exclusive access). */
+
+       int nr_waiting; /**< Number of waiting vreses. */
+       int queued_signal;      /**< Number of not yet delivered signals. */
+};
+
+static inline int fres_synchobj_id_cmp(const fres_synchobj_id_t *a,
+                                      const fres_synchobj_id_t *b)
+{
+       return forb_server_id_cmp((forb_server_id*)a,
+                                 (forb_server_id*)b);
+}
+
+static inline char *fres_synchobj_id_to_string(char *dest,
+                                              const fres_synchobj_id_t *id,
+                                              size_t n)
+{
+       return forb_server_id_to_string(dest, (forb_server_id*)id, n);
+}
+
+struct fres_synchobj *fres_synchobj_new(void);
+struct fres_synchobj* fres_synchobj_get_path(const char synchobj_path[]);
+struct fres_synchobj* fres_synchobj_get(const fres_synchobj_id_t *id);
+int fres_synchobj_destroy(struct fres_synchobj *synchobj);
+
+int fres_synchobj_signal(struct fres_synchobj *s);
+int fres_synchobj_wait_with_timeout(struct fres_synchobj *s,
+                                   const fosa_abs_time_t *timeout);
+
+void
+fres_synchobj_print(char *prefix, const struct fres_synchobj *c);
+
+#ifdef __cplusplus
+} /* extern "C"*/
+#endif
+
+#endif
+
diff --git a/fres/synchobj/fres_synchobj_idl.idl b/fres/synchobj/fres_synchobj_idl.idl
new file mode 100644 (file)
index 0000000..e51b766
--- /dev/null
@@ -0,0 +1,26 @@
+/**
+ * @file   fres_synchobj_idl.idl
+ * @author Dario Faggioli <faggioli@gandalf.sssup.it>
+ * 
+ * @brief Definitions of data types and constants for
+         FRESCOR synchronization objects.
+ * 
+ */
+
+#ifndef _SYNCHOBJ_IDL
+#define _SYNCHOBJ_IDL
+module fres {
+       module synchobj {
+               /// Pointer to the contract type
+               native ptr;
+
+               /// Globaly unique contract ID
+               struct id_t {
+                       char byte[8];
+               };
+
+       };
+};
+
+#endif
+
diff --git a/fres/synchobj/tests/Makefile b/fres/synchobj/tests/Makefile
new file mode 100644 (file)
index 0000000..b22a357
--- /dev/null
@@ -0,0 +1,14 @@
+# Generic directory or leaf node makefile for OCERA make framework
+
+ifndef MAKERULES_DIR
+MAKERULES_DIR := $(shell ( old_pwd="" ;  while [ ! -e Makefile.rules ] ; do if [ "$$old_pwd" = `pwd`  ] ; then exit 1 ; else old_pwd=`pwd` ; cd -L .. 2>/dev/null ; fi ; done ; pwd ) )
+endif
+
+ifeq ($(MAKERULES_DIR),)
+all : default
+.DEFAULT::
+       @echo -e "\nThe Makefile.rules has not been found in this or partent directory\n"
+else
+include $(MAKERULES_DIR)/Makefile.rules
+endif
+
diff --git a/fres/synchobj/tests/Makefile.omk b/fres/synchobj/tests/Makefile.omk
new file mode 100644 (file)
index 0000000..3b0564a
--- /dev/null
@@ -0,0 +1,6 @@
+test_PROGRAMS:=$(basename $(notdir $(wildcard $(SOURCES_DIR)/*.c)))
+
+$(foreach t,$(test_PROGRAMS),\
+$(eval $(t)_SOURCES = $(t).c)\
+$(eval $(t)_LIBS = fosa m contract rt forb ulut)\
+)
index a78af43e60e481483023f3c67dd87b068440636d..85fa9099558ffe1f28091536382da41d27d50c8d 100644 (file)
@@ -1,9 +1,9 @@
 SUBDIRS = tests
 
 shared_LIBRARIES = frsh
-frsh_SOURCES = frsh_contract.c frsh_vres.c frsh_distributed.c frsh_core.c frsh_error.c frsh_thread.c 
+frsh_SOURCES = frsh_contract.c frsh_vres.c frsh_synchobj.c frsh_distributed.c frsh_core.c frsh_error.c frsh_thread.c 
 include_HEADERS = frsh_opaque_types.h frsh_forb.h
-frsh_LIBS = fna fcb_client forb contract fra ulut fosa $(allocator-libs-y)
+frsh_LIBS = fna fcb_client forb contract synchobj fra ulut fosa $(allocator-libs-y)
 
 config_include_HEADERS = frsh_resources.h
 frsh_resources_DEFINES = CONFIG_RESOURCE_DUMMY \
index 7e73bcdc44c87efde8fc5d3f4447cb38c03f60e6..3cb1e8dbd2da58c7c0cde67dc237889c1fe13ca4 100644 (file)
@@ -71,6 +71,7 @@
 #define _FRSH_OPAQUE_TYPES_H_
 
 #include <fres_contract_type.h>
+#include <fres_synchobj.h>
 #include <frsh_cpp_macros.h>
 #include <fres_error.h>
 
@@ -105,7 +106,7 @@ struct fres_vres;
 #define FRSH_CONTRACT_T_OPAQUE fres_contract_ptr
 
 
-typedef int FRSH_SYNCHOBJ_HANDLE_T_OPAQUE;
+#define FRSH_SYNCHOBJ_HANDLE_T_OPAQUE struct fres_synchobj *
 
 typedef int FRSH_SHAREDOBJ_HANDLE_T_OPAQUE;
 
@@ -124,14 +125,14 @@ typedef unsigned int FRSH_GROUP_ID_T_OPAQUE;
  *     areas;       // memory areas to be protected
  * 
  **/
-#define FRSH_CSECT_T_OPAQUE struct { \
-  frsh_csect_op_kind_t op_kind;                 \
-  frsh_sharedobj_handle_t obj_handle;               \
-  frsh_rel_time_t wcet;                              \
-  frsh_rel_time_t blocking;                              \
-  frsh_csect_op_t op;                           \
-  frsh_memory_areas_t areas;                         \
-  frsh_memory_areas_t storage;                         \
+#define FRSH_CSECT_T_OPAQUE struct {   \
+  frsh_csect_op_kind_t op_kind;                \
+  frsh_sharedobj_handle_t obj_handle;  \
+  frsh_rel_time_t wcet;                        \
+  frsh_rel_time_t blocking;            \
+  frsh_csect_op_t op;                  \
+  frsh_memory_areas_t areas;           \
+  frsh_memory_areas_t storage;         \
 }
 
 struct fna_endpoint_data;
index 8f8271384a171895f978d1275566293b6ff0574d..cfd4e52026812e98fa9f110a81903258fc6cf8d9 100644 (file)
  *
  *
  */
+#include <fres_vres.h>
+#include <fra_generic.h>
+#include <fres_synchobj.h>
+
 #include <frsh_core.h>
-/* #include <fres_synchobj.h> */
 
 int frsh_synchobj_create(frsh_synchobj_handle_t *synch_handle)
 {
-       return FRSH_ERR_NOT_IMPLEMENTED;
+       if (!synch_handle)
+               return FRSH_ERR_BAD_ARGUMENT;
+
+       *synch_handle = fres_synchobj_new();
+        if (!(*synch_handle)) goto err;
+
+       return 0;
+err:
+       return errno;
 }
 
 int frsh_synchobj_destroy(const frsh_synchobj_handle_t synch_handle)
 {
-       return FRSH_ERR_NOT_IMPLEMENTED;
+       return fres_synchobj_destroy(synch_handle);
+}
+
+static void
+__frsh_synchobj_check_wcet_and_deadline(frsh_thread_id_t thread,
+                                       fres_thread_vres_t *th_vres,
+                                       bool *was_deadline_missed,
+                                       bool *was_budget_overrun)
+{
+       fosa_clock_id_t th_clockid;
+       fosa_abs_time_t curr_time, curr_exec_time;
+       frsh_vres_id_t vres;
+       fres_block_basic *b;
+       fres_block_timing_reqs *t;
+
+       if (!was_deadline_missed && !was_budget_overrun)
+               return;
+
+       *was_deadline_missed = false;
+       *was_budget_overrun = false;
+
+       fosa_thread_get_cputime_clock(thread, &th_clockid);
+       fosa_clock_get_time(FOSA_CLOCK_REALTIME, &curr_time);
+       fosa_clock_get_time(th_clockid, &curr_exec_time);
+
+       curr_time = fosa_abs_time_decr(curr_time, th_vres->job_start_time);
+       curr_exec_time = fosa_abs_time_decr(curr_exec_time,
+                                           th_vres->job_cpu_time);
+
+       vres = th_vres->vres;
+       b = fres_contract_get_basic(vres->perceived);
+       t = fres_contract_get_timing_reqs(vres->perceived);
+
+       if (was_deadline_missed &&
+           fosa_abs_time_smaller_or_equal(t->deadline, curr_time))
+               *was_deadline_missed = true;
+
+       if (was_budget_overrun &&
+           fosa_abs_time_smaller_or_equal(b->budget, curr_exec_time))
+               *was_budget_overrun = false;
+}
+
+static void
+__frsh_synchobj_set_wcet_and_deadline(frsh_thread_id_t thread,
+                                     fres_thread_vres_t *th_vres)
+{
+       fosa_clock_id_t th_clockid;
+
+       fosa_thread_get_cputime_clock(thread, &th_clockid);
+       fosa_clock_get_time(FOSA_CLOCK_REALTIME,
+                                 &th_vres->job_start_time);
+       fosa_clock_get_time(th_clockid, &th_vres->job_cpu_time);
+
+}
+
+int frsh_synchobj_wait
+  (const frsh_synchobj_handle_t synch_handle,
+   frsh_rel_time_t *next_budget,
+   frsh_rel_time_t *next_period,
+   bool *was_deadline_missed,
+   bool *was_budget_overrun)
+{
+       frsh_thread_id_t thread = fosa_thread_self();
+       fres_thread_vres_t *th_vres;
+       int ret = 0;
+
+       th_vres = fra_get_thread_vres(&thread);
+       if (!th_vres) goto out;
+
+       __frsh_synchobj_check_wcet_and_deadline(thread,
+                                               th_vres,
+                                               was_deadline_missed,
+                                               was_budget_overrun);
+
+       ret = fres_synchobj_wait_with_timeout(synch_handle, NULL);
+       if (ret) goto out;
+
+       __frsh_synchobj_set_wcet_and_deadline(thread, th_vres);
+       frsh_vres_get_budget_and_period(th_vres->vres,
+                                       next_budget,
+                                       next_period);
+
+out:
+       return ret;
+}
+
+int frsh_synchobj_wait_with_timeout
+  (const frsh_synchobj_handle_t synch_handle,
+   const frsh_abs_time_t *abs_timeout,
+   bool *timed_out,
+   frsh_rel_time_t *next_budget,
+   frsh_rel_time_t *next_period,
+   bool *was_deadline_missed,
+   bool *was_budget_overrun)
+{
+       frsh_thread_id_t thread = fosa_thread_self();
+       fres_thread_vres_t *th_vres;
+       int ret = 0;
+
+       th_vres = fra_get_thread_vres(&thread);
+       if (!th_vres) goto out;
+
+       __frsh_synchobj_check_wcet_and_deadline(thread,
+                                               th_vres,
+                                               was_deadline_missed,
+                                               was_budget_overrun);
+
+       ret = fres_synchobj_wait_with_timeout(synch_handle, abs_timeout);
+       if (ret == ETIMEDOUT) *timed_out = true;
+       else if (ret != 0) goto out;
+
+       __frsh_synchobj_set_wcet_and_deadline(thread, th_vres);
+        frsh_vres_get_budget_and_period(th_vres->vres,
+                                       next_budget,
+                                       next_period);
+
+out:
+        return ret;
 }
 
 int frsh_synchobj_signal(const frsh_synchobj_handle_t synch_handle)
 {
-       return FRSH_ERR_NOT_IMPLEMENTED;
+       return fres_synchobj_signal(synch_handle);
+}
+
+int frsh_timed_wait
+  (const frsh_abs_time_t *abs_time,
+   frsh_rel_time_t *next_budget,
+   frsh_rel_time_t *next_period,
+   bool *was_deadline_missed,
+   bool *was_budget_overrun)
+{
+       frsh_thread_id_t thread = fosa_thread_self();
+       fres_thread_vres_t *th_vres;
+       int ret = 0;
+
+       th_vres = fra_get_thread_vres(&thread);
+       if (!th_vres) goto out;
+
+       __frsh_synchobj_check_wcet_and_deadline(thread,
+                                               th_vres,
+                                               was_deadline_missed,
+                                               was_budget_overrun);
+
+       ret = clock_nanosleep(FOSA_CLOCK_REALTIME,
+                             TIMER_ABSTIME,
+                             abs_time, NULL);
+
+       __frsh_synchobj_set_wcet_and_deadline(thread, th_vres);
+       frsh_vres_get_budget_and_period(th_vres->vres,
+                                       next_budget,
+                                       next_period);
+
+out:
+       return ret;
 }
 
 int frsh_vresperiod_wait(const frsh_vres_id_t vres, long period_num)