//
// FNA(Frescor Network Adaptation layer), pronounced "efe ene a"
//==============================================================
+
+/**
+ * unix fna implementation
+ *
+ * In the following functions we implement a DUMMY FNA implementation without
+ * contracts or real-time requirements just for testing purposes. We provide
+ * send/receive capabilities between Linux processes through UNIX domain
+ * datagram sockets.
+ *
+ * The goal is to run FRSH on several processes by using the Linux_lib arch
+ * of MaRTE OS or Partikle to simulate a distributed system in a single PC.
+ *
+ * The main tricks of the implementation are the following:
+ *
+ * - We encode the address and stream in the Unix socket path as:
+ * '/tmp/unix_fna-address-stream'
+ * - At initialization we create MX_UNIX_STREAM_IDS sockets for our address
+ * (our address number is defined by FRSH_CPU_ID_DEFAULT)
+ * - When the user SENDS we obtain the address creating the mentioned string
+ * from the destination address and the stream.
+ * - When the user RECEIVES we use the appropiate socket by using the
+ * stream_id information.
+ *
+ **/
+
#include <malloc.h> /* for malloc and free */
+#include <assert.h>
+#include <string.h> /* for string functions: strtok, strcpy, ... */
+
+#include "frsh_distributed_types.h" /* for frsh_network_address_t, frsh_stream_id_t */
#include "unix_fna.h" /* function prototypes */
-#if 0
+#include <sys/socket.h>
+#include <sys/un.h>
+
+/* DEBUGGING FLAGS and MACROS */
#include <stdio.h>
-#define DEBUG(x,args...) printf("%s: " x, __func__ , ##args)
-#else
-#define DEBUG(x,args...)
-#endif
+#define DEBUG(enable,x,args...) if(enable) printf("\t>> Called %s: " x, __func__ , ##args)
+#define DBG_UNIX_FNA_NOT_IMPLEMENTED true
+
+/**
+ * to_unix_path()
+ *
+ **/
+
+static int to_unix_path(const frsh_network_address_t addr,
+ const frsh_stream_id_t stream,
+ char *str,
+ size_t mx_size)
+{
+ return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream);
+}
+
+/**
+ * to_addr_stream()
+ *
+ **/
+
+static int to_addr_stream(frsh_network_address_t *addr,
+ frsh_stream_id_t *stream,
+ char *str,
+ size_t size)
+{
+ char *token;
+ char *search = "-";
+
+ token = strtok(str, search);
+ token = strtok(NULL, search);
+ *addr = atoi(token);
+ token = strtok(NULL, search);
+ *stream = atoi(token);
+
+ return 0;
+}
//////////////////////////////////////////////////////////////////////
// INITIALIZATION
//////////////////////////////////////////////////////////////////////
+int the_unix_sockets[MX_UNIX_STREAM_IDS];
+
/**
* unix_fna_init()
*
+ * for each stream_id create a socket and bind it to the address obtained
+ * from "/tmp/unix_fna-addr-stream"
+ *
**/
int unix_fna_init(const frsh_resource_id_t resource_id)
{
- DEBUG("starting unix fna\n");
- return 0;
+ int i, err;
+ struct sockaddr_un sock_addr;
+
+ DEBUG(true, "creating unix sockets\n");
+
+ for(i=0; i<MX_UNIX_STREAM_IDS; i++) {
+ the_unix_sockets[i] = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(the_unix_sockets[i] >= 0);
+
+ memset(&sock_addr, 0, sizeof(sock_addr));
+ sock_addr.sun_family = AF_UNIX;
+ err = to_unix_path(FRSH_CPU_ID_DEFAULT,
+ (frsh_stream_id_t) i,
+ sock_addr.sun_path,
+ sizeof(sock_addr.sun_path));
+ assert(err >= 0);
+
+ err = bind(the_unix_sockets[i],
+ (struct sockaddr *)&sock_addr,
+ sizeof(sock_addr));
+ assert(err >= 0);
+ }
+
+ return 0;
}
///////////////////////////////////////////////////////////////////
const frsh_contract_t *contract,
fna_vres_id_t *vres)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const fna_vres_id_t vres,
const frsh_contract_t *new_contract)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
frsh_signal_t signal_to_notify,
frsh_signal_info_t signal_info)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const fna_vres_id_t vres,
frsh_renegotiation_status_t *renegotiation_status)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
int unix_fna_vres_destroy(const frsh_resource_id_t resource_id,
const fna_vres_id_t vres)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const fna_vres_id_t vres,
frsh_contract_t *contract)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const fna_vres_id_t vres,
struct timespec *usage)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const fna_vres_id_t vres,
struct timespec *remaining_budget)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
struct timespec *budget,
struct timespec *period)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
///////////////////////////////////////////////////////////////////
const int importance,
uint32_t *capacity)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const int importance,
int *total_weight)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const struct timespec new_budget,
const struct timespec new_period)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
///////////////////////////////////////////////////////////////////
const void *msg,
const size_t size)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
* unix_fna_send_async()
*
+ * To send we use one of our sockets (it doesn't matter which one so we use
+ * the first one). The destination is obtained from the endpoint values for
+ * dest and stream.
+ *
**/
int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
const void *msg,
const size_t size)
{
- return 0;
+ int err;
+ struct sockaddr_un to;
+ ssize_t sent_bytes;
+
+ DEBUG(true, "send async\n");
+
+ assert(endpoint->is_bound);
+
+ memset(&to, 0, sizeof(to));
+ to.sun_family = AF_UNIX;
+
+ err = to_unix_path(endpoint->destination,
+ endpoint->stream_id,
+ to.sun_path,
+ sizeof(to.sun_path));
+ assert(err >= 0);
+
+ sent_bytes = sendto(the_unix_sockets[0],
+ msg,
+ size,
+ 0,
+ (struct sockaddr *) &to,
+ sizeof(to));
+ assert(sent_bytes >= 0);
+
+ return 0;
}
/**
* unix_fna_receive_sync()
*
+ * We call recvfrom using the socket associated to the corresponding stream_id.
+ * The "from" address is obtained by parsing the unix address path.
+ *
**/
int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
size_t *received_bytes,
frsh_network_address_t *from)
{
- return 0;
+ int err;
+ struct sockaddr_un sender_addr;
+ ssize_t recv_bytes;
+ socklen_t from_len;
+ frsh_network_address_t addr;
+ frsh_stream_id_t stream;
+
+ DEBUG(true, "receive sync\n");
+
+ from_len = sizeof(sender_addr);
+ recv_bytes = recvfrom(the_unix_sockets[endpoint->stream_id],
+ buffer,
+ buffer_size,
+ 0,
+ (struct sockaddr *)&sender_addr,
+ &from_len);
+
+ assert(recv_bytes >= 0);
+ *received_bytes = recv_bytes;
+
+ err = to_addr_stream(&addr,
+ &stream,
+ sender_addr.sun_path,
+ sizeof(sender_addr.sun_path));
+ assert(err == 0);
+ *from = addr;
+
+ return 0;
}
/**
size_t *received_bytes,
frsh_network_address_t *from)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
frsh_endpoint_network_status_t *network_status,
frsh_protocol_status_t *protocol_status)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
* unix_fna_receive_endpoint_created()
*
**/
-int unix_fna_receive_endpoint_created(const fna_endpoint_data_t *endpoint)
+int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
frsh_endpoint_network_status_t *network_status,
frsh_protocol_status_t *protocol_status)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
//////////////////////////////////////////////////////////////////////
const frsh_network_address_t destination,
size_t *max_size)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
const size_t nbytes,
struct timespec *budget)
{
- return 0;
+ DEBUG(true, "let's put 1 microsecond for all\n");
+ budget->tv_sec = 0;
+ budget->tv_nsec = 1000;
+ return 0;
}
/**
const struct timespec *budget,
size_t *nbytes)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
/**
int unix_fna_network_get_min_eff_budget(const frsh_resource_id_t resource_id,
struct timespec *budget)
{
- return 0;
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ return 0;
}
// GLOBAL variable to install the network protocol in FRESCOR
fna_operations_t unix_fna_operations = {
- .fna_init = unix_fna_init,
- .fna_contract_negotiate = unix_fna_contract_negotiate,
- .fna_contract_renegotiate_sync = unix_fna_contract_renegotiate_sync,
- .fna_contract_renegotiate_async = unix_fna_contract_renegotiate_async,
- .fna_vres_get_renegotiation_status = unix_fna_vres_get_renegotiation_status,
- .fna_vres_destroy = unix_fna_vres_destroy,
- .fna_vres_get_contract = unix_fna_vres_get_contract,
- .fna_vres_get_usage = unix_fna_vres_get_usage,
- .fna_vres_get_remaining_budget = unix_fna_vres_get_remaining_budget,
- .fna_vres_get_budget_and_period = unix_fna_vres_get_budget_and_period,
- .fna_resource_get_capacity = unix_fna_resource_get_capacity,
- .fna_resource_get_total_weight = unix_fna_resource_get_total_weight,
- .fna_vres_decrease_capacity = unix_fna_vres_decrease_capacity,
- .fna_send_sync = unix_fna_send_sync,
- .fna_send_async = unix_fna_send_async,
- .fna_receive_sync = unix_fna_receive_sync,
- .fna_receive_async = unix_fna_receive_async,
- .fna_send_endpoint_get_status = unix_fna_send_endpoint_get_status,
- .fna_receive_endpoint_created = unix_fna_receive_endpoint_created,
- .fna_receive_endpoint_get_status = unix_fna_receive_endpoint_get_status,
- .fna_network_get_max_message_size = unix_fna_network_get_max_message_size,
- .fna_network_bytes_to_budget = unix_fna_network_bytes_to_budget,
- .fna_network_budget_to_bytes = unix_fna_network_budget_to_bytes,
- .fna_network_get_min_eff_budget = unix_fna_network_get_min_eff_budget
+ .fna_init = unix_fna_init,
+ .fna_contract_negotiate = unix_fna_contract_negotiate,
+ .fna_contract_renegotiate_sync = unix_fna_contract_renegotiate_sync,
+ .fna_contract_renegotiate_async = unix_fna_contract_renegotiate_async,
+ .fna_vres_get_renegotiation_status = unix_fna_vres_get_renegotiation_status,
+ .fna_vres_destroy = unix_fna_vres_destroy,
+ .fna_vres_get_contract = unix_fna_vres_get_contract,
+ .fna_vres_get_usage = unix_fna_vres_get_usage,
+ .fna_vres_get_remaining_budget = unix_fna_vres_get_remaining_budget,
+ .fna_vres_get_budget_and_period = unix_fna_vres_get_budget_and_period,
+ .fna_resource_get_capacity = unix_fna_resource_get_capacity,
+ .fna_resource_get_total_weight = unix_fna_resource_get_total_weight,
+ .fna_vres_decrease_capacity = unix_fna_vres_decrease_capacity,
+ .fna_send_sync = unix_fna_send_sync,
+ .fna_send_async = unix_fna_send_async,
+ .fna_receive_sync = unix_fna_receive_sync,
+ .fna_receive_async = unix_fna_receive_async,
+ .fna_send_endpoint_get_status = unix_fna_send_endpoint_get_status,
+ .fna_receive_endpoint_created = unix_fna_receive_endpoint_created,
+ .fna_receive_endpoint_get_status = unix_fna_receive_endpoint_get_status,
+ .fna_network_get_max_message_size = unix_fna_network_get_max_message_size,
+ .fna_network_bytes_to_budget = unix_fna_network_bytes_to_budget,
+ .fna_network_budget_to_bytes = unix_fna_network_budget_to_bytes,
+ .fna_network_get_min_eff_budget = unix_fna_network_get_min_eff_budget
};