]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/cbroker/fcb.c
02e99cfefe9cf4ecd220cead20ef83789d176adb
[frescor/frsh.git] / fres / cbroker / fcb.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   fcb.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Oct 20 18:00:18 2008
51  * 
52  * @brief  FRESCOR Contract Broker
53  * 
54  * 
55  */
56 #include <getopt.h>
57 #include <forb.h>
58 #include <forb/config.h>
59 #include <fcb.h>
60 #include <fcb_contact_info.h>
61 #include <error.h>
62 #include <errno.h>
63 #include <stdio.h>
64 #include <ul_gavlcust.h>
65 #include <string.h>
66 #include <ul_log.h>
67 #include <ul_logreg.h>
68 #include <forb/server_id.h>
69 #include <fres_contract.h>
70 #include "fcb_config.h"
71 #include <fosa_clocks_and_timers.h>
72 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
73 #include <forb/proto_inet.h>
74 #endif
75
76 UL_LOG_CUST(ulogd_fcb);
77 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "fcb"};
78
79 fosa_abs_time_t start_time;
80
81
82 /**
83  * Resource identification 
84  */
85 struct res_key {
86         frsh_resource_type_t type;
87         frsh_resource_id_t id;
88 };
89
90 /**
91  * Registered resource
92  */
93 struct resource {
94         gavl_node_t node;
95         struct res_key key;
96         char *name;
97         fres_resource_manager mng; /**< Object reference of the resource manager */
98         gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
99         ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
100 };
101
102 /**
103  * Resource allocator registered in different nodes/applications 
104  */
105 struct res_alloc {
106         gavl_node_t node;
107         forb_server_id app;
108         fres_resource_allocator ra;
109 };
110
111 struct fcb_contract {
112         fres_contract_id_t id;
113         struct res_alloc *ra;   /**< Allocator for this contract TODO: Remove contract if allocator is destroyed */
114         gavl_node_t node_fcb;
115         ul_list_node_t node_sc;
116         ul_list_node_t node_reservation;
117         struct {
118                 int initial;
119                 int try;
120         } sc_variant;
121         fosa_abs_time_t end_of_stability_period;
122         struct fres_contract *user_contract; /* The contract sent by user. Set after sucessfull neotiation. */
123         struct fres_contract *requested_contract; /* User request for the contract. Set by prepare_reservation_requests() in the beginning of negotiation. */
124         struct fres_contract *to_be_reserved_contract;
125         struct fres_contract *schedulable_contract;
126 };
127
128 /**
129  * Contract broker data
130  */
131 struct fcb {
132         gavl_cust_root_field_t resources; /**< Registered resources */
133         gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
134 };
135
136 /** List of contracts to be reserved during spare capacity rebalancing */
137 struct reservation_list {
138         ul_list_head_t fcb_contracts;
139         unsigned length;
140 };
141
142 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
143 {
144         struct fcb_contract *fcb_contract;
145         
146         fcb_contract = malloc(sizeof(*fcb_contract));
147         if (!fcb_contract) {
148                 return NULL;
149         }
150         memset(fcb_contract, 0, sizeof(*fcb_contract));
151         fcb_contract->id = *id;
152
153         return fcb_contract;
154 }
155
156 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
157 {
158         if (fcb_contract->user_contract) {
159                 fres_contract_destroy(fcb_contract->user_contract);
160         }
161         if (fcb_contract->to_be_reserved_contract) {
162                 fres_contract_destroy(fcb_contract->to_be_reserved_contract);
163         }
164         if (fcb_contract->requested_contract) {
165                 fres_contract_destroy(fcb_contract->requested_contract);
166         }
167         if (fcb_contract->schedulable_contract) {
168                 fres_contract_destroy(fcb_contract->requested_contract);
169         }
170         free(fcb_contract);
171 }
172
173 static inline int res_key_cmp(const struct res_key *a,
174                               const struct res_key *b)
175 {
176         if (a->type < b->type) {
177                 return -1;
178         } else if (a->type > b->type) {
179                 return +1;
180         } else if (a->id < b->id) {
181                 return -1;
182         } else if (a->id > b->id) {
183                 return +1;
184         } else {
185                 return 0;
186         }
187 }
188
189 /* Container for registered resources */
190 GAVL_CUST_NODE_INT_DEC(fcb_resource   /* cust_prefix */,                \
191                        struct fcb     /* cust_root_t */,                \
192                        struct resource/* cust_item_t */,                \
193                        struct res_key /* cust_key_t */,                 \
194                        resources      /* cust_root_node */,             \
195                        node           /* cust_item_node */,             \
196                        key            /* cust_item_key */,              \
197                        res_key_cmp    /* cust_cmp_fnc */);
198
199 GAVL_CUST_NODE_INT_IMP(fcb_resource   /* cust_prefix */,                \
200                        struct fcb     /* cust_root_t */,                \
201                        struct resource/* cust_item_t */,                \
202                        struct res_key /* cust_key_t */,                 \
203                        resources      /* cust_root_node */,             \
204                        node           /* cust_item_node */,             \
205                        key            /* cust_item_key */,              \
206                        res_key_cmp    /* cust_cmp_fnc */);
207
208 /* Container for allocators registered for a given resource */
209 GAVL_CUST_NODE_INT_DEC(fcb_alloc        /* cust_prefix */,              \
210                        struct resource  /* cust_root_t */,              \
211                        struct res_alloc /* cust_item_t */,              \
212                        forb_server_id   /* cust_key_t */,               \
213                        allocators       /* cust_root_node */,           \
214                        node             /* cust_item_node */,           \
215                        app              /* cust_item_key */,            \
216                        fres_contract_id_cmp /* cust_cmp_fnc */);
217
218 GAVL_CUST_NODE_INT_IMP(fcb_alloc        /* cust_prefix */,              \
219                        struct resource   /* cust_root_t */,             \
220                        struct res_alloc /* cust_item_t */,              \
221                        forb_server_id   /* cust_key_t */,               \
222                        allocators       /* cust_root_node */,           \
223                        node             /* cust_item_node */,           \
224                        app              /* cust_item_key */,            \
225                        forb_server_id_cmp /* cust_cmp_fnc */);
226
227 /* Container for contracts with spare capacity negotiated for a given resource */
228 UL_LIST_CUST_DEC(sc_contracts        /* cust_prefix */,                 \
229                  struct resource  /* cust_head_t */,                    \
230                  struct fcb_contract /* cust_item_t */,                 \
231                  sc_contracts   /* cust_head_field */,                  \
232                  node_sc        /* cust_node_field */);
233
234 UL_LIST_CUST_DEC(reservation_list        /* cust_prefix */,             \
235                  struct reservation_list  /* cust_head_t */,            \
236                  struct fcb_contract /* cust_item_t */,                 \
237                  fcb_contracts  /* cust_head_field */,                  \
238                  node_reservation /* cust_node_field */);
239
240 /* Container for negotiated contracts */
241 GAVL_CUST_NODE_INT_DEC(fcb_contract         /* cust_prefix */,          \
242                        struct fcb           /* cust_root_t */,          \
243                        struct fcb_contract  /* cust_item_t */,          \
244                        fres_contract_id_t   /* cust_key_t */,           \
245                        contracts            /* cust_root_node */,       \
246                        node_fcb             /* cust_item_node */,       \
247                        id                   /* cust_item_key */,        \
248                        fres_contract_id_cmp /* cust_cmp_fnc */);
249
250 #if 1
251 GAVL_CUST_NODE_INT_IMP(fcb_contract         /* cust_prefix */,          \
252                        struct fcb           /* cust_root_t */,          \
253                        struct fcb_contract  /* cust_item_t */,          \
254                        fres_contract_id_t   /* cust_key_t */,           \
255                        contracts            /* cust_root_node */,       \
256                        node_fcb             /* cust_item_node */,       \
257                        id                   /* cust_item_key */,        \
258                        fres_contract_id_cmp /* cust_cmp_fnc */);
259 #else
260 #include "fcb_contract_gavl.inc"
261 #endif
262
263
264 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
265
266 struct res_key*
267 get_res_key(const struct fres_contract *contract, struct res_key *key)
268 {
269         fres_block_resource *block_res;
270         
271         block_res = fres_contract_get_resource(contract);
272         if (!block_res) {
273                 ul_logerr("No resource specified\n");
274                 return NULL;
275         } else {
276                 key->type = block_res->resource_type;
277                 key->id = block_res->resource_id;
278         }
279         return key;
280 }
281
282 /** 
283  * Fills in an array of fcb_contracts according to requests submited
284  * to negotiate_contracts(). For every contract in @a contracts, there
285  * is one fcb_contract in @a fcb_contracts array.
286  * 
287  * @param fcb
288  * @param fcb_contracts Where to store pointers to fcb_contracts
289  * @param resource Resource for which negotiation is being done.
290  * @param app Application which requests negotiation
291  * @param contracts Contracts submited to negotiate_contracts() 
292  * @param num Number of contracts in contracts (which is the same as the number in fcb_contracts).
293  * 
294  * @return Zero on success, non-zero error code on error.
295  */
296 static int
297 prepare_fcb_contracts(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
298                       struct resource **resource,
299                       forb_server_id *app,
300                       struct fres_contract *contracts[], int num)
301 {
302         unsigned i;
303         struct fcb_contract *fc;
304         struct res_alloc *ra;
305         struct res_key key, key2 = {-1,-1};
306
307         for (i=0; i<num; i++) {
308                 struct fres_contract *c = contracts[i];
309
310                 if (fres_contract_id_is_empty(&c->id)) {
311                         /* Normal negotiation request */
312                         forb_uuid_generate((forb_uuid_t *)&c->id);
313                         fc = fcb_contract_new(&c->id);
314                         if (!fc)
315                                 return errno;
316                         fcb_contracts[i] = fc;
317                         if (!get_res_key(c, &key)) {
318                                 return FRSH_ERR_RESOURCE_ID_INVALID;
319                         }
320                 } else {
321                         fc = fcb_contract_find(fcb, &c->id);
322                         if (!fc) {
323                                 char str[60];
324                                 fres_contract_id_to_string(str, &c->id, sizeof(str));
325                                 ul_logerr("Attempt to change unknown contract %s\n", str);
326                                 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
327                         } else {
328                                 fcb_contracts[i] = fc;
329                                 if (fres_contract_get_num_blocks(c) == 0) {
330                                         /* Cancelation */
331                                         get_res_key(fc->user_contract, &key);
332                                 } else {
333                                         /* Renegotiation */
334                                         if (!get_res_key(c, &key)) {
335                                                 return FRSH_ERR_RESOURCE_ID_INVALID;
336                                         }
337                                 }
338                         }
339                 }
340                 fc->requested_contract = c;
341
342                 /* Check that all contracts are for the same resource */
343                 if (i==0) {
344                         key2 = key;
345                         *resource = fcb_resource_find(fcb, &key);
346                         if (!*resource) {
347                                 ul_logerr("No resource manager for %d.%d registered\n",
348                                           key.type, key.id);
349                                 return FRSH_ERR_RESOURCE_ID_INVALID;
350                         }
351                 }
352                 else if (key.type != key2.type ||
353                          key.id   != key2.id) {
354                         ul_logerr("Contract #%d differs in resource!\n", i);
355                         return FRSH_ERR_RESOURCE_ID_INVALID;
356                 }
357
358                 /* Find allocator for this request */
359                 ra = fcb_alloc_find(*resource, app);
360                 if (!ra) {
361                         char str[60];
362                         forb_server_id_to_string(str, app, sizeof(str));
363                         ul_logerr("No resource allocator found for %d.%d and %s\n",
364                                   (*resource)->key.type, (*resource)->key.id, str);
365                         return FRES_ERR_NO_RESOURCE_ALLOCATOR;
366                 }
367                 fc->ra = ra;
368         }       
369         return 0;
370 }
371
372 /**
373  * @param resource Resource for which to rebalance capacity and negotiate new contracts
374  * @param fcb_contract New requests to negotiate
375  * @param num The number of elements in @a fcb_contract
376  * @param[out] rl List with changed contracts to be commited
377  */
378 static void
379 prepare_reservation_list(struct resource *resource,
380                          struct fcb_contract *fcb_contract[], int num,
381                          struct reservation_list *rl)
382 {
383         int i;
384         fosa_abs_time_t now;
385         struct fcb_contract *fc;
386
387         reservation_list_init_head(rl);
388         rl->length = 0;
389         for (i=0; i<num; i++) {
390                 assert(fcb_contract[i]->requested_contract != NULL);
391                 reservation_list_insert(rl, fcb_contract[i]);
392                 rl->length++;
393         }
394         fosa_clock_get_time(CLOCK_REALTIME, &now);
395         ul_list_for_each(sc_contracts, resource, fc) {
396                 if (fosa_abs_time_smaller(fc->end_of_stability_period, now) &&
397                     fc->requested_contract == NULL) /* Do not insert contract inserted above */
398                 {
399                         reservation_list_insert(rl, fc);
400                         rl->length++;
401                 }
402         }
403 }
404
405 /** 
406  * 
407  * 
408  * @param resource Resource for which to rebalance capacity and negotiate new contracts
409  * @param rl List with changed contracts to be commited
410  * 
411  * @return Zero on success, non-zero error code on error
412  */
413 static int
414 rebalance_spare_capacity_and_reserve(struct resource *resource,
415                                      struct reservation_list *rl)
416 {
417         int ret;
418         unsigned i;
419         struct fcb_contract *fc;
420         fres_block_spare_capacity *s;
421
422         fres_contract_ptr_seq contracts;
423         if (!forb_sequence_alloc_buf(contracts, rl->length)) {
424                 return errno;
425         }
426         contracts._length = rl->length;
427         i=0;
428         /* Initialize optimization */
429         ul_list_for_each(reservation_list, rl, fc) {
430                 fc->to_be_reserved_contract =
431                         fc->requested_contract ? fres_contract_duplicate(fc->requested_contract) :
432                         fc->user_contract ? fres_contract_duplicate(fc->user_contract) :
433                         NULL;
434                 assert(fc->to_be_reserved_contract != NULL);
435
436                 forb_sequence_elem(contracts, i) = fc->to_be_reserved_contract;
437                 i++;
438                 
439                 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
440                 if (s && s->granularity == FRSH_GR_DISCRETE) {
441                         fc->sc_variant.initial = s->variants._length - 1;
442                         fc->sc_variant.try = fc->sc_variant.initial;
443                 }
444         }
445
446         bool all_combinations_tried;
447         int criterion, best_criterion = -1;
448         struct fcb_contract *fcb_changed;
449         /* Exhaustive search. Try all combinations of spare capacity
450          * variants and find the one with best creterion. */
451         do {
452                 all_combinations_tried = true;
453                 fcb_changed = NULL;
454                 criterion = 0;
455                 /* Prepare spare capacity variant */
456                 ul_list_for_each(reservation_list, rl, fc) {
457                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
458                         /* TODO: Simulate continuous granularity by discretization */
459                         if (s && s->granularity == FRSH_GR_DISCRETE) {
460                                 fres_container_copy(fc->to_be_reserved_contract->container,
461                                                     forb_sequence_elem(s->variants, fc->sc_variant.try));
462                                 criterion += fc->sc_variant.try;
463
464                                 if (fcb_changed == NULL) {
465                                         /* Chnage tried variant for the next round */
466                                         fc->sc_variant.try = fc->sc_variant.try > 0 ?
467                                                 fc->sc_variant.try - 1 :
468                                                 s->variants._length - 1;
469                                         /* Do not change other
470                                          * contracts unless we have
471                                          * tried all variants */
472                                         if (fc->sc_variant.try != fc->sc_variant.initial) {
473                                                 fcb_changed = fc;
474                                         }
475                                 }
476                                 if (fc->sc_variant.try != fc->sc_variant.initial)
477                                         all_combinations_tried = false;
478                         }
479                 }
480
481                 if (criterion > best_criterion) {
482                         CORBA_Environment ev;
483                         /* Reserve contract */
484                         ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, &ev);
485                         if (forb_exception_occurred(&ev)) {
486                                 ret = fres_forbex2err(&ev);
487                                 ul_logerr("FORB exception when reserving contracts\n");
488                                 goto err;
489                         }
490                         if (ret < 0) {
491                                 ul_logerr("Contract reservation error %d\n", ret);
492                                 ret = FRES_ERR_ADMISSION_TEST;
493                                 goto err;
494                         }
495                         if (ret == 0) { /* negotiation succeeded */
496                                 best_criterion = criterion;
497                         }
498                 }
499         } while (!all_combinations_tried);
500
501         if (best_criterion == -1) {
502                 ret = FRSH_ERR_CONTRACT_REJECTED;
503         } else {
504                 /* At least some variant succeeded */
505                 ret = 0;
506                 sc_contracts_init_head(resource);
507                 ul_list_for_each(reservation_list, rl, fc) {
508                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
509                         if (s && s->granularity == FRSH_GR_DISCRETE) {
510                                 sc_contracts_insert(resource, fc);
511                         }
512                 }
513         }
514 err:
515         ul_list_for_each(reservation_list, rl, fc) {
516                 fres_contract_destroy(fc->to_be_reserved_contract);
517                 fc->to_be_reserved_contract = NULL;
518         }
519         forb_sequence_free_buf(contracts, forb_no_destructor);
520         return ret;
521 }
522
523 /**
524  * Create/change VReses according to @a schedulable_contracts.
525  *
526  * There might be more allocators for schedulable contracts, so merge
527  * adjacent vreses with the same allocator together and create/change
528  * them in one step. Also the order of schedulable contracts might be
529  * different from the initial order od ids in commit_contracts()
530  * therefore we have to search for every contract.
531  */
532 static int
533 change_vreses(struct fcb *fcb, fres_contract_ptr_seq *schedulable_contracts)
534 {
535         struct res_alloc *last_ra = NULL;
536         fres_contract_ptr_seq vreses;
537         int i, ret;
538         CORBA_Environment ev;
539         
540         if (!forb_sequence_alloc_buf(vreses, schedulable_contracts->_length)) {
541                 return errno;
542         }
543         vreses._length = 0;
544         
545         for (i=0; i<schedulable_contracts->_length; i++) {
546                 struct fcb_contract *fc;
547                 struct fres_contract *sc = schedulable_contracts->_buffer[i];
548
549                 fc = fcb_contract_find(fcb, &sc->id);
550                 assert(fc != NULL);
551
552                 if (true /* TODO: if the schedulable contract is changed */) {
553                         if (last_ra != fc->ra) {
554                                 if (vreses._length) {
555                                         ret = fres_resource_allocator_change_vreses(last_ra->ra,
556                                                                                     &vreses, &ev);
557                                         if (forb_exception_occurred(&ev)) {
558                                                 ret = fres_forbex2err(&ev);
559                                                 goto err;
560                                         }
561                                         if (ret) goto err;
562                                 }
563                                 vreses._length = 0;
564                         }
565                         vreses._buffer[vreses._length] = sc;
566                         vreses._length++;
567                         fres_contract_destroy(fc->schedulable_contract);
568                         fc->schedulable_contract = fres_contract_duplicate(sc);
569                         last_ra = fc->ra;
570                 
571                 }
572         }
573         ret = fres_resource_allocator_change_vreses(last_ra->ra,
574                                                     &vreses, &ev);
575         if (forb_exception_occurred(&ev))
576                 ret = fres_forbex2err(&ev);
577 err:
578         forb_sequence_free_buf(vreses, forb_no_destructor);
579         return ret;
580 }
581
582 CORBA_long
583 negotiate_contracts(fres_contract_broker obj,
584                     const fres_contract_ptr_seq* contracts,
585                     fres_contract_id_seq** ids_out,
586                     CORBA_Environment *ev)
587 {
588         struct fcb *fcb = o2fcb(obj);
589         struct resource *resource = NULL;
590         int ret = 0;
591         forb_server_id app;
592         fres_contract_ptr_seq *schedulable_contracts;
593         struct fcb_contract **fcb_contracts, *fc;
594         unsigned i;
595         fres_contract_id_seq commit_ids;
596
597         /* Prepare output sequence for the case we return eariler with
598          * an error */
599         forb_sequence_alloc(*ids_out, 0);
600         if (!*ids_out) {
601                 ev->major = FORB_EX_NO_MEMORY;
602                 goto err;
603         }
604         CORBA_sequence_set_release(*ids_out, CORBA_TRUE);
605         
606         forb_get_req_source(obj, &app);
607         
608         fcb_contracts = malloc(sizeof(fcb_contracts[0])*contracts->_length);
609         if (!fcb_contracts) {
610                 ret = errno;
611                 goto err;
612         }
613         memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*contracts->_length);
614
615         ret = prepare_fcb_contracts(fcb, fcb_contracts, &resource,
616                                     &app, contracts->_buffer, contracts->_length);
617         if (ret)
618                 goto err_free_fcb_contracts;
619
620         struct reservation_list rl;
621         prepare_reservation_list(resource,
622                                  fcb_contracts, contracts->_length,
623                                  &rl);
624
625         /* Allocate all the needed memory before doing reservation. If
626          * there is no enough memory, it has no sense to call resource
627          * manager. */
628         if (!forb_sequence_alloc_buf(commit_ids, rl.length)) {
629                 ret = errno;
630                 goto err_free_fcb_contracts;
631         }
632
633         /* Reserve contracts */
634         ret = rebalance_spare_capacity_and_reserve(resource, &rl);
635         if (ret) {
636                 if (ret == FRSH_ERR_CONTRACT_REJECTED) {
637                         ul_logmsg("Contract(s) was/were rejected\n");
638                 } else {
639                         char msg[100];
640                         fres_strerror(ret, msg, sizeof(msg));
641                         ul_logerr("Reservation error: %s\n", msg);
642                 }
643                 goto err_free_fcb_contracts;
644         }
645
646         /* Commit contracts */
647         commit_ids._length = rl.length;
648         i=0;
649         ul_list_for_each(reservation_list, &rl, fc) {
650                 forb_sequence_elem(commit_ids, i) = fc->id;
651                 i++;
652         }
653         
654         fres_resource_manager_commit_contracts(resource->mng, &commit_ids,
655                                                &schedulable_contracts, ev);
656         if (forb_exception_occurred(ev)) {
657                 ret = fres_forbex2err(ev);
658                 goto err_cancel_reservation;
659         }
660
661         /* Add new contracts to our fcb database for later
662          * reference. Canceled contracts are removed below. */
663         for (i=0; i<contracts->_length; i++) {
664                 fc =  fcb_contracts[i];
665
666                 if (fc->user_contract) {
667                         if (fres_contract_get_num_blocks(fc->requested_contract) > 0) {
668                                 /* Renegotiation */
669                                 fres_contract_destroy(fc->user_contract);
670                                 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
671                                 /* Note: requested_contract is also
672                                  * pointed by contracts parameter and
673                                  * will be freed by FORB. */
674                                 fc->requested_contract = NULL;
675                         }
676                 } else {
677                         /* Insert new contracts */
678                         fcb_contract_insert(fcb, fcb_contracts[i]);
679                         fc->user_contract = fres_contract_duplicate(fc->requested_contract);
680                         fc->requested_contract = NULL;
681                         /* See the note above. */
682                 }
683         }
684
685         ret = change_vreses(fcb, schedulable_contracts);
686         if (ret)
687                 goto err_cancel_reservation;
688         
689         forb_sequence_free(schedulable_contracts, fres_contract_ptr_destroy);
690         if (ret != 0) {
691                 ul_logerr("VRes was not created\n");
692                 goto err_cancel_contracts;
693         }
694
695
696         /* Return IDs and delete canceled contracts from out database */
697         if (!forb_sequence_alloc_buf(**ids_out, contracts->_length)) {
698                 ev->major = FORB_EX_NO_MEMORY;
699                 goto err_cancel_contracts;
700         }
701         (*ids_out)->_length = contracts->_length;
702         for (i=0; i<contracts->_length; i++) {
703                 struct fcb_contract *fc =  fcb_contracts[i];
704                 forb_sequence_elem(**ids_out, i) = fc->id;
705
706                 if (fc->requested_contract &&
707                     fres_contract_get_num_blocks(fc->requested_contract) == 0) {
708                         fcb_contract_delete(fcb, fc);
709                         /* Note: requested_contract is also pointed by
710                          * contracts parameter and will be freed by FORB. */
711                         fc->requested_contract = NULL;
712                         fcb_contract_destroy(fc);
713                 }
714         }
715         return 0;
716
717 err_cancel_contracts:
718         /* TODO */
719         goto err_free_fcb_contracts;
720 err_cancel_reservation:
721         fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, ev);
722 err_free_fcb_contracts:
723         for (i=0; i<contracts->_length; i++) {
724                 fc = fcb_contracts[i];
725                 if (fc && !fc->user_contract) {
726                         fcb_contracts[i]->requested_contract = NULL; /* Destroyed by FORB */
727                         fcb_contract_destroy(fcb_contracts[i]);
728                 }
729         }
730         free(fcb_contracts);
731 err:
732         return ret;
733 }
734
735 void redistribute_spare_capacity(fres_contract_broker obj,
736                                  const frsh_resource_type_t restype,
737                                  const frsh_resource_id_t resid,
738                                  CORBA_Environment *ev)
739 {
740         struct fcb *fcb = o2fcb(obj);
741         struct res_key key = {restype, resid };
742         struct resource *resource;
743         struct reservation_list rl;
744         
745         resource = fcb_resource_find(fcb, &key);
746
747         prepare_reservation_list(resource, NULL, 0, &rl);
748
749 /*      forb_sequence_alloc(ids, rl.length); */
750 /*      if (!ids || !ids->_buffer) { */
751 /*              ev->major = FORB_EX_NO_MEMORY; */
752 /*              goto err_free_fcb_contracts; */
753 /*      } */
754 /*      CORBA_sequence_set_release(ids, CORBA_TRUE); */
755 /*      *ids_out = ids;         /\* ids is freed by FORB *\/ */
756         
757         
758         rebalance_spare_capacity_and_reserve(resource, &rl);
759         /* Commit */
760 }
761
762 CORBA_long register_resource(fres_contract_broker obj,
763                             const frsh_resource_type_t restype,
764                             const frsh_resource_id_t resid,
765                             const fres_resource_desc *desc,
766                             CORBA_Environment *ev)
767 {
768         struct fcb *fcb = o2fcb(obj);
769         struct resource *res, *res2;
770
771         res = malloc(sizeof(*res));
772         if (!res) goto err;
773         memset(res, 0, sizeof(*res));
774         res->key.type = restype;
775         res->key.id = resid;
776         res2 = fcb_resource_find(fcb, &res->key);
777         if (res2) {
778                 if (forb_object_is_stale(res2->mng)) {
779                         ul_logmsg("Removing stale manager for resource %d.%d\n",
780                                   restype, resid);
781                         forb_object_release(res2->mng);
782                         fcb_resource_delete(fcb, res2);
783                         /* TODO: Delete also all allocators associated
784                          * with this stale resource manager. */
785                         free(res);
786                         res = res2;
787                 } else {
788                         ul_logerr("Resource manager %d.%d already registered\n",
789                                   restype, resid);
790                         goto free_err;
791                 }
792         }
793         res->mng = forb_object_duplicate(desc->manager);
794         res->name = desc->name;
795
796         fcb_alloc_init_root_field(res);
797         sc_contracts_init_head(res);
798         ul_logmsg("Registering manager for resource \"%s\" (%d.%d)\n",
799                   res->name, restype, resid);
800         fcb_resource_insert(fcb, res);
801         return 0;
802 free_err:
803         free(res);
804 err:
805         return -1;
806 }
807
808
809 CORBA_long register_allocator(fres_contract_broker obj,
810                               const frsh_resource_type_t restype,
811                               const frsh_resource_id_t resid,
812                               const fres_resource_allocator ra_obj,
813                               CORBA_Environment *ev)
814 {
815         struct fcb *fcb = o2fcb(obj);
816         struct resource *res;
817         struct res_alloc *ra;
818         struct res_key resource;
819         forb_server_id server_id;
820         char server_id_str[40];
821
822         forb_get_server_id(ra_obj, &server_id);
823         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
824         ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
825                   restype, resid, server_id_str);
826         
827         resource.type = restype;
828         resource.id = resid;
829         res = fcb_resource_find(fcb, &resource);
830         if (!res) {
831                 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
832                           restype, resid);
833                 goto err;
834         }
835         ra = fcb_alloc_find(res, &server_id);
836         if (ra) {
837                 char *str = forb_object_to_string(ra_obj);
838                 ul_logerr("Allocator from already registered (%s)\n",
839                           str);
840                 forb_free(str);
841                 goto err;
842         }
843         ra = malloc(sizeof(*ra));
844         if (!ra) {
845                 goto err;
846         }
847         ra->app = server_id;
848         ra->ra = forb_object_duplicate(ra_obj);
849         fcb_alloc_insert(res, ra);
850         return 0;
851 err:
852         return FRSH_ERR_RESOURCE_ID_INVALID;
853 }
854
855 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
856 {
857         struct fcb *fcb = o2fcb(obj);
858         fres_resource_seq *seq;
859         struct resource *res;
860         int n;
861
862         seq = malloc(sizeof(*seq));
863         if (!seq) {
864                 ev->major = FORB_EX_NO_MEMORY;
865                 return;
866         }
867         memset(seq, 0, sizeof(*seq));
868
869         n=0;
870         gavl_cust_for_each(fcb_resource, fcb, res) {
871                 n++;
872         }
873
874         seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
875         seq->_maximum = n;
876         seq->_length = n;
877
878         n = 0;
879         gavl_cust_for_each(fcb_resource, fcb, res) {
880                 seq->_buffer[n].restype = res->key.type;
881                 seq->_buffer[n].resid = res->key.id;
882                 seq->_buffer[n].desc.manager = res->mng;
883                 seq->_buffer[n].desc.name = res->name;
884                 n++;
885         }
886
887         *resources = seq;
888 }
889
890 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
891 static int register_inet_port(forb_orb orb)
892 {
893         forb_port_t *port = forb_malloc(sizeof(*port));
894         int ret;
895         struct in_addr listen_on;
896         
897         if (!port)
898                 return -1;
899         memset(port, 0, sizeof(*port));
900         listen_on.s_addr = INADDR_ANY;
901         ret = forb_inet_port_init(&port->desc, listen_on, 0);
902         if (ret)
903                 return ret;
904         ret = forb_register_port(orb, port);
905         return ret;
906 }
907 #endif
908
909 struct forb_fres_contract_broker_impl impl = {
910         .negotiate_contracts = negotiate_contracts,
911         .register_resource = register_resource,
912         .register_allocator = register_allocator,
913         .redistribute_spare_capacity = redistribute_spare_capacity,
914         .get_resources = get_resources,
915 };
916
917 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
918 {
919         forb_server_id server_id;
920         char server_id_str[sizeof(forb_server_id)*2+1];
921
922         forb_get_server_id(peer_orb, &server_id);
923         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
924 #if 0
925         ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
926 #endif
927
928         if (orb_id == NULL)
929                 return;
930
931         if (strcmp(orb_id, "org.frescor.fcb") == 0) {
932                 fosa_abs_time_t now;
933                 fosa_rel_time_t delay;
934                 fosa_clock_get_time(CLOCK_REALTIME, &now);
935                 delay = fosa_abs_time_extract_interval(start_time, now);
936                 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
937                           fosa_rel_time_to_msec(delay));
938         }
939 }
940
941 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
942 {
943 }
944
945 static struct option long_opts[] = {
946     { "loglevel", 1, 0, 'l' },
947     { 0, 0, 0, 0}
948 };
949
950 static void
951 usage(void)
952 {
953         printf("usage: fcb [ options ]\n");
954         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
955 }
956
957 int main(int argc, char *argv[])
958 {
959         forb_orb orb;
960         struct fcb fcb_data;
961         fres_contract_broker fcb;
962         forb_executor_t executor;
963         int ret;
964         forb_init_attr_t attr = {
965                 .orb_id = "org.frescor.fcb",
966                 .peer_discovery_callback = peer_discovery_callback,
967                 .peer_dead_callback = peer_dead_callback,
968                 .fixed_tcp_port = FCB_TCP_PORT,
969 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT           
970                 .fixed_server_id = FCB_SERVER_ID,
971 #endif
972         };
973         int  opt;
974
975         ul_logreg_domain(&ulogd_fcb);
976
977         while ((opt = getopt_long(argc, argv, "l:", &long_opts[0], NULL)) != EOF) {
978                 switch (opt) {
979                         case 'l':
980                                 ul_log_domain_arg2levels(optarg);
981                                 break;
982                         case 'h':
983                         /*default:*/
984                                 usage();
985                                 exit(opt == 'h' ? 0 : 1);
986                 }
987         }
988
989         fosa_clock_get_time(CLOCK_REALTIME, &start_time);
990
991         orb = forb_init(&argc, &argv, &attr);
992         if (!orb) error(1, errno, "FORB initialization failed");
993
994 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
995         ret = register_inet_port(orb);
996         if (ret) error(0, errno, "INET port registration failed");
997 #endif
998
999         fcb_resource_init_root_field(&fcb_data);
1000         fcb_contract_init_root_field(&fcb_data);
1001
1002         fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
1003         if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
1004
1005         /* Prepare executor before we register the fcb reference,
1006          * so that no reuqests are lost */
1007         ret = forb_executor_init(&executor);
1008         if (ret) error(1, errno, "forb_executor_init failed");
1009         
1010         ret = forb_executor_register_object(&executor, fcb);
1011         if (ret) error(1, errno, "forb_executor_register_object failed");
1012
1013         ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
1014         if (ret) error(1, errno, "forb_register_reference() failed");
1015
1016         ul_logmsg("Waiting for requests\n");
1017         ret = forb_executor_run(&executor);
1018         if (ret) error(1, errno, "forb_executor_run failed");
1019         
1020         return 0;
1021 }