1 #include "fwp_endpoint.h"
10 FWP_EPOINT_INACTIVE = 1,
11 FWP_EPOINT_UNBOUND = 2,
13 } fwp_endpoint_status_t;
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16 .reliability = FWP_EPOINT_BESTEFFORT,
17 .max_connections = 20,
21 * Structure of FWP endpoint.
24 fwp_endpoint_type_t type;
25 /**< endpoint attributes */
26 fwp_endpoint_attr_t attr;
27 /**< the vres descriptor the send endpoint is bound to */
29 /**< for send enpoint it contains destination address
30 * for receive endpoint it is filled with the msg source address
32 struct fwp_sockaddr peer;
33 /**< source/destination port */
35 /**< destination node */
37 /**< socket descriptor
38 * in case of rebliable epoint it is a listen socket
44 unsigned int nr_connections;
45 /**< specific operation options*/
47 fwp_endpoint_status_t status;
51 struct fwp_endpoint_table {
52 unsigned int nr_endpoints;
53 fwp_endpoint_t *entry;
55 } fwp_endpoint_table_t;
57 /* Global variable - endpoint table */
58 static fwp_endpoint_table_t fwp_endpoint_table = {
61 .lock = PTHREAD_MUTEX_INITIALIZER,
64 int fwp_endpoint_table_init(unsigned int nr_endpoints)
66 int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
68 fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
69 if (!fwp_endpoint_table.entry)
71 memset(fwp_endpoint_table.entry, 0, table_size);
72 fwp_endpoint_table.nr_endpoints = nr_endpoints;
76 static fwp_endpoint_t* fwp_endpoint_alloc()
80 /* find free vres id */
81 pthread_mutex_lock(&fwp_endpoint_table.lock);
83 nr_endpoints = fwp_endpoint_table.nr_endpoints;
84 while ((i < nr_endpoints) &&
85 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
88 if (i == nr_endpoints) {
89 pthread_mutex_unlock(&fwp_endpoint_table.lock);
93 fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
94 pthread_mutex_unlock(&fwp_endpoint_table.lock);
95 return (&fwp_endpoint_table.entry[i]);
98 void fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
100 fwp_endpoint_t *epoint = epointd;
102 epoint->status = FWP_EPOINT_FREE;
103 if (epoint->sockd > 0)
104 close(epoint->sockd);
107 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
108 unsigned int *port, fwp_endpoint_attr_t *attr)
110 fwp_endpoint_t *epoint = epointd;
112 *node = epoint->node;
113 *port = epoint->port;
114 *attr = epoint->attr;
119 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
121 bzero(attr, sizeof(fwp_endpoint_attr_t));
122 *attr = fwp_epoint_attr_default;
128 * Creates send endpoint
130 * \param[in] node IP address of destination node
131 * \param[in] port UDP port
132 * \param[in] attr Endpoint attributes
133 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
135 * \return On success returns descriptor of endpoint.
136 * On error, negative error code is returned.
139 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
140 fwp_endpoint_attr_t *attr,
141 fwp_endpoint_d_t *epointdp)
143 struct sockaddr_in *addr;
144 fwp_endpoint_t *epoint;
146 epoint = fwp_endpoint_alloc();
151 epoint->type = FWP_SEND_EPOINT;
152 epoint->status = FWP_EPOINT_UNBOUND;
156 epoint->attr = *attr;
158 epoint->attr = fwp_epoint_attr_default;
160 addr = (struct sockaddr_in *)&(epoint->peer.addr);
161 bzero((char*) addr, sizeof(*addr));
162 addr->sin_family = AF_INET;
163 addr->sin_addr.s_addr = node;
164 addr->sin_port = htons(port);
165 epoint->peer.addrlen = sizeof(struct sockaddr_in);
167 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
168 epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
171 epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
174 if (epoint->sockd < 0) {
175 perror("Unable to open socket");
179 /* Enable broadcasts */
180 unsigned int yes = 1;
181 if (setsockopt(epoint->sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/,
182 &yes, sizeof(yes)) == -1) {
183 perror("Unable to set BROADCAST option for socket");
187 if (connect(epoint->sockd,(struct sockaddr*) &epoint->peer.addr,
188 epoint->peer.addrlen)) {
189 perror("Connect error");
193 FWP_DEBUG("Send endpoint created.\n");
197 fwp_endpoint_destroy(epoint);
202 * Creates receive endpoint
204 * \param[in] port UDP port
205 * \param[in] attr Endpoint attributes
206 * \param[out] epointdp Pointer to the descriptor of newly created endpoint
208 * \return On success returns descriptor of endpoint.
209 * On error, negative error code is returned.
212 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
213 fwp_endpoint_attr_t *attr,
214 fwp_endpoint_d_t *epointdp)
216 fwp_endpoint_t *epoint;
217 struct sockaddr_in *addr;
218 //int rcvbuf_size = 3000;
220 epoint = fwp_endpoint_alloc();
225 epoint->type = FWP_RECV_EPOINT;
226 epoint->status = FWP_EPOINT_UNBOUND;
228 epoint->attr = *attr;
230 epoint->attr = fwp_epoint_attr_default;
232 addr = (struct sockaddr_in *)&(epoint->peer.addr);
233 addr->sin_family = AF_INET;
234 /* TODO: set listen interface, maybe through config struct*/
235 addr->sin_addr.s_addr = INADDR_ANY;
236 addr->sin_port = htons(port);
237 epoint->peer.addrlen = sizeof(struct sockaddr_in);
239 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
240 if ((epoint->sockd = socket(PF_INET, SOCK_STREAM,
242 perror("Unable to open socket");
246 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr,
247 epoint->peer.addrlen) == -1) {
248 perror("Bind error");
252 listen(epoint->sockd, epoint->attr.max_connections);
254 FD_ZERO(&epoint->fdset);
255 FD_SET(epoint->sockd, &epoint->fdset); /*add listen socket */
256 epoint->testfds = epoint->fdset;
257 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
258 bzero(epoint->c_sockd, epoint->attr.max_connections);
259 epoint->nr_connections = 0;
261 FWP_DEBUG("Receive endpoint\n");
264 if ((epoint->sockd = socket(PF_INET, SOCK_DGRAM,
266 perror("Unable to open socket");
270 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr,
271 epoint->peer.addrlen) == -1) {
272 perror("Bind error");
277 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
278 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
280 perror("Unable to set socket buffer size");
283 FWP_DEBUG("Receive endpoint buffer size is set.\n");
287 getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr,
288 &epoint->peer.addrlen);
290 epoint->port = ntohs(addr->sin_port);
292 epoint->node = ntohl(addr->sin_addr.s_addr);
293 FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port);
299 fwp_endpoint_destroy(epoint);
304 * Binds send endpoint to vres
306 * \param[in] vres_id identifier of vres
307 * \param[in] epoint_id send endpoint identifier
309 * \return On success returns 0. On error, negative error code is returned
311 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
313 fwp_endpoint_t *epoint = epointd;
315 if (epoint->type != FWP_SEND_EPOINT) {
319 /* link epoint-vres mutually */
320 pthread_mutex_lock(&fwp_endpoint_table.lock);
321 if (_fwp_vres_bind(vresd, epoint->sockd) < 0) {
322 pthread_mutex_unlock(&fwp_endpoint_table.lock);
326 if (epoint->type == FWP_EPOINT_BOUND) { /* if send endpoint is already bound */
327 fwp_send_endpoint_unbind(epoint);
330 epoint->vresd = vresd;
331 epoint->status = FWP_EPOINT_BOUND;
333 pthread_mutex_unlock(&fwp_endpoint_table.lock);
338 * Unbinds send endpoint from vres
340 * \param[in] id send endpoint identifier
341 * \return On success returns 0. On error, negative error code is returned
344 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
346 fwp_endpoint_t *epoint = epointd;
348 /* unlink epoint-vres mutually */
349 _fwp_vres_unbind(epoint->vresd);
350 epoint->status = FWP_EPOINT_UNBOUND;
358 * \param[in] epointd descriptor of endpoint
359 * \param[in] buffer buffer to store message
360 * \param[in] buffer_size size of buffer
363 * On success, it returns number of received bytes.
364 * On error, negative error code is returned,
367 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
370 fwp_endpoint_t *epoint = epointd;
371 fwp_sockaddr_t *peer = &epoint->peer;
376 if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
377 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
378 peer->addr, &peer->addrlen);
383 /* FWP_EPOINT_RELIABLE */
384 fdset = epoint->fdset;
385 FWP_DEBUG("Before select\n");
386 if (select(FD_SETSIZE, &fdset, (fd_set *)0,
387 (fd_set *)0, NULL) < 0) {
389 perror("Error in select");
392 FWP_DEBUG("After select\n");
394 if (FD_ISSET(epoint->sockd, &fdset)) { /* is it listen socket? */
395 if (epoint->nr_connections == epoint->attr.max_connections)
398 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
401 FWP_DEBUG("New connection accepted\n");
402 /* find free place */
404 while ((epoint->c_sockd[i])&&
405 (i < epoint->nr_connections))
407 epoint->c_sockd[i] = csockd;
408 FWP_DEBUG("Index = %d\n", i);
409 epoint->nr_connections++;
411 FD_SET(csockd, &epoint->fdset);
416 /* Check client TCP sockets */
417 for (i = 0; i < epoint->nr_connections; i++) {
418 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
419 _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
420 0, peer->addr, &peer->addrlen);
422 FWP_DEBUG("Received tcp data\n");
425 /* tcp connection closed */
426 FWP_DEBUG("Connection closed\n");
427 FD_CLR(epoint->c_sockd[i], &epoint->fdset);
428 memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1,
429 sizeof(int)*(epoint->nr_connections -i-1));
436 * Sends message through vres
438 * \param[in] epointd identificator of endpoint
439 * \param[in] msg message to sent
440 * \param[in] size message size
443 * On success, it returns zero.
444 * On error, negative error code is returned,
447 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
449 struct fwp_endpoint *epoint = epointd;
450 struct fwp_msgb *msgb;
452 /* TODO: Validity test of epointd */
453 if (epoint->status != FWP_EPOINT_BOUND) {
457 /*if (flags && MSG_DONTWAIT)
458 msgb = fwp_msgb_alloc(buffer_size);
460 if (!(msgb = fwp_msgb_alloc(size)))
463 msgb->peer = &epoint->peer;
464 /*msgb->data = msg;*/
465 /*msgb->flags = epoint->flags;*/
467 /* data must be copied since msg may change while
468 * message is waiting in transmission queue
470 memcpy(msgb->data, msg, size);
471 fwp_msgb_put(msgb, size);
472 /*msgb->tail = msgb->data + size;
477 /* TODO: test whether _fwp_vres_send is successful */
478 return _fwp_vres_send(epoint->vresd, msgb);