]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/cbroker/fcb.c
fcb: Refactor resource reserve/commit
[frescor/frsh.git] / fres / cbroker / fcb.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   fcb.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Oct 20 18:00:18 2008
51  * 
52  * @brief  FRESCOR Contract Broker
53  * 
54  * 
55  */
56 #include <semaphore.h>
57 #include <getopt.h>
58 #include <forb.h>
59 #include <forb/config.h>
60 #include <fcb.h>
61 #include <fcb_contact_info.h>
62 #include <error.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <ul_gavlcust.h>
66 #include <string.h>
67 #include <ul_log.h>
68 #include <ul_logreg.h>
69 #include <forb/server_id.h>
70 #include <fres_contract.h>
71 #include "fcb_config.h"
72 #include <fosa_clocks_and_timers.h>
73 #include "contract_log.h"
74 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
75 #include <forb/proto_inet.h>
76 #endif
77
78 UL_LOG_CUST(ulogd_fcb);
79 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "main"};
80 UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(init_ulogd_fcb, ulogd_fcb);
81         
82 bool opt_daemon = false;
83 char *opt_pidfile = NULL;
84 fosa_abs_time_t start_time;
85
86 /** List of contracts to be newly reserved or changed by resource manager */
87 struct reservation_list {
88         ul_list_head_t fcb_contracts;
89         unsigned length;
90 };
91
92 /**
93  * Resource identification 
94  */
95 struct res_key {
96         frsh_resource_type_t type;
97         frsh_resource_id_t id;
98 };
99
100 /**
101  * Registered resource
102  */
103 struct resource {
104         gavl_node_t node;
105         struct res_key key;
106         char *name;
107         fres_resource_manager mng; /**< Object reference of the resource manager */
108         gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
109         ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
110         struct reservation_list rl; /**< Temporary list of contracts to be reserved on this resource */
111 };
112
113 /**
114  * Resource allocator registered in different nodes/applications 
115  */
116 struct res_alloc {
117         gavl_node_t node;
118         forb_server_id app;
119         fres_resource_allocator ra;
120 };
121
122 struct fcb_contract {
123         fres_contract_id_t id;
124         struct res_alloc *ra;   /**< Allocator for this contract TODO: Remove contract if allocator is destroyed */
125         gavl_node_t node_fcb;
126         ul_list_node_t node_sc;
127         ul_list_node_t node_reservation;
128         struct {
129                 int initial;
130                 int try;
131         } sc_variant;
132         fosa_abs_time_t end_of_stability_period;
133         struct fres_contract *user_contract; /* The contract sent by user. Set after sucessfull neotiation. */
134         struct fres_contract *requested_contract; /* User request for the contract. Set by prepare_reservation_requests() in the beginning of negotiation. */
135         struct fres_contract *to_be_reserved_contract;
136         struct fres_contract *schedulable_contract;
137 };
138
139 /**
140  * Contract broker data
141  */
142 struct fcb {
143         gavl_cust_root_field_t resources; /**< Registered resources */
144         gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
145 };
146
147 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
148 {
149         struct fcb_contract *fcb_contract;
150         
151         fcb_contract = malloc(sizeof(*fcb_contract));
152         if (!fcb_contract) {
153                 return NULL;
154         }
155         memset(fcb_contract, 0, sizeof(*fcb_contract));
156         fcb_contract->id = *id;
157
158         return fcb_contract;
159 }
160
161 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
162 {
163         if (fcb_contract->user_contract) {
164                 fres_contract_destroy(fcb_contract->user_contract);
165         }
166         if (fcb_contract->to_be_reserved_contract) {
167                 fres_contract_destroy(fcb_contract->to_be_reserved_contract);
168         }
169         if (fcb_contract->requested_contract) {
170                 fres_contract_destroy(fcb_contract->requested_contract);
171         }
172         if (fcb_contract->schedulable_contract) {
173                 fres_contract_destroy(fcb_contract->requested_contract);
174         }
175         free(fcb_contract);
176 }
177
178 static inline int res_key_cmp(const struct res_key *a,
179                               const struct res_key *b)
180 {
181         if (a->type < b->type) {
182                 return -1;
183         } else if (a->type > b->type) {
184                 return +1;
185         } else if (a->id < b->id) {
186                 return -1;
187         } else if (a->id > b->id) {
188                 return +1;
189         } else {
190                 return 0;
191         }
192 }
193
194 /* Container for registered resources */
195 GAVL_CUST_NODE_INT_DEC(fcb_resource   /* cust_prefix */,                \
196                        struct fcb     /* cust_root_t */,                \
197                        struct resource/* cust_item_t */,                \
198                        struct res_key /* cust_key_t */,                 \
199                        resources      /* cust_root_node */,             \
200                        node           /* cust_item_node */,             \
201                        key            /* cust_item_key */,              \
202                        res_key_cmp    /* cust_cmp_fnc */);
203
204 GAVL_CUST_NODE_INT_IMP(fcb_resource   /* cust_prefix */,                \
205                        struct fcb     /* cust_root_t */,                \
206                        struct resource/* cust_item_t */,                \
207                        struct res_key /* cust_key_t */,                 \
208                        resources      /* cust_root_node */,             \
209                        node           /* cust_item_node */,             \
210                        key            /* cust_item_key */,              \
211                        res_key_cmp    /* cust_cmp_fnc */);
212
213 /* Container for allocators registered for a given resource */
214 GAVL_CUST_NODE_INT_DEC(fcb_alloc        /* cust_prefix */,              \
215                        struct resource  /* cust_root_t */,              \
216                        struct res_alloc /* cust_item_t */,              \
217                        forb_server_id   /* cust_key_t */,               \
218                        allocators       /* cust_root_node */,           \
219                        node             /* cust_item_node */,           \
220                        app              /* cust_item_key */,            \
221                        fres_contract_id_cmp /* cust_cmp_fnc */);
222
223 GAVL_CUST_NODE_INT_IMP(fcb_alloc        /* cust_prefix */,              \
224                        struct resource   /* cust_root_t */,             \
225                        struct res_alloc /* cust_item_t */,              \
226                        forb_server_id   /* cust_key_t */,               \
227                        allocators       /* cust_root_node */,           \
228                        node             /* cust_item_node */,           \
229                        app              /* cust_item_key */,            \
230                        forb_server_id_cmp /* cust_cmp_fnc */);
231
232 /* Container for contracts with spare capacity negotiated for a given resource */
233 UL_LIST_CUST_DEC(sc_contracts        /* cust_prefix */,                 \
234                  struct resource  /* cust_head_t */,                    \
235                  struct fcb_contract /* cust_item_t */,                 \
236                  sc_contracts   /* cust_head_field */,                  \
237                  node_sc        /* cust_node_field */);
238
239 UL_LIST_CUST_DEC(reservation_list        /* cust_prefix */,             \
240                  struct reservation_list  /* cust_head_t */,            \
241                  struct fcb_contract /* cust_item_t */,                 \
242                  fcb_contracts  /* cust_head_field */,                  \
243                  node_reservation /* cust_node_field */);
244
245 /* Container for negotiated contracts */
246 GAVL_CUST_NODE_INT_DEC(fcb_contract         /* cust_prefix */,          \
247                        struct fcb           /* cust_root_t */,          \
248                        struct fcb_contract  /* cust_item_t */,          \
249                        fres_contract_id_t   /* cust_key_t */,           \
250                        contracts            /* cust_root_node */,       \
251                        node_fcb             /* cust_item_node */,       \
252                        id                   /* cust_item_key */,        \
253                        fres_contract_id_cmp /* cust_cmp_fnc */);
254
255 #if 1
256 GAVL_CUST_NODE_INT_IMP(fcb_contract         /* cust_prefix */,          \
257                        struct fcb           /* cust_root_t */,          \
258                        struct fcb_contract  /* cust_item_t */,          \
259                        fres_contract_id_t   /* cust_key_t */,           \
260                        contracts            /* cust_root_node */,       \
261                        node_fcb             /* cust_item_node */,       \
262                        id                   /* cust_item_key */,        \
263                        fres_contract_id_cmp /* cust_cmp_fnc */);
264 #else
265 #include "fcb_contract_gavl.inc"
266 #endif
267
268
269 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
270
271 static struct res_key*
272 get_res_key(const struct fres_contract *contract, struct res_key *key)
273 {
274         fres_block_resource *block_res;
275         
276         block_res = fres_contract_get_resource(contract);
277         if (!block_res) {
278                 ul_logerr("No resource specified\n");
279                 return NULL;
280         } else {
281                 key->type = block_res->resource_type;
282                 key->id = block_res->resource_id;
283         }
284         return key;
285 }
286
287 static struct res_key*
288 get_fc_res_key(const struct fcb_contract *fc, struct res_key *key)
289 {
290         if (fc->user_contract)
291                 get_res_key(fc->user_contract, key);
292         else
293                 get_res_key(fc->requested_contract, key);
294         return key;
295 }
296
297 /** 
298  * Fills in an array of fcb_contracts according to requests submited
299  * by an application through negotiate_contracts() or
300  * negotiate_transaction(). For every contract in @a contracts, there
301  * is allocated one fcb_contract in @a fcb_contracts array.
302  * 
303  * @param fcb
304  * @param fcb_contracts Where to store pointers to fcb_contracts
305  * @param contracts Contracts submited for negotiation 
306  * @param num Number of contracts in contracts (which is the same as the number in fcb_contracts).
307  * 
308  * @return Zero on success, non-zero error code on error.
309  */
310 static int
311 prepare_fcb_contracts(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
312                       struct fres_contract *contracts[], int num)
313 {
314         unsigned i;
315         struct fcb_contract *fc;
316
317         for (i=0; i<num; i++) {
318                 struct fres_contract *c = contracts[i];
319
320                 if (fres_contract_id_is_empty(&c->id)) {
321                         /* Normal negotiation request */
322                         forb_uuid_generate((forb_uuid_t *)&c->id);
323                         fc = fcb_contract_new(&c->id);
324                         if (!fc)
325                                 return errno;
326                         fcb_contracts[i] = fc;
327                         log_contract("Negotiation request", i, c);
328                 } else {
329                         fc = fcb_contract_find(fcb, &c->id);
330                         if (!fc) {
331                                 char str[60];
332                                 fres_contract_id_to_string(str, &c->id, sizeof(str));
333                                 ul_logerr("Attempt to change unknown contract %s\n", str);
334                                 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
335                         } else {
336                                 fcb_contracts[i] = fc;
337                                 if (fres_contract_get_num_blocks(c) == 0) {
338                                         /* Cancelation */
339                                         log_contract("Cancelation request", i, fc->user_contract);
340                                 } else {
341                                         /* Renegotiation */
342                                         log_contract("Renegotiation request", i, fc->user_contract);
343                                 }
344                         }
345                 }
346                 fc->requested_contract = c;
347         }
348         return 0;
349 }
350
351 /**
352  * Ensure that all contracts belong to the same resource and find
353  * resource allocators for the contracts.
354  *
355  * @param fcb 
356  * @param fcb_contracts Array of fcb_contracts prepared by prepare_fcb_contracts()
357  * @param num Number of contracts in @a fcb_contracts
358  * @param resource Resource for which negotiation is being done.
359  * @param app Application which requests negotiation
360  */
361 static int
362 check_and_setup_resource(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
363                          struct resource **resource,
364                          forb_server_id *app,
365                          int num)
366 {
367         unsigned i;
368         struct res_alloc *ra;
369         struct res_key key, key2 = {-1,-1};
370
371         for (i=0; i<num; i++) {
372                 struct fcb_contract *fc = fcb_contracts[i];
373
374                 if (!get_fc_res_key(fc, &key))
375                         return FRSH_ERR_RESOURCE_ID_INVALID;
376
377                 /* Check that all contracts are for the same resource */
378                 if (i==0) {
379                         key2 = key;
380                         *resource = fcb_resource_find(fcb, &key);
381                         if (!*resource) {
382                                 ul_logerr("No resource manager for %d.%d registered\n",
383                                           key.type, key.id);
384                                 return FRES_ERR_NO_RESOURCE_MANAGER;
385                         }
386                         /* Find allocator for this request */
387                         ra = fcb_alloc_find(*resource, app);
388                         if (!ra) {
389                                 char str[60];
390                                 forb_server_id_to_string(str, app, sizeof(str));
391                                 ul_logerr("No resource allocator found for %d.%d and %s\n",
392                                           (*resource)->key.type, (*resource)->key.id, str);
393                                 return FRES_ERR_NO_RESOURCE_ALLOCATOR;
394                         }
395                 }
396                 else if (key.type != key2.type ||
397                          key.id   != key2.id) {
398                         ul_logerr("Contract #%d differs in resource!\n", i);
399                         return FRSH_ERR_RESOURCE_ID_INVALID;
400                 }
401                 fc->ra = ra;
402         }       
403         return 0;
404 }
405
406 /**
407  * Prepares a list of contracts to pass to resource manager for (re)reserving.
408  *
409  * @todo Needs to be changed for compatibility with transactions.
410  * 
411  * @param resource Resource for which to rebalance capacity and negotiate new contracts
412  * @param fcb_contract New requests to negotiate
413  * @param num The number of elements in @a fcb_contract
414  * @param[out] rl List with changed contracts to be commited
415  */
416 static void
417 prepare_reservation_list(struct resource *resource,
418                          struct fcb_contract *fcb_contract[], int num)
419 {
420         int i;
421         fosa_abs_time_t now;
422         struct fcb_contract *fc;
423
424         reservation_list_init_head(&resource->rl);
425         resource->rl.length = 0;
426         for (i=0; i<num; i++) {
427                 assert(fcb_contract[i]->requested_contract != NULL);
428                 reservation_list_insert(&resource->rl, fcb_contract[i]);
429                 resource->rl.length++;
430         }
431         fosa_clock_get_time(CLOCK_REALTIME, &now);
432         ul_list_for_each(sc_contracts, resource, fc) {
433                 if (fosa_abs_time_smaller(fc->end_of_stability_period, now) &&
434                     fc->requested_contract == NULL) /* Do not insert contract inserted above */
435                 {
436                         reservation_list_insert(&resource->rl, fc);
437                         resource->rl.length++;
438                 }
439         }
440 }
441
442 /** 
443  * 
444  * 
445  * @param resource Resource for which to rebalance capacity and negotiate new contracts
446  * @param rl List with changed contracts to be commited
447  * 
448  * @return Zero on success, non-zero error code on error
449  */
450 static int
451 rebalance_spare_capacity_and_reserve(struct resource *resource)
452 {
453         int ret;
454         unsigned i;
455         struct reservation_list *rl = &resource->rl;
456         struct fcb_contract *fc;
457         fres_block_spare_capacity *s;
458
459         fres_contract_ptr_seq contracts;
460         if (!forb_sequence_alloc_buf(&contracts, rl->length)) {
461                 return errno;
462         }
463         contracts._length = rl->length;
464         i=0;
465         /* Initialize optimization */
466         ul_list_for_each(reservation_list, rl, fc) {
467                 fc->to_be_reserved_contract =
468                         fc->requested_contract ? fres_contract_duplicate(fc->requested_contract) :
469                         fc->user_contract ? fres_contract_duplicate(fc->user_contract) :
470                         NULL;
471                 assert(fc->to_be_reserved_contract != NULL);
472
473                 forb_sequence_elem(&contracts, i) = fc->to_be_reserved_contract;
474                 i++;
475                 
476                 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
477                 if (s && s->granularity == FRSH_GR_DISCRETE) {
478                         fc->sc_variant.initial = s->variants._length - 1;
479                         fc->sc_variant.try = fc->sc_variant.initial;
480                 }
481         }
482
483         bool all_combinations_tried;
484         int criterion, best_criterion = -1;
485         struct fcb_contract *fcb_changed;
486         /* Exhaustive search. Try all combinations of spare capacity
487          * variants and find the one with best creterion. */
488         do {
489                 all_combinations_tried = true;
490                 fcb_changed = NULL;
491                 criterion = 0;
492                 /* Prepare spare capacity variant */
493                 ul_list_for_each(reservation_list, rl, fc) {
494                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
495                         /* TODO: Simulate continuous granularity by discretization */
496                         if (s && s->granularity == FRSH_GR_DISCRETE) {
497                                 fres_container_copy(fc->to_be_reserved_contract->container,
498                                                     forb_sequence_elem(&s->variants, fc->sc_variant.try));
499                                 criterion += fc->sc_variant.try;
500
501                                 if (fcb_changed == NULL) {
502                                         /* Chnage tried variant for the next round */
503                                         fc->sc_variant.try = fc->sc_variant.try > 0 ?
504                                                 fc->sc_variant.try - 1 :
505                                                 s->variants._length - 1;
506                                         /* Do not change other
507                                          * contracts unless we have
508                                          * tried all variants */
509                                         if (fc->sc_variant.try != fc->sc_variant.initial) {
510                                                 fcb_changed = fc;
511                                         }
512                                 }
513                                 if (fc->sc_variant.try != fc->sc_variant.initial)
514                                         all_combinations_tried = false;
515                         }
516                 }
517
518                 if (criterion > best_criterion) {
519                         CORBA_Environment ev;
520                         /* Reserve contract */
521                         ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, &ev);
522                         if (forb_exception_occurred(&ev)) {
523                                 ret = fres_forbex2err(&ev);
524                                 ul_logerr("FORB exception when reserving contracts\n");
525                                 goto err;
526                         }
527                         if (ret < 0) {
528                                 ul_logerr("Contract reservation error %d\n", ret);
529                                 ret = FRES_ERR_ADMISSION_TEST;
530                                 goto err;
531                         }
532                         if (ret == 0) { /* negotiation succeeded */
533                                 best_criterion = criterion;
534                         }
535                 }
536         } while (!all_combinations_tried);
537
538         if (best_criterion == -1) {
539                 ret = FRSH_ERR_CONTRACT_REJECTED;
540         } else {
541                 /* At least some variant succeeded */
542                 ret = 0;
543                 sc_contracts_init_head(resource);
544                 ul_list_for_each(reservation_list, rl, fc) {
545                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
546                         if (s && s->granularity == FRSH_GR_DISCRETE) {
547                                 sc_contracts_insert(resource, fc);
548                         }
549                 }
550         }
551 err:
552         ul_list_for_each(reservation_list, rl, fc) {
553                 fres_contract_destroy(fc->to_be_reserved_contract);
554                 fc->to_be_reserved_contract = NULL;
555         }
556         forb_sequence_free_buf(&contracts, forb_no_destructor);
557         return ret;
558 }
559
560 /**
561  * Create/change VReses according to @a schedulable_contracts.
562  *
563  * There might be more allocators for schedulable contracts, so merge
564  * adjacent vreses with the same allocator together and create/change
565  * them in one step. Also the order of schedulable contracts might be
566  * different from the initial order od ids in commit_contracts()
567  * therefore we have to search for every contract.
568  */
569 static int
570 change_vreses(struct fcb *fcb, fres_contract_ptr_seq *schedulable_contracts)
571 {
572         struct res_alloc *last_ra = NULL;
573         fres_contract_ptr_seq vreses;
574         int i, ret = 0;
575         CORBA_Environment ev;
576         
577         if (!forb_sequence_alloc_buf(&vreses, schedulable_contracts->_length)) {
578                 return errno;
579         }
580         vreses._length = 0;
581         
582         for (i=0; i<schedulable_contracts->_length; i++) {
583                 struct fcb_contract *fc;
584                 struct fres_contract *sc = schedulable_contracts->_buffer[i];
585
586                 log_contract("Changing VRES", i, sc);
587
588                 fc = fcb_contract_find(fcb, &sc->id);
589                 assert(fc != NULL);
590
591                 if (true /* TODO: if the schedulable contract is changed */) {
592                         if (last_ra != fc->ra) {
593                                 if (vreses._length) {
594                                         ret = fres_resource_allocator_change_vreses(last_ra->ra,
595                                                                                     &vreses, &ev);
596                                         if (forb_exception_occurred(&ev)) {
597                                                 ret = fres_forbex2err(&ev);
598                                                 goto err;
599                                         }
600                                         if (ret) goto err;
601                                 }
602                                 vreses._length = 0;
603                         }
604                         vreses._buffer[vreses._length] = sc;
605                         vreses._length++;
606                         fres_contract_destroy(fc->schedulable_contract);
607                         fc->schedulable_contract = fres_contract_duplicate(sc);
608                         last_ra = fc->ra;
609                 
610                 }
611         }
612         if (last_ra) {
613                 ret = fres_resource_allocator_change_vreses(last_ra->ra,
614                                                             &vreses, &ev);
615                 if (forb_exception_occurred(&ev))
616                         ret = fres_forbex2err(&ev);
617         }
618 err:
619         forb_sequence_free_buf(&vreses, forb_no_destructor);
620         return ret;
621 }
622
623 void
624 free_fcb_contracts(struct fcb_contract *fcb_contracts[], int num)
625 {
626         int i;
627         struct fcb_contract *fc;
628         for (i=0; i<num; i++) {
629                 fc = fcb_contracts[i];
630                 if (fc && !fc->user_contract) {
631                         fc->requested_contract = NULL; /* Destroyed by FORB */
632                         fcb_contract_destroy(fc);
633                 }
634         }
635         free(fcb_contracts);
636 }
637
638 int
639 commit_resource(struct resource *resource,
640                 fres_contract_ptr_seq **schedulable_contracts)
641 {
642         int ret;
643         fres_contract_id_seq commit_ids;
644         int i;
645         struct fcb_contract *fc;
646         CORBA_Environment ev;
647         
648         if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) {
649                 ret = errno;
650                 goto err;
651         }
652
653         commit_ids._length = resource->rl.length;
654         i=0;
655         ul_list_for_each(reservation_list, &resource->rl, fc) {
656                 forb_sequence_elem(&commit_ids, i) = fc->id;
657                 i++;
658         }
659         
660         fres_resource_manager_commit_contracts(resource->mng, &commit_ids,
661                                                schedulable_contracts, &ev);
662         if (forb_exception_occurred(&ev)) {
663                 ret = fres_forbex2err(&ev);
664                 goto err_free;
665         }
666         return 0;
667 err_free:
668         forb_sequence_free_buf(&commit_ids, forb_no_destructor);
669 err:
670         return ret;
671 }
672
673 int cancel_reservations(struct resource *resource)
674 {
675         int ret;
676         fres_contract_id_seq commit_ids;
677         int i;
678         struct fcb_contract *fc;
679         CORBA_Environment ev;
680         
681         if (!forb_sequence_alloc_buf(&commit_ids, resource->rl.length)) {
682                 ret = errno;
683                 goto err;
684         }
685
686         commit_ids._length = resource->rl.length;
687         i=0;
688         ul_list_for_each(reservation_list, &resource->rl, fc) {
689                 forb_sequence_elem(&commit_ids, i) = fc->id;
690                 i++;
691         }
692         
693         fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, &ev);
694         if (forb_exception_occurred(&ev)) {
695                 ret = fres_forbex2err(&ev);
696                 goto err_free;
697         }
698         return 0;
699 err_free:
700         forb_sequence_free_buf(&commit_ids, forb_no_destructor);
701 err:
702         return ret;
703 }
704
705
706 CORBA_long
707 negotiate_contracts(fres_contract_broker obj,
708                     const fres_contract_ptr_seq* contracts,
709                     fres_contract_id_seq** ids_out,
710                     CORBA_Environment *ev)
711 {
712         struct fcb *fcb = o2fcb(obj);
713         struct resource *resource = NULL;
714         int ret = 0;
715         forb_server_id app;
716         fres_contract_ptr_seq *schedulable_contracts;
717         struct fcb_contract **fcb_contracts, *fc;
718         unsigned i;
719         int num = contracts->_length;
720
721         /* Prepare output sequence for the case we return eariler with
722          * an error */
723         forb_sequence_alloc(*ids_out, 0);
724         if (!*ids_out) {
725                 ev->major = FORB_EX_NO_MEMORY;
726                 goto err;
727         }
728         CORBA_sequence_set_release(*ids_out, CORBA_TRUE);
729         
730         forb_get_req_source(obj, &app);
731         
732         fcb_contracts = malloc(sizeof(fcb_contracts[0])*num);
733         if (!fcb_contracts) {
734                 ret = errno;
735                 goto err;
736         }
737         memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*num);
738
739         ret = prepare_fcb_contracts(fcb, fcb_contracts,
740                                     contracts->_buffer, num);
741         if (ret)
742                 goto err_free_fcb_contracts;
743         ret = check_and_setup_resource(fcb, fcb_contracts, &resource,
744                                        &app, num);
745         if (ret)
746                 goto err_free_fcb_contracts;
747
748         prepare_reservation_list(resource, fcb_contracts, num);
749
750         /* Reserve contracts */
751         ret = rebalance_spare_capacity_and_reserve(resource);
752         if (ret) {
753                 if (ret == FRSH_ERR_CONTRACT_REJECTED) {
754                         ul_logmsg("Contract(s) was/were rejected\n");
755                 } else {
756                         char msg[100];
757                         fres_strerror(ret, msg, sizeof(msg));
758                         ul_logerr("Reservation error: %s\n", msg);
759                 }
760                 goto err_free_fcb_contracts;
761         }
762
763         /* Commit contracts */
764         ret = commit_resource(resource, &schedulable_contracts);
765         if (ret)
766                 goto err_cancel_reservation;
767
768         /* Add new contracts to our fcb database for later
769          * reference. Canceled contracts are removed below. */
770         for (i=0; i<num; i++) {
771                 fc =  fcb_contracts[i];
772
773                 if (fc->user_contract) {
774                         if (fres_contract_get_num_blocks(fc->requested_contract) > 0) {
775                                 /* Renegotiation */
776                                 fres_contract_destroy(fc->user_contract);
777                                 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
778                                 /* Note: requested_contract is also
779                                  * pointed by contracts parameter and
780                                  * will be freed by FORB. */
781                                 fc->requested_contract = NULL;
782                         }
783                 } else {
784                         /* Insert new contracts */
785                         fcb_contract_insert(fcb, fcb_contracts[i]);
786                         fc->user_contract = fres_contract_duplicate(fc->requested_contract);
787                         fc->requested_contract = NULL;
788                         /* See the note above. */
789                 }
790         }
791
792         ret = change_vreses(fcb, schedulable_contracts);
793         if (ret)
794                 goto err_cancel_reservation;
795         
796         forb_sequence_free(schedulable_contracts, fres_contract_ptr_destroy);
797         if (ret != 0) {
798                 ul_logerr("VRes was not created\n");
799                 goto err_cancel_contracts;
800         }
801
802
803         /* Return IDs and delete canceled contracts from out database */
804         if (!forb_sequence_alloc_buf(*ids_out, num)) {
805                 ev->major = FORB_EX_NO_MEMORY;
806                 goto err_cancel_contracts;
807         }
808         (*ids_out)->_length = num;
809         for (i=0; i<num; i++) {
810                 struct fcb_contract *fc =  fcb_contracts[i];
811                 forb_sequence_elem(*ids_out, i) = fc->id;
812
813                 if (fc->requested_contract &&
814                     fres_contract_get_num_blocks(fc->requested_contract) == 0) {
815                         fcb_contract_delete(fcb, fc);
816                         /* Note: requested_contract is also pointed by
817                          * contracts parameter and will be freed by FORB. */
818                         fc->requested_contract = NULL;
819                         fcb_contract_destroy(fc);
820                 }
821         }
822         return 0;
823
824 err_cancel_contracts:
825         /* TODO */
826         goto err_free_fcb_contracts;
827 err_cancel_reservation:
828         cancel_reservations(resource);
829 err_free_fcb_contracts:
830         free_fcb_contracts(fcb_contracts, num);
831 err:
832         return ret;
833 }
834
835 void redistribute_spare_capacity(fres_contract_broker obj,
836                                  const frsh_resource_type_t restype,
837                                  const frsh_resource_id_t resid,
838                                  CORBA_Environment *ev)
839 {
840         struct fcb *fcb = o2fcb(obj);
841         struct res_key key = {restype, resid };
842         struct resource *resource;
843         
844         resource = fcb_resource_find(fcb, &key);
845
846         prepare_reservation_list(resource, NULL, 0);
847
848 /*      forb_sequence_alloc(ids, rl.length); */
849 /*      if (!ids || !ids->_buffer) { */
850 /*              ev->major = FORB_EX_NO_MEMORY; */
851 /*              goto err_free_fcb_contracts; */
852 /*      } */
853 /*      CORBA_sequence_set_release(ids, CORBA_TRUE); */
854 /*      *ids_out = ids;         /\* ids is freed by FORB *\/ */
855         
856         
857         rebalance_spare_capacity_and_reserve(resource);
858         /* Commit */
859 }
860
861 CORBA_long register_resource(fres_contract_broker obj,
862                             const frsh_resource_type_t restype,
863                             const frsh_resource_id_t resid,
864                             const fres_resource_desc *desc,
865                             CORBA_Environment *ev)
866 {
867         struct fcb *fcb = o2fcb(obj);
868         struct resource *res, *res2;
869
870         res = malloc(sizeof(*res));
871         if (!res) goto err;
872         memset(res, 0, sizeof(*res));
873         res->key.type = restype;
874         res->key.id = resid;
875         res2 = fcb_resource_find(fcb, &res->key);
876         if (res2) {
877                 if (forb_object_is_stale(res2->mng)) {
878                         ul_logmsg("Removing stale manager for resource %d.%d\n",
879                                   restype, resid);
880                         forb_object_release(res2->mng);
881                         fcb_resource_delete(fcb, res2);
882                         /* TODO: Delete also all allocators associated
883                          * with this stale resource manager. */
884                         free(res);
885                         res = res2;
886                 } else {
887                         ul_logerr("Resource manager %d.%d already registered\n",
888                                   restype, resid);
889                         goto free_err;
890                 }
891         }
892         res->mng = forb_object_duplicate(desc->manager);
893         res->name = desc->name;
894
895         fcb_alloc_init_root_field(res);
896         sc_contracts_init_head(res);
897         ul_logmsg("Registering manager for resource \"%s\" (%d.%d)\n",
898                   res->name, restype, resid);
899         fcb_resource_insert(fcb, res);
900         return 0;
901 free_err:
902         free(res);
903 err:
904         return -1;
905 }
906
907
908 CORBA_long register_allocator(fres_contract_broker obj,
909                               const frsh_resource_type_t restype,
910                               const frsh_resource_id_t resid,
911                               const fres_resource_allocator ra_obj,
912                               CORBA_Environment *ev)
913 {
914         struct fcb *fcb = o2fcb(obj);
915         struct resource *res;
916         struct res_alloc *ra;
917         struct res_key resource;
918         forb_server_id server_id;
919         char server_id_str[40];
920         int ret;
921
922         forb_get_server_id(ra_obj, &server_id);
923         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
924         ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
925                   restype, resid, server_id_str);
926         
927         resource.type = restype;
928         resource.id = resid;
929         res = fcb_resource_find(fcb, &resource);
930         if (!res) {
931                 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
932                           restype, resid);
933                 ret = FRES_ERR_NO_RESOURCE_MANAGER;
934                 goto err;
935         }
936         ra = fcb_alloc_find(res, &server_id);
937         if (ra) {
938                 char *str = forb_object_to_string(ra_obj);
939                 ul_logerr("Allocator from already registered (%s)\n",
940                           str);
941                 forb_free(str);
942                 ret = FRES_ERR_ALLOCATOR_ALREADY_REGISTERED;
943                 goto err;
944         }
945         ra = malloc(sizeof(*ra));
946         if (!ra) {
947                 ret = ENOMEM;
948                 goto err;
949         }
950         ra->app = server_id;
951         ra->ra = forb_object_duplicate(ra_obj);
952         fcb_alloc_insert(res, ra);
953         return 0;
954 err:
955         return ret;
956 }
957
958 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
959 {
960         struct fcb *fcb = o2fcb(obj);
961         fres_resource_seq *seq;
962         struct resource *res;
963         int n;
964
965         seq = malloc(sizeof(*seq));
966         if (!seq) {
967                 ev->major = FORB_EX_NO_MEMORY;
968                 return;
969         }
970         memset(seq, 0, sizeof(*seq));
971
972         n=0;
973         gavl_cust_for_each(fcb_resource, fcb, res) {
974                 n++;
975         }
976
977         seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
978         seq->_maximum = n;
979         seq->_length = n;
980
981         n = 0;
982         gavl_cust_for_each(fcb_resource, fcb, res) {
983                 seq->_buffer[n].restype = res->key.type;
984                 seq->_buffer[n].resid = res->key.id;
985                 seq->_buffer[n].desc.manager = res->mng;
986                 seq->_buffer[n].desc.name = res->name;
987                 n++;
988         }
989
990         *resources = seq;
991 }
992
993 static bool
994 transaction_has_spare_capacity(const fres_transaction_t* transaction)
995 {
996         struct fres_contract **c;
997         forb_sequence_foreach(&transaction->contracts, c)
998                 if (fres_contract_get_spare_capacity(*c))
999                         return true;
1000         return false;
1001 }
1002
1003 static int
1004 transaction_get_resources(struct fcb *fcb, struct fcb_contract *fc[],
1005                           int num, struct resource **resources[])
1006 {
1007         int i, j, r, alloc;
1008         struct resource **res;
1009         struct resource *resource;
1010         struct res_key key;
1011
1012         alloc = 10;
1013         res = malloc(alloc*sizeof(*res));
1014         if (!res) 
1015                 return errno;
1016         res[0] = NULL;
1017         for (r = 0, i = 0; i < num; i++) {
1018                 get_fc_res_key(fc[i], &key);
1019                 resource = fcb_resource_find(fcb, &key);
1020                 /* TODO: Check whether to use GSA for resources. */
1021         }
1022         res[r] = NULL;
1023         return 0;
1024 }
1025                                  
1026
1027
1028 CORBA_long
1029 negotiate_transaction(fres_contract_broker _obj,
1030                       const fres_transaction_t* transaction,
1031                       CORBA_Environment *ev)
1032 {
1033         struct fcb *fcb = o2fcb(_obj);
1034         struct fcb_contract **fcb_contracts;
1035         const fres_contract_ptr_seq* contracts = &transaction->contracts;
1036         int num = contracts->_length;
1037         int ret;
1038
1039         if (transaction_has_spare_capacity(transaction)) {
1040                 ret =  FRES_ERR_SPARE_CAPACITY_NOT_SUPPORTED;
1041                 goto err;
1042         }
1043
1044         fcb_contracts = malloc(sizeof(*fcb_contracts)*num);
1045         if (!fcb_contracts) {
1046                 ret = errno;
1047                 goto err;
1048         }
1049         memset(fcb_contracts, 0, sizeof(*fcb_contracts)*num);
1050
1051         ret = prepare_fcb_contracts(fcb, fcb_contracts,
1052                                     contracts->_buffer, num);
1053         if (ret)
1054                 goto err_free_fcb_contracts;
1055
1056         //ret = transaction_get_resources(fcb_contracts, num, &resources);
1057         
1058         return FRSH_ERR_NOT_IMPLEMENTED;
1059 err_free_fcb_contracts:
1060         free_fcb_contracts(fcb_contracts, num);
1061 err:
1062         return ret;
1063 }
1064
1065 CORBA_long
1066 wait_transaction(fres_contract_broker _obj,
1067                  const CORBA_char * name,
1068                  fres_transaction_t** transaction,
1069                  CORBA_Environment *ev)
1070 {
1071         return FRSH_ERR_NOT_IMPLEMENTED;
1072 }
1073
1074 CORBA_long
1075 allocate_transaction_vres(fres_contract_broker _obj,
1076                           const CORBA_long id,
1077                           CORBA_Environment *ev)
1078 {
1079         return FRSH_ERR_NOT_IMPLEMENTED;
1080 }
1081
1082
1083 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1084 static int register_inet_port(forb_orb orb)
1085 {
1086         forb_port_t *port = forb_malloc(sizeof(*port));
1087         int ret;
1088         struct in_addr listen_on;
1089         
1090         if (!port)
1091                 return -1;
1092         memset(port, 0, sizeof(*port));
1093         listen_on.s_addr = INADDR_ANY;
1094         ret = forb_inet_port_init(&port->desc, listen_on, 0);
1095         if (ret)
1096                 return ret;
1097         ret = forb_register_port(orb, port);
1098         return ret;
1099 }
1100 #endif
1101
1102 struct forb_fres_contract_broker_impl impl = {
1103         .negotiate_contracts = negotiate_contracts,
1104         .register_resource = register_resource,
1105         .register_allocator = register_allocator,
1106         .redistribute_spare_capacity = redistribute_spare_capacity,
1107         .get_resources = get_resources,
1108         .negotiate_transaction = negotiate_transaction,
1109         .wait_transaction = wait_transaction,
1110         .allocate_transaction_vres = allocate_transaction_vres,
1111 };
1112
1113 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
1114 {
1115         forb_server_id server_id;
1116         char server_id_str[sizeof(forb_server_id)*2+1];
1117
1118         forb_get_server_id(peer_orb, &server_id);
1119         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
1120 #if 0
1121         ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
1122 #endif
1123
1124         if (orb_id == NULL)
1125                 return;
1126
1127         if (strcmp(orb_id, "org.frescor.fcb") == 0) {
1128                 fosa_abs_time_t now;
1129                 fosa_rel_time_t delay;
1130                 fosa_clock_get_time(CLOCK_REALTIME, &now);
1131                 delay = fosa_abs_time_extract_interval(start_time, now);
1132                 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
1133                           fosa_rel_time_to_msec(delay));
1134         }
1135 }
1136
1137 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
1138 {
1139 }
1140
1141 static struct option long_opts[] = {
1142     { "daemon",   optional_argument, NULL, 'd' },
1143     { "loglevel", required_argument, NULL, 'l' },
1144     { 0, 0, 0, 0}
1145 };
1146
1147 static void
1148 usage(void)
1149 {
1150         printf("usage: fcb [ options ]\n");
1151         printf("  -d, --daemon [pid-file]   go to background after FORB initialization\n");
1152         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
1153 }
1154
1155 int print_log_domain(ul_log_domain_t *domain, void *context)
1156 {
1157         printf("%s = %d\n", domain->name, domain->level);
1158         return 0;
1159 }
1160
1161 int main(int argc, char *argv[])
1162 {
1163         forb_orb orb;
1164         struct fcb fcb_data;
1165         fres_contract_broker fcb;
1166         forb_executor_t executor;
1167         int ret;
1168         forb_init_attr_t attr = {
1169                 .orb_id = "org.frescor.fcb",
1170                 .peer_discovery_callback = peer_discovery_callback,
1171                 .peer_dead_callback = peer_dead_callback,
1172                 .fixed_tcp_port = FCB_TCP_PORT,
1173 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT           
1174                 .fixed_server_id = FCB_SERVER_ID,
1175                 .redistribute_hellos = true,
1176 #endif
1177         };
1178         int  opt;
1179
1180         while ((opt = getopt_long(argc, argv, "d::l:", &long_opts[0], NULL)) != EOF) {
1181                 switch (opt) {
1182                         case 'l':
1183                                 if (*optarg == '?') {
1184                                         ul_logreg_for_each_domain(print_log_domain, NULL);
1185                                         exit(0);
1186                                 }
1187                                 {
1188                                         int ret;
1189                                         ret = ul_log_domain_arg2levels(optarg);
1190                                         if (ret) 
1191                                                 error(1, EINVAL, "Error parsing -l argument at char %d\n", ret);
1192                                 }
1193                                 break;
1194                         case 'd':
1195                                 opt_daemon = true;
1196                                 opt_pidfile = optarg;
1197                                 break;
1198                         case 'h':
1199                         /*default:*/
1200                                 usage();
1201                                 exit(opt == 'h' ? 0 : 1);
1202                 }
1203         }
1204
1205         fosa_clock_get_time(CLOCK_REALTIME, &start_time);
1206
1207         if (opt_daemon)
1208                 forb_daemon_prepare(opt_pidfile);
1209
1210         orb = forb_init(&argc, &argv, &attr);
1211         if (!orb) error(1, errno, "FORB initialization failed");
1212
1213 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1214         ret = register_inet_port(orb);
1215         if (ret) error(0, errno, "INET port registration failed");
1216 #endif
1217
1218         fcb_resource_init_root_field(&fcb_data);
1219         fcb_contract_init_root_field(&fcb_data);
1220
1221         fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
1222         if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
1223
1224         /* Prepare executor before we register the fcb reference,
1225          * so that no reuqests are lost */
1226         ret = forb_executor_init(&executor);
1227         if (ret) error(1, errno, "forb_executor_init failed");
1228         
1229         ret = forb_executor_register_object(&executor, fcb);
1230         if (ret) error(1, errno, "forb_executor_register_object failed");
1231
1232         ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
1233         if (ret) error(1, errno, "forb_register_reference() failed");
1234
1235         ul_logmsg("Waiting for requests\n");
1236         if (opt_daemon)
1237                 forb_daemon_ready();
1238
1239         ret = forb_executor_run(&executor);
1240         if (ret) error(1, errno, "forb_executor_run failed");
1241         
1242         return 0;
1243 }