1 #include "fwp_endpoint.h"
10 FWP_EPOINT_INACTIVE = 1,
11 FWP_EPOINT_UNBOUND = 2,
13 } fwp_endpoint_status_t;
15 static fwp_endpoint_attr_t fwp_epoint_attr_default ={
16 .reliability = FWP_EPOINT_BESTEFFORT,
17 .max_connections = 20,
21 * Structure of FWP endpoint.
24 fwp_endpoint_type_t type;
25 /**< endpoint attributes */
26 fwp_endpoint_attr_t attr;
27 /**< the vres descriptor the send endpoint is bound to */
29 /**< for send enpoint it contains destination address
30 * for receive endpoint it is filled with the msg source address
32 struct fwp_sockaddr peer;
33 /**< source/destination port */
35 /**< destination node */
37 /**< socket descriptor
38 * in case of rebliable epoint it is a listen socket
43 unsigned int nr_connections;
44 /**< specific operation options*/
46 fwp_endpoint_status_t status;
50 struct fwp_endpoint_table {
51 unsigned int nr_endpoints;
52 fwp_endpoint_t *entry;
54 } fwp_endpoint_table_t;
56 /* Global variable - endpoint table */
57 static fwp_endpoint_table_t fwp_endpoint_table = {
60 .lock = PTHREAD_MUTEX_INITIALIZER,
63 int fwp_endpoint_table_init(unsigned int nr_endpoints)
65 int table_size = nr_endpoints * sizeof(fwp_endpoint_t);
67 fwp_endpoint_table.entry = (fwp_endpoint_t*) malloc(table_size);
68 if (!fwp_endpoint_table.entry)
70 memset(fwp_endpoint_table.entry, 0, table_size);
71 fwp_endpoint_table.nr_endpoints = nr_endpoints;
75 static fwp_endpoint_t* fwp_endpoint_alloc()
79 /* find free vres id */
80 pthread_mutex_lock(&fwp_endpoint_table.lock);
82 nr_endpoints = fwp_endpoint_table.nr_endpoints;
83 while ((i < nr_endpoints) &&
84 (fwp_endpoint_table.entry[i].status != FWP_EPOINT_FREE))
87 if (i == nr_endpoints) {
88 pthread_mutex_unlock(&fwp_endpoint_table.lock);
92 fwp_endpoint_table.entry[i].status = FWP_EPOINT_INACTIVE;
93 pthread_mutex_unlock(&fwp_endpoint_table.lock);
94 return (&fwp_endpoint_table.entry[i]);
97 static inline void fwp_endpoint_free(fwp_endpoint_t *epoint)
99 epoint->status = FWP_EPOINT_FREE;
100 close(epoint->sockd);
103 int fwp_endpoint_get_params(fwp_endpoint_d_t epointd, unsigned int *node,
104 unsigned int *port, fwp_endpoint_attr_t *attr)
106 fwp_endpoint_t *epoint = epointd;
108 *node = epoint->node;
109 *port = epoint->port;
110 *attr = epoint->attr;
115 int fwp_endpoint_attr_init(fwp_endpoint_attr_t *attr)
117 bzero(attr, sizeof(fwp_endpoint_attr_t));
118 *attr = fwp_epoint_attr_default;
124 * Creates send endpoint
126 * \param[in] node IP address of destination node
127 * \param[in] port UDP port
129 * \return On success returns identifier of endpoint.
130 * On error, negative error code is returned.
133 int fwp_send_endpoint_create(unsigned int node, unsigned int port,
134 fwp_endpoint_attr_t *attr,
135 fwp_endpoint_d_t *epointdp)
137 struct sockaddr_in *addr;
138 fwp_endpoint_t *epoint;
141 epoint = fwp_endpoint_alloc();
146 epoint->type = FWP_SEND_EPOINT;
147 epoint->status = FWP_EPOINT_UNBOUND;
151 epoint->attr = *attr;
153 epoint->attr = fwp_epoint_attr_default;
155 addr = (struct sockaddr_in *)&(epoint->peer.addr);
156 bzero((char*) addr, sizeof(*addr));
157 addr->sin_family = AF_INET;
158 addr->sin_addr.s_addr = node;
159 addr->sin_port = htons(port);
160 epoint->peer.addrlen = sizeof(struct sockaddr_in);
162 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
163 sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
166 sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
170 perror("Unable to open socket");
174 if (connect(sockd,(struct sockaddr*) &epoint->peer.addr,
175 epoint->peer.addrlen)) {
176 perror("Connect error");
180 epoint->sockd = sockd;
182 FWP_DEBUG("Send endpoint created.\n");
186 fwp_endpoint_free(epoint);
191 * Creates receive endpoint
193 * \param[in] port UDP port
195 * \return On success returns identifier of endpoint.
196 * On error, negative error code is returned.
199 int fwp_receive_endpoint_create(/*unsigned int node,*/ unsigned int port,
200 fwp_endpoint_attr_t *attr,
201 fwp_endpoint_d_t *epointdp)
203 fwp_endpoint_t *epoint;
205 struct sockaddr_in *addr;
206 //int rcvbuf_size = 3000;
208 epoint = fwp_endpoint_alloc();
213 epoint->type = FWP_RECV_EPOINT;
214 epoint->status = FWP_EPOINT_UNBOUND;
216 epoint->attr = *attr;
218 epoint->attr = fwp_epoint_attr_default;
220 addr = (struct sockaddr_in *)&(epoint->peer.addr);
221 addr->sin_family = AF_INET;
222 /* TODO: set listen interface, maybe through config struct*/
223 addr->sin_addr.s_addr = INADDR_ANY;
224 addr->sin_port = htons(port);
225 epoint->peer.addrlen = sizeof(struct sockaddr_in);
227 if (epoint->attr.reliability == FWP_EPOINT_RELIABLE) {
228 if ((sockd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
229 perror("Unable to open socket");
232 FD_ZERO(&epoint->fdset);
233 FD_SET(sockd, &epoint->fdset); /*add listen socket */
234 epoint->c_sockd = (int*) malloc(epoint->attr.max_connections);
235 bzero(epoint->c_sockd, epoint->attr.max_connections);
236 epoint->nr_connections = 0;
239 if ((sockd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
240 perror("Unable to open socket");
245 if (bind(sockd, (struct sockaddr*) &epoint->peer.addr,
246 epoint->peer.addrlen) == -1) {
247 perror("Bind error");
251 epoint->sockd = sockd;
252 /*if (setsockopt(epoint->sockd, SOL_SOCKET, SO_RCVBUF,
253 &rcvbuf_size, sizeof(rcvbuf_size)) == -1) {
255 perror("Unable to set socket buffer size");
258 FWP_DEBUG("Receive endpoint buffer size is set.\n");
262 getsockname(epoint->sockd, (struct sockaddr*)&epoint->peer.addr,
263 &epoint->peer.addrlen);
265 epoint->port = ntohs(addr->sin_port);
267 epoint->node = ntohl(addr->sin_addr.s_addr);
268 FWP_DEBUG("Receive endpoint port=%d created.\n", ntohs(epoint->port));
274 fwp_endpoint_free(epoint);
279 * Binds send endpoint to vres
281 * \param[in] vres_id identifier of vres
282 * \param[in] epoint_id send endpoint identifier
284 * \return On success returns 0. On error, negative error code is returned
286 int fwp_send_endpoint_bind(fwp_endpoint_d_t epointd, fwp_vres_d_t vresd)
288 fwp_endpoint_t *epoint = epointd;
290 if (epoint->type != FWP_SEND_EPOINT) {
294 /* link epoint-vres mutually */
295 pthread_mutex_lock(&fwp_endpoint_table.lock);
296 if (_fwp_vres_bind(vresd, epoint) < 0) {
297 pthread_mutex_unlock(&fwp_endpoint_table.lock);
301 if (epoint->type == FWP_EPOINT_BOUND) { /* if send endpoint is already bound */
302 fwp_send_endpoint_unbind(epoint);
305 epoint->vresd = vresd;
306 epoint->status = FWP_EPOINT_BOUND;
308 pthread_mutex_unlock(&fwp_endpoint_table.lock);
313 * Unbinds send endpoint from vres
315 * \param[in] id send endpoint identifier
316 * \return On success returns 0. On error, negative error code is returned
319 int fwp_send_endpoint_unbind(fwp_endpoint_d_t epointd)
321 fwp_endpoint_t *epoint = epointd;
323 /* unlink epoint-vres mutually */
324 _fwp_vres_unbind(epoint->vresd);
325 epoint->status = FWP_EPOINT_UNBOUND;
333 * \param[in] epoint_id identificator of endpoint
334 * \param[in] buffer buffer to store message
335 * \param[in] buffer_size size of buffer
338 * On success, it returns number of received bytes.
339 * On error, negative error code is returned,
342 ssize_t fwp_recv(fwp_endpoint_d_t epointd, void *buffer, size_t buffer_size,
345 fwp_endpoint_t *epoint = epointd;
346 fwp_sockaddr_t *peer = &epoint->peer;
351 if (epoint->attr.reliability == FWP_EPOINT_BESTEFFORT) {
352 _fwp_recvfrom(len, epoint->sockd, buffer, buffer_size, 0,
353 peer->addr, &peer->addrlen);
358 /* FWP_EPOINT_RELIABLE */
359 memcpy(&fdset, &epoint->fdset, sizeof(fdset));
360 if (select(getdtablesize()+1, &fdset, (fd_set *)0,
361 (fd_set *)0, NULL) < 0) {
362 perror("Error in select");
366 if (FD_ISSET(epoint->sockd, &fdset)) { /*is it listen socket?*/
367 if (epoint->nr_connections == epoint->attr.max_connections)
368 goto next_connection;
370 csockd = accept(epoint->sockd, (struct sockaddr*)peer->addr,
372 /* find free place */
374 while ((epoint->c_sockd[i])&&
375 (i < epoint->nr_connections))
377 epoint->c_sockd[i] = csockd;
378 epoint->nr_connections++;
380 goto next_connection;
383 /* Check client TCP sockets */
384 for (i = 0; i < epoint->nr_connections; i++) {
385 if (FD_ISSET(epoint->c_sockd[i], &fdset)) {
386 _fwp_recvfrom(len, epoint->c_sockd[i], buffer, buffer_size,
387 0, peer->addr, &peer->addrlen);
390 /* tcp connection closed */
391 FD_CLR(epoint->c_sockd[i], &fdset);
392 memcpy(epoint->c_sockd+i, epoint->c_sockd+i+1,
393 sizeof(int)*(epoint->nr_connections -i-1));
399 * Sends message through vres
401 * \param[in] epoint_id identificator of endpoint
402 * \param[in] msg message to sent
403 * \param[in] size message size
406 * On success, it returns zero.
407 * On error, negative error code is returned,
410 int fwp_send(fwp_endpoint_d_t epointd, void *msg, size_t size, int flags)
412 struct fwp_endpoint *epoint = epointd;
413 struct fwp_msgb *msgb;
415 /* TODO: Validity test of epointd */
416 if (epoint->status != FWP_EPOINT_BOUND) {
420 /*if (flags && MSG_DONTWAIT)
421 msgb = fwp_msgb_alloc(buffer_size);
423 if (!(msgb = fwp_msgb_alloc(size)))
426 msgb->peer = &epoint->peer;
427 /*msgb->data = msg;*/
428 /*msgb->flags = epoint->flags;*/
430 /* data must be copied since msg may change while
431 * message is waiting in transmission queue
433 memcpy(msgb->data, msg, size);
434 fwp_msgb_put(msgb, size);
435 /*msgb->tail = msgb->data + size;
440 return _fwp_vres_send(epoint->vresd, msgb);