]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
b6c8f1c8ac132678406e00e5f03021b9b1f5e38b
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2
3 typedef enum {
4         FWP_SEND_EPOINT = 0,
5         FWP_RECV_EPOINT = 1,
6 } fwp_endpoint_type_t;
7
8 typedef enum {
9         FWP_EPOINT_FREE         = 0,
10         FWP_EPOINT_INACTIVE     = 1,
11         FWP_EPOINT_UNBOUND      = 2,
12         FWP_EPOINT_BOUND        = 3,
13 } fwp_endpoint_status_t;
14
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16         .reliability = FWP_EPOINT_BESTEFFORT, 
17         .max_connections = 20,
18 };
19
20 /**
21  * Structure of FWP endpoint.
22  */
23 struct fwp_endpoint{
24         fwp_endpoint_type_t     type;
25         /**< endpoint attributes */
26         fwp_endpoint_attr_t     attr;
27         /**< the vres descriptor the send endpoint is bound to */       
28         fwp_vres_d_t            vresd;
29         /**< for send enpoint it contains destination address
30          * for receive endpoint it is filled with the msg source address
31          */
32         struct fwp_sockaddr     peer;   
33         /**< source/destination port */
34         unsigned int            port;   
35         /**< destination node */
36         int                     node;
37         /**< socket descriptor
38          * in case of rebliable epoint it is a listen socket
39          * */
40         int                     sockd; 
41         fd_set                  fdset;
42         fd_set                  testfds;
43         int                     *c_sockd;
44         unsigned int            nr_connections;
45         /**< specific operation options*/
46         int                     flags;  
47         fwp_endpoint_status_t   status;
48 };
49
50 typedef
51 struct fwp_endpoint_table {
52         unsigned int                    nr_endpoints;
53         fwp_endpoint_t                  *entry;
54         pthread_mutex_t                 lock;
55 } fwp_endpoint_table_t;
56
57 /* Global variable - endpoint table */
58 static fwp_endpoint_table_t  fwp_endpoint_table = {
59         .nr_endpoints = 0,
60         .entry = NULL,
61         .lock = PTHREAD_MUTEX_INITIALIZER,
62 };
63
64 int fwp_endpoint_table_init(unsigned int nr_endpoints)
65 {
66         int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
67
68         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
69         if (!fwp_endpoint_table.entry)
70                 return -ENOMEM;
71         memset(fwp_endpoint_table.entry, 0, table_size);
72         fwp_endpoint_table.nr_endpoints = nr_endpoints;
73         return 0;
74 }
75
76 static fwp_endpoint_t* fwp_endpoint_alloc()
77 {
78         int i, nr_endpoints;
79
80         /* find free vres id */
81         pthread_mutex_lock(&fwp_endpoint_table.lock);
82         i = 0;
83         nr_endpoints = fwp_endpoint_table.nr_endpoints;
84         while ((i < nr_endpoints) && 
85                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
86                 i++;
87         
88         if (i == nr_endpoints) {
89                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
90                 return NULL;
91         }
92
93         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
94         pthread_mutex_unlock(&fwp_endpoint_table.lock);
95         return (&fwp_endpoint_table.entry[i]);
96 }
97
98 void fwp_endpoint_destroy(fwp_endpoint_d_t epointd)
99 {
100         fwp_endpoint_t *epoint = epointd;
101         
102         epoint->status = FWP_EPOINT_FREE;
103         if (epoint->sockd > 0) 
104                 close(epoint->sockd);
105 }
106
107 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
108                                 unsigned int *port, fwp_endpoint_attr_t *attr)
109 {
110         fwp_endpoint_t *epoint = epointd;
111
112         *node = epoint->node;
113         *port = epoint->port;
114         *attr = epoint->attr;
115         
116         return 0;
117 }
118
119 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
120 {
121         bzero(attr, sizeof(fwp_endpoint_attr_t));
122         *attr = fwp_epoint_attr_default;
123
124         return 0;
125 }
126
127 /**
128  * Creates send endpoint
129  *
130  * \param[in] node IP address of destination node
131  * \param[in] port UDP port
132  * \param[in] attr Endpoint attributes
133  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
134  *
135  * \return On success returns descriptor of endpoint. 
136  * On error, negative error code is returned. 
137  *
138  */
139 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
140                                 fwp_endpoint_attr_t *attr, 
141                                 fwp_endpoint_d_t *epointdp)
142 {       
143         struct sockaddr_in *addr;
144         fwp_endpoint_t *epoint;
145
146         epoint = fwp_endpoint_alloc();  
147         if (!epoint) {
148                 return -ENOMEM;
149         }
150         
151         epoint->type = FWP_SEND_EPOINT;
152         epoint->status = FWP_EPOINT_UNBOUND;
153         epoint->node = node;
154         epoint->port = port;
155         if (attr) 
156                 epoint->attr  = *attr;
157         else 
158                 epoint->attr = fwp_epoint_attr_default;
159                 
160         addr = (struct sockaddr_in *)&(epoint->peer.addr);
161         bzero((char*) addr, sizeof(*addr));
162         addr->sin_family = AF_INET;
163         addr->sin_addr.s_addr = node;
164         addr->sin_port = htons(port);
165         epoint->peer.addrlen = sizeof(struct sockaddr_in);
166         
167         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
168                 epoint->sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
169
170         } else {
171                 epoint->sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
172         }
173         
174         if (epoint->sockd < 0) {
175                         perror("Unable to open socket");
176                         goto err;
177         }
178         
179         /* Enable broadcasts */
180         unsigned int yes = 1;
181         if (setsockopt(epoint->sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/, 
182                         &yes, sizeof(yes)) == -1) {
183                 perror("Unable to set BROADCAST option for socket");
184                 return (-errno);
185         }
186         
187         if (connect(epoint->sockd,(struct sockaddr*) &epoint->peer.addr, 
188                         epoint->peer.addrlen)) {
189                 perror("Connect error");
190                 goto err;
191         }
192         
193         FWP_DEBUG("Send endpoint created.\n"); 
194         *epointdp = epoint;
195         return 0;               
196 err:
197         fwp_endpoint_destroy(epoint);
198         return (-errno);        
199 }
200
201 /**
202  * Creates receive endpoint
203  *
204  * \param[in] port UDP port
205  * \param[in] attr Endpoint attributes
206  * \param[out] epointdp  Pointer to the descriptor of newly created endpoint
207  *
208  * \return On success returns descriptor of endpoint. 
209  * On error, negative error code is returned. 
210  *
211  */
212 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
213                                 fwp_endpoint_attr_t *attr, 
214                                 fwp_endpoint_d_t *epointdp)
215 {
216         fwp_endpoint_t *epoint;
217         struct sockaddr_in *addr;
218         //int rcvbuf_size = 3000;
219
220         epoint = fwp_endpoint_alloc();  
221         if (!epoint) {
222                 return -ENOMEM;
223         }
224         
225         epoint->type = FWP_RECV_EPOINT;
226         epoint->status = FWP_EPOINT_UNBOUND;
227         if (attr) 
228                 epoint->attr  = *attr;
229         else 
230                 epoint->attr = fwp_epoint_attr_default;
231         
232         addr = (struct sockaddr_in *)&(epoint->peer.addr);
233         addr->sin_family = AF_INET;
234         /* TODO: set listen interface, maybe through config struct*/
235         addr->sin_addr.s_addr = INADDR_ANY;
236         addr->sin_port = htons(port);
237         epoint->peer.addrlen = sizeof(struct sockaddr_in);
238         
239         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
240                 if ((epoint->sockd = socket(PF_INET, SOCK_STREAM, 
241                                                 IPPROTO_TCP)) < 0) {
242                         perror("Unable to open socket");
243                         goto err;
244                 }       
245                 
246                 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
247                                 epoint->peer.addrlen) == -1) {
248                         perror("Bind error");
249                         goto err;
250                 }
251                 
252                 listen(epoint->sockd, epoint->attr.max_connections); 
253                 
254                 FD_ZERO(&epoint->fdset);
255                 FD_SET(epoint->sockd, &epoint->fdset); /*add listen socket */
256                 epoint->testfds = epoint->fdset;
257                 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
258                 bzero(epoint->c_sockd, epoint->attr.max_connections);
259                 epoint->nr_connections = 0;
260
261                 FWP_DEBUG("Receive endpoint\n");
262
263         } else {
264                 if ((epoint->sockd = socket(PF_INET, SOCK_DGRAM, 
265                                                 IPPROTO_UDP)) < 0) {
266                         perror("Unable to open socket");
267                         goto err;
268                 }
269                 
270                 if (bind(epoint->sockd, (struct sockaddr*) &epoint->peer.addr, 
271                                 epoint->peer.addrlen) == -1) {
272                         perror("Bind error");
273                         goto err;
274                 }
275         }
276                 
277         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
278                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
279                 
280                 perror("Unable to set socket buffer size");
281                 return -1;
282         }else {
283                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
284         }
285         */
286         
287         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
288                         &epoint->peer.addrlen);
289         
290         epoint->port = ntohs(addr->sin_port);
291         /*TODO: set node*/
292         epoint->node =  ntohl(addr->sin_addr.s_addr);
293         FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port); 
294         
295         *epointdp = epoint; 
296         return 0;
297
298 err:    
299         fwp_endpoint_destroy(epoint);
300         return (-errno);        
301 }
302
303 /**
304  * Binds send endpoint to vres
305  *
306  * \param[in] vres_id identifier of vres
307  * \param[in] epoint_id send endpoint identifier
308  *
309  * \return On success returns 0. On error, negative error code is returned 
310  */
311 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
312 {
313         fwp_endpoint_t *epoint = epointd;
314         
315         if (epoint->type != FWP_SEND_EPOINT) {  
316                 return (-EINVAL);
317         }
318         
319         /* link epoint-vres mutually */
320         pthread_mutex_lock(&fwp_endpoint_table.lock);
321         if (_fwp_vres_bind(vresd, epoint->sockd) < 0) { 
322                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
323                 return -EPERM;
324         }
325
326         if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
327                 fwp_send_endpoint_unbind(epoint);
328         }
329
330         epoint->vresd = vresd;
331         epoint->status = FWP_EPOINT_BOUND;
332         
333         pthread_mutex_unlock(&fwp_endpoint_table.lock);
334         return 0;
335 }
336
337 /**
338  * Unbinds send endpoint from vres
339  *
340  * \param[in] id send endpoint identifier 
341  * \return On success returns 0. On error, negative error code is returned 
342  *
343  */
344 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
345 {
346         fwp_endpoint_t *epoint = epointd;
347         
348         /* unlink epoint-vres mutually */
349         _fwp_vres_unbind(epoint->vresd);
350         epoint->status = FWP_EPOINT_UNBOUND;
351
352         return 0;
353 }
354
355 /**
356  * Receives message
357  *
358  * \param[in] epointd descriptor of endpoint
359  * \param[in] buffer buffer to store message
360  * \param[in] buffer_size size of buffer
361  *
362  * \return
363  * On success, it returns number of received bytes.  
364  * On error, negative error code is returned,
365  *
366  */
367 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
368                         int flags)
369 {
370         fwp_endpoint_t *epoint = epointd;
371         fwp_sockaddr_t *peer = &epoint->peer;
372         ssize_t len;
373         fd_set fdset;
374         int csockd, i;
375         
376         if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
377                 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
378                         peer->addr, &peer->addrlen);
379                 return len;
380         }
381
382 next_recv:
383         /* FWP_EPOINT_RELIABLE */
384         fdset = epoint->fdset;
385         FWP_DEBUG("Before select\n");
386         if (select(FD_SETSIZE, &fdset, (fd_set *)0, 
387                 (fd_set *)0, NULL) < 0) {
388                 
389                 perror("Error in select");
390                 return (-errno);
391         }
392         FWP_DEBUG("After select\n");
393         
394         if (FD_ISSET(epoint->sockd, &fdset)) { /* is it listen socket? */ 
395                 if (epoint->nr_connections == epoint->attr.max_connections)
396                         goto next_recv;
397
398                 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
399                                  &peer->addrlen);
400                 
401                 FWP_DEBUG("New connection accepted\n");
402                 /* find free place */           
403                 i = 0;
404                 while ((epoint->c_sockd[i])&& 
405                         (i < epoint->nr_connections)) 
406                         i++;
407                 epoint->c_sockd[i] = csockd; 
408                 FWP_DEBUG("Index = %d\n", i);
409                 epoint->nr_connections++;
410                 
411                 FD_SET(csockd, &epoint->fdset);
412
413                 goto next_recv;
414         }
415
416         /* Check client TCP sockets */
417         for (i = 0; i < epoint->nr_connections; i++) {
418                 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
419                         _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
420                                         0, peer->addr, &peer->addrlen);
421                         
422                         FWP_DEBUG("Received tcp data\n");
423                         if (len)
424                                 return len;
425                         /* tcp connection closed */
426                         FWP_DEBUG("Connection closed\n");
427                         FD_CLR(epoint->c_sockd[i], &epoint->fdset);
428                         memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
429                                 sizeof(int)*(epoint->nr_connections -i-1));
430                         goto next_recv;
431                 }
432         }
433 }
434
435 /**
436  * Sends message through vres
437  *
438  * \param[in] epointd  identificator of endpoint
439  * \param[in] msg message to sent
440  * \param[in] size message size
441  *
442  * \return
443  * On success, it returns zero.  
444  * On error, negative error code is returned,
445  *
446  */
447 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
448 {
449         struct fwp_endpoint *epoint = epointd;
450         struct fwp_msgb *msgb;
451
452         /* TODO: Validity test of epointd */
453         if (epoint->status != FWP_EPOINT_BOUND) {
454                 return -EPERM;
455         }
456
457         /*if (flags && MSG_DONTWAIT) 
458                 msgb = fwp_msgb_alloc(buffer_size);
459         else {*/
460                 if (!(msgb = fwp_msgb_alloc(size)))
461                         return -ENOMEM;
462
463                 msgb->peer = &epoint->peer;
464                 /*msgb->data = msg;*/
465                 /*msgb->flags = epoint->flags;*/
466                 
467                 /* data must be copied since msg may change while
468                  * message is waiting in transmission queue 
469                  * */
470                 memcpy(msgb->data, msg, size);
471                 fwp_msgb_put(msgb, size);
472                 /*msgb->tail = msgb->data + size;
473                 msgb->len = size;*/
474
475         /*}*/
476
477         /* TODO: test whether _fwp_vres_send is successful */
478         return _fwp_vres_send(epoint->vresd, msgb);
479 }
480