]> rtime.felk.cvut.cz Git - frescor/frsh-forb.git/blob - src/forb/src/discovery.c
forb: Split forb_port_destroy() to stop and destroy phases
[frescor/frsh-forb.git] / src / forb / src / discovery.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FORB (Frescor Object Request Broker)             */
26 /*                                                                        */
27 /* FORB is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FORB is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FORB; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FORB header files in a file,         */
39 /* instantiating FORB generics or templates, or linking other files       */
40 /* with FORB objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   discovery.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Wed May 27 19:28:30 2009
51  * 
52  * @brief  Discovery functions
53  * 
54  */
55
56 #include "discovery.h"
57 #include <ul_log.h>
58 #include "object.h"
59 #include <forb/config.h>
60 #include "iop.h"
61
62 extern UL_LOG_CUST(ulogd_forb_discovery);
63
64 GAVL_CUST_NODE_INT_DEC(forb_peer_nolock,/* cust_prefix */
65                        forb_t,          /* cust_root_t */
66                        forb_peer_t,     /* cust_item_t */
67                        forb_server_id,  /* cust_key_t */
68                        peers,           /* cust_root_node */
69                        gnode,           /* cust_item_node */
70                        server_id,       /* cust_item_key */
71                        forb_server_id_cmp)/* cust_cmp_fnc */
72
73 GAVL_CUST_NODE_INT_IMP(forb_peer_nolock,/* cust_prefix */
74                        forb_t,          /* cust_root_t */
75                        forb_peer_t,     /* cust_item_t */
76                        forb_server_id,  /* cust_key_t */
77                        peers,           /* cust_root_node */
78                        gnode,           /* cust_item_node */
79                        server_id,       /* cust_item_key */
80                        forb_server_id_cmp);/* cust_cmp_fnc */
81
82 int forb_discovery_init(forb_t *forb)
83 {
84         if (fosa_mutex_init(&forb->peer_mutex, 0) != 0) return -1;
85         forb_peer_nolock_init_root_field(forb);
86
87         if (fosa_mutex_init(&forb->objkey_mutex, 0) != 0) return -1;
88         forb_objects_nolock_init_root_field(forb);
89         return 0;
90 }
91
92 static inline void
93 forb_peer_insert(forb_t *forb, forb_peer_t *peer)
94 {
95         fosa_mutex_lock(&forb->peer_mutex);
96         forb_peer_get(peer);
97         forb_peer_nolock_insert(forb, peer);
98         fosa_mutex_unlock(&forb->peer_mutex);
99 }
100
101 static inline void
102 forb_peer_delete(forb_t *forb, forb_peer_t *peer)
103 {
104         fosa_mutex_lock(&forb->peer_mutex);
105         forb_peer_nolock_delete(forb, peer);
106         forb_peer_put(peer);
107         fosa_mutex_unlock(&forb->peer_mutex);
108 }
109
110 /** 
111  * Finds peer with given @a server_id.
112  * 
113  * @param forb 
114  * @param server_id 
115  * 
116  * @return The found peer or NULL if no peer is found. forb_peer_put()
117  * has to be called on the non-NULL returned value after is not
118  * needed.
119  */
120 forb_peer_t *
121 forb_peer_find(forb_t *forb, forb_server_id *server_id)
122 {
123         forb_peer_t *ret;
124         fosa_mutex_lock(&forb->peer_mutex);
125         ret = forb_peer_nolock_find(forb, server_id);
126         if (ret) {
127                 forb_peer_get(ret);
128         }
129         fosa_mutex_unlock(&forb->peer_mutex);
130         return ret;
131 }
132
133 /** 
134  * Finds the peer with given @a server_id. If the peer is not
135  * currently known (not yet discovered or not available), this
136  * function waits at maximum @a timeout for the peer to be discovered.
137  * 
138  * @param forb 
139  * @param server_id 
140  * @param timeout 
141  * 
142  * @return The peer structure or NULL if the peer is not known even
143  * after the timeout elapses.
144  * 
145  * @note After the returned peer is not needed, forb_peer_put() must
146  * called on it.
147  */
148 forb_peer_t *
149 forb_peer_find_timed(forb_t *forb, forb_server_id *server_id,
150                      fosa_abs_time_t *timeout)
151 {
152         forb_peer_t *peer;
153         bool peer_allocated = false;
154         
155         fosa_mutex_lock(&forb->peer_mutex);
156         peer = forb_peer_nolock_find(forb, server_id);
157         if (!peer && !timeout) goto unlock;
158         if (peer) {
159                 forb_peer_get(peer);
160         } else {
161                 /* We are the first who want to contact this peer */
162                 peer = forb_peer_new();
163                 if (!peer) goto unlock;
164                 peer->server_id = *server_id;
165                 peer->state = FORB_PEER_WANTED;
166                 forb_peer_nolock_insert(forb, forb_peer_get(peer));
167                 peer_allocated = true;
168         }
169         while (peer->state == FORB_PEER_WANTED) {
170                 int ret;
171                 /* Wait for the peer to be discovered. */
172                 ret = fosa_cond_timedwait(&peer->cond,
173                                           &forb->peer_mutex,
174                                           timeout);
175                 if (ret == FOSA_ETIMEDOUT) {
176                         /* No peer discovered within timeout */
177                         if (peer_allocated) {
178                                 forb_peer_nolock_delete(forb, peer);
179                                 forb_peer_put(peer);
180                         }
181                         forb_peer_put(peer);
182                         peer = NULL;
183                         break;
184                 }
185         }
186 unlock: 
187         fosa_mutex_unlock(&forb->peer_mutex);
188
189         return peer;
190 }
191
192 /** 
193  * 
194  * @param port 
195  * @param peer 
196  * @param server_id 
197  * @param addr 
198  * @param orb_id
199  * 
200  * @warning This function has to be called either from receiver thread
201  * of when the recevier thread is not running.
202  */
203 void forb_new_peer_discovered(forb_port_t *port, forb_peer_t *peer,
204                          forb_server_id server_id, void *addr,
205                          CORBA_string orb_id)
206 {
207         forb_t *forb = port->forb;
208         bool notify_waiters = false;
209         if (peer && peer->state == FORB_PEER_WANTED) {
210                 notify_waiters = true;
211         } else if (!peer) {
212                 peer = forb_peer_new();
213         }
214         if (!peer)
215                 return;
216
217         fosa_mutex_lock(&forb->peer_mutex);
218         peer->server_id = server_id;
219         peer->port = port;
220         peer->addr = addr;
221         peer->orb_id = orb_id;
222         peer->state = FORB_PEER_DISCOVERED;
223         if (notify_waiters) {
224                 fosa_cond_broadcast(&peer->cond);
225         } else {
226                 forb_peer_nolock_insert(forb, forb_peer_get(peer));
227         }
228         forb_port_peer_ins_tail(port, forb_peer_get(peer));
229         fosa_mutex_unlock(&forb->peer_mutex);
230         {
231                 char str[60];
232                 ul_logdeb("new peer discovered %s (orb_id '%s')\n",
233                           forb_server_id_to_string(str, &peer->server_id, sizeof(str)),
234                           orb_id);
235         }
236 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT
237         if (forb->attr.redistribute_hellos) {
238                 forb_peer_t *p;
239                 ul_list_for_each(forb_port_peer, port, p) {
240                         if (p != peer &&
241                             forb_server_id_cmp(&p->server_id, &forb->server_id) != 0) {
242                                 forb_iop_redistribute_hello_to(p, peer); /* Introduce new peer to others */
243                                 forb_iop_redistribute_hello_to(peer, p); /* Introduce other peers to the new one */
244                         }
245                 }
246         }
247 #endif
248         if (forb->attr.peer_discovery_callback) {
249                 forb_orb peer_orb = forb_object_new(forb->orb, &peer->server_id, 0);
250                 forb->attr.peer_discovery_callback(peer_orb, orb_id);
251                 forb_object_release(peer_orb);
252         }
253         forb_peer_put(peer);
254
255         /* Broadcast our hello packet now */
256         forb_syncobj_signal(&port->hello);
257 }
258
259 /** 
260  * 
261  * 
262  * @param peer 
263  *
264  * @warning This function has to be called either from receiver thread
265  * of when the recevier thread is not running.
266  */
267 void forb_peer_disconnected(forb_peer_t *peer)
268 {
269         forb_peer_delete(peer->port->forb, peer);
270         forb_port_peer_delete(peer->port, peer);
271
272         /* This should release the peer and in case on proto_inet
273          * close the socket. */
274         forb_peer_put(peer);
275 }
276