1 #include "fwp_endpoint.h"
7 typedef unsigned int fwp_endpoint_id_t;
9 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
10 .reliability = FWP_EPOINT_BESTEFFORT,
11 .max_connections = 20,
15 * Structure of FWP endpoint.
18 /** endpoint attributes */
19 fwp_endpoint_attr_t attr;
20 /** for send enpoint it contains destination address for
21 * receive endpoint it is filled with the msg source address
24 struct fwp_sockaddr peer;
25 /** source/destination port */
27 /** destination node */
29 /** Socket descriptor.
30 * In case of rebliable epoint it is a listen socket.
36 unsigned int nr_connections;
37 /** specific operation options*/
44 * \return On success returns endpoint structure.
45 * On error, NULL is returned.
48 static fwp_endpoint_t* fwp_endpoint_alloc()
50 return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
56 * \param[in] epointd Endpoint descriptor
57 * \return On success 0 is returned.
58 * On error, negative error value is returned and errno is set appropriately.
60 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
62 fwp_endpoint_t *epoint;
64 /* epoint = (fwp_endpoint_t*) endpoint->endpoint_protocol_info.send.body;
65 if (epoint->sockd > 0)
66 close(epoint->sockd);*/
72 * Get endpoint parameters
74 * \param[in] epointd Endpoint descriptor
75 * \param[out] node Node identifier
76 * \param[out] port Port
77 * \param[out] attr Endpoint`s attributes
78 * \return On success 0 is returned.
79 * On error, negative error value is returned.
81 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
82 unsigned int *port, fwp_endpoint_attr_t *attr)
84 fwp_endpoint_t *epoint = epointd;
86 if (node) *node = epoint->node;
87 if (port) *port = epoint->port;
88 if (attr) *attr = epoint->attr;
93 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
95 bzero(attr, sizeof(fwp_endpoint_attr_t));
96 *attr = fwp_epoint_attr_default;
102 * Creates send endpoint
104 * \param[in] node IP address of destination node
105 * \param[in] port UDP port
106 * \param[in] attr Endpoint attributes
107 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
109 * \return Zero on success, -1 on error and sets errno appropriately.
112 int fwp_send_endpoint_create(unsigned int node,
114 fwp_endpoint_attr_t *attr,
115 fwp_endpoint_t **epoint)
117 struct sockaddr_in *addr;
118 fwp_endpoint_t *fwp_epoint;
120 fwp_epoint = fwp_endpoint_alloc();
126 /*epoint->type = FWP_SEND_EPOINT;
127 epoint->status = FWP_EPOINT_UNBOUND;
132 fwp_epoint->attr = *attr;
134 fwp_epoint->attr = fwp_epoint_attr_default;
136 addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
137 bzero((char*) addr, sizeof(*addr));
138 addr->sin_family = AF_INET;
139 addr->sin_addr.s_addr = node;
140 addr->sin_port = htons(port);
141 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
143 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
144 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
145 if (fwp_epoint->sockd < 0) {
149 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
150 if (fwp_epoint->sockd < 0) {
154 /* Enable broadcasts */
155 /*unsigned int yes = 1;
156 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST,
157 &yes, sizeof(yes)) == -1) {
158 FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
164 if (connect(fwp_epoint->sockd,
165 (struct sockaddr*) &fwp_epoint->peer.addr,
166 fwp_epoint->peer.addrlen)) {
167 FWP_DEBUG("FWp connect error\n");
171 FWP_DEBUG("FWP Send endpoint created.\n");
172 *epoint = fwp_epoint;
175 fwp_endpoint_destroy(fwp_epoint);
180 * Creates receive endpoint
182 * \param[in] port UDP port
183 * \param[in] attr Endpoint attributes
184 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
186 * \return Zero on success, -1 on error and errno is set.
188 int fwp_receive_endpoint_create(unsigned int port,
189 fwp_endpoint_attr_t *attr,
190 fwp_endpoint_t **epoint)
192 struct sockaddr_in *addr;
193 fwp_endpoint_t *fwp_epoint;
195 fwp_epoint = fwp_endpoint_alloc();
201 /*epoint->type = FWP_RECV_EPOINT;
202 epoint->status = FWP_EPOINT_UNBOUND;*/
205 fwp_epoint->attr = *attr;
207 fwp_epoint->attr = fwp_epoint_attr_default;
209 addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
210 addr->sin_family = AF_INET;
211 /* TODO: set listen interface, maybe through config struct*/
212 addr->sin_addr.s_addr = INADDR_ANY;
213 addr->sin_port = htons(port);
214 fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
216 if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
217 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM,
219 FWP_ERROR("Unable to open socket: %s", strerror(errno));
224 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
225 &yes, sizeof(yes)) == -1) {
226 FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
230 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr,
231 fwp_epoint->peer.addrlen) == -1) {
232 FWP_ERROR("Bind error: %s", strerror(errno));
233 /* TODO: remove all error messages from all libraries */
237 listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections);
239 FD_ZERO(&fwp_epoint->fdset);
240 /*add listen socket */
241 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset);
242 fwp_epoint->testfds = fwp_epoint->fdset;
243 fwp_epoint->c_sockd =
244 (int*)malloc(fwp_epoint->attr.max_connections);
245 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
246 fwp_epoint->nr_connections = 0;
248 FWP_DEBUG("Receive endpoint\n");
251 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM,
253 FWP_ERROR("Unable to open socket: %s", strerror(errno));
257 if (bind(fwp_epoint->sockd,
258 (struct sockaddr*) &fwp_epoint->peer.addr,
259 fwp_epoint->peer.addrlen) == -1) {
261 FWP_ERROR("Bind error: %s", strerror(errno));
266 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
267 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
269 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
272 FWP_DEBUG("Receive endpoint buffer size is set.\n");
276 getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr,
277 &fwp_epoint->peer.addrlen);
279 FWP_DEBUG("Receive endpoint port=%d created.\n", fwp_epoint->port);
280 *epoint = fwp_epoint;
283 fwp_endpoint_destroy(fwp_epoint);
288 * Binds send endpoint to vres
290 * \param[in] vres_id identifier of vres
291 * \param[in] epoint_id send endpoint identifier
293 * \return On success returns 0. On error, -1 and errno is set appropriately.
295 int fwp_send_endpoint_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)
298 fwp_endpoint_t *fwp_epoint = epoint;
300 fwp_epoint->vresd = vresd;
301 rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
302 /* if send endpoint is already bound
303 if (epoint->type == FWP_EPOINT_BOUND) {
304 fwp_send_endpoint_unbind(epoint);
311 * Unbinds send endpoint from vres
313 * \param[in] epointd Send endpoint descriptor
314 * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
317 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
320 fwp_endpoint_t *fwp_epoint = epointd;
322 /* unlink epoint-vres mutually */
323 if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0)
329 int fwp_receive_endpoint_accept(fwp_endpoint_d_t epointd)
332 fwp_endpoint_t *fwp_epoint = epointd;
336 if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
339 csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
342 FWP_DEBUG("New connection accepted\n");
343 /* find free place */
345 while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections))
347 fwp_epoint->c_sockd[i] = csockd;
348 FWP_DEBUG("Index = %d\n", i);
349 fwp_epoint->nr_connections++;
351 FD_SET(csockd, &fwp_epoint->fdset);
356 * Receives message from stream (TCP)
358 * \param[in] epointd Descriptor of endpoint
359 * \param[in] buffer Buffer to store message
360 * \param[in] buffer_size Size of buffer
363 * On success, it returns number of received bytes.
364 * On error, -1 is returned and errno is set appropriately.
367 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer,
370 fwp_endpoint_t *fwp_epoint = epointd;
371 fwp_sockaddr_t *peer = &fwp_epoint->peer;
376 for (i = 0; i < fwp_epoint->nr_connections; i++) {
377 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
381 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer,
382 buffer_size,0, peer);
384 if (len < 0) /* Error */
387 FWP_DEBUG("Received tcp data\n");
391 /* tcp connection closed */
392 FWP_DEBUG("Connection closed\n");
393 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
394 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1,
395 sizeof(int)*(fwp_epoint->nr_connections -i-1));
396 fwp_epoint->nr_connections--;
405 * \param[in] epointd Descriptor of endpoint
406 * \param[in] buffer Buffer to store message
407 * \param[in] buffer_size Size of buffer
410 * On success, it returns number of received bytes.
411 * On error, -1 is returned and errno is set appropriately.
414 ssize_t fwp_recv(fwp_endpoint_t *endpoint,
415 void *buffer, const size_t buffer_size,
416 unsigned int *from, int flags)
418 fwp_sockaddr_t *peer = &endpoint->peer;
421 fwp_endpoint_t *fwp_epoint = endpoint;
423 /* if (!fwp_endpoint_is_valid(epointd)) {
428 if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
429 len = _fwp_recvfrom(fwp_epoint->sockd, buffer,
430 buffer_size, 0, peer);
435 /* FIXME: What about using a loop here and continue instead of goto???? */
436 /* FWP_EPOINT_RELIABLE */
437 fdset = fwp_epoint->fdset;
438 if (select(FD_SETSIZE, &fdset, (fd_set *)0,
439 (fd_set *)0, NULL) < 0) {
441 FWP_ERROR("Error in select: %s", strerror(errno));
445 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
446 fwp_receive_endpoint_accept(endpoint);
450 /* Check client TCP sockets */
451 len = fwp_recv_conn(endpoint, buffer, buffer_size);
458 * Sends message through vres
460 * \param[in] epointd Endpoint descriptor
461 * \param[in] msg Message to sent
462 * \param[in] size Message size
465 * On success, it returns zero.
466 * On error, -1 is returned and errno is set appropriately.
469 int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
471 struct fwp_msgb *msgb;
472 /*fwp_endpoint_t *fwp_epoint;*/
474 /* if (!fwp_endpoint_is_valid(epointd)){
478 if (!fwp_endpoint_is_bound(epointd)){
483 /*if (flags && MSG_DONTWAIT)
484 msgb = fwp_msgb_alloc(buffer_size);
486 if (!(msgb = fwp_msgb_alloc(size))) {
491 /*msgb->peer = &fwp_epoint->peer;*/
492 /*msgb->data = msg;*/
493 /*msgb->flags = epoint->flags;*/
495 /* data must be copied since msg may change while
496 * message is waiting in transmission queue
498 memcpy(msgb->data, msg, size);
499 fwp_msgb_put(msgb, size);
500 /*msgb->tail = msgb->data + size;
505 /* TODO: test whether _fwp_vres_send is successful */
506 return fwp_vres_send(fwp_epoint->vresd, msgb);