]> rtime.felk.cvut.cz Git - frescor/fwp.git/blob - fwp/lib/core/fwp_endpoint.c
654c19c59b834e7568ecf3e30a0cde83a75e3a08
[frescor/fwp.git] / fwp / lib / core / fwp_endpoint.c
1 #include "fwp_endpoint.h"
2
3 typedef enum {
4         FWP_SEND_EPOINT = 0,
5         FWP_RECV_EPOINT = 1,
6 } fwp_endpoint_type_t;
7
8 typedef enum {
9         FWP_EPOINT_FREE         = 0,
10         FWP_EPOINT_INACTIVE     = 1,
11         FWP_EPOINT_UNBOUND      = 2,
12         FWP_EPOINT_BOUND        = 3,
13 } fwp_endpoint_status_t;
14
15 /**
16  * Structure of FWP endpoint.
17  *
18  * 
19  */
20 struct fwp_endpoint{
21         fwp_endpoint_type_t     type;
22         /**< the vres descriptor the send endpoint is bound to */       
23         fwp_vres_d_t            vresd;
24         /**< for send enpoint it contains destination address
25          * for receive endpoint it is filled with the msg source address
26          */
27         struct fwp_sockaddr     peer;   
28         /**< source port */
29         unsigned int            sport;  
30         /**< dest port */
31         unsigned int            dport;  
32         /**< dest node */
33         int                     node;
34         /**< socket descriptor*/
35         int                     sockd;  
36         /**< specific operation options*/
37         int                     flags;  
38         fwp_endpoint_status_t   status;
39 };
40
41 typedef
42 struct fwp_endpoint_table {
43         unsigned int                    nr_endpoints;
44         fwp_endpoint_t                  *entry;
45         pthread_mutex_t                 lock;
46 } fwp_endpoint_table_t;
47
48 /* Global variable - endpoint table */
49 static fwp_endpoint_table_t  fwp_endpoint_table = {
50         .nr_endpoints = 0,
51         .entry = NULL,
52         .lock = PTHREAD_MUTEX_INITIALIZER,
53 };
54
55 int fwp_endpoint_table_init(unsigned int nr_endpoints)
56 {
57         int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
58
59         fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
60         if (!fwp_endpoint_table.entry)
61                 return -ENOMEM;
62         memset(fwp_endpoint_table.entry, 0, table_size);
63         fwp_endpoint_table.nr_endpoints = nr_endpoints;
64         return 0;
65 }
66
67 static fwp_endpoint_t* fwp_endpoint_alloc()
68 {
69         int i, nr_endpoints;
70
71         /* find free vres id */
72         pthread_mutex_lock(&fwp_endpoint_table.lock);
73         i = 0;
74         nr_endpoints = fwp_endpoint_table.nr_endpoints;
75         while ((i < nr_endpoints) && 
76                 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
77                 i++;
78         
79         if (i == nr_endpoints) {
80                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
81                 return NULL;
82         }
83
84         fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
85         pthread_mutex_unlock(&fwp_endpoint_table.lock);
86         return (&fwp_endpoint_table.entry[i]);
87 }
88
89 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
90 {
91         epoint->status = FWP_EPOINT_FREE;
92         close(epoint->sockd);
93 }
94
95 /**
96  *
97  *
98  */
99 int fwp_endpoint_get_params(unsigned int *node, unsigned int *port, int *flags, 
100                                 fwp_endpoint_d_t epointd)
101 {
102         fwp_endpoint_t *epoint = epointd;
103
104         *node = epoint->node;
105         *port = epoint->sport;
106         *flags = epoint->flags;
107         
108         return 0;
109 }
110
111 /**
112  * Creates send endpoint
113  *
114  * \param[in] node IP address of destination node
115  * \param[in] port UDP port
116  *
117  * \return On success returns identifier of endpoint. 
118  * On error, negative error code is returned. 
119  *
120  */
121
122 int fwp_send_endpoint_create(unsigned int node, unsigned int port, int flags, 
123                                 fwp_endpoint_d_t *epointdp)
124 {       
125         struct sockaddr_in *addr;
126         fwp_endpoint_t *epoint;
127         int sockd;
128         socklen_t addrlen;
129
130         epoint = fwp_endpoint_alloc();  
131         if (!epoint) {
132                 return -ENOMEM;
133         }
134         
135         epoint->type = FWP_SEND_EPOINT;
136         epoint->status = FWP_EPOINT_UNBOUND;
137         epoint->node = node;
138         epoint->dport = port;
139         epoint->flags = flags;
140                 
141         addr = (struct sockaddr_in *)&(epoint->peer.addr);
142         bzero((char*) addr, sizeof(*addr));
143         addr->sin_family = AF_INET;
144         addr->sin_addr.s_addr = node;
145         addr->sin_port = htons(port);
146         epoint->peer.addrlen = sizeof(struct sockaddr_in);
147         
148         if (flags && FWP_EPOINT_MNGT) {
149                 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
150                         perror("Unable to open socket");
151                         goto err;
152                 }
153
154                 if (connect(sockd,(struct sockaddr*) &epoint->peer.addr, 
155                                 epoint->peer.addrlen)) {
156                         
157                         perror("Connect error");
158                         goto err;
159                 }
160
161         } else {
162                 struct sockaddr_in myaddr;
163         
164                 bzero((char*) &myaddr, sizeof(myaddr));
165                 myaddr.sin_family = AF_INET;
166                 myaddr.sin_addr.s_addr = INADDR_ANY;
167                 myaddr.sin_port = 0;
168
169                 addrlen = sizeof(myaddr);
170
171                 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
172                         perror("Unable to open socket");
173                         goto err;
174                 }
175                 
176                 if (bind(sockd, (struct sockaddr*) &myaddr, 
177                         sizeof(myaddr)) == -1) {
178                         
179                         perror("Bind error");
180                         goto err;
181                 }
182                 
183                 getsockname(sockd, (struct sockaddr*) &myaddr, 
184                                 &addrlen);
185                 epoint->sport = myaddr.sin_port;
186   
187         }
188
189         epoint->sockd = sockd;
190         FWP_DEBUG("Send endpoint created.\n"); 
191         *epointdp = epoint;
192
193         return 0;               
194 err:
195         fwp_endpoint_free(epoint);
196         return (-errno);        
197 }
198
199 /**
200  * Creates receive endpoint
201  *
202  * \param[in] port UDP port
203  *
204  * \return On success returns identifier of endpoint. 
205  * On error, negative error code is returned.
206  *
207  */
208 int fwp_receive_endpoint_create(unsigned int port, int flags, 
209                                 fwp_endpoint_d_t *epointdp)
210 {
211         fwp_endpoint_t *epoint;
212         int sockd;
213         struct sockaddr_in *addr;
214         //int rcvbuf_size = 3000;
215
216         epoint = fwp_endpoint_alloc();  
217         if (!epoint) {
218                 return -ENOMEM;
219         }
220         
221         epoint->type = FWP_RECV_EPOINT;
222         epoint->status = FWP_EPOINT_UNBOUND;
223         epoint->flags  = flags;
224         
225         addr = (struct sockaddr_in *)&(epoint->peer.addr);
226         addr->sin_family = AF_INET;
227         addr->sin_addr.s_addr = INADDR_ANY;
228         addr->sin_port = htons(port);
229         epoint->peer.addrlen = sizeof(struct sockaddr_in);
230         
231         if (flags && FWP_EPOINT_MNGT) {
232                 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
233                         perror("Unable to open socket");
234                         goto err;
235                 }
236                 
237                 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
238                         epoint->peer.addrlen) == -1) {
239                         
240                         perror("Bind error");
241                         goto err;
242                 }
243
244                 if (listen(sockd,0)) {
245                         perror("Connect error");
246                         goto err;
247                 }
248
249         } else {
250                 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
251                         perror("Unable to open socket");
252                         goto err;
253                 }
254                 
255                 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
256                         epoint->peer.addrlen) == -1) {
257                         
258                         perror("Bind error");
259                         goto err;
260                 }
261    
262         }
263
264         epoint->sockd = sockd;
265         /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF, 
266                         &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
267                 
268                 perror("Unable to set socket buffer size");
269                 return -1;
270         }else {
271                 FWP_DEBUG("Receive endpoint buffer size is set.\n");
272         }
273         */
274         
275         getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr, 
276                         &epoint->peer.addrlen);
277         
278         epoint->sport = addr->sin_port;
279         epoint->node = addr->sin_addr.s_addr;
280         FWP_DEBUG("Receive endpoint port=%d created.\n", ntohs(epoint->sport)); 
281         
282         *epointdp = epoint; 
283         
284         return 0;
285
286 err:    
287         fwp_endpoint_free(epoint);
288         return (-errno);        
289 }
290
291 /**
292  * Binds send endpoint to vres
293  *
294  * \param[in] vres_id identifier of vres
295  * \param[in] epoint_id send endpoint identifier
296  *
297  * \return On success returns 0. On error, negative error code is returned 
298  */
299 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
300 {
301         fwp_endpoint_t *epoint = epointd;
302         
303         if (epoint->type != FWP_SEND_EPOINT) {  
304                 return (-EINVAL);
305         }
306         
307         /* link epoint-vres mutually */
308         pthread_mutex_lock(&fwp_endpoint_table.lock);
309         if (_fwp_vres_bind(vresd, epoint) < 0) { 
310                 pthread_mutex_unlock(&fwp_endpoint_table.lock);
311                 return -EPERM;
312         }
313
314         if (epoint->type == FWP_EPOINT_BOUND) {  /* if send endpoint is already bound */
315                 fwp_send_endpoint_unbind(epoint);
316         }
317
318         epoint->vresd = vresd;
319         epoint->status = FWP_EPOINT_BOUND;
320         
321         pthread_mutex_unlock(&fwp_endpoint_table.lock);
322         return 0;
323 }
324
325 /**
326  * Unbinds send endpoint from vres
327  *
328  * \param[in] id send endpoint identifier 
329  * \return On success returns 0. On error, negative error code is returned 
330  *
331  */
332 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
333 {
334         fwp_endpoint_t *epoint = epointd;
335         
336         /* unlink epoint-vres mutually */
337         _fwp_vres_unbind(epoint->vresd);
338         epoint->status = FWP_EPOINT_UNBOUND;
339
340         return 0;
341 }
342
343 /**
344  * Receives message
345  *
346  * \param[in] epoint_id  identificator of endpoint
347  * \param[in] buffer buffer to store message
348  * \param[in] buffer_size size of buffer
349  *
350  * \return
351  * On success, it returns number of received bytes.  
352  * On error, negative error code is returned,
353  *
354  */
355 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size)
356 {
357         fwp_endpoint_t *epoint = epointd;
358         fwp_sockaddr_t *peer = &epoint->peer;
359         ssize_t len;
360         
361         _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0, 
362                   peer->addr, &peer->addrlen);
363         
364         return len;
365 }
366
367 /**
368  * Sends message through vres
369  *
370  * \param[in] epoint_id  identificator of endpoint
371  * \param[in] msg message to sent
372  * \param[in] size message size
373  *
374  * \return
375  * On success, it returns zero.  
376  * On error, negative error code is returned,
377  *
378  */
379 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size)
380 {
381         struct fwp_endpoint *epoint = epointd;
382         struct fwp_msgb *msgb;
383
384         /* TODO: Validity test of epointd */
385         if (epoint->status != FWP_EPOINT_BOUND) {
386                 return -EPERM;
387         }
388
389         /*if (flags && MSG_DONTWAIT) 
390                 msgb = fwp_msgb_alloc(buffer_size);
391         else {*/
392                 if (!(msgb = fwp_msgb_alloc(size)))
393                         return -ENOMEM;
394
395                 msgb->peer = &epoint->peer;
396                 /*msgb->data = msg;*/
397                 /*msgb->flags = epoint->flags;*/
398                 
399                 /* data must be copied since msg may change while
400                  * message is waiting in transmission queue 
401                  * */
402                 memcpy(msgb->data, msg, size);
403                 fwp_msgb_put(msgb, size);
404                 /*msgb->tail = msgb->data + size;
405                 msgb->len = size;*/
406
407         /*}*/
408
409         return _fwp_vres_send(epoint->vresd, msgb);
410 }
411