]> rtime.felk.cvut.cz Git - frescor/fna.git/commitdiff
change to unix fna, now we encode streams... but i found a problem in dtm so it wont...
authorsangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Wed, 31 Oct 2007 12:28:04 +0000 (12:28 +0000)
committersangorrin <sangorrin@35b4ef3e-fd22-0410-ab77-dab3279adceb>
Wed, 31 Oct 2007 12:28:04 +0000 (12:28 +0000)
git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@826 35b4ef3e-fd22-0410-ab77-dab3279adceb

src_unix/unix_fna.c
src_unix/unix_fna.h
tests/tests_unix_fna/Makefile
tests/tests_unix_fna/test_unix_address.c [new file with mode: 0644]
tests/tests_unix_fna/test_unix_fna_send_receive.c
tests/tests_unix_fna/test_unix_fna_send_receive_script [new file with mode: 0755]

index 670fe5bb32785b1789585052927611ab28dc6372..5ab3540da58c18465dee1176d84530bda6c9960b 100644 (file)
  *
  * The main tricks of the implementation are the following:
  *
- *   - In Unix sockets the address is just a path (a string) so there is no
- *     such a concept of streams.
- *   - when the user negotiates a contract it must provide, as protocol
- *     dependent information, the destination so we can create and bind a
- *     socket for it
- *   - The created socket id is stored as the 'vres'
- *   - On the receiving part, when the user creates the receive endpoint, it
- *     must provide the receiving path as endpoint information so we can
- *     create a socket to receive from that path.
- *   - Both informations are passed as an integer for a static array called
- *     "the_unix_socket_paths" where we configure statically the unix paths.
- *     They are passed using the void * fields as if they where integers.
+ *   - We encode the address and stream in the Unix socket path as:
+ *                    '/tmp/unix_fna-address-stream'
+ *   - At initialization we create MX_UNIX_STREAM_IDS sockets for our address
+ *     (our address number is defined by FRSH_CPU_ID_DEFAULT)
+ *   - When the user SENDS we obtain the address creating the mentioned string
+ *     from the destination address and the stream.
+ *   - When the user RECEIVES we use the appropiate socket by using the
+ *     stream_id information.
  *
  **/
 
 #include <malloc.h>   /* for malloc and free */
 #include <assert.h>
+#include <string.h>   /* for string functions: strtok, strcpy, ... */
+
+#include "frsh_distributed_types.h" /* for frsh_network_address_t, frsh_stream_id_t */
 #include "unix_fna.h" /* function prototypes */
 
 #include <sys/socket.h>
-#include <sys/un.h> /* struct sockaddr_un */
+#include <sys/un.h>
 
 /* DEBUGGING FLAGS and MACROS */
 #include <stdio.h>
 #define DEBUG(enable,x,args...) if(enable) printf("\t>> Called %s: " x, __func__ , ##args)
 #define DBG_UNIX_FNA_NOT_IMPLEMENTED true
 
+/**
+ * to_unix_path()
+ *
+ **/
+
+static int to_unix_path(const frsh_network_address_t addr,
+                        const frsh_stream_id_t       stream,
+                        char                         *str,
+                        size_t                       mx_size)
+{
+        return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream);
+}
+
+/**
+ * to_addr_stream()
+ *
+ **/
+
+static int to_addr_stream(frsh_network_address_t *addr,
+                          frsh_stream_id_t       *stream,
+                          char                   *str,
+                          size_t                 size)
+{
+        char *token;
+        char *search = "-";
+
+        token = strtok(str, search);
+        token = strtok(NULL, search);
+        *addr = atoi(token);
+        token = strtok(NULL, search);
+        *stream = atoi(token);
+
+        return 0;
+}
+
 //////////////////////////////////////////////////////////////////////
 //           INITIALIZATION
 //////////////////////////////////////////////////////////////////////
 
