]> rtime.felk.cvut.cz Git - frescor/fna.git/blobdiff - src_unix/unix_fna.c
Unified header for FNA
[frescor/fna.git] / src_unix / unix_fna.c
index 779fe21dbafaa83d56eb2118b140fb5deaddc8e2..472c009b81d396a92043161ab45cc22695c05344 100644 (file)
@@ -1,5 +1,5 @@
 //----------------------------------------------------------------------
-//  Copyright (C) 2006 - 2007 by the FRESCOR consortium:
+//  Copyright (C) 2006 - 2009 by the FRESCOR consortium:
 //
 //    Universidad de Cantabria,              SPAIN
 //    University of York,                    UK
 //
 // FNA(Frescor Network Adaptation layer), pronounced "efe ene a"
 //==============================================================
+
+/**
+ * unix fna implementation
+ *
+ * In the following functions we implement a DUMMY FNA implementation without
+ * contracts or real-time requirements just for testing purposes. We provide
+ * send/receive capabilities between Linux processes through UNIX domain
+ * datagram sockets.
+ *
+ * The goal is to run FRSH on several processes by using the Linux_lib arch
+ * of MaRTE OS or Partikle to simulate a distributed system in a single PC.
+ *
+ * The main tricks of the implementation are the following:
+ *
+ *   - We encode the address and stream in the Unix socket path as:
+ *                    '/tmp/unix_fna-address-stream'
+ *   - At initialization we create MX_UNIX_STREAM_IDS sockets for our address
+ *     (our address number is defined by FRSH_CPU_ID_DEFAULT)
+ *   - When the user SENDS we obtain the address creating the mentioned string
+ *     from the destination address and the stream.
+ *   - When the user RECEIVES we use the appropiate socket by using the
+ *     stream_id information.
+ *
+ **/
+
 #include <malloc.h>   /* for malloc and free */
+#include <assert.h>
+#include <string.h>   /* for string functions: strtok, strcpy, ... */
+
+#include "frsh_distributed_types.h" /* for frsh_network_address_t, frsh_stream_id_t */
 #include "unix_fna.h" /* function prototypes */
 
-#if 0
+#include <sys/socket.h>
+#include <sys/un.h>
+
+/* DEBUGGING FLAGS and MACROS */
 #include <stdio.h>
-#define DEBUG(x,args...) printf("%s: " x, __func__ , ##args)
-#else
-#define DEBUG(x,args...)
-#endif
+#define DEBUG(enable,x,args...) if(enable) printf("\t>> Called %s: " x, __func__ , ##args)
+#define DBG_UNIX_FNA_NOT_IMPLEMENTED true
+
+/**
+ * to_unix_path()
+ *
+ **/
+
+static int to_unix_path(const frsh_network_address_t addr,
+                        const frsh_stream_id_t       stream,
+                        char                         *str,
+                        size_t                       mx_size)
+{
+        return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream);
+}
+
+/**
+ * to_addr_stream()
+ *
+ **/
+
+static int to_addr_stream(frsh_network_address_t *addr,
+                          frsh_stream_id_t       *stream,
+                          char                   *str,
+                          size_t                 size)
+{
+        char *token;
+        char *search = "-";
+
+        token = strtok(str, search);
+        token = strtok(NULL, search);
+        *addr = atoi(token);
+        token = strtok(NULL, search);
+        *stream = atoi(token);
+
+        return 0;
+}
 
 //////////////////////////////////////////////////////////////////////
 //           INITIALIZATION
 //////////////////////////////////////////////////////////////////////
 
