1 #include "fwp_endpoint.h"
10 FWP_EPOINT_INACTIVE = 1,
11 FWP_EPOINT_UNBOUND = 2,
13 } fwp_endpoint_status_t;
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16 .reliability = FWP_EPOINT_BESTEFFORT,
17 .max_connections = 20,
21 * Structure of FWP endpoint.
24 fwp_endpoint_type_t type;
25 /**< endpoint attributes */
26 fwp_endpoint_attr_t attr;
27 /**< the vres descriptor the send endpoint is bound to */
29 /**< for send enpoint it contains destination address
30 * for receive endpoint it is filled with the msg source address
32 struct fwp_sockaddr peer;
33 /**< source/destination port */
35 /**< destination node */
37 /**< socket descriptor
38 * in case of rebliable epoint it is a listen socket
44 unsigned int nr_connections;
45 /**< specific operation options*/
47 fwp_endpoint_status_t status;
51 struct fwp_endpoint_table {
52 unsigned int nr_endpoints;
53 fwp_endpoint_t *entry;
55 } fwp_endpoint_table_t;
57 /* Global variable - endpoint table */
58 static fwp_endpoint_table_t fwp_endpoint_table = {
61 .lock = PTHREAD_MUTEX_INITIALIZER,
64 int fwp_endpoint_table_init(unsigned int nr_endpoints)
66 int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
68 fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
69 if (!fwp_endpoint_table.entry)
71 memset(fwp_endpoint_table.entry, 0, table_size);
72 fwp_endpoint_table.nr_endpoints = nr_endpoints;
76 static fwp_endpoint_t* fwp_endpoint_alloc()
80 /* find free vres id */
81 pthread_mutex_lock(&fwp_endpoint_table.lock);
83 nr_endpoints = fwp_endpoint_table.nr_endpoints;
84 while ((i < nr_endpoints) &&
85 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
88 if (i == nr_endpoints) {
89 pthread_mutex_unlock(&fwp_endpoint_table.lock);
93 fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
94 pthread_mutex_unlock(&fwp_endpoint_table.lock);
95 return (&fwp_endpoint_table.entry[i]);
98 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
100 epoint->status = FWP_EPOINT_FREE;
101 close(epoint->sockd);
104 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
105 unsigned int *port, fwp_endpoint_attr_t *attr)
107 fwp_endpoint_t *epoint = epointd;
109 *node = epoint->node;
110 *port = epoint->port;
111 *attr = epoint->attr;
116 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
118 bzero(attr, sizeof(fwp_endpoint_attr_t));
119 *attr = fwp_epoint_attr_default;
125 * Creates send endpoint
127 * \param[in] node IP address of destination node
128 * \param[in] port UDP port
130 * \return On success returns identifier of endpoint.
131 * On error, negative error code is returned.
134 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
135 fwp_endpoint_attr_t *attr,
136 fwp_endpoint_d_t *epointdp)
138 struct sockaddr_in *addr;
139 fwp_endpoint_t *epoint;
142 epoint = fwp_endpoint_alloc();
147 epoint->type = FWP_SEND_EPOINT;
148 epoint->status = FWP_EPOINT_UNBOUND;
152 epoint->attr = *attr;
154 epoint->attr = fwp_epoint_attr_default;
156 addr = (struct sockaddr_in *)&(epoint->peer.addr);
157 bzero((char*) addr, sizeof(*addr));
158 addr->sin_family = AF_INET;
159 addr->sin_addr.s_addr = node;
160 addr->sin_port = htons(port);
161 epoint->peer.addrlen = sizeof(struct sockaddr_in);
163 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
164 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
167 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
171 perror("Unable to open socket");
175 /* Enable broadcasts */
176 unsigned int yes = 1;
177 if (setsockopt(sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/, &yes,
178 sizeof(yes)) == -1) {
179 perror("Unable to set BROADCAST option for socket");
184 if (connect(sockd,(struct sockaddr*) &epoint->peer.addr,
185 epoint->peer.addrlen)) {
186 perror("Connect error");
190 epoint->sockd = sockd;
192 FWP_DEBUG("Send endpoint created.\n");
196 fwp_endpoint_free(epoint);
201 * Creates receive endpoint
203 * \param[in] port UDP port
205 * \return On success returns identifier of endpoint.
206 * On error, negative error code is returned.
209 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
210 fwp_endpoint_attr_t *attr,
211 fwp_endpoint_d_t *epointdp)
213 fwp_endpoint_t *epoint;
215 struct sockaddr_in *addr;
216 //int rcvbuf_size = 3000;
218 epoint = fwp_endpoint_alloc();
223 epoint->type = FWP_RECV_EPOINT;
224 epoint->status = FWP_EPOINT_UNBOUND;
226 epoint->attr = *attr;
228 epoint->attr = fwp_epoint_attr_default;
230 addr = (struct sockaddr_in *)&(epoint->peer.addr);
231 addr->sin_family = AF_INET;
232 /* TODO: set listen interface, maybe through config struct*/
233 addr->sin_addr.s_addr = INADDR_ANY;
234 addr->sin_port = htons(port);
235 epoint->peer.addrlen = sizeof(struct sockaddr_in);
237 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
238 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
239 perror("Unable to open socket");
243 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
244 epoint->peer.addrlen) == -1) {
245 perror("Bind error");
249 listen(sockd, epoint->attr.max_connections);
251 FD_ZERO(&epoint->fdset);
252 FD_SET(sockd, &epoint->fdset); /*add listen socket */
253 epoint->testfds = epoint->fdset;
254 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
255 bzero(epoint->c_sockd, epoint->attr.max_connections);
256 epoint->nr_connections = 0;
258 FWP_DEBUG("Receive endpoint\n");
261 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
262 perror("Unable to open socket");
266 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
267 epoint->peer.addrlen) == -1) {
268 perror("Bind error");
273 epoint->sockd = sockd;
274 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
275 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
277 perror("Unable to set socket buffer size");
280 FWP_DEBUG("Receive endpoint buffer size is set.\n");
284 getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr,
285 &epoint->peer.addrlen);
287 epoint->port = ntohs(addr->sin_port);
289 epoint->node = ntohl(addr->sin_addr.s_addr);
290 FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port);
296 fwp_endpoint_free(epoint);
301 * Binds send endpoint to vres
303 * \param[in] vres_id identifier of vres
304 * \param[in] epoint_id send endpoint identifier
306 * \return On success returns 0. On error, negative error code is returned
308 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
310 fwp_endpoint_t *epoint = epointd;
312 if (epoint->type != FWP_SEND_EPOINT) {
316 /* link epoint-vres mutually */
317 pthread_mutex_lock(&fwp_endpoint_table.lock);
318 if (_fwp_vres_bind(vresd, epoint->sockd) < 0) {
319 pthread_mutex_unlock(&fwp_endpoint_table.lock);
323 if (epoint->type == FWP_EPOINT_BOUND) { /* if send endpoint is already bound */
324 fwp_send_endpoint_unbind(epoint);
327 epoint->vresd = vresd;
328 epoint->status = FWP_EPOINT_BOUND;
330 pthread_mutex_unlock(&fwp_endpoint_table.lock);
335 * Unbinds send endpoint from vres
337 * \param[in] id send endpoint identifier
338 * \return On success returns 0. On error, negative error code is returned
341 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
343 fwp_endpoint_t *epoint = epointd;
345 /* unlink epoint-vres mutually */
346 _fwp_vres_unbind(epoint->vresd);
347 epoint->status = FWP_EPOINT_UNBOUND;
355 * \param[in] epoint_id identificator of endpoint
356 * \param[in] buffer buffer to store message
357 * \param[in] buffer_size size of buffer
360 * On success, it returns number of received bytes.
361 * On error, negative error code is returned,
364 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
367 fwp_endpoint_t *epoint = epointd;
368 fwp_sockaddr_t *peer = &epoint->peer;
373 if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
374 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
375 peer->addr, &peer->addrlen);
380 /* FWP_EPOINT_RELIABLE */
381 //fdset = epoint->fdset;
382 FWP_DEBUG("Before selct\n");
383 if (select(FD_SETSIZE, &epoint->fdset, (fd_set *)0,
384 (fd_set *)0, NULL) < 0) {
386 perror("Error in select");
389 FWP_DEBUG("After select\n");
391 if (FD_ISSET(epoint->sockd, &epoint->fdset)) { /* is it listen socket? */
392 if (epoint->nr_connections == epoint->attr.max_connections)
393 goto next_connection;
395 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
398 FWP_DEBUG("New connection accepted\n");
399 /* find free place */
401 while ((epoint->c_sockd[i])&&
402 (i < epoint->nr_connections))
404 epoint->c_sockd[i] = csockd;
405 FWP_DEBUG("Index = %d\n", i);
406 epoint->nr_connections++;
408 FD_SET(csockd, &epoint->fdset);
410 goto next_connection;
413 /* Check client TCP sockets */
414 for (i = 0; i < epoint->nr_connections; i++) {
415 if (FD_ISSET(epoint->c_sockd[i], &epoint->fdset)) {
416 _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
417 0, peer->addr, &peer->addrlen);
419 FWP_DEBUG("Received tcp data\n");
422 /* tcp connection closed */
423 FWP_DEBUG("Connection closed\n");
424 FD_CLR(epoint->c_sockd[i], &epoint->fdset);
425 memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1,
426 sizeof(int)*(epoint->nr_connections -i-1));
432 * Sends message through vres
434 * \param[in] epoint_id identificator of endpoint
435 * \param[in] msg message to sent
436 * \param[in] size message size
439 * On success, it returns zero.
440 * On error, negative error code is returned,
443 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
445 struct fwp_endpoint *epoint = epointd;
446 struct fwp_msgb *msgb;
448 /* TODO: Validity test of epointd */
449 if (epoint->status != FWP_EPOINT_BOUND) {
453 /*if (flags && MSG_DONTWAIT)
454 msgb = fwp_msgb_alloc(buffer_size);
456 if (!(msgb = fwp_msgb_alloc(size)))
459 msgb->peer = &epoint->peer;
460 /*msgb->data = msg;*/
461 /*msgb->flags = epoint->flags;*/
463 /* data must be copied since msg may change while
464 * message is waiting in transmission queue
466 memcpy(msgb->data, msg, size);
467 fwp_msgb_put(msgb, size);
468 /*msgb->tail = msgb->data + size;
473 /* TODO: test whether _fwp_vres_send is successful */
474 return _fwp_vres_send(epoint->vresd, msgb);