-char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS] = {
-        "/tmp/path00",
-        "/tmp/path01",
-        "/tmp/path02",
-        "/tmp/path03",
-        "/tmp/path04",
-        "/tmp/path05"
-};
+int the_unix_sockets[MX_UNIX_STREAM_IDS];
 
 /**
  * unix_fna_init()
  *
+ * for each stream_id create a socket and bind it to the address obtained
+ * from "/tmp/unix_fna-addr-stream"
+ *
  **/
 
 int unix_fna_init(const frsh_resource_id_t resource_id)
 {
-        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+        int i, err;
+        struct sockaddr_un sock_addr;
+
+        DEBUG(true, "creating unix sockets\n");
+
+        for(i=0; i<MX_UNIX_STREAM_IDS; i++) {
+                the_unix_sockets[i] = socket(AF_UNIX, SOCK_DGRAM, 0);
+                assert(the_unix_sockets[i] >= 0);
+
+                memset(&sock_addr, 0, sizeof(sock_addr));
+                sock_addr.sun_family = AF_UNIX;
+                err = to_unix_path(FRSH_CPU_ID_DEFAULT,
+                                   (frsh_stream_id_t) i,
+                                   sock_addr.sun_path,
+                                   sizeof(sock_addr.sun_path));
+                assert(err >= 0);
+
+                err = bind(the_unix_sockets[i],
+                           (struct sockaddr *)&sock_addr,
+                           sizeof(sock_addr));
+                assert(err >= 0);
+        }
+
         return 0;
 }
 
@@ -140,26 +192,7 @@ int unix_fna_contract_negotiate(const frsh_resource_id_t resource_id,
                                 const frsh_contract_t *contract,
                                 fna_vres_id_t *vres)
 {
-        int sock, err;
-        struct sockaddr_un sender_addr;
-        int unix_path_index;
-
-        DEBUG(true, "Creating and binding the unix socket\n");
-
-        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-        assert(sock >= 0);
-
-        unix_path_index = (int)contract->protocol_info.body;
-
-        memset(&sender_addr, 0, sizeof(sender_addr));
-        sender_addr.sun_family = AF_UNIX;
-        strcpy(sender_addr.sun_path, the_unix_socket_paths[unix_path_index]);
-
-        err = bind(sock, (struct sockaddr *)&sender_addr, sizeof(sender_addr));
-        assert(err >= 0);
-
-        *vres = (fna_vres_id_t)sock;
-
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
         return 0;
 }
 
