*
* The main tricks of the implementation are the following:
*
- * - In Unix sockets the address is just a path (a string) so there is no
- * such a concept of streams.
- * - when the user negotiates a contract it must provide, as protocol
- * dependent information, the destination so we can create and bind a
- * socket for it
- * - The created socket id is stored as the 'vres'
- * - On the receiving part, when the user creates the receive endpoint, it
- * must provide the receiving path as endpoint information so we can
- * create a socket to receive from that path.
- * - Both informations are passed as an integer for a static array called
- * "the_unix_socket_paths" where we configure statically the unix paths.
- * They are passed using the void * fields as if they where integers.
+ * - We encode the address and stream in the Unix socket path as:
+ * '/tmp/unix_fna-address-stream'
+ * - At initialization we create MX_UNIX_STREAM_IDS sockets for our address
+ * (our address number is defined by FRSH_CPU_ID_DEFAULT)
+ * - When the user SENDS we obtain the address creating the mentioned string
+ * from the destination address and the stream.
+ * - When the user RECEIVES we use the appropiate socket by using the
+ * stream_id information.
*
**/
#include <malloc.h> /* for malloc and free */
#include <assert.h>
+#include <string.h> /* for string functions: strtok, strcpy, ... */
+
+#include "frsh_distributed_types.h" /* for frsh_network_address_t, frsh_stream_id_t */
#include "unix_fna.h" /* function prototypes */
#include <sys/socket.h>
-#include <sys/un.h> /* struct sockaddr_un */
+#include <sys/un.h>
/* DEBUGGING FLAGS and MACROS */
#include <stdio.h>
#define DEBUG(enable,x,args...) if(enable) printf("\t>> Called %s: " x, __func__ , ##args)
#define DBG_UNIX_FNA_NOT_IMPLEMENTED true
+/**
+ * to_unix_path()
+ *
+ **/
+
+static int to_unix_path(const frsh_network_address_t addr,
+ const frsh_stream_id_t stream,
+ char *str,
+ size_t mx_size)
+{
+ return snprintf(str, mx_size, "/tmp/unix_fna-%d-%d", addr, stream);
+}
+
+/**
+ * to_addr_stream()
+ *
+ **/
+
+static int to_addr_stream(frsh_network_address_t *addr,
+ frsh_stream_id_t *stream,
+ char *str,
+ size_t size)
+{
+ char *token;
+ char *search = "-";
+
+ token = strtok(str, search);
+ token = strtok(NULL, search);
+ *addr = atoi(token);
+ token = strtok(NULL, search);
+ *stream = atoi(token);
+
+ return 0;
+}
+
//////////////////////////////////////////////////////////////////////
// INITIALIZATION
//////////////////////////////////////////////////////////////////////
-char *the_unix_socket_paths[MX_UNIX_SOCKET_PATHS] = {
- "/tmp/path00",
- "/tmp/path01",
- "/tmp/path02",
- "/tmp/path03",
- "/tmp/path04",
- "/tmp/path05"
-};
+int the_unix_sockets[MX_UNIX_STREAM_IDS];
/**
* unix_fna_init()
*
+ * for each stream_id create a socket and bind it to the address obtained
+ * from "/tmp/unix_fna-addr-stream"
+ *
**/
int unix_fna_init(const frsh_resource_id_t resource_id)
{
- DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
+ int i, err;
+ struct sockaddr_un sock_addr;
+
+ DEBUG(true, "creating unix sockets\n");
+
+ for(i=0; i<MX_UNIX_STREAM_IDS; i++) {
+ the_unix_sockets[i] = socket(AF_UNIX, SOCK_DGRAM, 0);
+ assert(the_unix_sockets[i] >= 0);
+
+ memset(&sock_addr, 0, sizeof(sock_addr));
+ sock_addr.sun_family = AF_UNIX;
+ err = to_unix_path(FRSH_CPU_ID_DEFAULT,
+ (frsh_stream_id_t) i,
+ sock_addr.sun_path,
+ sizeof(sock_addr.sun_path));
+ assert(err >= 0);
+
+ err = bind(the_unix_sockets[i],
+ (struct sockaddr *)&sock_addr,
+ sizeof(sock_addr));
+ assert(err >= 0);
+ }
+
return 0;
}
const frsh_contract_t *contract,
fna_vres_id_t *vres)
{
- int sock, err;
- struct sockaddr_un sender_addr;
- int unix_path_index;
-
- DEBUG(true, "Creating and binding the unix socket\n");
-
- sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- assert(sock >= 0);
-
- unix_path_index = (int)contract->protocol_info.body;
-
- memset(&sender_addr, 0, sizeof(sender_addr));
- sender_addr.sun_family = AF_UNIX;
- strcpy(sender_addr.sun_path, the_unix_socket_paths[unix_path_index]);
-
- err = bind(sock, (struct sockaddr *)&sender_addr, sizeof(sender_addr));
- assert(err >= 0);
-
- *vres = (fna_vres_id_t)sock;
-
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
return 0;
}
/**
* unix_fna_send_async()
*
+ * To send we use one of our sockets (it doesn't matter which one so we use
+ * the first one). The destination is obtained from the endpoint values for
+ * dest and stream.
+ *
**/
int unix_fna_send_async(const fna_endpoint_data_t *endpoint,
const void *msg,
const size_t size)
{
-
- struct sockaddr_un receiver_addr;
+ int err;
+ struct sockaddr_un to;
ssize_t sent_bytes;
DEBUG(true, "send async\n");
assert(endpoint->is_bound);
- memset(&receiver_addr, 0, sizeof(receiver_addr));
- receiver_addr.sun_family = AF_UNIX;
- strcpy(receiver_addr.sun_path,
- the_unix_socket_paths[endpoint->destination]);
+ memset(&to, 0, sizeof(to));
+ to.sun_family = AF_UNIX;
+
+ err = to_unix_path(endpoint->destination,
+ endpoint->stream_id,
+ to.sun_path,
+ sizeof(to.sun_path));
+ assert(err >= 0);
- sent_bytes = sendto(endpoint->vres, /* the socket */
+ sent_bytes = sendto(the_unix_sockets[0],
msg,
size,
0,
- (struct sockaddr *) &receiver_addr,
- sizeof(receiver_addr));
-
+ (struct sockaddr *) &to,
+ sizeof(to));
assert(sent_bytes >= 0);
return 0;
/**
* unix_fna_receive_sync()
*
+ * We call recvfrom using the socket associated to the corresponding stream_id.
+ * The "from" address is obtained by parsing the unix address path.
+ *
**/
int unix_fna_receive_sync(const fna_endpoint_data_t *endpoint,
size_t *received_bytes,
frsh_network_address_t *from)
{
+ int err;
struct sockaddr_un sender_addr;
ssize_t recv_bytes;
socklen_t from_len;
+ frsh_network_address_t addr;
+ frsh_stream_id_t stream;
DEBUG(true, "receive sync\n");
from_len = sizeof(sender_addr);
- recv_bytes = recvfrom(endpoint->vres, /* the socket */
+ recv_bytes = recvfrom(the_unix_sockets[endpoint->stream_id],
buffer,
buffer_size,
0,
&from_len);
assert(recv_bytes >= 0);
-
*received_bytes = recv_bytes;
- /* TODO: fill from */
+
+ err = to_addr_stream(&addr,
+ &stream,
+ sender_addr.sun_path,
+ sizeof(sender_addr.sun_path));
+ assert(err == 0);
+ *from = addr;
return 0;
}
**/
int unix_fna_receive_endpoint_created(fna_endpoint_data_t *endpoint)
{
- int sock, err;
- struct sockaddr_un receiver_addr;
- int unix_path_index;
-
- DEBUG(true, "creating the socket to receive\n");
-
- sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- assert(sock >= 0);
-
- unix_path_index = (int)endpoint->protocol_info.body;
-
- memset(&receiver_addr, 0, sizeof(receiver_addr));
- receiver_addr.sun_family = AF_UNIX;
- strcpy(receiver_addr.sun_path, the_unix_socket_paths[unix_path_index]);
-
- err = bind(sock, (struct sockaddr *)&receiver_addr, sizeof(receiver_addr));
- assert(err >= 0);
-
- endpoint->vres = sock;
-
+ DEBUG(DBG_UNIX_FNA_NOT_IMPLEMENTED, "NOT IMPLEMENTED\n");
return 0;
}
const size_t nbytes,
struct timespec *budget)
{
- DEBUG(true, "let's put 1 microsecond\n");
+ DEBUG(true, "let's put 1 microsecond for all\n");
budget->tv_sec = 0;
budget->tv_nsec = 1000;
return 0;
#include <assert.h>
#include <string.h> /* memset */
-#include <sys/socket.h>
-#include <sys/un.h> /* struct sockaddr_un */
-
-#include "frsh_core.h" // for frsh_contract_set_xxx
-#include "frsh_core_types.h" // for FRSH_NETWORK_ID_DEFAULT
-#include "frsh_distributed_types.h" // for frsh_network_address_t, frsh_stream_id_t
+#include "frsh.h" // for FRSH_CPU_ID_DEFAULT
#include "fna.h" // for fna_vres_id_t
#include "unix_fna.h" // for unix_fna_operations
-#define THE_SENDER_ADDR 0
-#define THE_RECEIVER_ADDR 1
-
+#define THE_SENDER_ADDR 0
+#define THE_RECEIVER_ADDR 1
+#define THE_STREAM_ID 3
void sender(void);
void receiver(void);
-int main (int argc, char **argv)
+int main ()
{
int err;
- if (argc != 2) {
- printf("please provide the node to execute:\n");
- printf(" $./mprogram 0 (sender)\n");
- printf(" $./mprogram 1 (receiver)\n");
- return -1;
- }
-
err = unix_fna_operations.fna_init(FRSH_NETWORK_ID_DEFAULT);
assert(err == 0);
- switch(atoi(argv[1])) {
+ printf("I am %d\n", FRSH_CPU_ID_DEFAULT);
+
+ switch(FRSH_CPU_ID_DEFAULT) {
case 0:
sender();
break;
return -1;
}
- unlink(the_unix_socket_paths[0]);
- unlink(the_unix_socket_paths[1]);
-
return 0;
}
int err, i;
frsh_contract_t contract;
fna_endpoint_data_t endpoint;
- frsh_network_address_t my_address, receiver_address;
char buffer[100];
size_t size;
printf("I am the sender\n");
- err = frsh_contract_init(&contract);
- assert(err == 0);
-
- my_address = THE_SENDER_ADDR;
- receiver_address = THE_RECEIVER_ADDR;
-
- contract.protocol_info.body = (void *)my_address;
-
err = unix_fna_operations.fna_contract_negotiate
(FRSH_NETWORK_ID_DEFAULT,
&contract,
endpoint.endpoint_type = FRSH_SEND_ENDPOINT_TYPE;
endpoint.is_bound = true;
- endpoint.destination = receiver_address; /* only for send_endpoints */
+ endpoint.destination = THE_RECEIVER_ADDR;
endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT;
- endpoint.stream_id = 0; /* not used */
+ endpoint.stream_id = THE_STREAM_ID;
for(i=0; i<10; i++) {
sleep(1);
{
int err, i;
fna_endpoint_data_t endpoint;
- frsh_network_address_t sender_addr, my_address;
+ frsh_network_address_t from;
char buffer[100];
size_t received_bytes;
printf("I am the receiver\n");
- my_address = THE_RECEIVER_ADDR;
-
- endpoint.protocol_info.body = (void *)my_address;
endpoint.endpoint_type = FRSH_RECEIVE_ENDPOINT_TYPE;
endpoint.is_bound = false;
endpoint.resource_id = FRSH_NETWORK_ID_DEFAULT;
- endpoint.stream_id = 0; /* not used */
+ endpoint.stream_id = THE_STREAM_ID;
err = unix_fna_operations.fna_receive_endpoint_created(&endpoint);
assert (err == 0);
buffer,
100,
&received_bytes,
- &sender_addr);
+ &from);
assert (err == 0);
buffer[received_bytes] = '\0';
- printf("receiver : received %s\n", buffer);
+ printf("receiver : received %s from %d\n", buffer, from);
}
printf("Receiver done\n");