]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
FWP tests got to the functional state
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2
3 typedef enum {
4         FWP_SEND_EPOINT = 0,
5         FWP_RECV_EPOINT = 1,
6 } fwp_endpoint_type_t;
7
8 typedef enum {
9         FWP_EPOINT_FREE         = 0,
10         FWP_EPOINT_INACTIVE     = 1,
11         FWP_EPOINT_UNBOUND      = 2,
12         FWP_EPOINT_BOUND        = 3,
13 } fwp_endpoint_status_t;
14
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16         .reliability = FWP_EPOINT_BESTEFFORT, 
17         .max_connections = 20,
18 };
19
20 /**
21  * Structure of FWP endpoint.
22  */
23 struct fwp_endpoint{
24         fwp_endpoint_type_t     type;
25         /**< endpoint attributes */
26         fwp_endpoint_attr_t     attr;
27         /**< the vres descriptor the send endpoint is bound to */       
28         fwp_vres_d_t            vresd;
29         /**< for send enpoint it contains destination address
30          * for receive endpoint it is filled with the msg source address
31          */
32         struct fwp_sockaddr     peer;   
33         /**< source/destination port */
34         unsigned int            port;   
35         /**< destination node */
36         int                     node;
37         /**< socket descriptor
38          * in case of rebliable epoint it is a listen socket
39          * */
40         int                     sockd; 
41         fd_set                  fdset;
42         fd_set                  testfds;
43         int                     *c_sockd;
44         unsigned int            nr_connections;
45         /**< specific operation options*/
46         int                     flags;  
47         fwp_endpoint_status_t   status;
48 };
49
50 typedef
51 struct fwp_endpoint_table {
52         unsigned int                    nr_endpoints;
53         fwp_endpoint_t                  *entry;
54         pthread_mutex_t                 lock;
55 } fwp_endpoint_table_t;
56
57 /* Global variable - endpoint table */
58 static fwp_endpoint_table_t  fwp_endpoint_table = {
59         .nr_endpoints = 0,
60         .entry = NULL,
61         .lock = PTHREAD_MUTEX_INITIALIZER,
62 };
63
64 int fwp_endpoint_table_init(unsigned int nr_endpoints)
65 {
66         int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
67
68         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
69         if (!fwp_endpoint_table.entry)
70                 return -ENOMEM;
71         memset(fwp_endpoint_table.entry, 0, table_size);
72         fwp_endpoint_table.nr_endpoints = nr_endpoints;
73         return 0;
74 }
75
76 static fwp_endpoint_t* fwp_endpoint_alloc()
77 {
78         int i, nr_endpoints;
79
80         /* find free vres id */
81         pthread_mutex_lock(&fwp_endpoint_table.lock);
82         i = 0;
83         nr_endpoints = fwp_endpoint_table.nr_endpoints;
84         while ((i < nr_endpoints) && 
85                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
86                 i++;
87         
88         if (i == nr_endpoints) {
89                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
90                 return NULL;
91         }
92
93         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
94         pthread_mutex_unlock(&fwp_endpoint_table.lock);
95         return (&fwp_endpoint_table.entry[i]);
96 }
97
98 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
99 {
100         epoint->status = FWP_EPOINT_FREE;
101         close(epoint->sockd);
102 }
103
104 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
105                                 unsigned int *port, fwp_endpoint_attr_t *attr)
106 {
107         fwp_endpoint_t *epoint = epointd;
108
109         *node = epoint->node;
110         *port = epoint->port;
111         *attr = epoint->attr;
112         
113         return 0;
114 }
115
116 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
117 {
118         bzero(attr, sizeof(fwp_endpoint_attr_t));
119         *attr = fwp_epoint_attr_default;
120
121         return 0;
122 }
123
124 /**
125  * Creates send endpoint
126  *
127  * \param[in] node IP address of destination node
128  * \param[in] port UDP port
129  *
130  * \return On success returns identifier of endpoint. 
131  * On error, negative error code is returned. 
132  *
133  */
134 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
135                                 fwp_endpoint_attr_t *attr, 
136                                 fwp_endpoint_d_t *epointdp)
137 {       
138         struct sockaddr_in *addr;
139         fwp_endpoint_t *epoint;
140         int sockd;
141
142         epoint = fwp_endpoint_alloc();  
143         if (!epoint) {
144                 return -ENOMEM;
145         }
146         
147         epoint->type = FWP_SEND_EPOINT;
148         epoint->status = FWP_EPOINT_UNBOUND;
149         epoint->node = node;
150         epoint->port = port;
151         if (attr) 
152                 epoint->attr  = *attr;
153         else 
154                 epoint->attr = fwp_epoint_attr_default;
155                 
156         addr = (struct sockaddr_in *)&(epoint->peer.addr);
157         bzero((char*) addr, sizeof(*addr));
158         addr->sin_family = AF_INET;
159         addr->sin_addr.s_addr = node;
160         addr->sin_port = htons(port);
161         epoint->peer.addrlen = sizeof(struct sockaddr_in);
162         
163         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
164                 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
165
166         } else {
167                 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
168         }
169         
170         if (sockd < 0) {
171                         perror("Unable to open socket");
172                         goto err;
173         }
174         
175         /* Enable broadcasts */
176         unsigned int yes = 1;
177         if (setsockopt(sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/, &yes, 
178                          sizeof(yes)) == -1) {
179                 perror("Unable to set BROADCAST option for socket");
180                 close(sockd);
181                 return (-errno);
182         }
183         
184         if (connect(sockd,(struct sockaddr*) &epoint->peer.addr, 
185                         epoint->peer.addrlen)) {
186                 perror("Connect error");
187                 goto err;
188         }
189                 
190         epoint->sockd = sockd;
191         
192         FWP_DEBUG("Send endpoint created.\n"); 
193         *epointdp = epoint;
194         return 0;               
195 err:
196         fwp_endpoint_free(epoint);
197         return (-errno);        
198 }
199
200 /**
201  * Creates receive endpoint
202  *
203  * \param[in] port UDP port
204  *
205  * \return On success returns identifier of endpoint. 
206  * On error, negative error code is returned.
207  *
208  */
209 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
210                                 fwp_endpoint_attr_t *attr, 
211                                 fwp_endpoint_d_t *epointdp)
212 {
213         fwp_endpoint_t *epoint;
214         int sockd;
215         struct sockaddr_in *addr;
216         //int rcvbuf_size = 3000;
217
218         epoint = fwp_endpoint_alloc();  
219         if (!epoint) {
220                 return -ENOMEM;
221         }
222         
223         epoint->type = FWP_RECV_EPOINT;
224         epoint->status = FWP_EPOINT_UNBOUND;
225         if (attr) 
226                 epoint->attr  = *attr;
227         else 
228                 epoint->attr = fwp_epoint_attr_default;
229         
230         addr = (struct sockaddr_in *)&(epoint->peer.addr);
231         addr->sin_family = AF_INET;
232         /* TODO: set listen interface, maybe through config struct*/
233         addr->sin_addr.s_addr = INADDR_ANY;
234         addr->sin_port = htons(port);
235         epoint->peer.addrlen = sizeof(struct sockaddr_in);
236         
237         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
238                 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
239                         perror("Unable to open socket");
240                         goto err;
241                 }       
242                 
243                 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr, 
244                                 epoint->peer.addrlen) == -1) {
245                         perror("Bind error");
246                         goto err;
247                 }
248                 
249                 listen(sockd, epoint->attr.max_connections); 
250                 
251                 FD_ZERO(&epoint->fdset);
252                 FD_SET(sockd, &epoint->fdset); /*add listen socket */
253                 epoint->testfds = epoint->fdset;
254                 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
255                 bzero(epoint->c_sockd, epoint->attr.max_connections);
256                 epoint->nr_connections = 0;
257
258                 FWP_DEBUG("Receive endpoint\n");
259
260         } else {
261                 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
262                         perror("Unable to open socket");
263                         goto err;
264                 }
265                 
266                 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr, 
267                                 epoint->peer.addrlen) == -1) {
268                         perror("Bind error");
269                         goto err;
270                 }
271         }
272                 
273         epoint->sockd = sockd;
274         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
275                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
276                 
277                 perror("Unable to set socket buffer size");
278                 return -1;
279         }else {
280                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
281         }
282         */
283         
284         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
285                         &epoint->peer.addrlen);
286         
287         epoint->port = ntohs(addr->sin_port);
288         /*TODO: set node*/
289         epoint->node =  ntohl(addr->sin_addr.s_addr);
290         FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port); 
291         
292         *epointdp = epoint; 
293         return 0;
294
295 err:    
296         fwp_endpoint_free(epoint);
297         return (-errno);        
298 }
299
300 /**
301  * Binds send endpoint to vres
302  *
303  * \param[in] vres_id identifier of vres
304  * \param[in] epoint_id send endpoint identifier
305  *
306  * \return On success returns 0. On error, negative error code is returned 
307  */
308 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
309 {
310         fwp_endpoint_t *epoint = epointd;
311         
312         if (epoint->type != FWP_SEND_EPOINT) {  
313                 return (-EINVAL);
314         }
315         
316         /* link epoint-vres mutually */
317         pthread_mutex_lock(&fwp_endpoint_table.lock);
318         if (_fwp_vres_bind(vresd, epoint->sockd) < 0) { 
319                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
320                 return -EPERM;
321         }
322
323         if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
324                 fwp_send_endpoint_unbind(epoint);
325         }
326
327         epoint->vresd = vresd;
328         epoint->status = FWP_EPOINT_BOUND;
329         
330         pthread_mutex_unlock(&fwp_endpoint_table.lock);
331         return 0;
332 }
333
334 /**
335  * Unbinds send endpoint from vres
336  *
337  * \param[in] id send endpoint identifier 
338  * \return On success returns 0. On error, negative error code is returned 
339  *
340  */
341 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
342 {
343         fwp_endpoint_t *epoint = epointd;
344         
345         /* unlink epoint-vres mutually */
346         _fwp_vres_unbind(epoint->vresd);
347         epoint->status = FWP_EPOINT_UNBOUND;
348
349         return 0;
350 }
351
352 /**
353  * Receives message
354  *
355  * \param[in] epoint_id  identificator of endpoint
356  * \param[in] buffer buffer to store message
357  * \param[in] buffer_size size of buffer
358  *
359  * \return
360  * On success, it returns number of received bytes.  
361  * On error, negative error code is returned,
362  *
363  */
364 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
365                         int flags)
366 {
367         fwp_endpoint_t *epoint = epointd;
368         fwp_sockaddr_t *peer = &epoint->peer;
369         ssize_t len;
370         //fd_set fdset;
371         int csockd, i;
372         
373         if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
374                 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
375                         peer->addr, &peer->addrlen);
376                 return len;
377         }
378
379 next_connection:
380         /* FWP_EPOINT_RELIABLE */
381         //fdset = epoint->fdset;
382         FWP_DEBUG("Before selct\n");
383         if (select(FD_SETSIZE, &epoint->fdset, (fd_set *)0, 
384                 (fd_set *)0, NULL) < 0) {
385                 
386                 perror("Error in select");
387                 return (-errno);
388         }
389         FWP_DEBUG("After select\n");
390         
391         if (FD_ISSET(epoint->sockd, &epoint->fdset)) { /* is it listen socket? */ 
392                 if (epoint->nr_connections == epoint->attr.max_connections)
393                         goto next_connection;
394
395                 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
396                                  &peer->addrlen);
397                 
398                 FWP_DEBUG("New connection accepted\n");
399                 /* find free place */           
400                 i = 0;
401                 while ((epoint->c_sockd[i])&& 
402                         (i < epoint->nr_connections)) 
403                         i++;
404                 epoint->c_sockd[i] = csockd; 
405                 FWP_DEBUG("Index = %d\n", i);
406                 epoint->nr_connections++;
407                 
408                 FD_SET(csockd, &epoint->fdset);
409
410                 goto next_connection;
411         }
412
413         /* Check client TCP sockets */
414         for (i = 0; i < epoint->nr_connections; i++) {
415                 if (FD_ISSET(epoint->c_sockd[i], &epoint->fdset)) {
416                         _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
417                                         0, peer->addr, &peer->addrlen);
418                         
419                         FWP_DEBUG("Received tcp data\n");
420                         if (len)
421                                 return len;
422                         /* tcp connection closed */
423                         FWP_DEBUG("Connection closed\n");
424                         FD_CLR(epoint->c_sockd[i], &epoint->fdset);
425                         memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
426                                 sizeof(int)*(epoint->nr_connections -i-1));
427                 }
428         }
429 }
430
431 /**
432  * Sends message through vres
433  *
434  * \param[in] epoint_id  identificator of endpoint
435  * \param[in] msg message to sent
436  * \param[in] size message size
437  *
438  * \return
439  * On success, it returns zero.  
440  * On error, negative error code is returned,
441  *
442  */
443 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
444 {
445         struct fwp_endpoint *epoint = epointd;
446         struct fwp_msgb *msgb;
447
448         /* TODO: Validity test of epointd */
449         if (epoint->status != FWP_EPOINT_BOUND) {
450                 return -EPERM;
451         }
452
453         /*if (flags && MSG_DONTWAIT) 
454                 msgb = fwp_msgb_alloc(buffer_size);
455         else {*/
456                 if (!(msgb = fwp_msgb_alloc(size)))
457                         return -ENOMEM;
458
459                 msgb->peer = &epoint->peer;
460                 /*msgb->data = msg;*/
461                 /*msgb->flags = epoint->flags;*/
462                 
463                 /* data must be copied since msg may change while
464                  * message is waiting in transmission queue 
465                  * */
466                 memcpy(msgb->data, msg, size);
467                 fwp_msgb_put(msgb, size);
468                 /*msgb->tail = msgb->data + size;
469                 msgb->len = size;*/
470
471         /*}*/
472
473         /* TODO: test whether _fwp_vres_send is successful */
474         return _fwp_vres_send(epoint->vresd, msgb);
475 }
476