]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
FWP_FNA compilation fixes
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2 #include "fwp_msgb.h"
3 #include <errno.h>
4
5 #include <pthread.h>
6
7 typedef unsigned int fwp_endpoint_id_t;
8
9 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
10         .reliability = FWP_EPOINT_BESTEFFORT, 
11         .max_connections = 20,
12 };
13
14 /**
15  * Structure of FWP endpoint.
16  */
17 struct fwp_endpoint{
18         /** endpoint attributes */
19         fwp_endpoint_attr_t     attr;
20         /** for send enpoint it contains destination address for
21          * receive endpoint it is filled with the msg source address
22          */
23         fwp_vres_d_t            vresd;
24         struct fwp_sockaddr     peer;   
25         /** source/destination port */
26         unsigned int            port;   
27         /** destination node */
28         int                     node;
29         /** Socket descriptor.
30          * In case of rebliable epoint it is a listen socket.
31          * */
32         int                     sockd; 
33         fd_set                  fdset;
34         fd_set                  testfds;
35         int                     *c_sockd;
36         unsigned int            nr_connections;
37         /** specific operation options*/
38         int                     flags;  
39 };
40
41 /**
42  * Allocates endpoint
43  *
44  * \return On success returns endpoint structure. 
45  * On error, NULL is returned. 
46  *
47  */
48 static fwp_endpoint_t* fwp_endpoint_alloc()
49 {
50         return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
51 }
52
53 /**
54  * Destroy endpoint
55  *
56  * \param[in] epointd Endpoint descriptor
57  * \return On success 0 is returned. 
58  * On error, negative error value is returned and errno is set appropriately. 
59  */
60 /*int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
61 {
62         fwp_endpoint_t *epoint;
63         
64         epoint = (fwp_endpoint_t*) endpoint->endpoint_protocol_info.send.body;
65         if (epoint->sockd > 0) 
66                 close(epoint->sockd);
67         
68         return 0;
69 }*/
70
71 /**
72  * Get endpoint parameters
73  *
74  * \param[in] epointd Endpoint descriptor
75  * \param[out] node Node identifier
76  * \param[out] port Port
77  * \param[out] attr Endpoint`s attributes
78  * \return On success 0 is returned. 
79  * On error, negative error value is returned. 
80  */
81 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
82                                 unsigned int *port, fwp_endpoint_attr_t *attr)
83 {
84         fwp_endpoint_t *epoint = epointd;
85
86         if (node) *node = epoint->node;
87         if (port) *port = epoint->port;
88         if (attr) *attr = epoint->attr;
89         
90         return 0;
91 }
92
93 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
94 {
95         bzero(attr, sizeof(fwp_endpoint_attr_t));
96         *attr = fwp_epoint_attr_default;
97
98         return 0;
99 }
100
101 /**
102  * Creates send endpoint
103  *
104  * \param[in] node IP address of destination node
105  * \param[in] port UDP port
106  * \param[in] attr Endpoint attributes
107  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
108  *
109  * \return Zero on success, -1 on error and sets errno appropriately. 
110  *
111  */
112 int fwp_send_endpoint_create(unsigned int node,
113                                 unsigned int port, 
114                                 fwp_endpoint_attr_t *attr,
115                                 fwp_endpoint_t **epoint)
116 {       
117         struct sockaddr_in *addr;
118         fwp_endpoint_t *fwp_epoint;
119
120         fwp_epoint = fwp_endpoint_alloc();      
121         if (!fwp_epoint) {
122                 errno = ENOMEM;
123                 return -1;
124         }
125         
126         /*epoint->type = FWP_SEND_EPOINT;
127         epoint->status = FWP_EPOINT_UNBOUND;
128         epoint->node = node;
129         epoint->port = port;
130         */
131         if (attr)
132                 fwp_epoint->attr  = *attr;
133         else
134                 fwp_epoint->attr = fwp_epoint_attr_default;
135                 
136         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
137         bzero((char*) addr, sizeof(*addr));
138         addr->sin_family = AF_INET;
139         addr->sin_addr.s_addr = node;
140         addr->sin_port = htons(port);
141         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
142         
143         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
144                 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
145                 if (fwp_epoint->sockd < 0) {
146                         goto err;
147                 }
148         } else {
149                 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
150                 if (fwp_epoint->sockd < 0) {
151                         goto err;
152                 }
153         
154                 /* Enable broadcasts */
155                 /*unsigned int yes = 1;
156                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
157                                &yes, sizeof(yes)) == -1) {
158                         FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
159                         goto err;
160                 }*/
161         
162         }
163         
164         if (connect(fwp_epoint->sockd,
165                         (struct sockaddr*) &fwp_epoint->peer.addr, 
166                         fwp_epoint->peer.addrlen)) {
167                 goto err;
168         }
169         
170         FWP_DEBUG("FWP Send endpoint created.\n"); 
171         *epoint = fwp_epoint;
172         return 0;               
173 err:
174         fwp_endpoint_destroy(fwp_epoint);
175         return -1;      
176 }
177
178 /**
179  * Creates receive endpoint
180  *
181  * \param[in] port UDP port
182  * \param[in] attr Endpoint attributes
183  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
184  *
185  * \return Zero on success, -1 on error and errno is set.
186  */
187 int fwp_receive_endpoint_create(unsigned int port,
188                                 fwp_endpoint_attr_t *attr,
189                                 fwp_endpoint_t **epoint)
190 {
191         struct sockaddr_in *addr;
192         fwp_endpoint_t *fwp_epoint;
193
194         fwp_epoint = fwp_endpoint_alloc();      
195         if (!fwp_epoint) {
196                 errno = ENOMEM;
197                 return -1;
198         }
199         
200         /*epoint->type = FWP_RECV_EPOINT;
201         epoint->status = FWP_EPOINT_UNBOUND;*/
202         
203         if (attr)
204                 fwp_epoint->attr  = *attr;
205         else
206                 fwp_epoint->attr = fwp_epoint_attr_default;
207
208         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
209         addr->sin_family = AF_INET;
210         /* TODO: set listen interface, maybe through config struct*/
211         addr->sin_addr.s_addr = INADDR_ANY;
212         addr->sin_port = htons(port);
213         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
214         
215         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
216                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
217                                                 IPPROTO_TCP)) < 0) {
218                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
219                         goto err;
220                 }       
221                 
222                 int yes = 1;
223                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
224                                &yes, sizeof(yes)) == -1) {
225                         FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
226                         goto err;
227                 }
228
229                 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
230                                 fwp_epoint->peer.addrlen) == -1) {
231                         FWP_ERROR("Bind error: %s", strerror(errno));
232                         /* TODO: remove all error messages from all libraries */
233                         goto err;
234                 }
235                 
236                 listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections); 
237                 
238                 FD_ZERO(&fwp_epoint->fdset);
239                 /*add listen socket */
240                 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
241                 fwp_epoint->testfds = fwp_epoint->fdset;
242                 fwp_epoint->c_sockd = 
243                                 (int*)malloc(fwp_epoint->attr.max_connections);
244                 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
245                 fwp_epoint->nr_connections = 0;
246
247                 FWP_DEBUG("Receive endpoint\n");
248
249         } else {
250                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
251                                                 IPPROTO_UDP)) < 0) {
252                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
253                         goto err;
254                 }
255                 
256                 if (bind(fwp_epoint->sockd, 
257                         (struct sockaddr*) &fwp_epoint->peer.addr, 
258                         fwp_epoint->peer.addrlen) == -1) {
259                         
260                         FWP_ERROR("Bind error: %s", strerror(errno));
261                         goto err;
262                 }
263         }
264                 
265         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
266                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
267                 
268                 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
269                 return -1;
270         }else {
271                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
272         }
273         */
274         
275         getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
276                         &fwp_epoint->peer.addrlen);
277         
278         FWP_DEBUG("Receive endpoint port=%d created.\n", fwp_epoint->port); 
279         *epoint = fwp_epoint;   
280         return 0;
281 err:
282         fwp_endpoint_destroy(fwp_epoint);
283         return -1;
284 }
285
286 /**
287  * Binds send endpoint to vres
288  *
289  * \param[in] vres_id identifier of vres
290  * \param[in] epoint_id send endpoint identifier
291  *
292  * \return On success returns 0. On error, -1 and errno is set appropriately.
293  */
294 int fwp_send_endpoint_bind(fwp_endpoint_t *epoint, fwp_vres_d_t vresd)
295 {
296         int rv ;
297         fwp_endpoint_t *fwp_epoint = epoint;
298         
299         rv = _fwp_vres_bind(vresd, fwp_epoint->sockd);
300         /* if send endpoint is already bound 
301         if (epoint->type == FWP_EPOINT_BOUND) {  
302                 fwp_send_endpoint_unbind(epoint);
303         }*/
304         
305         return rv;
306 }
307
308 /**
309  * Unbinds send endpoint from vres
310  *
311  * \param[in] epointd Send endpoint descriptor 
312  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
313  *
314  */
315 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
316 {
317         int rv = 0;
318         fwp_endpoint_t *fwp_epoint = epointd;
319
320         /* unlink epoint-vres mutually */
321         if ((rv = _fwp_vres_unbind(fwp_epoint->vresd)) < 0) 
322                 return rv;
323
324         return 0;
325 }
326
327 int fwp_receive_endpoint_accept(fwp_endpoint_d_t epointd)
328 {
329         int csockd;
330         fwp_endpoint_t *fwp_epoint = epointd;
331         fwp_sockaddr_t  peer;
332         int i;
333
334         if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
335                 return -1;
336
337         csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
338                         &peer.addrlen);
339                 
340         FWP_DEBUG("New connection accepted\n");
341         /* find free place */           
342         i = 0;
343         while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
344                                 i++;
345         fwp_epoint->c_sockd[i] = csockd; 
346         FWP_DEBUG("Index = %d\n", i);
347         fwp_epoint->nr_connections++;
348                 
349         FD_SET(csockd, &fwp_epoint->fdset);
350         return 0;       
351
352
353 /**
354  * Receives message from stream (TCP)
355  *
356  * \param[in] epointd Descriptor of endpoint
357  * \param[in] buffer Buffer to store message
358  * \param[in] buffer_size Size of buffer
359  *
360  * \return
361  * On success, it returns number of received bytes.  
362  * On error, -1 is returned and errno is set appropriately.
363  *
364  */
365 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer, 
366                                 size_t buffer_size)
367 {
368         fwp_endpoint_t *fwp_epoint = epointd;
369         fwp_sockaddr_t *peer = &fwp_epoint->peer;
370         fd_set fdset;
371         ssize_t len;
372         int i;
373
374         for (i = 0; i < fwp_epoint->nr_connections; i++) {
375                 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
376                         continue;       
377                 }       
378                         
379                 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer, 
380                                         buffer_size,0, peer);
381
382                 if (len < 0) /* Error */
383                         return len;
384                 
385                 FWP_DEBUG("Received tcp data\n");
386                 if (len)
387                         return len;
388         
389                 /* tcp connection closed */
390                 FWP_DEBUG("Connection closed\n");
391                 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
392                 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1, 
393                         sizeof(int)*(fwp_epoint->nr_connections -i-1));
394                 fwp_epoint->nr_connections--;
395                 return 0;
396         }
397         return 0;
398 }
399
400 /**
401  * Receives message
402  *
403  * \param[in] epointd Descriptor of endpoint
404  * \param[in] buffer Buffer to store message
405  * \param[in] buffer_size Size of buffer
406  *
407  * \return
408  * On success, it returns number of received bytes.  
409  * On error, -1 is returned and errno is set appropriately.
410  *
411  */
412 ssize_t fwp_recv(fwp_endpoint_t *endpoint,
413                         void *buffer, const size_t buffer_size,
414                         unsigned int *from, int flags)
415 {
416         fwp_sockaddr_t *peer = &endpoint->peer;
417         ssize_t len;
418         fd_set fdset;
419         fwp_endpoint_t *fwp_epoint = endpoint;
420         
421 /*      if (!fwp_endpoint_is_valid(epointd)) {
422                 errno = EINVAL;
423                 return -1;
424         }*/
425         
426         if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
427                 len = _fwp_recvfrom(fwp_epoint->sockd, buffer, 
428                                         buffer_size, 0, peer);
429                 return len;
430         }
431         
432         while (1){
433         /* FIXME: What about using a loop here and continue instead of goto???? */
434                 /* FWP_EPOINT_RELIABLE */
435                 fdset = fwp_epoint->fdset;
436                 if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
437                            (fd_set *)0, NULL) < 0) {
438                 
439                         FWP_ERROR("Error in select: %s", strerror(errno));
440                         return -1;
441                 }
442         
443                 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
444                         fwp_receive_endpoint_accept(endpoint);
445                         continue;
446                 }
447
448                 /* Check client TCP sockets */
449                 len = fwp_recv_conn(endpoint, buffer, buffer_size);
450                 if (len)
451                         return len;
452         }
453 }
454
455 /**
456  * Sends message through vres
457  *
458  * \param[in] epointd Endpoint descriptor
459  * \param[in] msg Message to sent
460  * \param[in] size Message size
461  *
462  * \return
463  * On success, it returns zero.  
464  * On error, -1 is returned and errno is set appropriately.
465  *
466  */
467 int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
468 {
469         struct fwp_msgb *msgb;
470         /*fwp_endpoint_t *fwp_epoint;*/
471         
472 /*      if (!fwp_endpoint_is_valid(epointd)){
473                 errno = EINVAL;
474                 return -1;
475         }
476         if (!fwp_endpoint_is_bound(epointd)){
477                 errno = EPERM;
478                 return -1;
479         }*/
480
481         /*if (flags && MSG_DONTWAIT) 
482                 msgb = fwp_msgb_alloc(buffer_size);
483         else {*/
484                 if (!(msgb = fwp_msgb_alloc(size))) {
485                         errno = ENOMEM;
486                         return -1;
487                 }
488
489                 msgb->peer = &fwp_epoint->peer;
490                 /*msgb->data = msg;*/
491                 /*msgb->flags = epoint->flags;*/
492                 
493                 /* data must be copied since msg may change while
494                  * message is waiting in transmission queue 
495                  * */
496                 memcpy(msgb->data, msg, size);
497                 fwp_msgb_put(msgb, size);
498                 /*msgb->tail = msgb->data + size;
499                 msgb->len = size;*/
500
501         /*}*/
502
503         /* TODO: test whether _fwp_vres_send is successful */
504         return _fwp_vres_send(fwp_epoint->vresd, msgb);
505 }
506