@@ -333,32 +366,39 @@ int unix_fna_send_sync(const fna_endpoint_data_t *endpoint,
 /**
  * unix_fna_send_async()
  *
+ * To send we use one of our sockets (it doesn't matter which one so we use
+ * the first one). The destination is obtained from the endpoint values for
+ * dest and stream.
+ *
  **/
 
 int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
                         const void *msg,
                         const size_t size)
 {
-
-        struct sockaddr_un receiver_addr;
+        int err;
+        struct sockaddr_un to;
         ssize_t sent_bytes;
 
         DEBUG(true, "send async\n");
 
         assert(endpoint->is_bound);
 
-        memset(&receiver_addr, 0, sizeof(receiver_addr));
-        receiver_addr.sun_family = AF_UNIX;
-        strcpy(receiver_addr.sun_path,
-               the_unix_socket_paths[endpoint->destination]);
+        memset(&to, 0, sizeof(to));
+        to.sun_family = AF_UNIX;
+
+        err = to_unix_path(endpoint->destination,
+                           endpoint->stream_id,
+                           to.sun_path,
+                           sizeof(to.sun_path));
+        assert(err >= 0);
 
-        sent_bytes = sendto(endpoint->vres, /* the socket */
+        sent_bytes = sendto(the_unix_sockets[0],
                             msg,
                             size,
                             0,
-                            (struct sockaddr *) &receiver_addr,
-                            sizeof(receiver_addr));
-
+                            (struct sockaddr *) &to,
+                            sizeof(to));
         assert(sent_bytes >= 0);
 
         return 0;
@@ -367,6 +407,9 @@ int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
 /**
  * unix_fna_receive_sync()
  *
+ * We call recvfrom using the socket associated to the corresponding stream_id.
+ * The "from" address is obtained by parsing the unix address path.
+ *
  **/
 
 int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
@@ -375,14 +418,17 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
                           size_t *received_bytes,
                           frsh_network_address_t *from)
 {
+        int err;
         struct sockaddr_un sender_addr;
         ssize_t recv_bytes;
         socklen_t from_len;
+        frsh_network_address_t addr;
+        frsh_stream_id_t       stream;
 
         DEBUG(true, "receive sync\n");
 
         from_len = sizeof(sender_addr);
-        recv_bytes = recvfrom(endpoint->vres, /* the socket */
+        recv_bytes = recvfrom(the_unix_sockets[endpoint->stream_id],
                               buffer,
                               buffer_size,
                               0,
@@ -390,9 +436,14 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
                               &from_len);
 
         assert(recv_bytes >= 0);
-
         *received_bytes = recv_bytes;
-        /* TODO: fill from */
+
+        err = to_addr_stream(&addr,
+                             &stream,
+                             sender_addr.sun_path,
+                             sizeof(sender_addr.sun_path));
+        assert(err == 0);
+        *from = addr;
 
         return 0;
 }
@@ -432,26 +483,7 @@ int unix_fna_send_endpoint_get_status(const fna_endpoint_data_t *endpoint,
  **/
 int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
 {
-        int sock, err;
-        struct sockaddr_un receiver_addr;
-        int unix_path_index;
-
-        DEBUG(true, "creating the socket to receive\n");
-
-        sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-        assert(sock >= 0);
-
-        unix_path_index = (int)endpoint->protocol_info.body;
-
-        memset(&receiver_addr, 0, sizeof(receiver_addr));
-        receiver_addr.sun_family = AF_UNIX;
-        strcpy(receiver_addr.sun_path, the_unix_socket_paths[unix_path_index]);
-
-        err = bind(sock, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr));
-        assert(err >= 0);
-
-        endpoint->vres = sock;
-
+        DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
         return 0;
 }
 
@@ -495,7 +527,7 @@ int unix_fna_network_bytes_to_budget(const frsh_resource_id_t resource_id,
                                      const size_t nbytes,
                                      struct timespec *budget)
 {
-        DEBUG(true, "let's put 1 microsecond\n");
+        DEBUG(true, "let's put 1 microsecond for all\n");
         budget->tv_sec  = 0;
         budget->tv_nsec = 1000;
         return 0;
index 7c1ace2719c8eb1ab0171b7ac9265fef4b844793..bf6c06b862c0333d0a0fbaba04e2a0cf610afb9e 100644 (file)
 
 extern fna_operations_t unix_fna_operations;
 
-/**
- *  unix addresses:
- *
- *  We hardwire the frsh network addresses to unix socket paths
- *
- **/
-#define MX_UNIX_SOCKET_PATHS 6
-extern char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS];
+#define MX_UNIX_STREAM_IDS      6
+#define MX_UNIX_NETWORK_ADDR    3
 
 #endif // _UNIX_FNA_H_
index 7d3abec34de93962a91badf764f7c59789c0f2f3..1a0712c7eb6a17ce3264d2ac84159a1b968a78ea 100644 (file)
@@ -2,4 +2,7 @@ include ../../config.mk
 include ../../rules.mk
 
 CFLAGS += -I$(FNA_PATH)/src_unix/
-LDFLAGS += -L$(FNA_PATH)/lib -lunixfna -L$(FRSH_PATH)/lib -lfrsh -L$(FOSA_PATH)/lib -lfosa_$(PLATFORM) -lm -lfna
\ No newline at end of file
+LDFLAGS += -L$(FNA_PATH)/lib -lunixfna -L$(FRSH_PATH)/lib -lfrsh -L$(FOSA_PATH)/lib -lfosa_$(PLATFORM) -lm -lfna
+
+test_unix_address.exe: test_unix_address.c
+       $(CC) $< $(CFLAGS)
\ No newline at end of file
diff --git a/tests/tests_unix_fna/test_unix_address.c b/tests/tests_unix_fna/test_unix_address.c
new file mode 100644 (file)
index 0000000..31b3ba1
--- /dev/null
@@ -0,0 +1,48 @@
+#include <stdio.h>
+#include <frsh.h>
+#include <string.h>
+#include <stdlib.h>
+
+int to_unix_path(const frsh_network_address_t addr,
+                        const frsh_stream_id_t       stream,
+                        char                         *str,
+                        size_t                       mx_size)
+{
+        return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream);
+}
+
+int to_addr_stream(frsh_network_address_t *addr,
+                   frsh_stream_id_t       *stream,
+                   char             *str,
+                   size_t           size)
+{
+        char *token;
+        char *search = "-";
+
+        token = strtok(str, search);
+        token = strtok(NULL, search);
+        *addr = atoi(token);
+        token = strtok(NULL, search);
+        *stream = atoi(token);
+
+        return 0;
+}
+
+int main()
+{
+        int err;
+        char myaddress[] = "/tmp/unix_fna-45-4";
+        frsh_network_address_t addr;
+        frsh_stream_id_t       stream;
+        char str[100];
+
+        printf("TEST for unix path\n");
+
+        err = to_addr_stream(&addr, &stream, myaddress, sizeof(myaddress));
+        printf("to_addr_stream: addr=%d, stream=%d, err=%d\n", addr, stream, err);
+
+        err = to_unix_path(0, 1, str, sizeof(str));
+        printf("to_unix_path: %s\n", str);
+
+        return 0;
+}
index b7f93e97b64fde35d2e27511cf4057efde6c80cf..dc34cfa2a89e9f5f66b52002d3afa3b36a9d5c55 100644 (file)
@@ -4,37 +4,27 @@
 #include <assert.h>
 #include <string.h> /* memset */
 
