1 #include "fwp_endpoint.h"
6 typedef unsigned int fwp_endpoint_id_t;
11 } fwp_endpoint_type_t;
15 FWP_EPOINT_INACTIVE = 1,
16 FWP_EPOINT_UNBOUND = 2,
18 } fwp_endpoint_status_t;
20 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
21 .reliability = FWP_EPOINT_BESTEFFORT,
22 .max_connections = 20,
26 * Structure of FWP endpoint.
30 fwp_endpoint_type_t type;
31 /** endpoint attributes */
32 fwp_endpoint_attr_t attr;
33 /** the vres descriptor the send endpoint is bound to */
35 /** for send enpoint it contains destination address for
36 * receive endpoint it is filled with the msg source address
38 struct fwp_sockaddr peer;
39 /** source/destination port */
41 /** destination node */
43 /** Socket descriptor.
44 * In case of rebliable epoint it is a listen socket.
50 unsigned int nr_connections;
51 /** specific operation options*/
53 fwp_endpoint_status_t status;
57 struct fwp_endpoint_table {
58 unsigned int max_endpoints;
59 fwp_endpoint_t *entry;
61 } fwp_endpoint_table_t;
63 /* Global variable - endpoint table */
64 static fwp_endpoint_table_t fwp_endpoint_table = {
67 .lock = PTHREAD_MUTEX_INITIALIZER,
70 static inline int fwp_endpoint_is_valid(fwp_endpoint_d_t epointd)
72 int id = epointd - fwp_endpoint_table.entry;
74 if ((id < 0) || (id > fwp_endpoint_table.max_endpoints - 1)||
75 (epointd->status == FWP_EPOINT_FREE))
81 int fwp_endpoint_is_bound(fwp_endpoint_d_t epointd)
83 return (epointd->status == FWP_EPOINT_BOUND);
86 int fwp_endpoint_table_init(unsigned int max_endpoints)
88 int table_size = max_endpoints * sizeof(fwp_endpoint_t);
90 fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
91 if (!fwp_endpoint_table.entry)
93 memset(fwp_endpoint_table.entry, 0, table_size);
94 fwp_endpoint_table.max_endpoints = max_endpoints;
101 * \return On success returns endpoint structure.
102 * On error, NULL is returned.
105 static fwp_endpoint_t* fwp_endpoint_alloc()
107 int i, max_endpoints;
109 /* find free vres id */
110 pthread_mutex_lock(&fwp_endpoint_table.lock);
112 max_endpoints = fwp_endpoint_table.max_endpoints;
113 while ((i < max_endpoints) &&
114 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
117 if (i == max_endpoints) {
118 pthread_mutex_unlock(&fwp_endpoint_table.lock);
122 fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
123 pthread_mutex_unlock(&fwp_endpoint_table.lock);
124 return (&fwp_endpoint_table.entry[i]);
130 * \param[in] epointd Endpoint descriptor
131 * \return On success 0 is returned.
132 * On error, negative error value is returned.
134 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
136 fwp_endpoint_t *epoint = epointd;
138 if (!fwp_endpoint_is_valid(epointd))
141 epoint->status = FWP_EPOINT_FREE;
142 if (epoint->sockd > 0)
143 close(epoint->sockd);
148 * Get endpoint parameters
150 * \param[in] epointd Endpoint descriptor
151 * \param[out] node Node identifier
152 * \param[out] port Port
153 * \param[out] attr Endpoint`s attributes
154 * \return On success 0 is returned.
155 * On error, negative error value is returned.
157 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
158 unsigned int *port, fwp_endpoint_attr_t *attr)
160 fwp_endpoint_t *epoint = epointd;
162 *node = epoint->node;
163 *port = epoint->port;
164 *attr = epoint->attr;
169 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
171 bzero(attr, sizeof(fwp_endpoint_attr_t));
172 *attr = fwp_epoint_attr_default;
178 * Creates send endpoint
180 * \param[in] node IP address of destination node
181 * \param[in] port UDP port
182 * \param[in] attr Endpoint attributes
183 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
185 * \return On success returns descriptor of endpoint.
186 * On error, negative error code is returned.
189 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
190 fwp_endpoint_attr_t *attr,
191 fwp_endpoint_d_t *epointdp)
193 struct sockaddr_in *addr;
194 fwp_endpoint_t *epoint;
196 epoint = fwp_endpoint_alloc();
201 epoint->type = FWP_SEND_EPOINT;
202 epoint->status = FWP_EPOINT_UNBOUND;
206 epoint->attr = *attr;
208 epoint->attr = fwp_epoint_attr_default;
210 addr = (struct sockaddr_in *)&(epoint->peer.addr);
211 bzero((char*) addr, sizeof(*addr));
212 addr->sin_family = AF_INET;
213 addr->sin_addr.s_addr = node;
214 addr->sin_port = htons(port);
215 epoint->peer.addrlen = sizeof(struct sockaddr_in);
217 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
218 epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
221 if (setsockopt(epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
222 &yes, sizeof(yes)) == -1) {
223 perror("setsockopt(SO_REUSEADDR)");
227 epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
229 /* Enable broadcasts */
230 unsigned int yes = 1;
231 if (setsockopt(epoint->sockd,SOL_SOCKET, SO_BROADCAST,
232 &yes, sizeof(yes)) == -1) {
233 perror("setsockopt(SO_BROADCAST)");
239 if (epoint->sockd < 0) {
240 perror("Unable to open socket");
244 if (connect(epoint->sockd,(struct sockaddr*) &epoint->peer.addr,
245 epoint->peer.addrlen)) {
246 perror("Connect error");
250 FWP_DEBUG("Send endpoint created.\n");
254 fwp_endpoint_destroy(epoint);
259 * Creates receive endpoint
261 * \param[in] port UDP port
262 * \param[in] attr Endpoint attributes
263 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
265 * \return On success returns descriptor of endpoint.
266 * On error, negative error code is returned.
268 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
269 fwp_endpoint_attr_t *attr,
270 fwp_endpoint_d_t *epointdp)
272 fwp_endpoint_t *epoint;
273 struct sockaddr_in *addr;
274 //int rcvbuf_size = 3000;
276 epoint = fwp_endpoint_alloc();
281 epoint->type = FWP_RECV_EPOINT;
282 epoint->status = FWP_EPOINT_UNBOUND;
284 epoint->attr = *attr;
286 epoint->attr = fwp_epoint_attr_default;
288 addr = (struct sockaddr_in *)&(epoint->peer.addr);
289 addr->sin_family = AF_INET;
290 /* TODO: set listen interface, maybe through config struct*/
291 addr->sin_addr.s_addr = INADDR_ANY;
292 addr->sin_port = htons(port);
293 epoint->peer.addrlen = sizeof(struct sockaddr_in);
295 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
296 if ((epoint->sockd = socket(PF_INET, SOCK_STREAM,
298 perror("Unable to open socket");
302 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr,
303 epoint->peer.addrlen) == -1) {
304 perror("Bind error");
305 /* TODO: remove all error messages from all libraries */
309 listen(epoint->sockd, epoint->attr.max_connections);
311 FD_ZERO(&epoint->fdset);
312 FD_SET(epoint->sockd, &epoint->fdset); /*add listen socket */
313 epoint->testfds = epoint->fdset;
314 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
315 bzero(epoint->c_sockd, epoint->attr.max_connections);
316 epoint->nr_connections = 0;
318 FWP_DEBUG("Receive endpoint\n");
321 if ((epoint->sockd = socket(PF_INET, SOCK_DGRAM,
323 perror("Unable to open socket");
327 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr,
328 epoint->peer.addrlen) == -1) {
329 perror("Bind error");
334 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
335 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
337 perror("Unable to set socket buffer size");
340 FWP_DEBUG("Receive endpoint buffer size is set.\n");
344 getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr,
345 &epoint->peer.addrlen);
347 epoint->port = ntohs(addr->sin_port);
349 epoint->node = ntohl(addr->sin_addr.s_addr);
350 FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port);
356 fwp_endpoint_destroy(epoint);
361 * Binds send endpoint to vres
363 * \param[in] vres_id identifier of vres
364 * \param[in] epoint_id send endpoint identifier
366 * \return On success returns 0. On error, negative error code is returned
368 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
370 fwp_endpoint_t *epoint = epointd;
373 /* link epoint-vres mutually */
374 pthread_mutex_lock(&fwp_endpoint_table.lock);
375 if ((!fwp_endpoint_is_valid(epointd))||
376 (epoint->type != FWP_SEND_EPOINT)) {
380 if (epoint->status == FWP_EPOINT_BOUND) { /* already bound*/
384 if ((rv = _fwp_vres_bind(vresd, epoint->sockd)) < 0) {
388 /* if send endpoint is already bound
389 if (epoint->type == FWP_EPOINT_BOUND) {
390 fwp_send_endpoint_unbind(epoint);
392 epoint->vresd = vresd;
393 epoint->status = FWP_EPOINT_BOUND;
396 pthread_mutex_unlock(&fwp_endpoint_table.lock);
401 * Unbinds send endpoint from vres
403 * \param[in] epointd Send endpoint descriptor
404 * \return On success returns 0. On error, negative error code is returned
407 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
409 fwp_endpoint_t *epoint = epointd;
412 if (!fwp_endpoint_is_valid(epointd))
414 if (epoint->status != FWP_EPOINT_BOUND)
417 /* unlink epoint-vres mutually */
418 if ((rv = _fwp_vres_unbind(epoint->vresd)) < 0)
420 epoint->status = FWP_EPOINT_UNBOUND;
428 * \param[in] epointd Descriptor of endpoint
429 * \param[in] buffer Buffer to store message
430 * \param[in] buffer_size Size of buffer
433 * On success, it returns number of received bytes.
434 * On error, negative error code is returned,
437 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
440 fwp_endpoint_t *epoint = epointd;
441 fwp_sockaddr_t *peer = &epoint->peer;
446 if (!fwp_endpoint_is_valid(epointd))
449 if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
450 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
451 peer->addr, &peer->addrlen);
456 /* FIXME: What about using a loop here and continue instead of goto???? */
457 /* FWP_EPOINT_RELIABLE */
458 fdset = epoint->fdset;
459 FWP_DEBUG("Before select\n");
460 if (select(FD_SETSIZE, &fdset, (fd_set *)0,
461 (fd_set *)0, NULL) < 0) {
463 perror("Error in select");
466 FWP_DEBUG("After select\n");
468 if (FD_ISSET(epoint->sockd, &fdset)) { /* is it listen socket? */
469 if (epoint->nr_connections == epoint->attr.max_connections)
472 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
475 FWP_DEBUG("New connection accepted\n");
476 /* find free place */
478 while ((epoint->c_sockd[i])&&
479 (i < epoint->nr_connections))
481 epoint->c_sockd[i] = csockd;
482 FWP_DEBUG("Index = %d\n", i);
483 epoint->nr_connections++;
485 FD_SET(csockd, &epoint->fdset);
489 /* Check client TCP sockets */
490 for (i = 0; i < epoint->nr_connections; i++) {
491 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
492 _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
493 0, peer->addr, &peer->addrlen);
495 FWP_DEBUG("Received tcp data\n");
498 /* tcp connection closed */
499 FWP_DEBUG("Connection closed\n");
500 FD_CLR(epoint->c_sockd[i], &epoint->fdset);
501 memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1,
502 sizeof(int)*(epoint->nr_connections -i-1));
503 epoint->nr_connections--;
512 * Sends message through vres
514 * \param[in] epointd Endpoint descriptor
515 * \param[in] msg Message to sent
516 * \param[in] size Message size
519 * On success, it returns zero.
520 * On error, negative error code is returned,
523 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
525 struct fwp_endpoint *epoint = epointd;
526 struct fwp_msgb *msgb;
528 if (!fwp_endpoint_is_valid(epointd)){
531 if (!fwp_endpoint_is_bound(epointd)){
535 /*if (flags && MSG_DONTWAIT)
536 msgb = fwp_msgb_alloc(buffer_size);
538 if (!(msgb = fwp_msgb_alloc(size)))
541 msgb->peer = &epoint->peer;
542 /*msgb->data = msg;*/
543 /*msgb->flags = epoint->flags;*/
545 /* data must be copied since msg may change while
546 * message is waiting in transmission queue
548 memcpy(msgb->data, msg, size);
549 fwp_msgb_put(msgb, size);
550 /*msgb->tail = msgb->data + size;
555 /* TODO: test whether _fwp_vres_send is successful */
556 return _fwp_vres_send(epoint->vresd, msgb);