]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/resalloc/fra_generic.c
Implemented support for contract renegotiation
[frescor/frsh.git] / fres / resalloc / fra_generic.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   fra_generic.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Thu Oct 23 15:26:19 2008
51  * 
52  * @brief  FORB interface of FRES Resource Allocator
53  * 
54  * 
55  */
56 #include <forb.h>
57 #include <fra.h>
58 #include <fcb.h>
59 #include <ul_log.h>
60 #include "fra_generic.h"
61
62 UL_LOG_CUST(ulogd_fra);
63 ul_log_domain_t ulogd_fra = {UL_LOGL_MSG, "fra"};
64
65 /**
66  * Global "registry" of all virtual resources in one application.
67  */
68 struct fres_vreses fres_vreses;
69
70 GAVL_CUST_NODE_INT_IMP(fres_vreses_nolock /* cust_prefix */,
71                        struct fres_vreses /* cust_root_t */,
72                        struct fres_vres   /* cust_item_t */,
73                        fres_contract_id_t /* cust_key_t */,
74                        vreses             /* cust_root_node */,
75                        node               /* cust_item_node */,
76                        id                 /* cust_item_key */,
77                        fres_contract_id_cmp /* cust_cmp_fnc */);
78
79
80 /**
81  *  * Global "registry" of all bound threads in one application.
82  *   */
83 struct fres_threads_vres fres_threads_vres;
84
85 GAVL_CUST_NODE_INT_IMP(fres_threads_vres_nolock /* cust_prefix */,
86                        struct fres_threads_vres /* cust_root_t */,
87                        struct fres_thread_vres  /* cust_item_t */,
88                        fosa_thread_id_t         /* cust_key_t */,
89                        threads                  /* cust_root_node */,
90                        node                     /* cust_item_node */,
91                        id                       /* cust_item_key */,
92                        fres_thread_vres_cmp     /* cust_cmp_fnc */);
93
94
95 /** 
96  * Inserts vres to the global vres "registry".
97  * 
98  * @param vres 
99  * 
100  * @return Positive value on success, -1 on error.
101  */
102 static int
103 fres_vreses_insert(struct fres_vres *vres)
104 {
105         int ret;
106         fosa_mutex_lock(&fres_vreses.mutex);
107         ret = fres_vreses_nolock_insert(&fres_vreses, vres);
108         fosa_mutex_unlock(&fres_vreses.mutex);
109         return ret;
110 }
111
112 static int
113 fres_vreses_delete(struct fres_vres *vres)
114 {
115         int ret;
116         fosa_mutex_lock(&fres_vreses.mutex);
117         ret = fres_vreses_nolock_delete(&fres_vreses, vres);
118         fosa_mutex_unlock(&fres_vreses.mutex);
119         return ret;
120 }
121
122 static struct fres_vres *
123 fres_vreses_find(fres_contract_id_t *id)
124 {
125         struct fres_vres *ret;
126         fosa_mutex_lock(&fres_vreses.mutex);
127         ret = fres_vreses_nolock_find(&fres_vreses, id);
128         fosa_mutex_unlock(&fres_vreses.mutex);
129         return ret;
130 }
131
132
133 #define save_errno(cmd) do { int _e = errno; cmd; errno = _e; } while(0)
134
135 CORBA_long change_vreses(fres_resource_allocator obj,
136                          const fres_contract_ptr_seq* contracts,
137                          CORBA_Environment *ev)
138 {
139         int ret;
140         unsigned len = contracts->_length;
141         unsigned i;
142         struct fres_allocator *alloc = forb_instance_data(obj);
143         struct fres_vres **vreses;
144
145         /* Prepare the vres structures */
146         vreses = malloc(len*sizeof(vreses[0]));
147         if (!vreses) {
148                 ret = errno;
149                 goto err;
150         }
151         
152         for (i=0; i<len; i++) {
153                 struct fres_contract *contract;
154                 struct fres_vres *vres;
155                 contract = fres_contract_duplicate(contracts->_buffer[i]);
156                 if (!contract) {
157                         ret = -1;
158                         goto err_free_vreses;
159                 }
160                 vres = fres_vreses_find(&contract->id);
161                 if (!vres) {
162                         vres = fres_vres_new(&contract->id);
163                         if (!vres) {
164                                 ret = errno;
165                                 goto err_free_vreses;
166                         }
167                         ret = fres_vreses_insert(vres);
168                         assert(ret > 0); /* Nobody else inserted the same vres. */
169                 }
170                 vres->new = contract;
171                 vres->allocator = alloc;
172                 vreses[i] = vres;
173         }
174
175         /* Apply the changes */
176         if (alloc->apply_vres_changes) {
177                 /* Full interface */
178                 ret = alloc->apply_vres_changes(vreses, len, alloc->priv);
179                 if (ret) {
180                         ul_logerr("apply_vres_changes failed %d\n", ret);
181                         goto err_free_vreses;
182                 }
183         } else {
184                 /* Simple interface */
185                 for (i=0; i<len; i++) {
186                         struct fres_vres *vres = vreses[i];
187                         if (fres_contract_get_num_blocks(vres->new) == 0) {
188                                 /* VRES cancleation */
189                                 ret = alloc->cancel_vres(vres, alloc->priv);
190                                 
191                                 fres_vreses_delete(vres);
192                                 fres_vres_destroy(vres);
193                                 vreses[i] = NULL;
194                         } else if (vres->allocated) {
195                                 /* VRES change */
196                                 struct fres_contract *last_perceived = vres->perceived;
197                                 ret = alloc->change_vres(vreses[i], alloc->priv);
198                                 if (last_perceived == vres->perceived) {
199                                         ul_logerr("change_vres callback did not change the perceived vres!");
200                                         vres->perceived = vres->new;
201                                 }       
202                         } else {
203                                 /* VRES creation */
204                                 ret = alloc->create_vres(vreses[i], alloc->priv);
205                                 vres->perceived = vres->new;
206                         }
207                         if (ret) {
208                                 ul_logerr("VRES apply error %d\n", ret);
209                                 goto err_free_vreses;
210                         }
211                 }
212         }
213
214         /* Update the vres structures (move new to allocated) */
215         for (i=0; i<len; i++) {
216                 struct fres_vres *vres = vreses[i];
217                 struct fres_contract *tmp;
218
219                 if (vres) {
220                         tmp = vres->allocated;
221                         vres->allocated = vres->new;
222                         if (tmp) fres_contract_destroy(tmp);
223                 }
224         }
225
226         ret = 0;
227
228 err_free_vreses:
229         free(vreses);
230 err:
231         return ret;
232 }
233
234 static const struct forb_fres_resource_allocator_impl fra_impl = {
235         .change_vreses = change_vreses,
236 };
237
238
239 /** 
240  * Creates allocator object and registeres it with the given executor.
241  * 
242  * @param orb 
243  * @param executor 
244  * @param allocator
245  * 
246  * @return Object reference on success or NULL on error.
247  */
248 fres_resource_allocator fra_new(forb_orb orb,
249                                 forb_executor_t *executor,
250                                 struct fres_allocator *allocator)
251 {
252         int ret;
253         fres_resource_allocator fra;
254         
255         fra = forb_fres_resource_allocator_new(orb, &fra_impl, allocator);
256         if (!fra) {
257                 save_errno(ul_logerr("forb_fres_resource_manager_new error"));
258                 goto err;
259         }
260
261         /* Prepare executor before we register the resource allocator
262          * with contract broker */
263         ret = forb_executor_register_object(executor, fra);
264         if (ret) goto err_release;
265
266         return fra;
267 err_release:
268         save_errno(forb_object_release(fra));
269 err:
270         return NULL;
271 }
272
273 /** 
274  * Returns pointer to VRes, which corresponds to the contract with the
275  * given id.
276  * 
277  * @param id 
278  * 
279  * @return Pointer to VRes, or NULL in the VRes doesn't exist.
280  */
281 fres_vres_t *fra_get_vres(fres_contract_id_t *id)
282 {
283         return fres_vreses_find(id);
284 }
285
286 static inline
287 int fres_threads_vres_insert(struct fres_thread_vres *th)
288 {
289         int ret;
290
291         fosa_mutex_lock(&fres_threads_vres.mutex);
292         ret = fres_threads_vres_nolock_insert(&fres_threads_vres, th);
293         fosa_mutex_unlock(&fres_threads_vres.mutex);
294
295         return ret;
296 }
297
298 static inline
299 int fres_threads_vres_delete(struct fres_thread_vres *th)
300 {
301         int ret;
302
303         fosa_mutex_lock(&fres_threads_vres.mutex);
304         ret = fres_threads_vres_nolock_delete(&fres_threads_vres, th);
305         fosa_mutex_unlock(&fres_threads_vres.mutex);
306
307         return ret;
308 }
309
310 static inline
311 struct fres_thread_vres *fres_threads_vres_find(const fosa_thread_id_t *id)
312 {
313         struct fres_thread_vres *ret;
314
315         fosa_mutex_lock(&fres_threads_vres.mutex);
316         ret = fres_threads_vres_nolock_find(&fres_threads_vres, id);
317         fosa_mutex_unlock(&fres_threads_vres.mutex);
318
319         return ret;
320 }
321
322 /** 
323  * Creates binding from a thread to VRES; if binding already exists,
324  * it is updated.
325  * 
326  * @param id Thread ID bounded to @a vres.
327  * @param vres VRES
328  * 
329  * @return Positive number when a new binding was inserted, zero when
330  * binding was changed and -1 on error.
331  */
332 int fra_insert_thread_vres(const fosa_thread_id_t *id, fres_vres_t *vres)
333 {
334         struct fres_thread_vres *th;
335
336         th = fres_threads_vres_find(id);
337         if (th) {
338                 fosa_mutex_lock(&fres_threads_vres.mutex);
339                 th->vres = vres;
340                 fosa_mutex_unlock(&fres_threads_vres.mutex);
341
342                 return 0;
343         }
344
345         th = fres_thread_vres_new(id, vres);
346         if (!th) return -1;
347
348         return fres_threads_vres_insert(th);
349 }
350
351 /** 
352  * Deletes thread to VRES binding.
353  * 
354  * @param id Thread ID to unbind.
355  * 
356  * @return Zero on success, -1 on when the thread was not bound.
357  */
358 int fra_delete_thread_vres(const fosa_thread_id_t *id)
359 {
360         struct fres_thread_vres *th;
361         int ret = 0;
362
363         fosa_mutex_lock(&fres_threads_vres.mutex);
364         th = fres_threads_vres_nolock_find(&fres_threads_vres, id);
365         if (th) {
366                 fres_threads_vres_nolock_delete(&fres_threads_vres, th);
367         } else {
368                 ret = -1;
369         }
370         fosa_mutex_unlock(&fres_threads_vres.mutex);
371         if (ret == 0) {
372                 fres_thread_vres_destroy(th);
373         }
374         return ret;
375 }
376
377 /** 
378  * Returns VRES bounded to a thread.
379  * 
380  * @param id Thread ID
381  * 
382  * @return VRES on NULL in the thread is not bound.
383  * 
384  * @todo Is this function needed if we have
385  * fra_get_vres_thread_vres()?
386  */
387 fres_thread_vres_t *fra_get_thread_vres(const fosa_thread_id_t *id)
388 {
389         return fres_threads_vres_find(id);
390 }
391
392 /** 
393  * Returns VRES bounded to a thread.
394  * 
395  * @param id Thread ID
396  * 
397  * @return VRES on NULL in the thread is not bound.
398  */
399 fres_vres_t *fra_get_vres_thread_vres(const fosa_thread_id_t *id)
400 {
401         fres_thread_vres_t *th_vres;
402
403         th_vres = fres_threads_vres_find(id);
404         if (!th_vres) return NULL;
405
406         return th_vres->vres;
407
408 }
409