#include <unistd.h>
#include <netinet/in.h>
#include "fwp_utils.h"
+#include "fwp_vres.h"
+#include <frsh_error.h>
#include <pthread.h>
#include "fwp_debug.h"
+#include "fwp_msgq.h"
typedef unsigned int fwp_endpoint_id_t;
fd_set fdset;
/** specific operation options*/
int flags;
+ /** Forced source address. If non-zero, packets are sent over
+ * the specified interface. */
+ struct in_addr src;
};
/**
* On error, NULL is returned.
*
*/
-static fwp_endpoint_t* fwp_endpoint_alloc()
+static struct fwp_endpoint* fwp_endpoint_alloc()
{
- return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
+ return (struct fwp_endpoint*) calloc(1,sizeof(struct fwp_endpoint));
}
/**
* On error, NULL is returned.
*
*/
-static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
+static inline void fwp_endpoint_free(struct fwp_endpoint *endpoint)
{
free(endpoint);
}
* \return On success 0 is returned.
* On error, negative error value is returned and errno is set appropriately.
*/
-int fwp_endpoint_destroy(fwp_endpoint_t *ep)
+int fwp_endpoint_destroy(struct fwp_endpoint *ep)
{
if (ep->sockd > 0)
close(ep->sockd);
-
+
fwp_endpoint_free(ep);
return 0;
}
* \return On success 0 is returned.
* On error, negative error value is returned.
*/
-int fwp_endpoint_get_params(fwp_endpoint_t *ep, unsigned int *node,
+int fwp_endpoint_get_params(struct fwp_endpoint *ep, unsigned int *node,
unsigned int *port, fwp_endpoint_attr_t *attr)
{
if (node) *node = ep->node;
int fwp_send_endpoint_create(unsigned int node,
unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epoint)
+ struct fwp_endpoint **epoint)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
*/
int fwp_receive_endpoint_create(unsigned int port,
fwp_endpoint_attr_t *attr,
- fwp_endpoint_t **epp)
+ struct fwp_endpoint **epp)
{
struct sockaddr_in *addr;
- fwp_endpoint_t *fwp_epoint;
+ struct fwp_endpoint *fwp_epoint;
fwp_epoint = fwp_endpoint_alloc();
if (!fwp_epoint) {
*
* \return On success returns 0. On error, -1 and errno is set appropriately.
*/
-int fwp_send_endpoint_bind(fwp_endpoint_t *ep, fwp_vres_t *vres)
+int fwp_send_endpoint_bind(struct fwp_endpoint *ep, fwp_vres_t *vres)
{
int rv = 0;
-#ifndef FWP_WITHOUT_CONTNEGT
+
+ if (ep->vres)
+ return FRSH_ERR_ALREADY_BOUND;
ep->vres = vres;
- rv = fwp_vres_bind(vres, ep->sockd);
-#endif
- /* if send endpoint is already bound
- if (epoint->type == FWP_EPOINT_BOUND) {
- fwp_send_endpoint_unbind(epoint);
- }*/
+ rv = fwp_vres_bind(vres, ep, ep->sockd);
+
return rv;
}
* \return On success returns 0. On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send_endpoint_unbind(fwp_endpoint_t *ep)
+int fwp_send_endpoint_unbind(struct fwp_endpoint *ep)
{
int rv = 0;
* On success, it returns zero.
*
*/
-static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
+static int fwp_receive_endpoint_accept(struct fwp_endpoint *fwp_epoint)
{
int csockd;
-// fwp_endpoint_t *fwp_epoint = epointd;
+// struct fwp_endpoint *fwp_epoint = epointd;
fwp_sockaddr_t peer;
int i;
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_recv_conn(fwp_endpoint_t *ep, void *buffer,
+int fwp_recv_conn(struct fwp_endpoint *ep, void *buffer,
size_t buffer_size)
{
fwp_sockaddr_t *peer = &ep->peer;
* On error, -1 is returned and errno is set appropriately.
*
*/
-ssize_t fwp_recv(fwp_endpoint_t *ep,
+ssize_t fwp_recv(struct fwp_endpoint *ep,
void *buffer, const size_t buffer_size,
unsigned int *from, int flags)
{
}
}
}
+/**
+ * Physically send the message.
+ *
+ * This function should be called either by fwp_send_sync()/async() of
+ * by VRES to send delayed messaged.
+ *
+ * @param ep
+ * @param data
+ * @param size
+ *
+ * @return
+ */
+ssize_t fwp_endpoint_do_send(struct fwp_endpoint *ep,
+ void *data, const size_t size)
+{
+ struct iovec iov;
+ struct msghdr msg = {0};
+ ssize_t ret;
+ char cmsg_buf[CMSG_SPACE(sizeof(struct in_pktinfo))];
+
+ iov.iov_base = data;
+ iov.iov_len = size;
+
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (ep->src.s_addr != 0) {
+ struct cmsghdr *cmsg;
+ struct in_pktinfo *ipi;
+
+ memset(cmsg_buf, 0, sizeof(cmsg_buf));
+
+ msg.msg_control = cmsg_buf;
+ msg.msg_controllen = sizeof(cmsg_buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ cmsg->cmsg_level = SOL_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+
+ ipi = (struct in_pktinfo*)CMSG_DATA(cmsg);
+ ipi->ipi_spec_dst = ep->src;
+ }
+ ret = sendmsg(ep->sockd, &msg, 0);
+ return ret;
+}
/**
* Sends message through vres
* On error, -1 is returned and errno is set appropriately.
*
*/
-int fwp_send(fwp_endpoint_t *ep,const void *msg, const size_t size)
+int fwp_send_async(struct fwp_endpoint *ep, void *msg, size_t size)
{
- struct fwp_msgb *msgb;
-
-/* if (!fwp_endpoint_is_valid(epointd)){
- errno = EINVAL;
- return -1;
- }
- if (!fwp_endpoint_is_bound(epointd)){
- errno = EPERM;
- return -1;
- }*/
+ int ret;
- /*if (flags && MSG_DONTWAIT)
- msgb = fwp_msgb_alloc(buffer_size);
- else {*/
- if (!(msgb = fwp_msgb_alloc(size))) {
- errno = ENOMEM;
- return -1;
- }
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
- /*msgb->peer = &ep->peer;*/
- /*msgb->data = msg;*/
- /*msgb->flags = epoint->flags;*/
-
- /* data must be copied since msg may change while
- * message is waiting in transmission queue
- * */
- memcpy(msgb->data, msg, size);
- fwp_msgb_put(msgb, size);
- /*msgb->tail = msgb->data + size;
- msgb->len = size;*/
-
- /*}*/
-
- /* TODO: test whether _fwp_vres_send is successful */
- return fwp_vres_send(ep->vres, msgb);
+ if (fwp_vres_consume_budget(ep->vres, size, false) == 0)
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ else
+ ret = fwp_vres_enqueue(ep->vres, ep, msg, size);
+ return ret;
+}
+
+int fwp_send_sync(struct fwp_endpoint *ep, void *msg, size_t size)
+{
+ int ret;
+
+ if (!ep->vres)
+ return FRSH_ERR_NOT_BOUND;
+
+ ret = fwp_vres_consume_budget(ep->vres, size, true);
+ if (ret)
+ return ret;
+ ret = fwp_endpoint_do_send(ep, msg, size);
+ return ret;
}