+int the_unix_sockets[MX_UNIX_STREAM_IDS];
+
 /**
  * unix_fna_init()
  *
+ * for each stream_id create a socket and bind it to the address obtained
+ * from "/tmp/unix_fna-addr-stream"
+ *
  **/
 
 int unix_fna_init(const frsh_resource_id_t resource_id)
 {
-    DEBUG("starting unix fna\n");
-    return 0;
+        int i, err;
+        struct sockaddr_un sock_addr;
+
+        DEBUG(true, "creating unix sockets\n");
+
+        for(i=0; i<MX_UNIX_STREAM_IDS; i++) {
+                the_unix_sockets[i] = socket(AF_UNIX, SOCK_DGRAM, 0);
+                assert(the_unix_sockets[i] >= 0);
+
+                memset(&sock_addr, 0, sizeof(sock_addr));
+                sock_addr.sun_family = AF_UNIX;
+                err = to_unix_path(FRSH_CPU_ID_DEFAULT,
+                                   (frsh_stream_id_t) i,
+                                   sock_addr.sun_path,
+                                   sizeof(sock_addr.sun_path));
+                assert(err >= 0);
+
+                err = bind(the_unix_sockets[i],
+                           (struct sockaddr *)&sock_addr,
+                           sizeof(sock_addr));
+                assert(err >= 0);
+        }
+
+        return 0;
 }
 
 ///////////////////////////////////////////////////////////////////
