//
// FNA(Frescor Network Adaptation layer), pronounced "efe ene a"
//==============================================================
+
+/**
+ * unix fna implementation
+ *
+ * In the following functions we implement a DUMMY FNA implementation without
+ * contracts or real-time requirements just for testing purposes. We provide
+ * send/receive capabilities between Linux processes through UNIX domain
+ * datagram sockets.
+ *
+ * The goal is to run FRSH on several processes by using the Linux_lib arch
+ * of MaRTE OS or Partikle to simulate a distributed system in a single PC.
+ *
+ * The main tricks of the implementation are the following:
+ *
+ * - In Unix sockets the address is just a path (a string) so there is no
+ * such a concept of streams.
+ * - when the user negotiates a contract it must provide, as protocol
+ * dependent information, the destination so we can create and bind a
+ * socket for it
+ * - The created socket id is stored as the 'vres'
+ * - On the receiving part, when the user creates the receive endpoint, it
+ * must provide the receiving path as endpoint information so we can
+ * create a socket to receive from that path.
+ * - Both informations are passed as an integer for a static array called
+ * "the_unix_socket_paths" where we configure statically the unix paths.
+ * They are passed using the void * fields as if they where integers.
+ *
+ **/
+
#include <malloc.h> /* for malloc and free */
+#include <assert.h>
#include "unix_fna.h" /* function prototypes */
+#include <sys/socket.h>
+#include <sys/un.h> /* struct sockaddr_un */
+
#if 1
#include <stdio.h>
#define DEBUG(x,args...) printf("%s: " x, __func__ , ##args)
// INITIALIZATION
//////////////////////////////////////////////////////////////////////
-char *the_unix_stream_ids[MX_UNIX_STREAM_IDS] = {
- "/tmp/stream00",
- "/tmp/stream01",
- "/tmp/stream02",
- "/tmp/stream03",
- "/tmp/stream04",
- "/tmp/stream05"
+char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS] = {
+ "/tmp/path00",
+ "/tmp/path01",
+ "/tmp/path02",
+ "/tmp/path03",
+ "/tmp/path04",
+ "/tmp/path05"
};
/**
const frsh_contract_t *contract,
fna_vres_id_t *vres)
{
- DEBUG("NOT IMPLEMENTED\n");
+ int sock, err;
+ struct sockaddr_un sender_addr;
+ int unix_path_index;
+
+ DEBUG("Creating and binding the unix socket\n");
+
+ sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(sock >= 0);
+
+ unix_path_index = (int)contract->protocol_info.body;
+
+ memset(&sender_addr, 0, sizeof(sender_addr));
+ sender_addr.sun_family = AF_UNIX;
+ strcpy(sender_addr.sun_path, the_unix_socket_paths[unix_path_index]);
+
+ err = bind(sock, (struct sockaddr *)&sender_addr, sizeof(sender_addr));
+ assert(err >= 0);
+
+ *vres = (fna_vres_id_t)sock;
+
return 0;
}
const void *msg,
const size_t size)
{
+
+ struct sockaddr_un receiver_addr;
+ ssize_t sent_bytes;
+
DEBUG("send async\n");
+
+ assert(endpoint->is_bound);
+
+ memset(&receiver_addr, 0, sizeof(receiver_addr));
+ receiver_addr.sun_family = AF_UNIX;
+ strcpy(receiver_addr.sun_path,
+ the_unix_socket_paths[endpoint->destination]);
+
+ sent_bytes = sendto(endpoint->vres, /* the socket */
+ msg,
+ size,
+ 0,
+ (struct sockaddr *) &receiver_addr,
+ sizeof(receiver_addr));
+
+ assert(sent_bytes >= 0);
+
return 0;
}
size_t *received_bytes,
frsh_network_address_t *from)
{
+ struct sockaddr_un sender_addr;
+ ssize_t recv_bytes;
+ socklen_t from_len;
+
DEBUG("receive sync\n");
+
+ from_len = sizeof(sender_addr);
+ recv_bytes = recvfrom(endpoint->vres, /* the socket */
+ buffer,
+ buffer_size,
+ 0,
+ (struct sockaddr *)&sender_addr,
+ &from_len);
+
+ assert(recv_bytes >= 0);
+
+ *received_bytes = recv_bytes;
+ /* TODO: fill from */
+
return 0;
}
* unix_fna_receive_endpoint_created()
*
**/
-int unix_fna_receive_endpoint_created(const fna_endpoint_data_t *endpoint)
+int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
{
- DEBUG("NOT IMPLEMENTED\n");
+ int sock, err;
+ struct sockaddr_un receiver_addr;
+ int unix_path_index;
+
+ DEBUG("creating the socket to receive\n");
+
+ sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(sock >= 0);
+
+ unix_path_index = (int)endpoint->protocol_info.body;
+
+ memset(&receiver_addr, 0, sizeof(receiver_addr));
+ receiver_addr.sun_family = AF_UNIX;
+ strcpy(receiver_addr.sun_path, the_unix_socket_paths[unix_path_index]);
+
+ err = bind(sock, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr));
+ assert(err >= 0);
+
+ endpoint->vres = sock;
+
return 0;
}
#include "fna.h" // for fna_vres_id_t
#include "unix_fna.h" // for unix_fna_operations
+#define THE_SENDER_ADDR 0
+#define THE_RECEIVER_ADDR 1
void sender(void);
void receiver(void);
return -1;
}
- unlink(the_unix_stream_ids[0]);
- unlink(the_unix_stream_ids[1]);
+ unlink(the_unix_socket_paths[0]);
+ unlink(the_unix_socket_paths[1]);
return 0;
}
void sender(void)
{
int err, i;
-
+ frsh_contract_t contract;
fna_endpoint_data_t endpoint;
- frsh_stream_id_t receiver_port;
+ frsh_network_address_t my_address, receiver_address;
char buffer[100];
size_t size;
printf("I am the sender\n");
- receiver_port = 0; /* statically configured to the_unix_stream_ids[0] */
+ err = frsh_contract_init(&contract);
+ assert(err == 0);
+
+ my_address = THE_SENDER_ADDR;
+ receiver_address = THE_RECEIVER_ADDR;
+
+ contract.protocol_info.body = (void *)my_address;
+
+ err = unix_fna_operations.fna_contract_negotiate
+ (FRSH_NETWORK_ID_DEFAULT,
+ &contract,
+ &endpoint.vres);
+ assert(err == 0);
endpoint.endpoint_type = FRSH_SEND_ENDPOINT_TYPE;
- endpoint.vres = 0;
endpoint.is_bound = true;
- endpoint.destination = 0; /* only for send_endpoints */
+ endpoint.destination = receiver_address; /* only for send_endpoints */
endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT;
- endpoint.stream_id = receiver_port;
+ endpoint.stream_id = 0; /* not used */
for(i=0; i<10; i++) {
sleep(1);
{
int err, i;
fna_endpoint_data_t endpoint;
- frsh_network_address_t sender_addr;
- frsh_stream_id_t sender_port;
+ frsh_network_address_t sender_addr, my_address;
char buffer[100];
size_t received_bytes;
printf("I am the receiver\n");
- sender_port = 1;
+ my_address = THE_RECEIVER_ADDR;
+ endpoint.protocol_info.body = (void *)my_address;
endpoint.endpoint_type = FRSH_RECEIVE_ENDPOINT_TYPE;
endpoint.is_bound = false;
endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT;
- endpoint.stream_id = sender_port;
+ endpoint.stream_id = 0; /* not used */
+
+ err = unix_fna_operations.fna_receive_endpoint_created(&endpoint);
+ assert (err == 0);
for(i=0; i<10; i++) {
err = unix_fna_operations.fna_receive_sync(&endpoint,
printf("I am the sender\n");
- err = sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- assert(err >= 0);
+ sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(sock >= 0);
memset(&sender_addr, 0, sizeof(sender_addr));
sender_addr.sun_family = AF_UNIX;
printf("I am the receiver\n");
- err = sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- assert(err >= 0);
+ sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(sock >= 0);
memset(&receiver_addr, 0, sizeof(receiver_addr));
receiver_addr.sun_family = AF_UNIX;
assert(err >= 0);
for(i=0; i<10; i++) {
-
- sleep(5);
-
from_len = sizeof(sender_addr);
received_bytes = recvfrom(sock,
buffer,