]> rtime.felk.cvut.cz Git - frescor/fwp.git/blobdiff - fwp/lib/fwp/fwp_endpoint.c
Implemented synchronous and asynchronous sending
[frescor/fwp.git] / fwp / lib / fwp / fwp_endpoint.c
index 0531284cbfe763c0e39f39b5a40ac4beb8848195..56964aadb8745ac0edb7247d858de500324c6acc 100644 (file)
 #include <unistd.h>
 #include <netinet/in.h>
 #include "fwp_utils.h"
+#include "fwp_vres.h"
+#include <frsh_error.h>
 
 #include <pthread.h>
 #include "fwp_debug.h"
+#include "fwp_msgq.h"
 
 typedef unsigned int fwp_endpoint_id_t;
 
@@ -95,6 +98,9 @@ struct fwp_endpoint{
        fd_set                  fdset;
        /** specific operation options*/
        int                     flags;  
+       /** Forced source address. If non-zero, packets are sent over
+        * the specified interface. */
+       struct in_addr  src;
 };
 
 /**
@@ -104,9 +110,9 @@ struct fwp_endpoint{
  * On error, NULL is returned. 
  *
  */
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static struct fwp_endpoint* fwp_endpoint_alloc()
 {
-       return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
+       return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
 }
 
 /**
@@ -116,7 +122,7 @@ static fwp_endpoint_t* fwp_endpoint_alloc()
  * On error, NULL is returned. 
  *
  */
-static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
+static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
 {
        free(endpoint);
 }
@@ -128,11 +134,11 @@ static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
  * \return On success 0 is returned. 
  * On error, negative error value is returned and errno is set appropriately. 
  */
-int fwp_endpoint_destroy(fwp_endpoint_t *ep)
+int fwp_endpoint_destroy(struct fwp_endpoint *ep)
 {
        if (ep->sockd > 0) 
                close(ep->sockd);
-       
+
        fwp_endpoint_free(ep);  
        return 0;
 }
