]> rtime.felk.cvut.cz Git - frescor/fna.git/commitdiff
unix fna, it seems to work
authorsangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Mon, 29 Oct 2007 10:38:31 +0000 (10:38 +0000)
committersangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Mon, 29 Oct 2007 10:38:31 +0000 (10:38 +0000)
git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@820 35b4ef3e-fd22-0410-ab77-dab3279adceb

include/fna.h
src_rtep/rtep_fna_c.c
src_unix/unix_fna.c
src_unix/unix_fna.h
tests/tests_unix_fna/test_unix_fna_send_receive.c
tests/tests_unix_fna/test_unix_send_receive.c

index 242af53317914bfa5b55b45226ded8811b44ba58..518471eef0d9ab29936feea1a8260d77017830e8 100644 (file)
@@ -744,7 +744,7 @@ typedef int fna_send_endpoint_get_status_t
  *   FNA_ERR_BAD_ARGUMENT: if pointers are NULL \n
  **/
 typedef int fna_receive_endpoint_create_callback_t
-   (const fna_endpoint_data_t *endpoint);
+   (fna_endpoint_data_t *endpoint);
 
 /**
  * fna_receive_endpoint_get_pending_messages
index c6db6f2df1962db87adb6bfa4f50c8523b9feab8..e760fdf7b45c9eeee70d9b95d29e86320ed1df3c 100644 (file)
@@ -827,7 +827,7 @@ int rtep_fna_send_endpoint_get_status
  *   FNA_ERR_BAD_ARGUMENT: if pointers are NULL \n
  **/
 int rtep_fna_receive_endpoint_created
-      (const fna_endpoint_data_t *endpoint)
+      (fna_endpoint_data_t *endpoint)
 {
    return 0;
 }
index 8ffca6d2172cbd3d67ed6cfa67f21fdec1db3009..b3e3c76f0a95dd7ad15fb65e76ea6a7bfc4bdabd 100644 (file)
 //
 // FNA(Frescor Network Adaptation layer), pronounced "efe ene a"
 //==============================================================
+
+/**
+ * unix fna implementation
+ *
+ * In the following functions we implement a DUMMY FNA implementation without
+ * contracts or real-time requirements just for testing purposes. We provide
+ * send/receive capabilities between Linux processes through UNIX domain
+ * datagram sockets.
+ *
+ * The goal is to run FRSH on several processes by using the Linux_lib arch
+ * of MaRTE OS or Partikle to simulate a distributed system in a single PC.
+ *
+ * The main tricks of the implementation are the following:
+ *
+ *   - In Unix sockets the address is just a path (a string) so there is no
+ *     such a concept of streams.
+ *   - when the user negotiates a contract it must provide, as protocol
+ *     dependent information, the destination so we can create and bind a
+ *     socket for it
+ *   - The created socket id is stored as the 'vres'
+ *   - On the receiving part, when the user creates the receive endpoint, it
+ *     must provide the receiving path as endpoint information so we can
+ *     create a socket to receive from that path.
+ *   - Both informations are passed as an integer for a static array called
+ *     "the_unix_socket_paths" where we configure statically the unix paths.
+ *     They are passed using the void * fields as if they where integers.
+ *
+ **/
+
 #include <malloc.h>   /* for malloc and free */
+#include <assert.h>
 #include "unix_fna.h" /* function prototypes */
 
+#include <sys/socket.h>
+#include <sys/un.h> /* struct sockaddr_un */
+
 #if 1
 #include <stdio.h>
 #define DEBUG(x,args...) printf("%s: " x, __func__ , ##args)
 //           INITIALIZATION
 //////////////////////////////////////////////////////////////////////
 
