From 2e7854b383d492f381dd11f874dba30f581d34ee Mon Sep 17 00:00:00 2001 From: sangorrin Date: Wed, 31 Oct 2007 12:28:04 +0000 Subject: [PATCH 1/1] change to unix fna, now we encode streams... but i found a problem in dtm so it wont be useful because multithreaded apps in linux_lib_arch cannot sleep git-svn-id: http://www.frescor.org/private/svn/frescor/fna/trunk@826 35b4ef3e-fd22-0410-ab77-dab3279adceb --- src_unix/unix_fna.c | 184 ++++++++++-------- src_unix/unix_fna.h | 10 +- tests/tests_unix_fna/Makefile | 5 +- tests/tests_unix_fna/test_unix_address.c | 48 +++++ .../test_unix_fna_send_receive.c | 53 ++--- .../test_unix_fna_send_receive_script | 26 +++ 6 files changed, 202 insertions(+), 124 deletions(-) create mode 100644 tests/tests_unix_fna/test_unix_address.c create mode 100755 tests/tests_unix_fna/test_unix_fna_send_receive_script diff --git a/src_unix/unix_fna.c b/src_unix/unix_fna.c index 670fe5b..5ab3540 100644 --- a/src_unix/unix_fna.c +++ b/src_unix/unix_fna.c @@ -76,54 +76,106 @@ * * The main tricks of the implementation are the following: * - * - In Unix sockets the address is just a path (a string) so there is no - * such a concept of streams. - * - when the user negotiates a contract it must provide, as protocol - * dependent information, the destination so we can create and bind a - * socket for it - * - The created socket id is stored as the 'vres' - * - On the receiving part, when the user creates the receive endpoint, it - * must provide the receiving path as endpoint information so we can - * create a socket to receive from that path. - * - Both informations are passed as an integer for a static array called - * "the_unix_socket_paths" where we configure statically the unix paths. - * They are passed using the void * fields as if they where integers. + * - We encode the address and stream in the Unix socket path as: + * '/tmp/unix_fna-address-stream' + * - At initialization we create MX_UNIX_STREAM_IDS sockets for our address + * (our address number is defined by FRSH_CPU_ID_DEFAULT) + * - When the user SENDS we obtain the address creating the mentioned string + * from the destination address and the stream. + * - When the user RECEIVES we use the appropiate socket by using the + * stream_id information. * **/ #include /* for malloc and free */ #include +#include /* for string functions: strtok, strcpy, ... */ + +#include "frsh_distributed_types.h" /* for frsh_network_address_t, frsh_stream_id_t */ #include "unix_fna.h" /* function prototypes */ #include -#include /* struct sockaddr_un */ +#include /* DEBUGGING FLAGS and MACROS */ #include #define DEBUG(enable,x,args...) if(enable) printf("\t>> Called %s: " x, __func__ , ##args) #define DBG_UNIX_FNA_NOT_IMPLEMENTED true +/** + * to_unix_path() + * + **/ + +static int to_unix_path(const frsh_network_address_t addr, + const frsh_stream_id_t stream, + char *str, + size_t mx_size) +{ + return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream); +} + +/** + * to_addr_stream() + * + **/ + +static int to_addr_stream(frsh_network_address_t *addr, + frsh_stream_id_t *stream, + char *str, + size_t size) +{ + char *token; + char *search = "-"; + + token = strtok(str, search); + token = strtok(NULL, search); + *addr = atoi(token); + token = strtok(NULL, search); + *stream = atoi(token); + + return 0; +} + ////////////////////////////////////////////////////////////////////// // INITIALIZATION ////////////////////////////////////////////////////////////////////// -char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS] = { - "/tmp/path00", - "/tmp/path01", - "/tmp/path02", - "/tmp/path03", - "/tmp/path04", - "/tmp/path05" -}; +int the_unix_sockets[MX_UNIX_STREAM_IDS]; /** * unix_fna_init() * + * for each stream_id create a socket and bind it to the address obtained + * from "/tmp/unix_fna-addr-stream" + * **/ int unix_fna_init(const frsh_resource_id_t resource_id) { - DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n"); + int i, err; + struct sockaddr_un sock_addr; + + DEBUG(true, "creating unix sockets\n"); + + for(i=0; i= 0); + + memset(&sock_addr, 0, sizeof(sock_addr)); + sock_addr.sun_family = AF_UNIX; + err = to_unix_path(FRSH_CPU_ID_DEFAULT, + (frsh_stream_id_t) i, + sock_addr.sun_path, + sizeof(sock_addr.sun_path)); + assert(err >= 0); + + err = bind(the_unix_sockets[i], + (struct sockaddr *)&sock_addr, + sizeof(sock_addr)); + assert(err >= 0); + } + return 0; } @@ -140,26 +192,7 @@ int unix_fna_contract_negotiate(const frsh_resource_id_t resource_id, const frsh_contract_t *contract, fna_vres_id_t *vres) { - int sock, err; - struct sockaddr_un sender_addr; - int unix_path_index; - - DEBUG(true, "Creating and binding the unix socket\n"); - - sock = socket(AF_UNIX, SOCK_DGRAM, 0); - assert(sock >= 0); - - unix_path_index = (int)contract->protocol_info.body; - - memset(&sender_addr, 0, sizeof(sender_addr)); - sender_addr.sun_family = AF_UNIX; - strcpy(sender_addr.sun_path, the_unix_socket_paths[unix_path_index]); - - err = bind(sock, (struct sockaddr *)&sender_addr, sizeof(sender_addr)); - assert(err >= 0); - - *vres = (fna_vres_id_t)sock; - + DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n"); return 0; } @@ -333,32 +366,39 @@ int unix_fna_send_sync(const fna_endpoint_data_t *endpoint, /** * unix_fna_send_async() * + * To send we use one of our sockets (it doesn't matter which one so we use + * the first one). The destination is obtained from the endpoint values for + * dest and stream. + * **/ int unix_fna_send_async(const fna_endpoint_data_t *endpoint, const void *msg, const size_t size) { - - struct sockaddr_un receiver_addr; + int err; + struct sockaddr_un to; ssize_t sent_bytes; DEBUG(true, "send async\n"); assert(endpoint->is_bound); - memset(&receiver_addr, 0, sizeof(receiver_addr)); - receiver_addr.sun_family = AF_UNIX; - strcpy(receiver_addr.sun_path, - the_unix_socket_paths[endpoint->destination]); + memset(&to, 0, sizeof(to)); + to.sun_family = AF_UNIX; + + err = to_unix_path(endpoint->destination, + endpoint->stream_id, + to.sun_path, + sizeof(to.sun_path)); + assert(err >= 0); - sent_bytes = sendto(endpoint->vres, /* the socket */ + sent_bytes = sendto(the_unix_sockets[0], msg, size, 0, - (struct sockaddr *) &receiver_addr, - sizeof(receiver_addr)); - + (struct sockaddr *) &to, + sizeof(to)); assert(sent_bytes >= 0); return 0; @@ -367,6 +407,9 @@ int unix_fna_send_async(const fna_endpoint_data_t *endpoint, /** * unix_fna_receive_sync() * + * We call recvfrom using the socket associated to the corresponding stream_id. + * The "from" address is obtained by parsing the unix address path. + * **/ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint, @@ -375,14 +418,17 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint, size_t *received_bytes, frsh_network_address_t *from) { + int err; struct sockaddr_un sender_addr; ssize_t recv_bytes; socklen_t from_len; + frsh_network_address_t addr; + frsh_stream_id_t stream; DEBUG(true, "receive sync\n"); from_len = sizeof(sender_addr); - recv_bytes = recvfrom(endpoint->vres, /* the socket */ + recv_bytes = recvfrom(the_unix_sockets[endpoint->stream_id], buffer, buffer_size, 0, @@ -390,9 +436,14 @@ int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint, &from_len); assert(recv_bytes >= 0); - *received_bytes = recv_bytes; - /* TODO: fill from */ + + err = to_addr_stream(&addr, + &stream, + sender_addr.sun_path, + sizeof(sender_addr.sun_path)); + assert(err == 0); + *from = addr; return 0; } @@ -432,26 +483,7 @@ int unix_fna_send_endpoint_get_status(const fna_endpoint_data_t *endpoint, **/ int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint) { - int sock, err; - struct sockaddr_un receiver_addr; - int unix_path_index; - - DEBUG(true, "creating the socket to receive\n"); - - sock = socket(AF_UNIX, SOCK_DGRAM, 0); - assert(sock >= 0); - - unix_path_index = (int)endpoint->protocol_info.body; - - memset(&receiver_addr, 0, sizeof(receiver_addr)); - receiver_addr.sun_family = AF_UNIX; - strcpy(receiver_addr.sun_path, the_unix_socket_paths[unix_path_index]); - - err = bind(sock, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr)); - assert(err >= 0); - - endpoint->vres = sock; - + DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n"); return 0; } @@ -495,7 +527,7 @@ int unix_fna_network_bytes_to_budget(const frsh_resource_id_t resource_id, const size_t nbytes, struct timespec *budget) { - DEBUG(true, "let's put 1 microsecond\n"); + DEBUG(true, "let's put 1 microsecond for all\n"); budget->tv_sec = 0; budget->tv_nsec = 1000; return 0; diff --git a/src_unix/unix_fna.h b/src_unix/unix_fna.h index 7c1ace2..bf6c06b 100644 --- a/src_unix/unix_fna.h +++ b/src_unix/unix_fna.h @@ -70,13 +70,7 @@ extern fna_operations_t unix_fna_operations; -/** - * unix addresses: - * - * We hardwire the frsh network addresses to unix socket paths - * - **/ -#define MX_UNIX_SOCKET_PATHS 6 -extern char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS]; +#define MX_UNIX_STREAM_IDS 6 +#define MX_UNIX_NETWORK_ADDR 3 #endif // _UNIX_FNA_H_ diff --git a/tests/tests_unix_fna/Makefile b/tests/tests_unix_fna/Makefile index 7d3abec..1a0712c 100644 --- a/tests/tests_unix_fna/Makefile +++ b/tests/tests_unix_fna/Makefile @@ -2,4 +2,7 @@ include ../../config.mk include ../../rules.mk CFLAGS += -I$(FNA_PATH)/src_unix/ -LDFLAGS += -L$(FNA_PATH)/lib -lunixfna -L$(FRSH_PATH)/lib -lfrsh -L$(FOSA_PATH)/lib -lfosa_$(PLATFORM) -lm -lfna \ No newline at end of file +LDFLAGS += -L$(FNA_PATH)/lib -lunixfna -L$(FRSH_PATH)/lib -lfrsh -L$(FOSA_PATH)/lib -lfosa_$(PLATFORM) -lm -lfna + +test_unix_address.exe: test_unix_address.c + $(CC) $< $(CFLAGS) \ No newline at end of file diff --git a/tests/tests_unix_fna/test_unix_address.c b/tests/tests_unix_fna/test_unix_address.c new file mode 100644 index 0000000..31b3ba1 --- /dev/null +++ b/tests/tests_unix_fna/test_unix_address.c @@ -0,0 +1,48 @@ +#include +#include +#include +#include + +int to_unix_path(const frsh_network_address_t addr, + const frsh_stream_id_t stream, + char *str, + size_t mx_size) +{ + return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream); +} + +int to_addr_stream(frsh_network_address_t *addr, + frsh_stream_id_t *stream, + char *str, + size_t size) +{ + char *token; + char *search = "-"; + + token = strtok(str, search); + token = strtok(NULL, search); + *addr = atoi(token); + token = strtok(NULL, search); + *stream = atoi(token); + + return 0; +} + +int main() +{ + int err; + char myaddress[] = "/tmp/unix_fna-45-4"; + frsh_network_address_t addr; + frsh_stream_id_t stream; + char str[100]; + + printf("TEST for unix path\n"); + + err = to_addr_stream(&addr, &stream, myaddress, sizeof(myaddress)); + printf("to_addr_stream: addr=%d, stream=%d, err=%d\n", addr, stream, err); + + err = to_unix_path(0, 1, str, sizeof(str)); + printf("to_unix_path: %s\n", str); + + return 0; +} diff --git a/tests/tests_unix_fna/test_unix_fna_send_receive.c b/tests/tests_unix_fna/test_unix_fna_send_receive.c index b7f93e9..dc34cfa 100644 --- a/tests/tests_unix_fna/test_unix_fna_send_receive.c +++ b/tests/tests_unix_fna/test_unix_fna_send_receive.c @@ -4,37 +4,27 @@ #include #include /* memset */ -#include -#include /* struct sockaddr_un */ - -#include "frsh_core.h" // for frsh_contract_set_xxx -#include "frsh_core_types.h" // for FRSH_NETWORK_ID_DEFAULT -#include "frsh_distributed_types.h" // for frsh_network_address_t, frsh_stream_id_t +#include "frsh.h" // for FRSH_CPU_ID_DEFAULT #include "fna.h" // for fna_vres_id_t #include "unix_fna.h" // for unix_fna_operations -#define THE_SENDER_ADDR 0 -#define THE_RECEIVER_ADDR 1 - +#define THE_SENDER_ADDR 0 +#define THE_RECEIVER_ADDR 1 +#define THE_STREAM_ID 3 void sender(void); void receiver(void); -int main (int argc, char **argv) +int main () { int err; - if (argc != 2) { - printf("please provide the node to execute:\n"); - printf(" $./mprogram 0 (sender)\n"); - printf(" $./mprogram 1 (receiver)\n"); - return -1; - } - err = unix_fna_operations.fna_init(FRSH_NETWORK_ID_DEFAULT); assert(err == 0); - switch(atoi(argv[1])) { + printf("I am %d\n", FRSH_CPU_ID_DEFAULT); + + switch(FRSH_CPU_ID_DEFAULT) { case 0: sender(); break; @@ -46,9 +36,6 @@ int main (int argc, char **argv) return -1; } - unlink(the_unix_socket_paths[0]); - unlink(the_unix_socket_paths[1]); - return 0; } @@ -57,20 +44,11 @@ void sender(void) int err, i; frsh_contract_t contract; fna_endpoint_data_t endpoint; - frsh_network_address_t my_address, receiver_address; char buffer[100]; size_t size; printf("I am the sender\n"); - err = frsh_contract_init(&contract); - assert(err == 0); - - my_address = THE_SENDER_ADDR; - receiver_address = THE_RECEIVER_ADDR; - - contract.protocol_info.body = (void *)my_address; - err = unix_fna_operations.fna_contract_negotiate (FRSH_NETWORK_ID_DEFAULT, &contract, @@ -79,9 +57,9 @@ void sender(void) endpoint.endpoint_type = FRSH_SEND_ENDPOINT_TYPE; endpoint.is_bound = true; - endpoint.destination = receiver_address; /* only for send_endpoints */ + endpoint.destination = THE_RECEIVER_ADDR; endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT; - endpoint.stream_id = 0; /* not used */ + endpoint.stream_id = THE_STREAM_ID; for(i=0; i<10; i++) { sleep(1); @@ -102,19 +80,16 @@ void receiver(void) { int err, i; fna_endpoint_data_t endpoint; - frsh_network_address_t sender_addr, my_address; + frsh_network_address_t from; char buffer[100]; size_t received_bytes; printf("I am the receiver\n"); - my_address = THE_RECEIVER_ADDR; - - endpoint.protocol_info.body = (void *)my_address; endpoint.endpoint_type = FRSH_RECEIVE_ENDPOINT_TYPE; endpoint.is_bound = false; endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT; - endpoint.stream_id = 0; /* not used */ + endpoint.stream_id = THE_STREAM_ID; err = unix_fna_operations.fna_receive_endpoint_created(&endpoint); assert (err == 0); @@ -124,12 +99,12 @@ void receiver(void) buffer, 100, &received_bytes, - &sender_addr); + &from); assert (err == 0); buffer[received_bytes] = '\0'; - printf("receiver : received %s\n", buffer); + printf("receiver : received %s from %d\n", buffer, from); } printf("Receiver done\n"); diff --git a/tests/tests_unix_fna/test_unix_fna_send_receive_script b/tests/tests_unix_fna/test_unix_fna_send_receive_script new file mode 100755 index 0000000..f58201e --- /dev/null +++ b/tests/tests_unix_fna/test_unix_fna_send_receive_script @@ -0,0 +1,26 @@ +export PLATFORM=MARTE_OS +rm -f /tmp/unix_fna* + +cd +cd frescor +make clean +cp frsh/include/frsh_configuration_parameters.h_cpu1 frsh/include/frsh_configuration_parameters.h +make all + +cd fna/tests/tests_unix_fna +make test_unix_fna_send_receive.exe +cp mprogram /home/dsl/export/test_unix_fna_send_receive_cpu1.exe + +cd +cd frescor +make clean +cp frsh/include/frsh_configuration_parameters.h_cpu2 frsh/include/frsh_configuration_parameters.h +make all + +cd fna/tests/tests_unix_fna +make test_unix_fna_send_receive.exe +cp mprogram /home/dsl/export/test_unix_fna_send_receive_cpu2.exe + +cd /home/dsl/export +./test_unix_fna_send_receive_cpu2.exe & +./test_unix_fna_send_receive_cpu1.exe -- 2.39.2