1 #include "fwp_endpoint.h"
7 typedef unsigned int fwp_endpoint_id_t;
10 * Default fwp endpoint attributes
12 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
13 .reliability = FWP_EPOINT_BESTEFFORT,
14 .max_connections = 20,
18 * FWP endpoint structure
21 /** Fwp endpoint attributes */
22 fwp_endpoint_attr_t attr;
23 /* Vres this fwp endpoint is bound to */
25 /** For send enpoint it contains destination address for
26 * receive endpoint it is filled with the msg source address
28 struct fwp_sockaddr peer;
29 /** Source/destination port */
31 /** Destination node */
33 /** Socket descriptor.
34 * In case of rebliable epoint it is a listen tcp socket.
37 /** File descriptor array of peers connected
38 * to this fwp receive endpoint.*/
41 * Number of connections
43 unsigned int nr_connections;
47 /** specific operation options*/
54 * \return On success returns fwp endpoint structure.
55 * On error, NULL is returned.
58 static fwp_endpoint_t* fwp_endpoint_alloc()
60 return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
66 * \return On success returns endpoint structure.
67 * On error, NULL is returned.
70 static inline void fwp_endpoint_free(fwp_endpoint_t *endpoint)
78 * \param[in] epointd Endpoint descriptor
79 * \return On success 0 is returned.
80 * On error, negative error value is returned and errno is set appropriately.
82 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
84 if (epointd->sockd > 0)
85 close(epointd->sockd);
87 fwp_endpoint_free(epointd);
92 * Get endpoint parameters
94 * \param[in] epointd Endpoint descriptor
95 * \param[out] node Node identifier
96 * \param[out] port Port
97 * \param[out] attr Endpoint`s attributes
98 * \return On success 0 is returned.
99 * On error, negative error value is returned.
101 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
102 unsigned int *port, fwp_endpoint_attr_t *attr)
104 fwp_endpoint_t *epoint = epointd;
106 if (node) *node = epoint->node;
107 if (port) *port = epoint->port;
108 if (attr) *attr = epoint->attr;
113 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
115 bzero(attr, sizeof(fwp_endpoint_attr_t));
116 *attr = fwp_epoint_attr_default;
122 * Creates send endpoint
124 * \param[in] node IP address of destination node
125 * \param[in] port UDP port
126 * \param[in] attr Endpoint attributes
127 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
129 * \return Zero on success, -1 on error and sets errno appropriately.
132 int fwp_send_endpoint_create(unsigned int node,
134 fwp_endpoint_attr_t *attr,
135 fwp_endpoint_d_t *epointd)
137 struct sockaddr_in *addr;
138 fwp_endpoint_t *fwp_epoint;
140 fwp_epoint = fwp_endpoint_alloc();
146 /*epoint->type = FWP_SEND_EPOINT;
147 epoint->status = FWP_EPOINT_UNBOUND;
152 fwp_epoint->attr = *attr;
154 fwp_epoint->attr = fwp_epoint_attr_default;
156 addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
157 bzero((char*) addr, sizeof(*addr));
158 addr->sin_family = AF_INET;
159 addr->sin_addr.s_addr = node;
160 addr->sin_port = htons(port);
161 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
163 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
164 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
165 if (fwp_epoint->sockd < 0) {
169 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
170 if (fwp_epoint->sockd < 0) {
174 /* Enable broadcasts */
175 /*unsigned int yes = 1;
176 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST,
177 &yes, sizeof(yes)) == -1) {
178 FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
184 unsigned int yes = 1;
185 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
186 &yes, sizeof(yes)) == -1) {
187 FWP_DEBUG("setsockopt(SO_REUSEADDR): %s", strerror(errno));
191 if (connect(fwp_epoint->sockd,
192 (struct sockaddr*) &fwp_epoint->peer.addr,
193 fwp_epoint->peer.addrlen)) {
194 FWP_DEBUG("FWp connect error\n");
198 FWP_DEBUG("FWP Send endpoint created.\n");
200 #ifndef FWP_WITH_CONTNEGT
201 /* Create vres with default parameters */
202 FWP_DEBUG("Creating default vres\n");
203 if (fwp_vres_create(&fwp_vres_params_default, &fwp_epoint->vresd)) {
207 fwp_send_endpoint_bind(fwp_epoint, fwp_epoint->vresd);
210 *epointd = fwp_epoint;
213 fwp_endpoint_destroy(fwp_epoint);
218 * Creates receive endpoint
220 * \param[in] port UDP port
221 * \param[in] attr Endpoint attributes
222 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
224 * \return Zero on success, -1 on error and errno is set.
226 int fwp_receive_endpoint_create(unsigned int port,
227 fwp_endpoint_attr_t *attr,
228 fwp_endpoint_d_t *epointd)
230 struct sockaddr_in *addr;
231 fwp_endpoint_t *fwp_epoint;
233 fwp_epoint = fwp_endpoint_alloc();
239 /*epoint->type = FWP_RECV_EPOINT;
240 epoint->status = FWP_EPOINT_UNBOUND;*/
243 fwp_epoint->attr = *attr;
245 fwp_epoint->attr = fwp_epoint_attr_default;
247 addr = (struct sockaddr_in *) &(fwp_epoint->peer.addr);
248 addr->sin_family = AF_INET;
249 /* TODO: set listen interface, maybe through config struct*/
250 addr->sin_addr.s_addr = FWP_ANY_NODE;
251 fwp_epoint->port = addr->sin_port = htons(port);
252 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
254 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
255 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM,
257 FWP_ERROR("Unable to open socket: %s", strerror(errno));
262 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
263 &yes, sizeof(yes)) == -1) {
264 FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
268 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr,
269 fwp_epoint->peer.addrlen) == -1) {
270 FWP_ERROR("Bind error: %s", strerror(errno));
271 /* TODO: remove all error messages from all libraries */
275 if (listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections)){
276 perror("Error on listen call\n");
280 FD_ZERO(&fwp_epoint->fdset);
281 /*add listen socket */
282 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset);
283 fwp_epoint->testfds = fwp_epoint->fdset;
284 fwp_epoint->c_sockd =
285 (int*)malloc(fwp_epoint->attr.max_connections);
286 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
287 fwp_epoint->nr_connections = 0;
289 FWP_DEBUG("Reliable receive endpoint port=%d created.\n",
292 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM,
294 FWP_ERROR("Unable to open socket: %s", strerror(errno));
298 if (bind(fwp_epoint->sockd,
299 (struct sockaddr*) &fwp_epoint->peer.addr,
300 fwp_epoint->peer.addrlen) == -1) {
302 FWP_ERROR("Bind error: %s", strerror(errno));
305 FWP_DEBUG("Best-Effort receive endpoint port=%d created.\n",
309 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
310 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
312 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
315 FWP_DEBUG("Receive endpoint buffer size is set.\n");
319 getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr,
320 &fwp_epoint->peer.addrlen);
322 addr = (struct sockaddr_in*) fwp_epoint->peer.addr;
323 FWP_DEBUG("Recv port= %d\n",ntohs(addr->sin_port));
324 *epointd = fwp_epoint;
327 fwp_endpoint_destroy(fwp_epoint);
332 * Binds send endpoint to vres
334 * \param[in] vres_id identifier of vres
335 * \param[in] epoint_id send endpoint identifier
337 * \return On success returns 0. On error, -1 and errno is set appropriately.
339 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
342 #ifdef FWP_WITH_CONTNEGT
343 fwp_endpoint_t *fwp_epoint = epointd;
345 fwp_epoint->vresd = vresd;
346 rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
348 /* if send endpoint is already bound
349 if (epoint->type == FWP_EPOINT_BOUND) {
350 fwp_send_endpoint_unbind(epoint);
356 * Unbinds send endpoint from vres
358 * \param[in] epointd Send endpoint descriptor
359 * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
362 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
365 fwp_endpoint_t *fwp_epoint = epointd;
367 /* unlink epoint-vres mutually */
368 if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0)
375 * Accepts (TCP) client connection to receive endpoint
377 * \param[in] epointd Pointer to fwp endpoint
379 * On success, it returns zero.
382 static int fwp_receive_endpoint_accept(fwp_endpoint_t *fwp_epoint)
385 // fwp_endpoint_t *fwp_epoint = epointd;
389 if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
392 peer.addrlen = sizeof(struct sockaddr_in);
393 csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
397 perror("Error on accept\n");
401 FWP_DEBUG("New connection accepted\n");
402 /* find free place */
404 while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections))
406 fwp_epoint->c_sockd[i] = csockd;
407 fwp_epoint->nr_connections++;
409 FD_SET(csockd, &fwp_epoint->fdset);
414 * Receives message from stream (TCP)
416 * \param[in] epointd Descriptor of endpoint
417 * \param[in] buffer Buffer to store message
418 * \param[in] buffer_size Size of buffer
421 * On success, it returns number of received bytes.
422 * On error, -1 is returned and errno is set appropriately.
425 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer,
428 fwp_endpoint_t *fwp_epoint = epointd;
429 fwp_sockaddr_t *peer = &fwp_epoint->peer;
430 fd_set fdset = fwp_epoint->fdset;
434 FWP_DEBUG("Checking for tcp data\n");
435 for (i = 0; i < fwp_epoint->nr_connections; i++) {
436 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
440 FWP_DEBUG("Prepare to receive tcp data\n");
441 peer->addrlen = sizeof(struct sockaddr_in);
442 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer,
443 buffer_size,0, peer);
445 if (len < 0) /* Error */
448 FWP_DEBUG("Received tcp data\n");
452 /* tcp connection closed */
453 FWP_DEBUG("Connection closed\n");
454 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
455 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1,
456 sizeof(int)*(fwp_epoint->nr_connections -i-1));
457 fwp_epoint->nr_connections--;
466 * \param[in] epointd Descriptor of endpoint
467 * \param[in] buffer Buffer to store message
468 * \param[in] buffer_size Size of buffer
471 * On success, it returns number of received bytes.
472 * On error, -1 is returned and errno is set appropriately.
475 ssize_t fwp_recv(fwp_endpoint_d_t epointd,
476 void *buffer, const size_t buffer_size,
477 unsigned int *from, int flags)
479 fwp_endpoint_t *fwp_epoint = epointd;
480 fwp_sockaddr_t *peer = &fwp_epoint->peer;
484 /*if (!fwp_endpoint_is_valid(epointd)) {
489 if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
490 len = _fwp_recvfrom(fwp_epoint->sockd, buffer,
491 buffer_size, 0, peer);
496 /* FIXME: What about using a loop here and continue instead of goto???? */
497 /* FWP_EPOINT_RELIABLE */
498 fdset = fwp_epoint->fdset;
499 if (select(FD_SETSIZE, &fdset, (fd_set *)0,
500 (fd_set *)0, NULL) < 0) {
502 FWP_ERROR("Error in select: %s", strerror(errno));
506 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
507 fwp_receive_endpoint_accept(fwp_epoint);
511 /* Check client TCP sockets */
512 len = fwp_recv_conn(fwp_epoint, buffer, buffer_size);
519 * Sends message through vres
521 * \param[in] epointd Endpoint descriptor
522 * \param[in] msg Message to sent
523 * \param[in] size Message size
526 * On success, it returns zero.
527 * On error, -1 is returned and errno is set appropriately.
530 int fwp_send(fwp_endpoint_d_t epointd,const void *msg, const size_t size,
533 fwp_endpoint_t *fwp_epoint = epointd;
534 struct fwp_msgb *msgb;
536 /* if (!fwp_endpoint_is_valid(epointd)){
540 if (!fwp_endpoint_is_bound(epointd)){
545 /*if (flags && MSG_DONTWAIT)
546 msgb = fwp_msgb_alloc(buffer_size);
548 if (!(msgb = fwp_msgb_alloc(size))) {
553 /*msgb->peer = &fwp_epoint->peer;*/
554 /*msgb->data = msg;*/
555 /*msgb->flags = epoint->flags;*/
557 /* data must be copied since msg may change while
558 * message is waiting in transmission queue
560 memcpy(msgb->data, msg, size);
561 fwp_msgb_put(msgb, size);
562 /*msgb->tail = msgb->data + size;
567 /* TODO: test whether _fwp_vres_send is successful */
568 return fwp_vres_send(fwp_epoint->vresd, msgb);