]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
bc9e7a74fdbe675cc0f5648f317152df89e18ea6
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2 #include "fwp_msgb.h"
3 #include <errno.h>
4
5 #include <pthread.h>
6
7 typedef unsigned int fwp_endpoint_id_t;
8
9 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
10         .reliability = FWP_EPOINT_BESTEFFORT, 
11         .max_connections = 20,
12 };
13
14 /**
15  * Structure of FWP endpoint.
16  */
17 struct fwp_endpoint{
18         /** endpoint attributes */
19         fwp_endpoint_attr_t     attr;
20         /** for send enpoint it contains destination address for
21          * receive endpoint it is filled with the msg source address
22          */
23         fwp_vres_d_t            vresd;
24         struct fwp_sockaddr     peer;   
25         /** source/destination port */
26         unsigned int            port;   
27         /** destination node */
28         int                     node;
29         /** Socket descriptor.
30          * In case of rebliable epoint it is a listen socket.
31          * */
32         int                     sockd; 
33         fd_set                  fdset;
34         fd_set                  testfds;
35         int                     *c_sockd;
36         unsigned int            nr_connections;
37         /** specific operation options*/
38         int                     flags;  
39 };
40
41 /**
42  * Allocates endpoint
43  *
44  * \return On success returns endpoint structure. 
45  * On error, NULL is returned. 
46  *
47  */
48 static fwp_endpoint_t* fwp_endpoint_alloc()
49 {
50         return (fwp_endpoint_t*) calloc(1,sizeof(fwp_endpoint_t));
51 }
52
53 /**
54  * Destroy endpoint
55  *
56  * \param[in] epointd Endpoint descriptor
57  * \return On success 0 is returned. 
58  * On error, negative error value is returned and errno is set appropriately. 
59  */
60 int fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
61 {
62         fwp_endpoint_t *epoint;
63
64 /*      epoint = (fwp_endpoint_t*) endpoint->endpoint_protocol_info.send.body;
65         if (epoint->sockd > 0) 
66                 close(epoint->sockd);*/
67         
68         return 0;
69 }
70
71 /**
72  * Get endpoint parameters
73  *
74  * \param[in] epointd Endpoint descriptor
75  * \param[out] node Node identifier
76  * \param[out] port Port
77  * \param[out] attr Endpoint`s attributes
78  * \return On success 0 is returned. 
79  * On error, negative error value is returned. 
80  */
81 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
82                                 unsigned int *port, fwp_endpoint_attr_t *attr)
83 {
84         fwp_endpoint_t *epoint = epointd;
85
86         if (node) *node = epoint->node;
87         if (port) *port = epoint->port;
88         if (attr) *attr = epoint->attr;
89         
90         return 0;
91 }
92
93 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
94 {
95         bzero(attr, sizeof(fwp_endpoint_attr_t));
96         *attr = fwp_epoint_attr_default;
97
98         return 0;
99 }
100
101 /**
102  * Creates send endpoint
103  *
104  * \param[in] node IP address of destination node
105  * \param[in] port UDP port
106  * \param[in] attr Endpoint attributes
107  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
108  *
109  * \return Zero on success, -1 on error and sets errno appropriately. 
110  *
111  */
112 int fwp_send_endpoint_create(unsigned int node,
113                                 unsigned int port, 
114                                 fwp_endpoint_attr_t *attr,
115                                 fwp_endpoint_t **epoint)
116 {       
117         struct sockaddr_in *addr;
118         fwp_endpoint_t *fwp_epoint;
119
120         fwp_epoint = fwp_endpoint_alloc();      
121         if (!fwp_epoint) {
122                 errno = ENOMEM;
123                 return -1;
124         }
125         
126         /*epoint->type = FWP_SEND_EPOINT;
127         epoint->status = FWP_EPOINT_UNBOUND;
128         epoint->node = node;
129         epoint->port = port;
130         */
131         if (attr)
132                 fwp_epoint->attr  = *attr;
133         else
134                 fwp_epoint->attr = fwp_epoint_attr_default;
135                 
136         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
137         bzero((char*) addr, sizeof(*addr));
138         addr->sin_family = AF_INET;
139         addr->sin_addr.s_addr = node;
140         addr->sin_port = htons(port);
141         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
142         
143         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
144                 fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
145                 if (fwp_epoint->sockd < 0) {
146                         goto err;
147                 }
148         } else {
149                 fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
150                 if (fwp_epoint->sockd < 0) {
151                         goto err;
152                 }
153         
154                 /* Enable broadcasts */
155                 /*unsigned int yes = 1;
156                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_BROADCAST, 
157                                &yes, sizeof(yes)) == -1) {
158                         FWP_DEBUG("setsockopt(SO_BROADCAST): %s", strerror(errno));
159                         goto err;
160                 }*/
161         
162         }
163         
164         if (connect(fwp_epoint->sockd,
165                         (struct sockaddr*) &fwp_epoint->peer.addr, 
166                         fwp_epoint->peer.addrlen)) {
167                 FWP_DEBUG("FWp connect error\n"); 
168                 goto err;
169         }
170         
171         FWP_DEBUG("FWP Send endpoint created.\n"); 
172         *epoint = fwp_epoint;
173         return 0;               
174 err:
175         fwp_endpoint_destroy(fwp_epoint);
176         return -1;      
177 }
178
179 /**
180  * Creates receive endpoint
181  *
182  * \param[in] port UDP port
183  * \param[in] attr Endpoint attributes
184  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
185  *
186  * \return Zero on success, -1 on error and errno is set.
187  */
188 int fwp_receive_endpoint_create(unsigned int port,
189                                 fwp_endpoint_attr_t *attr,
190                                 fwp_endpoint_t **epoint)
191 {
192         struct sockaddr_in *addr;
193         fwp_endpoint_t *fwp_epoint;
194
195         fwp_epoint = fwp_endpoint_alloc();      
196         if (!fwp_epoint) {
197                 errno = ENOMEM;
198                 return -1;
199         }
200         
201         /*epoint->type = FWP_RECV_EPOINT;
202         epoint->status = FWP_EPOINT_UNBOUND;*/
203         
204         if (attr)
205                 fwp_epoint->attr  = *attr;
206         else
207                 fwp_epoint->attr = fwp_epoint_attr_default;
208
209         addr = (struct sockaddr_in *)&(fwp_epoint->peer.addr);
210         addr->sin_family = AF_INET;
211         /* TODO: set listen interface, maybe through config struct*/
212         addr->sin_addr.s_addr = INADDR_ANY;
213         addr->sin_port = htons(port);
214         fwp_epoint->peer.addrlen = sizeof(struct sockaddr_in);
215         
216         if (fwp_epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
217                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_STREAM, 
218                                                 IPPROTO_TCP)) < 0) {
219                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
220                         goto err;
221                 }       
222                 
223                 int yes = 1;
224                 if (setsockopt(fwp_epoint->sockd,SOL_SOCKET, SO_REUSEADDR,
225                                &yes, sizeof(yes)) == -1) {
226                         FWP_ERROR("setsockopt(SO_REUSEADDR):%s",strerror(errno));
227                         goto err;
228                 }
229
230                 if (bind(fwp_epoint->sockd, (struct sockaddr*) &fwp_epoint->peer.addr, 
231                                 fwp_epoint->peer.addrlen) == -1) {
232                         FWP_ERROR("Bind error: %s", strerror(errno));
233                         /* TODO: remove all error messages from all libraries */
234                         goto err;
235                 }
236                 
237                 listen(fwp_epoint->sockd, fwp_epoint->attr.max_connections); 
238                 
239                 FD_ZERO(&fwp_epoint->fdset);
240                 /*add listen socket */
241                 FD_SET(fwp_epoint->sockd, &fwp_epoint->fdset); 
242                 fwp_epoint->testfds = fwp_epoint->fdset;
243                 fwp_epoint->c_sockd = 
244                                 (int*)malloc(fwp_epoint->attr.max_connections);
245                 bzero(fwp_epoint->c_sockd, fwp_epoint->attr.max_connections);
246                 fwp_epoint->nr_connections = 0;
247
248                 FWP_DEBUG("Receive endpoint\n");
249
250         } else {
251                 if ((fwp_epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
252                                                 IPPROTO_UDP)) < 0) {
253                         FWP_ERROR("Unable to open socket: %s", strerror(errno));
254                         goto err;
255                 }
256                 
257                 if (bind(fwp_epoint->sockd, 
258                         (struct sockaddr*) &fwp_epoint->peer.addr, 
259                         fwp_epoint->peer.addrlen) == -1) {
260                         
261                         FWP_ERROR("Bind error: %s", strerror(errno));
262                         goto err;
263                 }
264         }
265                 
266         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
267                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
268                 
269                 FWP_ERROR("Unable to set socket buffer size: %s", strerror(errno));
270                 return -1;
271         }else {
272                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
273         }
274         */
275         
276         getsockname(fwp_epoint->sockd, (struct sockaddr*)&fwp_epoint->peer.addr, 
277                         &fwp_epoint->peer.addrlen);
278         
279         FWP_DEBUG("Receive endpoint port=%d created.\n", fwp_epoint->port); 
280         *epoint = fwp_epoint;   
281         return 0;
282 err:
283         fwp_endpoint_destroy(fwp_epoint);
284         return -1;
285 }
286
287 /**
288  * Binds send endpoint to vres
289  *
290  * \param[in] vres_id identifier of vres
291  * \param[in] epoint_id send endpoint identifier
292  *
293  * \return On success returns 0. On error, -1 and errno is set appropriately.
294  */
295 int fwp_send_endpoint_bind(fwp_vres_d_t vresd, fwp_endpoint_t *epoint)
296 {
297         int rv ;
298         fwp_endpoint_t *fwp_epoint = epoint;
299         
300         fwp_epoint->vresd = vresd;      
301         rv = fwp_vres_bind(vresd, fwp_epoint->sockd);
302         /* if send endpoint is already bound 
303         if (epoint->type == FWP_EPOINT_BOUND) {  
304                 fwp_send_endpoint_unbind(epoint);
305         }*/
306         
307         return rv;
308 }
309
310 /**
311  * Unbinds send endpoint from vres
312  *
313  * \param[in] epointd Send endpoint descriptor 
314  * \return On success returns 0. On error, -1 is returned and errno is set appropriately.
315  *
316  */
317 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
318 {
319         int rv = 0;
320         fwp_endpoint_t *fwp_epoint = epointd;
321
322         /* unlink epoint-vres mutually */
323         if ((rv = fwp_vres_unbind(fwp_epoint->vresd)) < 0) 
324                 return rv;
325
326         return 0;
327 }
328
329 int fwp_receive_endpoint_accept(fwp_endpoint_d_t epointd)
330 {
331         int csockd;
332         fwp_endpoint_t *fwp_epoint = epointd;
333         fwp_sockaddr_t  peer;
334         int i;
335
336         if (fwp_epoint->nr_connections == fwp_epoint->attr.max_connections)
337                 return -1;
338
339         csockd = accept(fwp_epoint->sockd, (struct sockaddr*)peer.addr,
340                         &peer.addrlen);
341                 
342         FWP_DEBUG("New connection accepted\n");
343         /* find free place */           
344         i = 0;
345         while ((fwp_epoint->c_sockd[i])&& (i < fwp_epoint->nr_connections)) 
346                                 i++;
347         fwp_epoint->c_sockd[i] = csockd; 
348         FWP_DEBUG("Index = %d\n", i);
349         fwp_epoint->nr_connections++;
350                 
351         FD_SET(csockd, &fwp_epoint->fdset);
352         return 0;       
353
354
355 /**
356  * Receives message from stream (TCP)
357  *
358  * \param[in] epointd Descriptor of endpoint
359  * \param[in] buffer Buffer to store message
360  * \param[in] buffer_size Size of buffer
361  *
362  * \return
363  * On success, it returns number of received bytes.  
364  * On error, -1 is returned and errno is set appropriately.
365  *
366  */
367 int fwp_recv_conn(fwp_endpoint_d_t epointd, void *buffer, 
368                                 size_t buffer_size)
369 {
370         fwp_endpoint_t *fwp_epoint = epointd;
371         fwp_sockaddr_t *peer = &fwp_epoint->peer;
372         fd_set fdset;
373         ssize_t len;
374         int i;
375
376         for (i = 0; i < fwp_epoint->nr_connections; i++) {
377                 if (!FD_ISSET(fwp_epoint->c_sockd[i], &fdset)) {
378                         continue;       
379                 }       
380                         
381                 len = _fwp_recvfrom(fwp_epoint->c_sockd[i], buffer, 
382                                         buffer_size,0, peer);
383
384                 if (len < 0) /* Error */
385                         return len;
386                 
387                 FWP_DEBUG("Received tcp data\n");
388                 if (len)
389                         return len;
390         
391                 /* tcp connection closed */
392                 FWP_DEBUG("Connection closed\n");
393                 FD_CLR(fwp_epoint->c_sockd[i], &fwp_epoint->fdset);
394                 memcpy(fwp_epoint->c_sockd+i, fwp_epoint->c_sockd+i+1, 
395                         sizeof(int)*(fwp_epoint->nr_connections -i-1));
396                 fwp_epoint->nr_connections--;
397                 return 0;
398         }
399         return 0;
400 }
401
402 /**
403  * Receives message
404  *
405  * \param[in] epointd Descriptor of endpoint
406  * \param[in] buffer Buffer to store message
407  * \param[in] buffer_size Size of buffer
408  *
409  * \return
410  * On success, it returns number of received bytes.  
411  * On error, -1 is returned and errno is set appropriately.
412  *
413  */
414 ssize_t fwp_recv(fwp_endpoint_t *endpoint,
415                         void *buffer, const size_t buffer_size,
416                         unsigned int *from, int flags)
417 {
418         fwp_sockaddr_t *peer = &endpoint->peer;
419         ssize_t len;
420         fd_set fdset;
421         fwp_endpoint_t *fwp_epoint = endpoint;
422         
423 /*      if (!fwp_endpoint_is_valid(epointd)) {
424                 errno = EINVAL;
425                 return -1;
426         }*/
427         
428         if (fwp_epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {    
429                 len = _fwp_recvfrom(fwp_epoint->sockd, buffer, 
430                                         buffer_size, 0, peer);
431                 return len;
432         }
433         
434         while (1){
435         /* FIXME: What about using a loop here and continue instead of goto???? */
436                 /* FWP_EPOINT_RELIABLE */
437                 fdset = fwp_epoint->fdset;
438                 if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
439                            (fd_set *)0, NULL) < 0) {
440                 
441                         FWP_ERROR("Error in select: %s", strerror(errno));
442                         return -1;
443                 }
444         
445                 if (FD_ISSET(fwp_epoint->sockd, &fdset)) { /* is it listen socket? */
446                         fwp_receive_endpoint_accept(endpoint);
447                         continue;
448                 }
449
450                 /* Check client TCP sockets */
451                 len = fwp_recv_conn(endpoint, buffer, buffer_size);
452                 if (len)
453                         return len;
454         }
455 }
456
457 /**
458  * Sends message through vres
459  *
460  * \param[in] epointd Endpoint descriptor
461  * \param[in] msg Message to sent
462  * \param[in] size Message size
463  *
464  * \return
465  * On success, it returns zero.  
466  * On error, -1 is returned and errno is set appropriately.
467  *
468  */
469 int fwp_send(fwp_endpoint_t *fwp_epoint,const void *msg, const size_t size, int flags)
470 {
471         struct fwp_msgb *msgb;
472         /*fwp_endpoint_t *fwp_epoint;*/
473         
474 /*      if (!fwp_endpoint_is_valid(epointd)){
475                 errno = EINVAL;
476                 return -1;
477         }
478         if (!fwp_endpoint_is_bound(epointd)){
479                 errno = EPERM;
480                 return -1;
481         }*/
482
483         /*if (flags && MSG_DONTWAIT) 
484                 msgb = fwp_msgb_alloc(buffer_size);
485         else {*/
486                 if (!(msgb = fwp_msgb_alloc(size))) {
487                         errno = ENOMEM;
488                         return -1;
489                 }
490
491                 /*msgb->peer = &fwp_epoint->peer;*/
492                 /*msgb->data = msg;*/
493                 /*msgb->flags = epoint->flags;*/
494                 
495                 /* data must be copied since msg may change while
496                  * message is waiting in transmission queue 
497                  * */
498                 memcpy(msgb->data, msg, size);
499                 fwp_msgb_put(msgb, size);
500                 /*msgb->tail = msgb->data + size;
501                 msgb->len = size;*/
502
503         /*}*/
504
505         /* TODO: test whether _fwp_vres_send is successful */
506         return fwp_vres_send(fwp_epoint->vresd, msgb);
507 }
508