@@ -100,7 +192,8 @@ int unix_fna_contract_negotiate(const frsh_resource_id_t resource_id,
                                 const frsh_contract_t *contract,
                                 fna_vres_id_t *vres)
 {
-    return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -112,7 +205,8 @@ int unix_fna_contract_renegotiate_sync(const frsh_resource_id_t resource_id,
                                        const fna_vres_id_t vres,
                                        const frsh_contract_t *new_contract)
 {
-    return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -126,7 +220,8 @@ int unix_fna_contract_renegotiate_async(const frsh_resource_id_t resource_id,
                                         frsh_signal_t signal_to_notify,
                                         frsh_signal_info_t signal_info)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -138,7 +233,8 @@ int unix_fna_vres_get_renegotiation_status(const frsh_resource_id_t resource_id,
                                            const fna_vres_id_t vres,
                                            frsh_renegotiation_status_t *renegotiation_status)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -149,7 +245,8 @@ int unix_fna_vres_get_renegotiation_status(const frsh_resource_id_t resource_id,
 int unix_fna_vres_destroy(const frsh_resource_id_t resource_id,
                           const fna_vres_id_t vres)
 {
-    return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -161,7 +258,8 @@ int unix_fna_vres_get_contract(const frsh_resource_id_t resource_id,
                                const fna_vres_id_t vres,
                                frsh_contract_t *contract)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -173,7 +271,8 @@ int unix_fna_vres_get_usage(const frsh_resource_id_t resource_id,
                             const fna_vres_id_t vres,
                             struct timespec *usage)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -185,7 +284,8 @@ int unix_fna_vres_get_remaining_budget(const frsh_resource_id_t resource_id,
                                        const fna_vres_id_t vres,
                                        struct timespec *remaining_budget)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -198,7 +298,8 @@ int unix_fna_vres_get_budget_and_period(const frsh_resource_id_t resource_id,
                                         struct timespec *budget,
                                         struct timespec *period)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 ///////////////////////////////////////////////////////////////////
@@ -214,7 +315,8 @@ int unix_fna_resource_get_capacity(const frsh_resource_id_t resource_id,
                                    const int importance,
                                    uint32_t *capacity)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -226,7 +328,8 @@ int unix_fna_resource_get_total_weight(const frsh_resource_id_t resource_id,
                                        const int importance,
                                        int *total_weight)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -239,7 +342,8 @@ int unix_fna_vres_decrease_capacity(const frsh_resource_id_t resource_id,
                                     const struct timespec new_budget,
                                     const struct timespec new_period)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 ///////////////////////////////////////////////////////////////////
@@ -255,24 +359,57 @@ int unix_fna_send_sync(const fna_endpoint_data_t *endpoint,
                        const void *msg,
                        const size_t size)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
  * unix_fna_send_async()
  *
+ * To send we use one of our sockets (it doesn't matter which one so we use
+ * the first one). The destination is obtained from the endpoint values for
+ * dest and stream.
+ *
  **/
 
 int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
                         const void *msg,
                         const size_t size)
 {
-    return 0;
+        int err;
+        struct sockaddr_un to;
+        ssize_t sent_bytes;
+
+        DEBUG(true, "send async\n");
+
+        assert(endpoint->is_bound);
+
+        memset(&to, 0, sizeof(to));
+        to.sun_family = AF_UNIX;
+
+        err = to_unix_path(endpoint->destination,
+                           endpoint->stream_id,
+                           to.sun_path,
+                           sizeof(to.sun_path));
+        assert(err >= 0);
+
+        sent_bytes = sendto(the_unix_sockets[0],
+                            msg,
+                            size,
+                            0,
+                            (struct sockaddr *) &to,
+                            sizeof(to));
+        assert(sent_bytes >= 0);
+
+        return 0;
 }
 
 /**
  * unix_fna_receive_sync()
  *
+ * We call recvfrom using the socket associated to the corresponding stream_id.
+ * The "from" address is obtained by parsing the unix address path.
+ *
  **/
 
 int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
@@ -281,7 +418,34 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
                           size_t *received_bytes,
                           frsh_network_address_t *from)
 {
-    return 0;
+        int err;
+        struct sockaddr_un sender_addr;
+        ssize_t recv_bytes;
+        socklen_t from_len;
+        frsh_network_address_t addr;
+        frsh_stream_id_t       stream;
+
+        DEBUG(true, "receive sync\n");
+
+        from_len = sizeof(sender_addr);
+        recv_bytes = recvfrom(the_unix_sockets[endpoint->stream_id],
+                              buffer,
+                              buffer_size,
+                              0,
+                              (struct sockaddr *)&sender_addr,
+                              &from_len);
+
+        assert(recv_bytes >= 0);
+        *received_bytes = recv_bytes;
+
+        err = to_addr_stream(&addr,
+                             &stream,
+                             sender_addr.sun_path,
+                             sizeof(sender_addr.sun_path));
+        assert(err == 0);
+        *from = addr;
+
+        return 0;
 }
 
 /**
@@ -295,7 +459,8 @@ int unix_fna_receive_async(const fna_endpoint_data_t *endpoint,
                            size_t *received_bytes,
                            frsh_network_address_t *from)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -308,16 +473,18 @@ int unix_fna_send_endpoint_get_status(const fna_endpoint_data_t *endpoint,
                                       frsh_endpoint_network_status_t *network_status,
                                       frsh_protocol_status_t *protocol_status)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
  * unix_fna_receive_endpoint_created()
  *
  **/
-int unix_fna_receive_endpoint_created(const fna_endpoint_data_t *endpoint)
+int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -330,7 +497,8 @@ int unix_fna_receive_endpoint_get_status(const fna_endpoint_data_t *endpoint,
                                          frsh_endpoint_network_status_t *network_status,
                                          frsh_protocol_status_t *protocol_status)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 //////////////////////////////////////////////////////////////////////
@@ -346,7 +514,8 @@ int unix_fna_network_get_max_message_size(const frsh_resource_id_t resource_id,
                                           const frsh_network_address_t destination,
                                           size_t *max_size)
 {
-    return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -358,7 +527,10 @@ int unix_fna_network_bytes_to_budget(const frsh_resource_id_t resource_id,
                                      const size_t nbytes,
                                      struct timespec *budget)
 {
-    return 0;
+        DEBUG(true, "let's put 1 microsecond for all\n");
+        budget->tv_sec  = 0;
+        budget->tv_nsec = 1000;
+        return 0;
 }
 
 /**
@@ -370,7 +542,8 @@ int unix_fna_network_budget_to_bytes(const frsh_resource_id_t resource_id,
                                      const struct timespec *budget,
                                      size_t *nbytes)
 {
-    return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 /**
@@ -381,34 +554,35 @@ int unix_fna_network_budget_to_bytes(const frsh_resource_id_t resource_id,
 int unix_fna_network_get_min_eff_budget(const frsh_resource_id_t resource_id,
                                         struct timespec *budget)
 {
-   return 0;
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        return 0;
 }
 
 // GLOBAL variable to install the network protocol in FRESCOR
 
 fna_operations_t unix_fna_operations = {
-    .fna_init = unix_fna_init,
-    .fna_contract_negotiate = unix_fna_contract_negotiate,
-    .fna_contract_renegotiate_sync = unix_fna_contract_renegotiate_sync,
-    .fna_contract_renegotiate_async = unix_fna_contract_renegotiate_async,
-    .fna_vres_get_renegotiation_status = unix_fna_vres_get_renegotiation_status,
-    .fna_vres_destroy = unix_fna_vres_destroy,
-    .fna_vres_get_contract = unix_fna_vres_get_contract,
-    .fna_vres_get_usage = unix_fna_vres_get_usage,
-    .fna_vres_get_remaining_budget = unix_fna_vres_get_remaining_budget,
-    .fna_vres_get_budget_and_period = unix_fna_vres_get_budget_and_period,
-    .fna_resource_get_capacity = unix_fna_resource_get_capacity,
-    .fna_resource_get_total_weight = unix_fna_resource_get_total_weight,
-    .fna_vres_decrease_capacity = unix_fna_vres_decrease_capacity,
-    .fna_send_sync = unix_fna_send_sync,
-    .fna_send_async = unix_fna_send_async,
-    .fna_receive_sync = unix_fna_receive_sync,
-    .fna_receive_async = unix_fna_receive_async,
-    .fna_send_endpoint_get_status = unix_fna_send_endpoint_get_status,
-    .fna_receive_endpoint_created = unix_fna_receive_endpoint_created,
-    .fna_receive_endpoint_get_status = unix_fna_receive_endpoint_get_status,
-    .fna_network_get_max_message_size = unix_fna_network_get_max_message_size,
-    .fna_network_bytes_to_budget = unix_fna_network_bytes_to_budget,
-    .fna_network_budget_to_bytes = unix_fna_network_budget_to_bytes,
-    .fna_network_get_min_eff_budget = unix_fna_network_get_min_eff_budget
+        .fna_init = unix_fna_init,
+        .fna_contract_negotiate = unix_fna_contract_negotiate,
+        .fna_contract_renegotiate_sync = unix_fna_contract_renegotiate_sync,
+        .fna_contract_renegotiate_async = unix_fna_contract_renegotiate_async,
+        .fna_vres_get_renegotiation_status = unix_fna_vres_get_renegotiation_status,
+        .fna_vres_destroy = unix_fna_vres_destroy,
+        .fna_vres_get_contract = unix_fna_vres_get_contract,
+        .fna_vres_get_usage = unix_fna_vres_get_usage,
+        .fna_vres_get_remaining_budget = unix_fna_vres_get_remaining_budget,
+        .fna_vres_get_budget_and_period = unix_fna_vres_get_budget_and_period,
+        .fna_resource_get_capacity = unix_fna_resource_get_capacity,
+        .fna_resource_get_total_weight = unix_fna_resource_get_total_weight,
+        .fna_vres_decrease_capacity = unix_fna_vres_decrease_capacity,
+        .fna_send_sync = unix_fna_send_sync,
+        .fna_send_async = unix_fna_send_async,
+        .fna_receive_sync = unix_fna_receive_sync,
+        .fna_receive_async = unix_fna_receive_async,
+        .fna_send_endpoint_get_status = unix_fna_send_endpoint_get_status,
+        .fna_receive_endpoint_created = unix_fna_receive_endpoint_created,
+        .fna_receive_endpoint_get_status = unix_fna_receive_endpoint_get_status,
+        .fna_network_get_max_message_size = unix_fna_network_get_max_message_size,
+        .fna_network_bytes_to_budget = unix_fna_network_bytes_to_budget,
+        .fna_network_budget_to_bytes = unix_fna_network_budget_to_bytes,
+        .fna_network_get_min_eff_budget = unix_fna_network_get_min_eff_budget
 };