1 #include "fwp_endpoint.h"
7 typedef unsigned int fwp_endpoint_id_t;
9 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
10 .reliability = FWP_EPOINT_BESTEFFORT,
11 .max_connections = 20,
15 * Structure of FWP endpoint.
18 /** endpoint attributes */
19 fwp_endpoint_attr_t attr;
20 /** for send enpoint it contains destination address for
21 * receive endpoint it is filled with the msg source address
24 struct fwp_sockaddr peer;
25 /** source/destination port */
27 /** destination node */
29 /** Socket descriptor.
30 * In case of rebliable epoint it is a listen socket.
36 unsigned int nr_connections;
37 /** specific operation options*/
44 * \return On success returns endpoint structure.
45 * On error, NULL is returned.
48 static fwp_endpoint_t* fwp_endpoint_alloc()
50 return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
56 * \param[in] epointd Endpoint descriptor
57 * \return On success 0 is returned.
58 * On error, negative error value is returned and errno is set appropriately.
60 /*int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
62 fwp_endpoint_t *epoint;
64 epoint = (fwp_endpoint_t*) endpoint->endpoint_protocol_info.send.body;
65 if (epoint->sockd > 0)
72 * Get endpoint parameters
74 * \param[in] epointd Endpoint descriptor
75 * \param[out] node Node identifier
76 * \param[out] port Port
77 * \param[out] attr Endpoint`s attributes
78 * \return On success 0 is returned.
79 * On error, negative error value is returned.
81 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
82 unsigned int *port, fwp_endpoint_attr_t *attr)
84 fwp_endpoint_t *epoint = epointd;
86 if (node) *node = epoint->node;
87 if (port) *port = epoint->port;
88 if (attr) *attr = epoint->attr;
93 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
95 bzero(attr, sizeof(fwp_endpoint_attr_t));
96 *attr = fwp_epoint_attr_default;
102 * Creates send endpoint
104 * \param[in] node IP address of destination node
105 * \param[in] port UDP port
106 * \param[in] attr Endpoint attributes
107 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
109 * \return Zero on success, -1 on error and sets errno appropriately.
112 int fwp_send_endpoint_create(unsigned int node,
114 fwp_endpoint_attr_t *attr,
115 fwp_endpoint_t **epoint)
117 struct sockaddr_in *addr;
118 fwp_endpoint_t *fwp_epoint;
120 fwp_epoint = fwp_endpoint_alloc();
126 /*epoint->type = FWP_SEND_EPOINT;
127 epoint->status = FWP_EPOINT_UNBOUND;
132 fwp_epoint->attr = *attr;
134 fwp_epoint->attr = fwp_epoint_attr_default;
136 addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
137 bzero((char*) addr, sizeof(*addr));
138 addr->sin_family = AF_INET;
139 addr->sin_addr.s_addr = node;
140 addr->sin_port = htons(port);
141 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
143 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
144 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
145 if (fwp_epoint->sockd < 0) {
149 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
150 if (fwp_epoint->sockd < 0) {
154 /* Enable broadcasts */
155 /*unsigned int yes = 1;
156 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST,
157 &yes, sizeof(yes)) == -1) {
158 FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
164 if (connect(fwp_epoint->sockd,
165 (struct sockaddr*) &fwp_epoint->peer.addr,
166 fwp_epoint->peer.addrlen)) {
170 FWP_DEBUG("FWP Send endpoint created.\n");
171 *epoint = fwp_epoint;
174 fwp_endpoint_destroy(fwp_epoint);
179 * Creates receive endpoint
181 * \param[in] port UDP port
182 * \param[in] attr Endpoint attributes
183 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
185 * \return Zero on success, -1 on error and errno is set.
187 int fwp_receive_endpoint_create(unsigned int port,
188 fwp_endpoint_attr_t *attr,
189 fwp_endpoint_t **epoint)
191 struct sockaddr_in *addr;
192 fwp_endpoint_t *fwp_epoint;
194 fwp_epoint = fwp_endpoint_alloc();
200 /*epoint->type = FWP_RECV_EPOINT;
201 epoint->status = FWP_EPOINT_UNBOUND;*/
204 fwp_epoint->attr = *attr;
206 fwp_epoint->attr = fwp_epoint_attr_default;
208 addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
209 addr->sin_family = AF_INET;
210 /* TODO: set listen interface, maybe through config struct*/
211 addr->sin_addr.s_addr = INADDR_ANY;
212 addr->sin_port = htons(port);
213 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
215 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
216 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM,
218 FWP_ERROR("Unable to open socket: %s", strerror(errno));
223 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
224 &yes, sizeof(yes)) == -1) {
225 FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
229 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr,
230 fwp_epoint->peer.addrlen) == -1) {
231 FWP_ERROR("Bind error: %s", strerror(errno));
232 /* TODO: remove all error messages from all libraries */
236 listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections);
238 FD_ZERO(&fwp_epoint->fdset);
239 /*add listen socket */
240 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset);
241 fwp_epoint->testfds = fwp_epoint->fdset;
242 fwp_epoint->c_sockd =
243 (int*)malloc(fwp_epoint->attr.max_connections);
244 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
245 fwp_epoint->nr_connections = 0;
247 FWP_DEBUG("Receive endpoint\n");
250 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM,
252 FWP_ERROR("Unable to open socket: %s", strerror(errno));
256 if (bind(fwp_epoint->sockd,
257 (struct sockaddr*) &fwp_epoint->peer.addr,
258 fwp_epoint->peer.addrlen) == -1) {
260 FWP_ERROR("Bind error: %s", strerror(errno));
265 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
266 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
268 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
271 FWP_DEBUG("Receive endpoint buffer size is set.\n");
275 getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr,
276 &fwp_epoint->peer.addrlen);
278 FWP_DEBUG("Receive endpoint port=%d created.\n", fwp_epoint->port);
279 *epoint = fwp_epoint;
282 fwp_endpoint_destroy(fwp_epoint);
287 * Binds send endpoint to vres
289 * \param[in] vres_id identifier of vres
290 * \param[in] epoint_id send endpoint identifier
292 * \return On success returns 0. On error, -1 and errno is set appropriately.
294 int fwp_send_endpoint_bind(fwp_endpoint_t *epoint, fwp_vres_d_t vresd)
297 fwp_endpoint_t *fwp_epoint = epoint;
299 rv = _fwp_vres_bind(vresd, fwp_epoint->sockd);
300 /* if send endpoint is already bound
301 if (epoint->type == FWP_EPOINT_BOUND) {
302 fwp_send_endpoint_unbind(epoint);
309 * Unbinds send endpoint from vres
311 * \param[in] epointd Send endpoint descriptor
312 * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
315 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
318 fwp_endpoint_t *fwp_epoint = epointd;
320 /* unlink epoint-vres mutually */
321 if ((rv = _fwp_vres_unbind(fwp_epoint->vresd)) < 0)
327 int fwp_receive_endpoint_accept(fwp_endpoint_d_t epointd)
330 fwp_endpoint_t *fwp_epoint = epointd;
334 if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
337 csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
340 FWP_DEBUG("New connection accepted\n");
341 /* find free place */
343 while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections))
345 fwp_epoint->c_sockd[i] = csockd;
346 FWP_DEBUG("Index = %d\n", i);
347 fwp_epoint->nr_connections++;
349 FD_SET(csockd, &fwp_epoint->fdset);
354 * Receives message from stream (TCP)
356 * \param[in] epointd Descriptor of endpoint
357 * \param[in] buffer Buffer to store message
358 * \param[in] buffer_size Size of buffer
361 * On success, it returns number of received bytes.
362 * On error, -1 is returned and errno is set appropriately.
365 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer,
368 fwp_endpoint_t *fwp_epoint = epointd;
369 fwp_sockaddr_t *peer = &fwp_epoint->peer;
374 for (i = 0; i < fwp_epoint->nr_connections; i++) {
375 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
379 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer,
380 buffer_size,0, peer);
382 if (len < 0) /* Error */
385 FWP_DEBUG("Received tcp data\n");
389 /* tcp connection closed */
390 FWP_DEBUG("Connection closed\n");
391 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
392 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1,
393 sizeof(int)*(fwp_epoint->nr_connections -i-1));
394 fwp_epoint->nr_connections--;
403 * \param[in] epointd Descriptor of endpoint
404 * \param[in] buffer Buffer to store message
405 * \param[in] buffer_size Size of buffer
408 * On success, it returns number of received bytes.
409 * On error, -1 is returned and errno is set appropriately.
412 ssize_t fwp_recv(fwp_endpoint_t *endpoint,
413 void *buffer, const size_t buffer_size,
414 unsigned int *from, int flags)
416 fwp_sockaddr_t *peer = &endpoint->peer;
419 fwp_endpoint_t *fwp_epoint = endpoint;
421 /* if (!fwp_endpoint_is_valid(epointd)) {
426 if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
427 len = _fwp_recvfrom(fwp_epoint->sockd, buffer,
428 buffer_size, 0, peer);
433 /* FIXME: What about using a loop here and continue instead of goto???? */
434 /* FWP_EPOINT_RELIABLE */
435 fdset = fwp_epoint->fdset;
436 if (select(FD_SETSIZE, &fdset, (fd_set *)0,
437 (fd_set *)0, NULL) < 0) {
439 FWP_ERROR("Error in select: %s", strerror(errno));
443 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
444 fwp_receive_endpoint_accept(endpoint);
448 /* Check client TCP sockets */
449 len = fwp_recv_conn(endpoint, buffer, buffer_size);
456 * Sends message through vres
458 * \param[in] epointd Endpoint descriptor
459 * \param[in] msg Message to sent
460 * \param[in] size Message size
463 * On success, it returns zero.
464 * On error, -1 is returned and errno is set appropriately.
467 int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
469 struct fwp_msgb *msgb;
470 /*fwp_endpoint_t *fwp_epoint;*/
472 /* if (!fwp_endpoint_is_valid(epointd)){
476 if (!fwp_endpoint_is_bound(epointd)){
481 /*if (flags && MSG_DONTWAIT)
482 msgb = fwp_msgb_alloc(buffer_size);
484 if (!(msgb = fwp_msgb_alloc(size))) {
489 msgb->peer = &fwp_epoint->peer;
490 /*msgb->data = msg;*/
491 /*msgb->flags = epoint->flags;*/
493 /* data must be copied since msg may change while
494 * message is waiting in transmission queue
496 memcpy(msgb->data, msg, size);
497 fwp_msgb_put(msgb, size);
498 /*msgb->tail = msgb->data + size;
503 /* TODO: test whether _fwp_vres_send is successful */
504 return _fwp_vres_send(fwp_epoint->vresd, msgb);