-char *the_unix_stream_ids[MX_UNIX_STREAM_IDS] = {
-        "/tmp/stream00",
-        "/tmp/stream01",
-        "/tmp/stream02",
-        "/tmp/stream03",
-        "/tmp/stream04",
-        "/tmp/stream05"
+char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS] = {
+        "/tmp/path00",
+        "/tmp/path01",
+        "/tmp/path02",
+        "/tmp/path03",
+        "/tmp/path04",
+        "/tmp/path05"
 };
 
 /**
@@ -109,7 +142,26 @@ int unix_fna_contract_negotiate(const frsh_resource_id_t resource_id,
                                 const frsh_contract_t *contract,
                                 fna_vres_id_t *vres)
 {
-        DEBUG("NOT IMPLEMENTED\n");
+        int sock, err;
+        struct sockaddr_un sender_addr;
+        int unix_path_index;
+
+        DEBUG("Creating and binding the unix socket\n");
+
+        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+        assert(sock >= 0);
+
+        unix_path_index = (int)contract->protocol_info.body;
+
+        memset(&sender_addr, 0, sizeof(sender_addr));
+        sender_addr.sun_family = AF_UNIX;
+        strcpy(sender_addr.sun_path, the_unix_socket_paths[unix_path_index]);
+
+        err = bind(sock, (struct sockaddr *)&sender_addr, sizeof(sender_addr));
+        assert(err >= 0);
+
+        *vres = (fna_vres_id_t)sock;
+
         return 0;
 }
 
@@ -289,7 +341,28 @@ int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
                         const void *msg,
                         const size_t size)
 {
+
+        struct sockaddr_un receiver_addr;
+        ssize_t sent_bytes;
+
         DEBUG("send async\n");
+
+        assert(endpoint->is_bound);
+
+        memset(&receiver_addr, 0, sizeof(receiver_addr));
+        receiver_addr.sun_family = AF_UNIX;
+        strcpy(receiver_addr.sun_path,
+               the_unix_socket_paths[endpoint->destination]);
+
+        sent_bytes = sendto(endpoint->vres, /* the socket */
+                            msg,
+                            size,
+                            0,
+                            (struct sockaddr *) &receiver_addr,
+                            sizeof(receiver_addr));
+
+        assert(sent_bytes >= 0);
+
         return 0;
 }
 
@@ -304,7 +377,25 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
                           size_t *received_bytes,
                           frsh_network_address_t *from)
 {
+        struct sockaddr_un sender_addr;
+        ssize_t recv_bytes;
+        socklen_t from_len;
+
         DEBUG("receive sync\n");
+
+        from_len = sizeof(sender_addr);
+        recv_bytes = recvfrom(endpoint->vres, /* the socket */
+                              buffer,
+                              buffer_size,
+                              0,
+                              (struct sockaddr *)&sender_addr,
+                              &from_len);
+
+        assert(recv_bytes >= 0);
+
+        *received_bytes = recv_bytes;
+        /* TODO: fill from */
+
         return 0;
 }
 
@@ -341,9 +432,28 @@ int unix_fna_send_endpoint_get_status(const fna_endpoint_data_t *endpoint,
  * unix_fna_receive_endpoint_created()
  *
  **/
-int unix_fna_receive_endpoint_created(const fna_endpoint_data_t *endpoint)
+int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
 {
-        DEBUG("NOT IMPLEMENTED\n");
+        int sock, err;
+        struct sockaddr_un receiver_addr;
+        int unix_path_index;
+
+        DEBUG("creating the socket to receive\n");
+
+        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+        assert(sock >= 0);
+
+        unix_path_index = (int)endpoint->protocol_info.body;
+
+        memset(&receiver_addr, 0, sizeof(receiver_addr));
+        receiver_addr.sun_family = AF_UNIX;
+        strcpy(receiver_addr.sun_path, the_unix_socket_paths[unix_path_index]);
+
+        err = bind(sock, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr));
+        assert(err >= 0);
+
+        endpoint->vres = sock;
+
         return 0;
 }
 
index 07db3d790f3f997f40ca1641a2d4cf9e2e0a0847..7c1ace2719c8eb1ab0171b7ac9265fef4b844793 100644 (file)
@@ -73,12 +73,10 @@ extern fna_operations_t unix_fna_operations;
 /**
  *  unix addresses:
  *
- *  As we are in the same node the network address will be always 0.
- *  The stream_ids 0 .. MX_UNIX_STREAM_IDS-1 are statically mapped in the
- *  following table to UNIX pathnames.
+ *  We hardwire the frsh network addresses to unix socket paths
  *
  **/
-#define MX_UNIX_STREAM_IDS 6
-extern char *the_unix_stream_ids[MX_UNIX_STREAM_IDS];
+#define MX_UNIX_SOCKET_PATHS 6
+extern char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS];
 
 #endif // _UNIX_FNA_H_