-#include <sys/socket.h>
-#include <sys/un.h> /* struct sockaddr_un */
-
-#include "frsh_core.h" // for frsh_contract_set_xxx
-#include "frsh_core_types.h" // for FRSH_NETWORK_ID_DEFAULT
-#include "frsh_distributed_types.h" // for frsh_network_address_t, frsh_stream_id_t
+#include "frsh.h" // for FRSH_CPU_ID_DEFAULT
 
 #include "fna.h" // for fna_vres_id_t
 #include "unix_fna.h" // for unix_fna_operations
 
-#define THE_SENDER_ADDR 0
-#define THE_RECEIVER_ADDR 1
-
+#define THE_SENDER_ADDR         0
+#define THE_RECEIVER_ADDR       1
+#define THE_STREAM_ID           3
 void sender(void);
 void receiver(void);
 
-int main (int argc, char **argv)
+int main ()
 {
         int err;
 
-        if (argc != 2) {
-                printf("please provide the node to execute:\n");
-                printf("  $./mprogram 0 (sender)\n");
-                printf("  $./mprogram 1 (receiver)\n");
-                return -1;
-        }
-
         err = unix_fna_operations.fna_init(FRSH_NETWORK_ID_DEFAULT);
         assert(err == 0);
 
-        switch(atoi(argv[1])) {
+        printf("I am %d\n", FRSH_CPU_ID_DEFAULT);
+
+        switch(FRSH_CPU_ID_DEFAULT) {
         case 0:
                 sender();
                 break;
@@ -46,9 +36,6 @@ int main (int argc, char **argv)
                 return -1;
         }
 
-        unlink(the_unix_socket_paths[0]);
-        unlink(the_unix_socket_paths[1]);
-
         return 0;
 }
 
@@ -57,20 +44,11 @@ void sender(void)
         int err, i;
         frsh_contract_t contract;
         fna_endpoint_data_t endpoint;
-        frsh_network_address_t my_address, receiver_address;
         char buffer[100];
         size_t size;
 
         printf("I am the sender\n");
 
-        err = frsh_contract_init(&contract);
-        assert(err == 0);
-
-        my_address = THE_SENDER_ADDR;
-        receiver_address = THE_RECEIVER_ADDR;
-
-        contract.protocol_info.body = (void *)my_address;
-
         err = unix_fna_operations.fna_contract_negotiate
                         (FRSH_NETWORK_ID_DEFAULT,
                          &contract,
@@ -79,9 +57,9 @@ void sender(void)
 
         endpoint.endpoint_type  = FRSH_SEND_ENDPOINT_TYPE;
         endpoint.is_bound       = true;
-        endpoint.destination    = receiver_address; /* only for send_endpoints */
+        endpoint.destination    = THE_RECEIVER_ADDR;
         endpoint.resource_id    = FRSH_NETWORK_ID_DEFAULT;
-        endpoint.stream_id      = 0; /* not used */
+        endpoint.stream_id      = THE_STREAM_ID;
 
         for(i=0; i<10; i++) {
                 sleep(1);
@@ -102,19 +80,16 @@ void receiver(void)
 {
         int err, i;
         fna_endpoint_data_t endpoint;
-        frsh_network_address_t sender_addr, my_address;
+        frsh_network_address_t from;
         char buffer[100];
         size_t received_bytes;
 
         printf("I am the receiver\n");
 
-        my_address = THE_RECEIVER_ADDR;
-
-        endpoint.protocol_info.body = (void *)my_address;
         endpoint.endpoint_type  = FRSH_RECEIVE_ENDPOINT_TYPE;
         endpoint.is_bound       = false;
         endpoint.resource_id    = FRSH_NETWORK_ID_DEFAULT;
-        endpoint.stream_id      = 0; /* not used */
+        endpoint.stream_id      = THE_STREAM_ID;
 
         err = unix_fna_operations.fna_receive_endpoint_created(&endpoint);
         assert (err == 0);
@@ -124,12 +99,12 @@ void receiver(void)
                                                            buffer,
                                                            100,
                                                            &received_bytes,
-                                                           &sender_addr);
+                                                           &from);
                 assert (err == 0);
 
                 buffer[received_bytes] = '\0';
 
-                printf("receiver : received %s\n", buffer);
+                printf("receiver : received %s from %d\n", buffer, from);
         }
 
         printf("Receiver done\n");
diff --git a/tests/tests_unix_fna/test_unix_fna_send_receive_script b/tests/tests_unix_fna/test_unix_fna_send_receive_script
new file mode 100755 (executable)
index 0000000..f58201e
--- /dev/null
@@ -0,0 +1,26 @@
+export PLATFORM=MARTE_OS
+rm -f /tmp/unix_fna*
+
+cd
+cd frescor
+make clean
+cp frsh/include/frsh_configuration_parameters.h_cpu1 frsh/include/frsh_configuration_parameters.h
+make all
+
+cd fna/tests/tests_unix_fna
+make test_unix_fna_send_receive.exe
+cp mprogram /home/dsl/export/test_unix_fna_send_receive_cpu1.exe
+
+cd
+cd frescor
+make clean
+cp frsh/include/frsh_configuration_parameters.h_cpu2 frsh/include/frsh_configuration_parameters.h
+make all
+
+cd fna/tests/tests_unix_fna
+make test_unix_fna_send_receive.exe
+cp mprogram /home/dsl/export/test_unix_fna_send_receive_cpu2.exe
+
+cd /home/dsl/export
+./test_unix_fna_send_receive_cpu2.exe &
+./test_unix_fna_send_receive_cpu1.exe