1 #include "fwp_endpoint.h"
10 FWP_EPOINT_INACTIVE = 1,
11 FWP_EPOINT_UNBOUND = 2,
13 } fwp_endpoint_status_t;
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16 .reliability = FWP_EPOINT_BESTEFFORT,
17 .max_connections = 20,
21 * Structure of FWP endpoint.
24 fwp_endpoint_type_t type;
25 /**< endpoint attributes */
26 fwp_endpoint_attr_t attr;
27 /**< the vres descriptor the send endpoint is bound to */
29 /**< for send enpoint it contains destination address
30 * for receive endpoint it is filled with the msg source address
32 struct fwp_sockaddr peer;
33 /**< source/destination port */
35 /**< destination node */
37 /**< socket descriptor
38 * in case of rebliable epoint it is a listen socket
43 unsigned int nr_connections;
44 /**< specific operation options*/
46 fwp_endpoint_status_t status;
50 struct fwp_endpoint_table {
51 unsigned int nr_endpoints;
52 fwp_endpoint_t *entry;
54 } fwp_endpoint_table_t;
56 /* Global variable - endpoint table */
57 static fwp_endpoint_table_t fwp_endpoint_table = {
60 .lock = PTHREAD_MUTEX_INITIALIZER,
63 int fwp_endpoint_table_init(unsigned int nr_endpoints)
65 int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
67 fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
68 if (!fwp_endpoint_table.entry)
70 memset(fwp_endpoint_table.entry, 0, table_size);
71 fwp_endpoint_table.nr_endpoints = nr_endpoints;
75 static fwp_endpoint_t* fwp_endpoint_alloc()
79 /* find free vres id */
80 pthread_mutex_lock(&fwp_endpoint_table.lock);
82 nr_endpoints = fwp_endpoint_table.nr_endpoints;
83 while ((i < nr_endpoints) &&
84 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
87 if (i == nr_endpoints) {
88 pthread_mutex_unlock(&fwp_endpoint_table.lock);
92 fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
93 pthread_mutex_unlock(&fwp_endpoint_table.lock);
94 return (&fwp_endpoint_table.entry[i]);
97 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
99 epoint->status = FWP_EPOINT_FREE;
100 close(epoint->sockd);
103 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
104 unsigned int *port, fwp_endpoint_attr_t *attr)
106 fwp_endpoint_t *epoint = epointd;
108 *node = epoint->node;
109 *port = epoint->port;
110 *attr = epoint->attr;
115 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
117 bzero(attr, sizeof(fwp_endpoint_attr_t));
118 *attr = fwp_epoint_attr_default;
124 * Creates send endpoint
126 * \param[in] node IP address of destination node
127 * \param[in] port UDP port
129 * \return On success returns identifier of endpoint.
130 * On error, negative error code is returned.
133 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
134 fwp_endpoint_attr_t *attr,
135 fwp_endpoint_d_t *epointdp)
137 struct sockaddr_in *addr;
138 fwp_endpoint_t *epoint;
141 epoint = fwp_endpoint_alloc();
146 epoint->type = FWP_SEND_EPOINT;
147 epoint->status = FWP_EPOINT_UNBOUND;
151 epoint->attr = *attr;
153 epoint->attr = fwp_epoint_attr_default;
155 addr = (struct sockaddr_in *)&(epoint->peer.addr);
156 bzero((char*) addr, sizeof(*addr));
157 addr->sin_family = AF_INET;
158 addr->sin_addr.s_addr = node;
159 addr->sin_port = htons(port);
160 epoint->peer.addrlen = sizeof(struct sockaddr_in);
162 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
163 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
166 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
170 perror("Unable to open socket");
174 /* Enable broadcasts */
175 unsigned int yes = 1;
176 if (setsockopt(sockd,SOL_SOCKET, SO_BROADCAST/*SO_REUSEADDR*/, &yes,
177 sizeof(yes)) == -1) {
178 perror("Unable to set BROADCAST option for socket");
183 if (connect(sockd,(struct sockaddr*) &epoint->peer.addr,
184 epoint->peer.addrlen)) {
185 perror("Connect error");
189 epoint->sockd = sockd;
191 FWP_DEBUG("Send endpoint created.\n");
195 fwp_endpoint_free(epoint);
200 * Creates receive endpoint
202 * \param[in] port UDP port
204 * \return On success returns identifier of endpoint.
205 * On error, negative error code is returned.
208 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
209 fwp_endpoint_attr_t *attr,
210 fwp_endpoint_d_t *epointdp)
212 fwp_endpoint_t *epoint;
214 struct sockaddr_in *addr;
215 //int rcvbuf_size = 3000;
217 epoint = fwp_endpoint_alloc();
222 epoint->type = FWP_RECV_EPOINT;
223 epoint->status = FWP_EPOINT_UNBOUND;
225 epoint->attr = *attr;
227 epoint->attr = fwp_epoint_attr_default;
229 addr = (struct sockaddr_in *)&(epoint->peer.addr);
230 addr->sin_family = AF_INET;
231 /* TODO: set listen interface, maybe through config struct*/
232 addr->sin_addr.s_addr = INADDR_ANY;
233 addr->sin_port = htons(port);
234 epoint->peer.addrlen = sizeof(struct sockaddr_in);
236 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
237 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
238 perror("Unable to open socket");
241 FD_ZERO(&epoint->fdset);
242 FD_SET(sockd, &epoint->fdset); /*add listen socket */
243 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
244 bzero(epoint->c_sockd, epoint->attr.max_connections);
245 epoint->nr_connections = 0;
247 FWP_DEBUG("Receive endpoint\n");
250 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
251 perror("Unable to open socket");
256 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
257 epoint->peer.addrlen) == -1) {
258 perror("Bind error");
262 epoint->sockd = sockd;
263 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
264 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
266 perror("Unable to set socket buffer size");
269 FWP_DEBUG("Receive endpoint buffer size is set.\n");
273 getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr,
274 &epoint->peer.addrlen);
276 epoint->port = ntohs(addr->sin_port);
278 epoint->node = ntohl(addr->sin_addr.s_addr);
279 FWP_DEBUG("Receive endpoint port=%d created.\n", epoint->port);
285 fwp_endpoint_free(epoint);
290 * Binds send endpoint to vres
292 * \param[in] vres_id identifier of vres
293 * \param[in] epoint_id send endpoint identifier
295 * \return On success returns 0. On error, negative error code is returned
297 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
299 fwp_endpoint_t *epoint = epointd;
301 if (epoint->type != FWP_SEND_EPOINT) {
305 /* link epoint-vres mutually */
306 pthread_mutex_lock(&fwp_endpoint_table.lock);
307 if (_fwp_vres_bind(vresd, epoint->sockd) < 0) {
308 pthread_mutex_unlock(&fwp_endpoint_table.lock);
312 if (epoint->type == FWP_EPOINT_BOUND) { /* if send endpoint is already bound */
313 fwp_send_endpoint_unbind(epoint);
316 epoint->vresd = vresd;
317 epoint->status = FWP_EPOINT_BOUND;
319 pthread_mutex_unlock(&fwp_endpoint_table.lock);
324 * Unbinds send endpoint from vres
326 * \param[in] id send endpoint identifier
327 * \return On success returns 0. On error, negative error code is returned
330 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
332 fwp_endpoint_t *epoint = epointd;
334 /* unlink epoint-vres mutually */
335 _fwp_vres_unbind(epoint->vresd);
336 epoint->status = FWP_EPOINT_UNBOUND;
344 * \param[in] epoint_id identificator of endpoint
345 * \param[in] buffer buffer to store message
346 * \param[in] buffer_size size of buffer
349 * On success, it returns number of received bytes.
350 * On error, negative error code is returned,
353 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
356 fwp_endpoint_t *epoint = epointd;
357 fwp_sockaddr_t *peer = &epoint->peer;
362 if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
364 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
365 peer->addr, &peer->addrlen);
370 /* FWP_EPOINT_RELIABLE */
371 memcpy(&fdset, &epoint->fdset, sizeof(fdset));
372 if (select(getdtablesize()+1, &fdset, (fd_set *)0,
373 (fd_set *)0, NULL) < 0) {
374 perror("Error in select");
378 if (FD_ISSET(epoint->sockd, &fdset)) { /*is it listen socket?*/
379 if (epoint->nr_connections == epoint->attr.max_connections)
380 goto next_connection;
382 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
385 FWP_DEBUG("New connection\n");
386 /* find free place */
388 while ((epoint->c_sockd[i])&&
389 (i < epoint->nr_connections))
391 epoint->c_sockd[i] = csockd;
392 epoint->nr_connections++;
394 goto next_connection;
397 /* Check client TCP sockets */
398 for (i = 0; i < epoint->nr_connections; i++) {
399 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
400 _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
401 0, peer->addr, &peer->addrlen);
403 FWP_DEBUG("Received tcp data\n");
406 /* tcp connection closed */
407 FWP_DEBUG("Connection closed\n");
408 FD_CLR(epoint->c_sockd[i], &fdset);
409 memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1,
410 sizeof(int)*(epoint->nr_connections -i-1));
416 * Sends message through vres
418 * \param[in] epoint_id identificator of endpoint
419 * \param[in] msg message to sent
420 * \param[in] size message size
423 * On success, it returns zero.
424 * On error, negative error code is returned,
427 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
429 struct fwp_endpoint *epoint = epointd;
430 struct fwp_msgb *msgb;
432 /* TODO: Validity test of epointd */
433 if (epoint->status != FWP_EPOINT_BOUND) {
437 /*if (flags && MSG_DONTWAIT)
438 msgb = fwp_msgb_alloc(buffer_size);
440 if (!(msgb = fwp_msgb_alloc(size)))
443 msgb->peer = &epoint->peer;
444 /*msgb->data = msg;*/
445 /*msgb->flags = epoint->flags;*/
447 /* data must be copied since msg may change while
448 * message is waiting in transmission queue
450 memcpy(msgb->data, msg, size);
451 fwp_msgb_put(msgb, size);
452 /*msgb->tail = msgb->data + size;
457 /* TODO: test whether _fwp_vres_send is successful */
458 return _fwp_vres_send(epoint->vresd, msgb);