index 78430779b35006fd28a175f7d8c9f7602480f94e..b7f93e97b64fde35d2e27511cf4057efde6c80cf 100644 (file)
@@ -14,6 +14,8 @@
 #include "fna.h" // for fna_vres_id_t
 #include "unix_fna.h" // for unix_fna_operations
 
+#define THE_SENDER_ADDR 0
+#define THE_RECEIVER_ADDR 1
 
 void sender(void);
 void receiver(void);
@@ -44,8 +46,8 @@ int main (int argc, char **argv)
                 return -1;
         }
 
-        unlink(the_unix_stream_ids[0]);
-        unlink(the_unix_stream_ids[1]);
+        unlink(the_unix_socket_paths[0]);
+        unlink(the_unix_socket_paths[1]);
 
         return 0;
 }
@@ -53,22 +55,33 @@ int main (int argc, char **argv)
 void sender(void)
 {
         int err, i;
-
+        frsh_contract_t contract;
         fna_endpoint_data_t endpoint;
-        frsh_stream_id_t receiver_port;
+        frsh_network_address_t my_address, receiver_address;
         char buffer[100];
         size_t size;
 
         printf("I am the sender\n");
 
-        receiver_port = 0; /* statically configured to the_unix_stream_ids[0] */
+        err = frsh_contract_init(&contract);
+        assert(err == 0);
+
+        my_address = THE_SENDER_ADDR;
+        receiver_address = THE_RECEIVER_ADDR;
+
+        contract.protocol_info.body = (void *)my_address;
+
+        err = unix_fna_operations.fna_contract_negotiate
+                        (FRSH_NETWORK_ID_DEFAULT,
+                         &contract,
+                         &endpoint.vres);
+        assert(err == 0);
 
         endpoint.endpoint_type  = FRSH_SEND_ENDPOINT_TYPE;
-        endpoint.vres           = 0;
         endpoint.is_bound       = true;
-        endpoint.destination    = 0; /* only for send_endpoints */
+        endpoint.destination    = receiver_address; /* only for send_endpoints */
         endpoint.resource_id    = FRSH_NETWORK_ID_DEFAULT;
-        endpoint.stream_id      = receiver_port;
+        endpoint.stream_id      = 0; /* not used */
 
         for(i=0; i<10; i++) {
                 sleep(1);
@@ -89,19 +102,22 @@ void receiver(void)
 {
         int err, i;
         fna_endpoint_data_t endpoint;
-        frsh_network_address_t sender_addr;
-        frsh_stream_id_t sender_port;
+        frsh_network_address_t sender_addr, my_address;
         char buffer[100];
         size_t received_bytes;
 
         printf("I am the receiver\n");
 
-        sender_port = 1;
+        my_address = THE_RECEIVER_ADDR;
 
+        endpoint.protocol_info.body = (void *)my_address;
         endpoint.endpoint_type  = FRSH_RECEIVE_ENDPOINT_TYPE;
         endpoint.is_bound       = false;
         endpoint.resource_id    = FRSH_NETWORK_ID_DEFAULT;
-        endpoint.stream_id      = sender_port;
+        endpoint.stream_id      = 0; /* not used */
+
+        err = unix_fna_operations.fna_receive_endpoint_created(&endpoint);
+        assert (err == 0);
 
         for(i=0; i<10; i++) {
                 err = unix_fna_operations.fna_receive_sync(&endpoint,
index f09297d16b3d5f502cef887ca283d27f42bfe508..64b1e1aed8d1c109d65b2b0321e7eb85fdc5ee07 100644 (file)
@@ -50,8 +50,8 @@ void sender(void)
 
         printf("I am the sender\n");
 
-        err = sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-        assert(err >= 0);
+        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+        assert(sock >= 0);
 
         memset(&sender_addr, 0, sizeof(sender_addr));
         sender_addr.sun_family = AF_UNIX;
@@ -93,8 +93,8 @@ void receiver(void)
 
         printf("I am the receiver\n");
 
-        err = sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-        assert(err >= 0);
+        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+        assert(sock >= 0);
 
         memset(&receiver_addr, 0, sizeof(receiver_addr));
         receiver_addr.sun_family = AF_UNIX;
@@ -104,9 +104,6 @@ void receiver(void)
         assert(err >= 0);
 
         for(i=0; i<10; i++) {
-
-                sleep(5);
-
                 from_len = sizeof(sender_addr);
                 received_bytes = recvfrom(sock,
                                           buffer,