#include "fwp_endpoint.h"
#include "fwp_msgb.h"
#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netinet/in.h>
+#include "fwp_utils.h"
+#include "fwp_vres.h"
+#include <frsh_error.h>
#include <pthread.h>
+#include "fwp_debug.h"
+#include "fwp_msgq.h"
typedef unsigned int fwp_endpoint_id_t;
/** Fwp endpoint attributes */
fwp_endpoint_attr_t attr;
/* Vres this fwp endpoint is bound to */
- fwp_vres_d_t vresd;
+ fwp_vres_t *vres;
/** For send enpoint it contains destination address for
* receive endpoint it is filled with the msg source address
*/
fd_set fdset;
/** specific operation options*/
int flags;
+ /** Forced source address. If non-zero, packets are sent over
+ * the specified interface. */
+ struct in_addr src;
};
/**
* On error, NULL is returned.
*
*/
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static struct fwp_endpoint* fwp_endpoint_alloc()
{
- return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
+ return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
}
/**
* On error, NULL is returned.
*
*/
-static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
+static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
{
free(endpoint);
}
/**
* Destroy endpoint
*
- * \param[in] epointd Endpoint descriptor
+ * \param[in] epd Endpoint descriptor
* \return On success 0 is returned.
* On error, negative error value is returned and errno is set appropriately.
*/
-int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
+int fwp_endpoint_destroy(struct fwp_endpoint *ep)
{
- if (epointd->sockd > 0)
- close(epointd->sockd);
-
- fwp_endpoint_free(epointd);
+ if (ep->sockd > 0)
+ close(ep->sockd);
+
+ fwp_endpoint_free(ep);
return 0;
}
/**
* Get endpoint parameters
*
- * \param[in] epointd Endpoint descriptor
+ * \param[in] ep Endpoint descriptor
* \param[out] node Node identifier
* \param[out] port Port
* \param[out] attr Endpoint`s attributes
* \return On success 0 is returned.
* On error, negative error value is returned.
*/
-int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node,
unsigned int *port, fwp_endpoint_attr_t *attr)
{
- fwp_endpoint_t *epoint = epointd;
-
- if (node) *node = epoint->node;
- if (port) *port = epoint->port;
- if (attr) *attr = epoint->attr;
+ if (node) *node = ep->node;
+ if (port) *port = ep->port;
+ if (attr) *attr = ep->attr;
return 0;
}
* \param[in] node IP address of destination node
* \param[in] port UDP port
* \param[in] attr Endpoint attributes
- * \param[out] epointdp Pointer to the descriptor of newly created endpoint
+ * \param[out] epp Pointer to the descriptor of newly created endpoint
*
* \return Zero on success, -1 on error and sets errno appropriately.
*
int fwp_send_endpoint_create(unsigned int node,
unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_d_t *epointd)
+ struct fwp_endpoint **epoint)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
fwp_send_endpoint_bind(fwp_epoint, fwp_epoint->vresd);
#endif
- *epointd = fwp_epoint;
+ *epoint = fwp_epoint;
return fwp_epoint->sockd;
err:
fwp_endpoint_destroy(fwp_epoint);
*/
int fwp_receive_endpoint_create(unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_d_t *epointd)
+ struct fwp_endpoint **epp)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
}
if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
- perror("Error on listen call\n");
+ FWP_ERROR("Error on listen call: %s\n", strerror(errno));
goto err;
}
addr = (struct sockaddr_in*) fwp_epoint->peer.addr;
fwp_epoint->port = ntohs(addr->sin_port);
FWP_DEBUG("Recv port= %d\n",ntohs(addr->sin_port));
- *epointd = fwp_epoint;
+ *epp = fwp_epoint;
return 0;
err:
fwp_endpoint_destroy(fwp_epoint);
/**
* Binds send endpoint to vres
*
- * \param[in] vres_id identifier of vres
- * \param[in] epoint_id send endpoint identifier
+ * \param[in] vres identifier of vres
+ * \param[in] ep send endpoint identifier
*
* \return On success returns 0. On error, -1 and errno is set appropriately.
*/
-int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
{
int rv = 0;
-#ifndef FWP_WITHOUT_CONTNEGT
- fwp_endpoint_t *fwp_epoint = epointd;
+
+ if (ep->vres)
+ return FRSH_ERR_ALREADY_BOUND;
- fwp_epoint->vresd = vresd;
- rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
-#endif
- /* if send endpoint is already bound
- if (epoint->type == FWP_EPOINT_BOUND) {
- fwp_send_endpoint_unbind(epoint);
- }*/
+ ep->vres = vres;
+ rv = fwp_vres_bind(vres, ep, ep->sockd, &ep->src);
+
return rv;
}
* \return On success returns 0. On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
{
int rv = 0;
- fwp_endpoint_t *fwp_epoint = epointd;
/* unlink epoint-vres mutually */
- if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0)
+ if ((rv = fwp_vres_unbind(ep->vres)) < 0)
return rv;
return 0;
* On success, it returns zero.
*
*/
-static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
{
int csockd;
-// fwp_endpoint_t *fwp_epoint = epointd;
+// struct fwp_endpoint *fwp_epoint = epointd;
fwp_sockaddr_t peer;
int i;
&peer.addrlen);
if (csockd < 0) {
- perror("Error on accept\n");
+ FWP_ERROR("Error on accept: %s\n", strerror(errno));
return errno;
}
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer,
+int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer,
size_t buffer_size)
{
- fwp_endpoint_t *fwp_epoint = epointd;
- fwp_sockaddr_t *peer = &fwp_epoint->peer;
- fd_set fdset = fwp_epoint->fdset;
+ fwp_sockaddr_t *peer = &ep->peer;
+ fd_set fdset = ep->fdset;
ssize_t len;
int i;
FWP_DEBUG("Checking for tcp data\n");
- for (i = 0; i < fwp_epoint->nr_connections; i++) {
- if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
+ for (i = 0; i < ep->nr_connections; i++) {
+ if (!FD_ISSET(ep->c_sockd[i], &fdset)) {
continue;
}
FWP_DEBUG("Prepare to receive tcp data\n");
peer->addrlen = sizeof(struct sockaddr_in);
- len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer,
- buffer_size,0, peer);
+ len = recvfrom(ep->c_sockd[i], buffer, buffer_size, 0,
+ (struct sockaddr*)&peer->addr, &peer->addrlen);
if (len < 0) /* Error */
return len;
/* tcp connection closed */
FWP_DEBUG("Connection closed\n");
- FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
- memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1,
- sizeof(int)*(fwp_epoint->nr_connections -i-1));
- fwp_epoint->nr_connections--;
+ FD_CLR(ep->c_sockd[i], &ep->fdset);
+ memcpy(ep->c_sockd+i, ep->c_sockd+i+1,
+ sizeof(int)*(ep->nr_connections -i-1));
+ ep->nr_connections--;
return 0;
}
return 0;
* On error, -1 is returned and errno is set appropriately.
*
*/
-ssize_t fwp_recv(fwp_endpoint_d_t epointd,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
void *buffer, const size_t buffer_size,
unsigned int *from, int flags)
{
- fwp_endpoint_t *fwp_epoint = epointd;
- fwp_sockaddr_t *peer = &fwp_epoint->peer;
- struct sockaddr_in *addr = (struct sockaddr_in*) fwp_epoint->peer.addr;
+ fwp_sockaddr_t *peer = &ep->peer;
+ struct sockaddr_in *addr = (struct sockaddr_in*) ep->peer.addr;
ssize_t len;
fd_set fdset;
return -1;
}*/
- if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
- len = _fwp_recvfrom(fwp_epoint->sockd, buffer,
- buffer_size, 0, peer);
-
+ if (ep->attr.reliability == FWP_EPOINT_BESTEFFORT) {
+ len = recvfrom(ep->sockd, buffer, buffer_size, 0,
+ (struct sockaddr*)&peer->addr, &peer->addrlen);
*from = addr->sin_addr.s_addr;
return len;
}
while (1){
/* FWP_EPOINT_RELIABLE */
- fdset = fwp_epoint->fdset;
+ fdset = ep->fdset;
if (select(FD_SETSIZE, &fdset, (fd_set *)0,
(fd_set *)0, NULL) < 0) {
return -1;
}
- if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
- fwp_receive_endpoint_accept(fwp_epoint);
+ if (FD_ISSET(ep->sockd, &fdset)) { /* is it listen socket? */
+ fwp_receive_endpoint_accept(ep);
continue;
}
/* Check client TCP sockets */
- len = fwp_recv_conn(fwp_epoint, buffer, buffer_size);
+ len = fwp_recv_conn(ep, buffer, buffer_size);
if (len) {
*from = addr->sin_addr.s_addr;
return len;
}
}
}
+/**
+ * Physically send the message.
+ *
+ * This function should be called either by fwp_send_sync()/async() of
+ * by VRES to send delayed messaged.
+ *
+ * @param ep
+ * @param data
+ * @param size
+ *
+ * @return
+ */
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+ const void *data, const size_t size)
+{
+ struct iovec iov;
+ struct msghdr msg = {0};
+ ssize_t ret;
+ char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+ iov.iov_base = (void*)data;
+ iov.iov_len = size;
+
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (ep->src.s_addr != 0) {
+ struct cmsghdr *cmsg;
+ struct in_pktinfo *ipi;
+
+ memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+ msg.msg_control = cmsg_buf;
+ msg.msg_controllen = sizeof(cmsg_buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ ipi->ipi_spec_dst = ep->src;
+ }
+ ret = sendmsg(ep->sockd, &msg, 0);
+ return ret;
+}
/**
* Sends message through vres
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send(fwp_endpoint_d_t epointd,const void *msg, const size_t size,
- int flags)
+int fwp_send_async(struct fwp_endpoint *ep, const void *msg, size_t size)
{
- fwp_endpoint_t *fwp_epoint = epointd;
- struct fwp_msgb *msgb;
-
-/* if (!fwp_endpoint_is_valid(epointd)){
- errno = EINVAL;
- return -1;
- }
- if (!fwp_endpoint_is_bound(epointd)){
- errno = EPERM;
- return -1;
- }*/
+ int ret;
- /*if (flags && MSG_DONTWAIT)
- msgb = fwp_msgb_alloc(buffer_size);
- else {*/
- if (!(msgb = fwp_msgb_alloc(size))) {
- errno = ENOMEM;
- return -1;
- }
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
- /*msgb->peer = &fwp_epoint->peer;*/
- /*msgb->data = msg;*/
- /*msgb->flags = epoint->flags;*/
-
- /* data must be copied since msg may change while
- * message is waiting in transmission queue
- * */
- memcpy(msgb->data, msg, size);
- fwp_msgb_put(msgb, size);
- /*msgb->tail = msgb->data + size;
- msgb->len = size;*/
-
- /*}*/
-
- /* TODO: test whether _fwp_vres_send is successful */
- return fwp_vres_send(fwp_epoint->vresd, msgb);
+ if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ else
+ ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
+ return ret;
+}
+
+int fwp_send_sync(struct fwp_endpoint *ep, const void *msg, size_t size)
+{
+ int ret;
+
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
+
+ ret = fwp_vres_consume_budget(ep->vres, size, true);
+ if (ret)
+ return ret;
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ return ret;
}