]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
WIP:bug tracking
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2
3 typedef enum {
4         FWP_SEND_EPOINT = 0,
5         FWP_RECV_EPOINT = 1,
6 } fwp_endpoint_type_t;
7
8 typedef enum {
9         FWP_EPOINT_FREE         = 0,
10         FWP_EPOINT_INACTIVE     = 1,
11         FWP_EPOINT_UNBOUND      = 2,
12         FWP_EPOINT_BOUND        = 3,
13 } fwp_endpoint_status_t;
14
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16         .reliability = FWP_EPOINT_BESTEFFORT, 
17         .max_connections = 20,
18 };
19
20 /**
21  * Structure of FWP endpoint.
22  */
23 struct fwp_endpoint{
24         fwp_endpoint_type_t     type;
25         /**< endpoint attributes */
26         fwp_endpoint_attr_t     attr;
27         /**< the vres descriptor the send endpoint is bound to */       
28         fwp_vres_d_t            vresd;
29         /**< for send enpoint it contains destination address
30          * for receive endpoint it is filled with the msg source address
31          */
32         struct fwp_sockaddr     peer;   
33         /**< source/destination port */
34         unsigned int            port;   
35         /**< destination node */
36         int                     node;
37         /**< socket descriptor
38          * in case of rebliable epoint it is a listen socket
39          * */
40         int                     sockd; 
41         fd_set                  fdset;
42         int                     *c_sockd;
43         unsigned int            nr_connections;
44         /**< specific operation options*/
45         int                     flags;  
46         fwp_endpoint_status_t   status;
47 };
48
49 typedef
50 struct fwp_endpoint_table {
51         unsigned int                    nr_endpoints;
52         fwp_endpoint_t                  *entry;
53         pthread_mutex_t                 lock;
54 } fwp_endpoint_table_t;
55
56 /* Global variable - endpoint table */
57 static fwp_endpoint_table_t  fwp_endpoint_table = {
58         .nr_endpoints = 0,
59         .entry = NULL,
60         .lock = PTHREAD_MUTEX_INITIALIZER,
61 };
62
63 int fwp_endpoint_table_init(unsigned int nr_endpoints)
64 {
65         int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
66
67         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
68         if (!fwp_endpoint_table.entry)
69                 return -ENOMEM;
70         memset(fwp_endpoint_table.entry, 0, table_size);
71         fwp_endpoint_table.nr_endpoints = nr_endpoints;
72         return 0;
73 }
74
75 static fwp_endpoint_t* fwp_endpoint_alloc()
76 {
77         int i, nr_endpoints;
78
79         /* find free vres id */
80         pthread_mutex_lock(&fwp_endpoint_table.lock);
81         i = 0;
82         nr_endpoints = fwp_endpoint_table.nr_endpoints;
83         while ((i < nr_endpoints) && 
84                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
85                 i++;
86         
87         if (i == nr_endpoints) {
88                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
89                 return NULL;
90         }
91
92         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
93         pthread_mutex_unlock(&fwp_endpoint_table.lock);
94         return (&fwp_endpoint_table.entry[i]);
95 }
96
97 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
98 {
99         epoint->status = FWP_EPOINT_FREE;
100         close(epoint->sockd);
101 }
102
103 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
104                                 unsigned int *port, fwp_endpoint_attr_t *attr)
105 {
106         fwp_endpoint_t *epoint = epointd;
107
108         *node = epoint->node;
109         *port = epoint->port;
110         *attr = epoint->attr;
111         
112         return 0;
113 }
114
115 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
116 {
117         bzero(attr, sizeof(fwp_endpoint_attr_t));
118         *attr = fwp_epoint_attr_default;
119
120         return 0;
121 }
122
123 /**
124  * Creates send endpoint
125  *
126  * \param[in] node IP address of destination node
127  * \param[in] port UDP port
128  *
129  * \return On success returns identifier of endpoint. 
130  * On error, negative error code is returned. 
131  *
132  */
133 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
134                                 fwp_endpoint_attr_t *attr, 
135                                 fwp_endpoint_d_t *epointdp)
136 {       
137         struct sockaddr_in *addr;
138         fwp_endpoint_t *epoint;
139         int sockd;
140
141         epoint = fwp_endpoint_alloc();  
142         if (!epoint) {
143                 return -ENOMEM;
144         }
145         
146         epoint->type = FWP_SEND_EPOINT;
147         epoint->status = FWP_EPOINT_UNBOUND;
148         epoint->node = node;
149         epoint->port = port;
150         if (attr) 
151                 epoint->attr  = *attr;
152         else 
153                 epoint->attr = fwp_epoint_attr_default;
154                 
155         addr = (struct sockaddr_in *)&(epoint->peer.addr);
156         bzero((char*) addr, sizeof(*addr));
157         addr->sin_family = AF_INET;
158         addr->sin_addr.s_addr = node;
159         addr->sin_port = htons(port);
160         epoint->peer.addrlen = sizeof(struct sockaddr_in);
161         
162         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
163                 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
164
165         } else {
166                 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
167         }
168         
169         if (sockd < 0) {
170                         perror("Unable to open socket");
171                         goto err;
172         }
173         
174         /* Enable broadcasts */
175         unsigned int yes = 1;
176         if (setsockopt(sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/, &yes, 
177                          sizeof(yes)) == -1) {
178                 perror("Unable to set BROADCAST option for socket");
179                 close(sockd);
180                 return (-errno);
181         }
182         
183         if (connect(sockd,(struct sockaddr*) &epoint->peer.addr, 
184                         epoint->peer.addrlen)) {
185                 perror("Connect error");
186                 goto err;
187         }
188                 
189         epoint->sockd = sockd;
190         
191         FWP_DEBUG("Send endpoint created.\n"); 
192         *epointdp = epoint;
193         return 0;               
194 err:
195         fwp_endpoint_free(epoint);
196         return (-errno);        
197 }
198
199 /**
200  * Creates receive endpoint
201  *
202  * \param[in] port UDP port
203  *
204  * \return On success returns identifier of endpoint. 
205  * On error, negative error code is returned.
206  *
207  */
208 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
209                                 fwp_endpoint_attr_t *attr, 
210                                 fwp_endpoint_d_t *epointdp)
211 {
212         fwp_endpoint_t *epoint;
213         int sockd;
214         struct sockaddr_in *addr;
215         //int rcvbuf_size = 3000;
216
217         epoint = fwp_endpoint_alloc();  
218         if (!epoint) {
219                 return -ENOMEM;
220         }
221         
222         epoint->type = FWP_RECV_EPOINT;
223         epoint->status = FWP_EPOINT_UNBOUND;
224         if (attr) 
225                 epoint->attr  = *attr;
226         else 
227                 epoint->attr = fwp_epoint_attr_default;
228         
229         addr = (struct sockaddr_in *)&(epoint->peer.addr);
230         addr->sin_family = AF_INET;
231         /* TODO: set listen interface, maybe through config struct*/
232         addr->sin_addr.s_addr = INADDR_ANY;
233         addr->sin_port = htons(port);
234         epoint->peer.addrlen = sizeof(struct sockaddr_in);
235         
236         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
237                 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
238                         perror("Unable to open socket");
239                         goto err;
240                 }       
241                 FD_ZERO(&epoint->fdset);
242                 FD_SET(sockd, &epoint->fdset); /*add listen socket */
243                 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
244                 bzero(epoint->c_sockd, epoint->attr.max_connections);
245                 epoint->nr_connections = 0;
246
247                 FWP_DEBUG("Receive endpoint\n");
248
249         } else {
250                 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
251                         perror("Unable to open socket");
252                         goto err;
253                 }
254         }
255                 
256         if (bind(sockd, (struct sockaddr*) &epoint->peer.addr, 
257                                 epoint->peer.addrlen) == -1) {
258                 perror("Bind error");
259                 goto err;
260         }
261    
262         epoint->sockd = sockd;
263         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
264                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
265                 
266                 perror("Unable to set socket buffer size");
267                 return -1;
268         }else {
269                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
270         }
271         */
272         
273         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
274                         &epoint->peer.addrlen);
275         
276         epoint->port = ntohs(addr->sin_port);
277         /*TODO: set node*/
278         epoint->node =  ntohl(addr->sin_addr.s_addr);
279         FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port); 
280         
281         *epointdp = epoint; 
282         return 0;
283
284 err:    
285         fwp_endpoint_free(epoint);
286         return (-errno);        
287 }
288
289 /**
290  * Binds send endpoint to vres
291  *
292  * \param[in] vres_id identifier of vres
293  * \param[in] epoint_id send endpoint identifier
294  *
295  * \return On success returns 0. On error, negative error code is returned 
296  */
297 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
298 {
299         fwp_endpoint_t *epoint = epointd;
300         
301         if (epoint->type != FWP_SEND_EPOINT) {  
302                 return (-EINVAL);
303         }
304         
305         /* link epoint-vres mutually */
306         pthread_mutex_lock(&fwp_endpoint_table.lock);
307         if (_fwp_vres_bind(vresd, epoint->sockd) < 0) { 
308                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
309                 return -EPERM;
310         }
311
312         if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
313                 fwp_send_endpoint_unbind(epoint);
314         }
315
316         epoint->vresd = vresd;
317         epoint->status = FWP_EPOINT_BOUND;
318         
319         pthread_mutex_unlock(&fwp_endpoint_table.lock);
320         return 0;
321 }
322
323 /**
324  * Unbinds send endpoint from vres
325  *
326  * \param[in] id send endpoint identifier 
327  * \return On success returns 0. On error, negative error code is returned 
328  *
329  */
330 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
331 {
332         fwp_endpoint_t *epoint = epointd;
333         
334         /* unlink epoint-vres mutually */
335         _fwp_vres_unbind(epoint->vresd);
336         epoint->status = FWP_EPOINT_UNBOUND;
337
338         return 0;
339 }
340
341 /**
342  * Receives message
343  *
344  * \param[in] epoint_id  identificator of endpoint
345  * \param[in] buffer buffer to store message
346  * \param[in] buffer_size size of buffer
347  *
348  * \return
349  * On success, it returns number of received bytes.  
350  * On error, negative error code is returned,
351  *
352  */
353 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
354                         int flags)
355 {
356         fwp_endpoint_t *epoint = epointd;
357         fwp_sockaddr_t *peer = &epoint->peer;
358         ssize_t len;
359         fd_set fdset;
360         int csockd, i;
361         
362         if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
363                 FWP_DEBUG("Recv\n");
364                 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
365                         peer->addr, &peer->addrlen);
366                 return len;
367         }
368
369 next_connection:
370         /* FWP_EPOINT_RELIABLE */
371         memcpy(&fdset, &epoint->fdset, sizeof(fdset)); 
372         if (select(getdtablesize()+1, &fdset, (fd_set *)0, 
373                 (fd_set *)0, NULL) < 0) {
374                 perror("Error in select");
375                 return (-errno);
376         }
377         
378         if (FD_ISSET(epoint->sockd, &fdset)) { /*is it listen socket?*/ 
379                 if (epoint->nr_connections == epoint->attr.max_connections)
380                         goto next_connection;
381
382                 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
383                                  &peer->addrlen);
384                 
385                 FWP_DEBUG("New connection\n");
386                 /* find free place */           
387                 i = 0;
388                 while ((epoint->c_sockd[i])&& 
389                         (i < epoint->nr_connections)) 
390                         i++;
391                 epoint->c_sockd[i] = csockd; 
392                 epoint->nr_connections++;
393
394                 goto next_connection;
395         }
396
397         /* Check client TCP sockets */
398         for (i = 0; i < epoint->nr_connections; i++) {
399                 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
400                         _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
401                                         0, peer->addr, &peer->addrlen);
402                         
403                         FWP_DEBUG("Received tcp data\n");
404                         if (len)
405                                 return len;
406                         /* tcp connection closed */
407                         FWP_DEBUG("Connection closed\n");
408                         FD_CLR(epoint->c_sockd[i], &fdset);
409                         memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
410                                 sizeof(int)*(epoint->nr_connections -i-1));
411                 }
412         }
413 }
414
415 /**
416  * Sends message through vres
417  *
418  * \param[in] epoint_id  identificator of endpoint
419  * \param[in] msg message to sent
420  * \param[in] size message size
421  *
422  * \return
423  * On success, it returns zero.  
424  * On error, negative error code is returned,
425  *
426  */
427 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
428 {
429         struct fwp_endpoint *epoint = epointd;
430         struct fwp_msgb *msgb;
431
432         /* TODO: Validity test of epointd */
433         if (epoint->status != FWP_EPOINT_BOUND) {
434                 return -EPERM;
435         }
436
437         /*if (flags && MSG_DONTWAIT) 
438                 msgb = fwp_msgb_alloc(buffer_size);
439         else {*/
440                 if (!(msgb = fwp_msgb_alloc(size)))
441                         return -ENOMEM;
442
443                 msgb->peer = &epoint->peer;
444                 /*msgb->data = msg;*/
445                 /*msgb->flags = epoint->flags;*/
446                 
447                 /* data must be copied since msg may change while
448                  * message is waiting in transmission queue 
449                  * */
450                 memcpy(msgb->data, msg, size);
451                 fwp_msgb_put(msgb, size);
452                 /*msgb->tail = msgb->data + size;
453                 msgb->len = size;*/
454
455         /*}*/
456
457         /* TODO: test whether _fwp_vres_send is successful */
458         return _fwp_vres_send(epoint->vresd, msgb);
459 }
460