@@ -147,7 +153,7 @@ int fwp_endpoint_destroy(fwp_endpoint_t *ep)
  * \return On success 0 is returned. 
  * On error, negative error value is returned. 
  */
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node, 
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node, 
                                unsigned int *port, fwp_endpoint_attr_t *attr)
 {
        if (node) *node = ep->node;
@@ -179,10 +185,10 @@ int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
 int fwp_send_endpoint_create(unsigned int node,
                                unsigned int port, 
                                fwp_endpoint_attr_t *attr,
-                               fwp_endpoint_t **epoint)
+                               struct fwp_endpoint **epoint)
 {      
        struct sockaddr_in *addr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
        fwp_epoint = fwp_endpoint_alloc();      
         if (!fwp_epoint) {
@@ -272,10 +278,10 @@ err:
  */
 int fwp_receive_endpoint_create(unsigned int port,
                                fwp_endpoint_attr_t *attr,
-                               fwp_endpoint_t **epp)
+                               struct fwp_endpoint **epp)
 {
        struct sockaddr_in *addr;
-       fwp_endpoint_t *fwp_epoint;
+       struct fwp_endpoint *fwp_epoint;
 
        fwp_epoint = fwp_endpoint_alloc();      
         if (!fwp_epoint) {
@@ -383,18 +389,16 @@ err:
  *
  * \return On success returns 0. On error, -1 and errno is set appropriately.
  */
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
 {
        int rv = 0;
-#ifndef FWP_WITHOUT_CONTNEGT
+
+       if (ep->vres)
+               return FRSH_ERR_ALREADY_BOUND;
        
        ep->vres = vres;
-       rv = fwp_vres_bind(vres, ep->sockd);
-#endif
-       /* if send endpoint is already bound 
-       if (epoint->type == FWP_EPOINT_BOUND) {  
-               fwp_send_endpoint_unbind(epoint);
-       }*/
+       rv = fwp_vres_bind(vres, ep, ep->sockd);
+
        return rv;
 }
 
@@ -405,7 +409,7 @@ int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
 {
        int rv = 0;
 
@@ -424,10 +428,10 @@ int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
  * On success, it returns zero.  
  *
  */
-static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
 {
        int csockd;
-//     fwp_endpoint_t *fwp_epoint = epointd;
+//     struct fwp_endpoint *fwp_epoint = epointd;
        fwp_sockaddr_t  peer;
        int i;
 
@@ -467,7 +471,7 @@ static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer, 
+int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer, 
                        size_t buffer_size)
 {
        fwp_sockaddr_t *peer = &ep->peer;
@@ -516,7 +520,7 @@ int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer,
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
                        void *buffer, const size_t buffer_size,
                        unsigned int *from, int flags)
 {
@@ -561,6 +565,53 @@ ssize_t fwp_recv(fwp_endpoint_t *ep,
                }
        }
 }
+/** 
+ * Physically send the message.
+ *
+ * This function should be called either by fwp_send_sync()/async() of
+ * by VRES to send delayed messaged.
+ * 
+ * @param ep 
+ * @param data 
+ * @param size 
+ * 
+ * @return 
+ */
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+                            void *data, const size_t size)
+{
+       struct iovec  iov;
+       struct msghdr msg = {0};
+       ssize_t ret;
+       char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+       iov.iov_base = data;
+       iov.iov_len = size;
+
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+       if (ep->src.s_addr != 0) {
+               struct cmsghdr *cmsg;
+               struct in_pktinfo *ipi;
+
+               memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+               msg.msg_control = cmsg_buf;
+               msg.msg_controllen = sizeof(cmsg_buf);
+
+               cmsg = CMSG_FIRSTHDR(&msg);
+
+               cmsg->cmsg_level = SOL_IP;
+               cmsg->cmsg_type = IP_PKTINFO;
+               cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+               ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+               ipi->ipi_spec_dst = ep->src;
+       }
+       ret = sendmsg(ep->sockd, &msg, 0);
+       return ret;
+}
 
 /**
  * Sends message through vres
@@ -574,42 +625,31 @@ ssize_t fwp_recv(fwp_endpoint_t *ep,
  * On error, -1 is returned and errno is set appropriately.
  *
  */
-int fwp_send(fwp_endpoint_t *ep,const void *msg, const size_t size)
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, size_t size)
 {
-       struct fwp_msgb *msgb;
-       
-/*     if (!fwp_endpoint_is_valid(epointd)){
-               errno = EINVAL;
-               return -1;
-       }
-       if (!fwp_endpoint_is_bound(epointd)){
-               errno = EPERM;
-               return -1;
-       }*/
+       int ret;
 
-       /*if (flags && MSG_DONTWAIT) 
-               msgb = fwp_msgb_alloc(buffer_size);
-       else {*/
-               if (!(msgb = fwp_msgb_alloc(size))) {
-                       errno = ENOMEM;
-                       return -1;
-               }
+       if (!ep->vres)
+               return FRSH_ERR_NOT_BOUND;
 
-               /*msgb->peer = &ep->peer;*/
-               /*msgb->data = msg;*/
-               /*msgb->flags = epoint->flags;*/
-               
-               /* data must be copied since msg may change while
-                * message is waiting in transmission queue 
-                * */
-               memcpy(msgb->data, msg, size);
-               fwp_msgb_put(msgb, size);
-               /*msgb->tail = msgb->data + size;
-               msgb->len = size;*/
-
-       /*}*/
-
-       /* TODO: test whether _fwp_vres_send is successful */
-       return fwp_vres_send(ep->vres, msgb);
+       if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
+               ret = fwp_endpoint_do_send(ep, msg, size);
+       else
+               ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
+       return ret;
+}
+
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, size_t size)
+{
+       int ret;
+
+       if (!ep->vres)
+               return FRSH_ERR_NOT_BOUND;
+
+       ret = fwp_vres_consume_budget(ep->vres, size, true);
+       if (ret)
+               return ret;
+       ret = fwp_endpoint_do_send(ep, msg, size);
+       return ret;
 }