]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
985b642de9d624cf203527426b34bfe661ca49e1
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2
3 typedef enum {
4         FWP_SEND_EPOINT = 0,
5         FWP_RECV_EPOINT = 1,
6 } fwp_endpoint_type_t;
7
8 typedef enum {
9         FWP_EPOINT_FREE         = 0,
10         FWP_EPOINT_INACTIVE     = 1,
11         FWP_EPOINT_UNBOUND      = 2,
12         FWP_EPOINT_BOUND        = 3,
13 } fwp_endpoint_status_t;
14
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16         .reliability = FWP_EPOINT_BESTEFFORT, 
17         .max_connections = 20,
18 };
19
20 /**
21  * Structure of FWP endpoint.
22  */
23 struct fwp_endpoint{
24         fwp_endpoint_type_t     type;
25         /**< endpoint attributes */
26         fwp_endpoint_attr_t     attr;
27         /**< the vres descriptor the send endpoint is bound to */       
28         fwp_vres_d_t            vresd;
29         /**< for send enpoint it contains destination address
30          * for receive endpoint it is filled with the msg source address
31          */
32         struct fwp_sockaddr     peer;   
33         /**< source/destination port */
34         unsigned int            port;   
35         /**< destination node */
36         int                     node;
37         /**< socket descriptor
38          * in case of rebliable epoint it is a listen socket
39          * */
40         int                     sockd; 
41         fd_set                  fdset;
42         int                     *c_sockd;
43         unsigned int            nr_connections;
44         /**< specific operation options*/
45         int                     flags;  
46         fwp_endpoint_status_t   status;
47 };
48
49 typedef
50 struct fwp_endpoint_table {
51         unsigned int                    nr_endpoints;
52         fwp_endpoint_t                  *entry;
53         pthread_mutex_t                 lock;
54 } fwp_endpoint_table_t;
55
56 /* Global variable - endpoint table */
57 static fwp_endpoint_table_t  fwp_endpoint_table = {
58         .nr_endpoints = 0,
59         .entry = NULL,
60         .lock = PTHREAD_MUTEX_INITIALIZER,
61 };
62
63 int fwp_endpoint_table_init(unsigned int nr_endpoints)
64 {
65         int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
66
67         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
68         if (!fwp_endpoint_table.entry)
69                 return -ENOMEM;
70         memset(fwp_endpoint_table.entry, 0, table_size);
71         fwp_endpoint_table.nr_endpoints = nr_endpoints;
72         return 0;
73 }
74
75 static fwp_endpoint_t* fwp_endpoint_alloc()
76 {
77         int i, nr_endpoints;
78
79         /* find free vres id */
80         pthread_mutex_lock(&fwp_endpoint_table.lock);
81         i = 0;
82         nr_endpoints = fwp_endpoint_table.nr_endpoints;
83         while ((i < nr_endpoints) && 
84                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
85                 i++;
86         
87         if (i == nr_endpoints) {
88                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
89                 return NULL;
90         }
91
92         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
93         pthread_mutex_unlock(&fwp_endpoint_table.lock);
94         return (&fwp_endpoint_table.entry[i]);
95 }
96
97 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
98 {
99         epoint->status = FWP_EPOINT_FREE;
100         close(epoint->sockd);
101 }
102
103 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node, 
104                                 unsigned int *port, fwp_endpoint_attr_t *attr)
105 {
106         fwp_endpoint_t *epoint = epointd;
107
108         *node = epoint->node;
109         *port = epoint->port;
110         *attr = epoint->attr;
111         
112         return 0;
113 }
114
115 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
116 {
117         bzero(attr, sizeof(fwp_endpoint_attr_t));
118         *attr = fwp_epoint_attr_default;
119
120         return 0;
121 }
122
123 /**
124  * Creates send endpoint
125  *
126  * \param[in] node IP address of destination node
127  * \param[in] port UDP port
128  *
129  * \return On success returns identifier of endpoint. 
130  * On error, negative error code is returned. 
131  *
132  */
133 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
134                                 fwp_endpoint_attr_t *attr, 
135                                 fwp_endpoint_d_t *epointdp)
136 {       
137         struct sockaddr_in *addr;
138         fwp_endpoint_t *epoint;
139         int sockd;
140
141         epoint = fwp_endpoint_alloc();  
142         if (!epoint) {
143                 return -ENOMEM;
144         }
145         
146         epoint->type = FWP_SEND_EPOINT;
147         epoint->status = FWP_EPOINT_UNBOUND;
148         epoint->node = node;
149         epoint->port = port;
150         if (attr) 
151                 epoint->attr  = *attr;
152         else 
153                 epoint->attr = fwp_epoint_attr_default;
154                 
155         addr = (struct sockaddr_in *)&(epoint->peer.addr);
156         bzero((char*) addr, sizeof(*addr));
157         addr->sin_family = AF_INET;
158         addr->sin_addr.s_addr = node;
159         addr->sin_port = htons(port);
160         epoint->peer.addrlen = sizeof(struct sockaddr_in);
161         
162         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
163                 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
164
165         } else {
166                 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
167         }
168         
169         if (sockd < 0) {
170                         perror("Unable to open socket");
171                         goto err;
172         }
173         
174         if (connect(sockd,(struct sockaddr*) &epoint->peer.addr, 
175                         epoint->peer.addrlen)) {
176                 perror("Connect error");
177                 goto err;
178         }
179                 
180         epoint->sockd = sockd;
181         
182         FWP_DEBUG("Send endpoint created.\n"); 
183         *epointdp = epoint;
184         return 0;               
185 err:
186         fwp_endpoint_free(epoint);
187         return (-errno);        
188 }
189
190 /**
191  * Creates receive endpoint
192  *
193  * \param[in] port UDP port
194  *
195  * \return On success returns identifier of endpoint. 
196  * On error, negative error code is returned.
197  *
198  */
199 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
200                                 fwp_endpoint_attr_t *attr, 
201                                 fwp_endpoint_d_t *epointdp)
202 {
203         fwp_endpoint_t *epoint;
204         int sockd;
205         struct sockaddr_in *addr;
206         //int rcvbuf_size = 3000;
207
208         epoint = fwp_endpoint_alloc();  
209         if (!epoint) {
210                 return -ENOMEM;
211         }
212         
213         epoint->type = FWP_RECV_EPOINT;
214         epoint->status = FWP_EPOINT_UNBOUND;
215         if (attr) 
216                 epoint->attr  = *attr;
217         else 
218                 epoint->attr = fwp_epoint_attr_default;
219         
220         addr = (struct sockaddr_in *)&(epoint->peer.addr);
221         addr->sin_family = AF_INET;
222         /* TODO: set listen interface, maybe through config struct*/
223         addr->sin_addr.s_addr = INADDR_ANY;
224         addr->sin_port = htons(port);
225         epoint->peer.addrlen = sizeof(struct sockaddr_in);
226         
227         if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
228                 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
229                         perror("Unable to open socket");
230                         goto err;
231                 }       
232                 FD_ZERO(&epoint->fdset);
233                 FD_SET(sockd, &epoint->fdset); /*add listen socket */
234                 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
235                 bzero(epoint->c_sockd, epoint->attr.max_connections);
236                 epoint->nr_connections = 0;
237
238         } else {
239                 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
240                         perror("Unable to open socket");
241                         goto err;
242                 }
243         }
244                 
245         if (bind(sockd, (struct sockaddr*) &epoint->peer.addr, 
246                                 epoint->peer.addrlen) == -1) {
247                 perror("Bind error");
248                 goto err;
249         }
250    
251         epoint->sockd = sockd;
252         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
253                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
254                 
255                 perror("Unable to set socket buffer size");
256                 return -1;
257         }else {
258                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
259         }
260         */
261         
262         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
263                         &epoint->peer.addrlen);
264         
265         epoint->port = ntohs(addr->sin_port);
266         /*TODO: set node*/
267         epoint->node =  ntohl(addr->sin_addr.s_addr);
268         FWP_DEBUG("Receive endpoint port=%d created.\n", ntohs(epoint->port)); 
269         
270         *epointdp = epoint; 
271         return 0;
272
273 err:    
274         fwp_endpoint_free(epoint);
275         return (-errno);        
276 }
277
278 /**
279  * Binds send endpoint to vres
280  *
281  * \param[in] vres_id identifier of vres
282  * \param[in] epoint_id send endpoint identifier
283  *
284  * \return On success returns 0. On error, negative error code is returned 
285  */
286 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
287 {
288         fwp_endpoint_t *epoint = epointd;
289         
290         if (epoint->type != FWP_SEND_EPOINT) {  
291                 return (-EINVAL);
292         }
293         
294         /* link epoint-vres mutually */
295         pthread_mutex_lock(&fwp_endpoint_table.lock);
296         if (_fwp_vres_bind(vresd, epoint) < 0) { 
297                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
298                 return -EPERM;
299         }
300
301         if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
302                 fwp_send_endpoint_unbind(epoint);
303         }
304
305         epoint->vresd = vresd;
306         epoint->status = FWP_EPOINT_BOUND;
307         
308         pthread_mutex_unlock(&fwp_endpoint_table.lock);
309         return 0;
310 }
311
312 /**
313  * Unbinds send endpoint from vres
314  *
315  * \param[in] id send endpoint identifier 
316  * \return On success returns 0. On error, negative error code is returned 
317  *
318  */
319 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
320 {
321         fwp_endpoint_t *epoint = epointd;
322         
323         /* unlink epoint-vres mutually */
324         _fwp_vres_unbind(epoint->vresd);
325         epoint->status = FWP_EPOINT_UNBOUND;
326
327         return 0;
328 }
329
330 /**
331  * Receives message
332  *
333  * \param[in] epoint_id  identificator of endpoint
334  * \param[in] buffer buffer to store message
335  * \param[in] buffer_size size of buffer
336  *
337  * \return
338  * On success, it returns number of received bytes.  
339  * On error, negative error code is returned,
340  *
341  */
342 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size, 
343                         int flags)
344 {
345         fwp_endpoint_t *epoint = epointd;
346         fwp_sockaddr_t *peer = &epoint->peer;
347         ssize_t len;
348         fd_set fdset;
349         int csockd, i;
350         
351         if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {        
352                 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
353                         peer->addr, &peer->addrlen);
354                 return len;
355         }
356
357 next_connection:
358         /* FWP_EPOINT_RELIABLE */
359         memcpy(&fdset, &epoint->fdset, sizeof(fdset)); 
360         if (select(getdtablesize()+1, &fdset, (fd_set *)0, 
361                 (fd_set *)0, NULL) < 0) {
362                 perror("Error in select");
363                 return (-errno);
364         }
365         
366         if (FD_ISSET(epoint->sockd, &fdset)) { /*is it listen socket?*/ 
367                 if (epoint->nr_connections == epoint->attr.max_connections)
368                         goto next_connection;
369
370                 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
371                                  &peer->addrlen);
372                 /* find free place */           
373                 i = 0;
374                 while ((epoint->c_sockd[i])&& 
375                         (i < epoint->nr_connections)) 
376                         i++;
377                 epoint->c_sockd[i] = csockd; 
378                 epoint->nr_connections++;
379
380                 goto next_connection;
381         }
382
383         /* Check client TCP sockets */
384         for (i = 0; i < epoint->nr_connections; i++) {
385                 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
386                         _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
387                                         0, peer->addr, &peer->addrlen);
388                         if (len)
389                                 return len;
390                         /* tcp connection closed */
391                         FD_CLR(epoint->c_sockd[i], &fdset);
392                         memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1, 
393                                 sizeof(int)*(epoint->nr_connections -i-1));
394                 }
395         }
396 }
397
398 /**
399  * Sends message through vres
400  *
401  * \param[in] epoint_id  identificator of endpoint
402  * \param[in] msg message to sent
403  * \param[in] size message size
404  *
405  * \return
406  * On success, it returns zero.  
407  * On error, negative error code is returned,
408  *
409  */
410 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
411 {
412         struct fwp_endpoint *epoint = epointd;
413         struct fwp_msgb *msgb;
414
415         /* TODO: Validity test of epointd */
416         if (epoint->status != FWP_EPOINT_BOUND) {
417                 return -EPERM;
418         }
419
420         /*if (flags && MSG_DONTWAIT) 
421                 msgb = fwp_msgb_alloc(buffer_size);
422         else {*/
423                 if (!(msgb = fwp_msgb_alloc(size)))
424                         return -ENOMEM;
425
426                 msgb->peer = &epoint->peer;
427                 /*msgb->data = msg;*/
428                 /*msgb->flags = epoint->flags;*/
429                 
430                 /* data must be copied since msg may change while
431                  * message is waiting in transmission queue 
432                  * */
433                 memcpy(msgb->data, msg, size);
434                 fwp_msgb_put(msgb, size);
435                 /*msgb->tail = msgb->data + size;
436                 msgb->len = size;*/
437
438         /*}*/
439
440         return _fwp_vres_send(epoint->vresd, msgb);
441 }
442