]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/cbroker/fcb.c
Unify parameters of forb_sequence_*()
[frescor/frsh.git] / fres / cbroker / fcb.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   fcb.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Oct 20 18:00:18 2008
51  * 
52  * @brief  FRESCOR Contract Broker
53  * 
54  * 
55  */
56 #include <semaphore.h>
57 #include <getopt.h>
58 #include <forb.h>
59 #include <forb/config.h>
60 #include <fcb.h>
61 #include <fcb_contact_info.h>
62 #include <error.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <ul_gavlcust.h>
66 #include <string.h>
67 #include <ul_log.h>
68 #include <ul_logreg.h>
69 #include <forb/server_id.h>
70 #include <fres_contract.h>
71 #include "fcb_config.h"
72 #include <fosa_clocks_and_timers.h>
73 #include "contract_log.h"
74 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
75 #include <forb/proto_inet.h>
76 #endif
77
78 UL_LOG_CUST(ulogd_fcb);
79 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "main"};
80 UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(init_ulogd_fcb, ulogd_fcb);
81         
82 bool opt_daemon = false;
83 char *opt_pidfile = NULL;
84 fosa_abs_time_t start_time;
85
86
87 /**
88  * Resource identification 
89  */
90 struct res_key {
91         frsh_resource_type_t type;
92         frsh_resource_id_t id;
93 };
94
95 /**
96  * Registered resource
97  */
98 struct resource {
99         gavl_node_t node;
100         struct res_key key;
101         char *name;
102         fres_resource_manager mng; /**< Object reference of the resource manager */
103         gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
104         ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
105 };
106
107 /**
108  * Resource allocator registered in different nodes/applications 
109  */
110 struct res_alloc {
111         gavl_node_t node;
112         forb_server_id app;
113         fres_resource_allocator ra;
114 };
115
116 struct fcb_contract {
117         fres_contract_id_t id;
118         struct res_alloc *ra;   /**< Allocator for this contract TODO: Remove contract if allocator is destroyed */
119         gavl_node_t node_fcb;
120         ul_list_node_t node_sc;
121         ul_list_node_t node_reservation;
122         struct {
123                 int initial;
124                 int try;
125         } sc_variant;
126         fosa_abs_time_t end_of_stability_period;
127         struct fres_contract *user_contract; /* The contract sent by user. Set after sucessfull neotiation. */
128         struct fres_contract *requested_contract; /* User request for the contract. Set by prepare_reservation_requests() in the beginning of negotiation. */
129         struct fres_contract *to_be_reserved_contract;
130         struct fres_contract *schedulable_contract;
131 };
132
133 /**
134  * Contract broker data
135  */
136 struct fcb {
137         gavl_cust_root_field_t resources; /**< Registered resources */
138         gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
139 };
140
141 /** List of contracts to be reserved during spare capacity rebalancing */
142 struct reservation_list {
143         ul_list_head_t fcb_contracts;
144         unsigned length;
145 };
146
147 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
148 {
149         struct fcb_contract *fcb_contract;
150         
151         fcb_contract = malloc(sizeof(*fcb_contract));
152         if (!fcb_contract) {
153                 return NULL;
154         }
155         memset(fcb_contract, 0, sizeof(*fcb_contract));
156         fcb_contract->id = *id;
157
158         return fcb_contract;
159 }
160
161 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
162 {
163         if (fcb_contract->user_contract) {
164                 fres_contract_destroy(fcb_contract->user_contract);
165         }
166         if (fcb_contract->to_be_reserved_contract) {
167                 fres_contract_destroy(fcb_contract->to_be_reserved_contract);
168         }
169         if (fcb_contract->requested_contract) {
170                 fres_contract_destroy(fcb_contract->requested_contract);
171         }
172         if (fcb_contract->schedulable_contract) {
173                 fres_contract_destroy(fcb_contract->requested_contract);
174         }
175         free(fcb_contract);
176 }
177
178 static inline int res_key_cmp(const struct res_key *a,
179                               const struct res_key *b)
180 {
181         if (a->type < b->type) {
182                 return -1;
183         } else if (a->type > b->type) {
184                 return +1;
185         } else if (a->id < b->id) {
186                 return -1;
187         } else if (a->id > b->id) {
188                 return +1;
189         } else {
190                 return 0;
191         }
192 }
193
194 /* Container for registered resources */
195 GAVL_CUST_NODE_INT_DEC(fcb_resource   /* cust_prefix */,                \
196                        struct fcb     /* cust_root_t */,                \
197                        struct resource/* cust_item_t */,                \
198                        struct res_key /* cust_key_t */,                 \
199                        resources      /* cust_root_node */,             \
200                        node           /* cust_item_node */,             \
201                        key            /* cust_item_key */,              \
202                        res_key_cmp    /* cust_cmp_fnc */);
203
204 GAVL_CUST_NODE_INT_IMP(fcb_resource   /* cust_prefix */,                \
205                        struct fcb     /* cust_root_t */,                \
206                        struct resource/* cust_item_t */,                \
207                        struct res_key /* cust_key_t */,                 \
208                        resources      /* cust_root_node */,             \
209                        node           /* cust_item_node */,             \
210                        key            /* cust_item_key */,              \
211                        res_key_cmp    /* cust_cmp_fnc */);
212
213 /* Container for allocators registered for a given resource */
214 GAVL_CUST_NODE_INT_DEC(fcb_alloc        /* cust_prefix */,              \
215                        struct resource  /* cust_root_t */,              \
216                        struct res_alloc /* cust_item_t */,              \
217                        forb_server_id   /* cust_key_t */,               \
218                        allocators       /* cust_root_node */,           \
219                        node             /* cust_item_node */,           \
220                        app              /* cust_item_key */,            \
221                        fres_contract_id_cmp /* cust_cmp_fnc */);
222
223 GAVL_CUST_NODE_INT_IMP(fcb_alloc        /* cust_prefix */,              \
224                        struct resource   /* cust_root_t */,             \
225                        struct res_alloc /* cust_item_t */,              \
226                        forb_server_id   /* cust_key_t */,               \
227                        allocators       /* cust_root_node */,           \
228                        node             /* cust_item_node */,           \
229                        app              /* cust_item_key */,            \
230                        forb_server_id_cmp /* cust_cmp_fnc */);
231
232 /* Container for contracts with spare capacity negotiated for a given resource */
233 UL_LIST_CUST_DEC(sc_contracts        /* cust_prefix */,                 \
234                  struct resource  /* cust_head_t */,                    \
235                  struct fcb_contract /* cust_item_t */,                 \
236                  sc_contracts   /* cust_head_field */,                  \
237                  node_sc        /* cust_node_field */);
238
239 UL_LIST_CUST_DEC(reservation_list        /* cust_prefix */,             \
240                  struct reservation_list  /* cust_head_t */,            \
241                  struct fcb_contract /* cust_item_t */,                 \
242                  fcb_contracts  /* cust_head_field */,                  \
243                  node_reservation /* cust_node_field */);
244
245 /* Container for negotiated contracts */
246 GAVL_CUST_NODE_INT_DEC(fcb_contract         /* cust_prefix */,          \
247                        struct fcb           /* cust_root_t */,          \
248                        struct fcb_contract  /* cust_item_t */,          \
249                        fres_contract_id_t   /* cust_key_t */,           \
250                        contracts            /* cust_root_node */,       \
251                        node_fcb             /* cust_item_node */,       \
252                        id                   /* cust_item_key */,        \
253                        fres_contract_id_cmp /* cust_cmp_fnc */);
254
255 #if 1
256 GAVL_CUST_NODE_INT_IMP(fcb_contract         /* cust_prefix */,          \
257                        struct fcb           /* cust_root_t */,          \
258                        struct fcb_contract  /* cust_item_t */,          \
259                        fres_contract_id_t   /* cust_key_t */,           \
260                        contracts            /* cust_root_node */,       \
261                        node_fcb             /* cust_item_node */,       \
262                        id                   /* cust_item_key */,        \
263                        fres_contract_id_cmp /* cust_cmp_fnc */);
264 #else
265 #include "fcb_contract_gavl.inc"
266 #endif
267
268
269 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
270
271 struct res_key*
272 get_res_key(const struct fres_contract *contract, struct res_key *key)
273 {
274         fres_block_resource *block_res;
275         
276         block_res = fres_contract_get_resource(contract);
277         if (!block_res) {
278                 ul_logerr("No resource specified\n");
279                 return NULL;
280         } else {
281                 key->type = block_res->resource_type;
282                 key->id = block_res->resource_id;
283         }
284         return key;
285 }
286
287 /** 
288  * Fills in an array of fcb_contracts according to requests submited
289  * to negotiate_contracts(). For every contract in @a contracts, there
290  * is one fcb_contract in @a fcb_contracts array.
291  * 
292  * @param fcb
293  * @param fcb_contracts Where to store pointers to fcb_contracts
294  * @param resource Resource for which negotiation is being done.
295  * @param app Application which requests negotiation
296  * @param contracts Contracts submited to negotiate_contracts() 
297  * @param num Number of contracts in contracts (which is the same as the number in fcb_contracts).
298  * 
299  * @return Zero on success, non-zero error code on error.
300  */
301 static int
302 prepare_fcb_contracts(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
303                       struct resource **resource,
304                       forb_server_id *app,
305                       struct fres_contract *contracts[], int num)
306 {
307         unsigned i;
308         struct fcb_contract *fc;
309         struct res_alloc *ra;
310         struct res_key key, key2 = {-1,-1};
311
312         for (i=0; i<num; i++) {
313                 struct fres_contract *c = contracts[i];
314
315                 if (fres_contract_id_is_empty(&c->id)) {
316                         /* Normal negotiation request */
317                         forb_uuid_generate((forb_uuid_t *)&c->id);
318                         fc = fcb_contract_new(&c->id);
319                         if (!fc)
320                                 return errno;
321                         fcb_contracts[i] = fc;
322                         if (!get_res_key(c, &key)) {
323                                 return FRSH_ERR_RESOURCE_ID_INVALID;
324                         }
325                         log_contract("Negotiation request", i, c);
326                 } else {
327                         fc = fcb_contract_find(fcb, &c->id);
328                         if (!fc) {
329                                 char str[60];
330                                 fres_contract_id_to_string(str, &c->id, sizeof(str));
331                                 ul_logerr("Attempt to change unknown contract %s\n", str);
332                                 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
333                         } else {
334                                 fcb_contracts[i] = fc;
335                                 if (fres_contract_get_num_blocks(c) == 0) {
336                                         /* Cancelation */
337                                         get_res_key(fc->user_contract, &key);
338                                         log_contract("Cancelation request", i, fc->user_contract);
339                                 } else {
340                                         /* Renegotiation */
341                                         if (!get_res_key(c, &key)) {
342                                                 return FRSH_ERR_RESOURCE_ID_INVALID;
343                                         }
344                                         log_contract("Renegotiation request", i, fc->user_contract);
345                                 }
346                         }
347                 }
348                 fc->requested_contract = c;
349
350                 /* Check that all contracts are for the same resource */
351                 if (i==0) {
352                         key2 = key;
353                         *resource = fcb_resource_find(fcb, &key);
354                         if (!*resource) {
355                                 ul_logerr("No resource manager for %d.%d registered\n",
356                                           key.type, key.id);
357                                 return FRES_ERR_NO_RESOURCE_MANAGER;
358                         }
359                 }
360                 else if (key.type != key2.type ||
361                          key.id   != key2.id) {
362                         ul_logerr("Contract #%d differs in resource!\n", i);
363                         return FRSH_ERR_RESOURCE_ID_INVALID;
364                 }
365
366                 /* Find allocator for this request */
367                 ra = fcb_alloc_find(*resource, app);
368                 if (!ra) {
369                         char str[60];
370                         forb_server_id_to_string(str, app, sizeof(str));
371                         ul_logerr("No resource allocator found for %d.%d and %s\n",
372                                   (*resource)->key.type, (*resource)->key.id, str);
373                         return FRES_ERR_NO_RESOURCE_ALLOCATOR;
374                 }
375                 fc->ra = ra;
376         }       
377         return 0;
378 }
379
380 /**
381  * @param resource Resource for which to rebalance capacity and negotiate new contracts
382  * @param fcb_contract New requests to negotiate
383  * @param num The number of elements in @a fcb_contract
384  * @param[out] rl List with changed contracts to be commited
385  */
386 static void
387 prepare_reservation_list(struct resource *resource,
388                          struct fcb_contract *fcb_contract[], int num,
389                          struct reservation_list *rl)
390 {
391         int i;
392         fosa_abs_time_t now;
393         struct fcb_contract *fc;
394
395         reservation_list_init_head(rl);
396         rl->length = 0;
397         for (i=0; i<num; i++) {
398                 assert(fcb_contract[i]->requested_contract != NULL);
399                 reservation_list_insert(rl, fcb_contract[i]);
400                 rl->length++;
401         }
402         fosa_clock_get_time(CLOCK_REALTIME, &now);
403         ul_list_for_each(sc_contracts, resource, fc) {
404                 if (fosa_abs_time_smaller(fc->end_of_stability_period, now) &&
405                     fc->requested_contract == NULL) /* Do not insert contract inserted above */
406                 {
407                         reservation_list_insert(rl, fc);
408                         rl->length++;
409                 }
410         }
411 }
412
413 /** 
414  * 
415  * 
416  * @param resource Resource for which to rebalance capacity and negotiate new contracts
417  * @param rl List with changed contracts to be commited
418  * 
419  * @return Zero on success, non-zero error code on error
420  */
421 static int
422 rebalance_spare_capacity_and_reserve(struct resource *resource,
423                                      struct reservation_list *rl)
424 {
425         int ret;
426         unsigned i;
427         struct fcb_contract *fc;
428         fres_block_spare_capacity *s;
429
430         fres_contract_ptr_seq contracts;
431         if (!forb_sequence_alloc_buf(&contracts, rl->length)) {
432                 return errno;
433         }
434         contracts._length = rl->length;
435         i=0;
436         /* Initialize optimization */
437         ul_list_for_each(reservation_list, rl, fc) {
438                 fc->to_be_reserved_contract =
439                         fc->requested_contract ? fres_contract_duplicate(fc->requested_contract) :
440                         fc->user_contract ? fres_contract_duplicate(fc->user_contract) :
441                         NULL;
442                 assert(fc->to_be_reserved_contract != NULL);
443
444                 forb_sequence_elem(&contracts, i) = fc->to_be_reserved_contract;
445                 i++;
446                 
447                 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
448                 if (s && s->granularity == FRSH_GR_DISCRETE) {
449                         fc->sc_variant.initial = s->variants._length - 1;
450                         fc->sc_variant.try = fc->sc_variant.initial;
451                 }
452         }
453
454         bool all_combinations_tried;
455         int criterion, best_criterion = -1;
456         struct fcb_contract *fcb_changed;
457         /* Exhaustive search. Try all combinations of spare capacity
458          * variants and find the one with best creterion. */
459         do {
460                 all_combinations_tried = true;
461                 fcb_changed = NULL;
462                 criterion = 0;
463                 /* Prepare spare capacity variant */
464                 ul_list_for_each(reservation_list, rl, fc) {
465                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
466                         /* TODO: Simulate continuous granularity by discretization */
467                         if (s && s->granularity == FRSH_GR_DISCRETE) {
468                                 fres_container_copy(fc->to_be_reserved_contract->container,
469                                                     forb_sequence_elem(&s->variants, fc->sc_variant.try));
470                                 criterion += fc->sc_variant.try;
471
472                                 if (fcb_changed == NULL) {
473                                         /* Chnage tried variant for the next round */
474                                         fc->sc_variant.try = fc->sc_variant.try > 0 ?
475                                                 fc->sc_variant.try - 1 :
476                                                 s->variants._length - 1;
477                                         /* Do not change other
478                                          * contracts unless we have
479                                          * tried all variants */
480                                         if (fc->sc_variant.try != fc->sc_variant.initial) {
481                                                 fcb_changed = fc;
482                                         }
483                                 }
484                                 if (fc->sc_variant.try != fc->sc_variant.initial)
485                                         all_combinations_tried = false;
486                         }
487                 }
488
489                 if (criterion > best_criterion) {
490                         CORBA_Environment ev;
491                         /* Reserve contract */
492                         ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, &ev);
493                         if (forb_exception_occurred(&ev)) {
494                                 ret = fres_forbex2err(&ev);
495                                 ul_logerr("FORB exception when reserving contracts\n");
496                                 goto err;
497                         }
498                         if (ret < 0) {
499                                 ul_logerr("Contract reservation error %d\n", ret);
500                                 ret = FRES_ERR_ADMISSION_TEST;
501                                 goto err;
502                         }
503                         if (ret == 0) { /* negotiation succeeded */
504                                 best_criterion = criterion;
505                         }
506                 }
507         } while (!all_combinations_tried);
508
509         if (best_criterion == -1) {
510                 ret = FRSH_ERR_CONTRACT_REJECTED;
511         } else {
512                 /* At least some variant succeeded */
513                 ret = 0;
514                 sc_contracts_init_head(resource);
515                 ul_list_for_each(reservation_list, rl, fc) {
516                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
517                         if (s && s->granularity == FRSH_GR_DISCRETE) {
518                                 sc_contracts_insert(resource, fc);
519                         }
520                 }
521         }
522 err:
523         ul_list_for_each(reservation_list, rl, fc) {
524                 fres_contract_destroy(fc->to_be_reserved_contract);
525                 fc->to_be_reserved_contract = NULL;
526         }
527         forb_sequence_free_buf(&contracts, forb_no_destructor);
528         return ret;
529 }
530
531 /**
532  * Create/change VReses according to @a schedulable_contracts.
533  *
534  * There might be more allocators for schedulable contracts, so merge
535  * adjacent vreses with the same allocator together and create/change
536  * them in one step. Also the order of schedulable contracts might be
537  * different from the initial order od ids in commit_contracts()
538  * therefore we have to search for every contract.
539  */
540 static int
541 change_vreses(struct fcb *fcb, fres_contract_ptr_seq *schedulable_contracts)
542 {
543         struct res_alloc *last_ra = NULL;
544         fres_contract_ptr_seq vreses;
545         int i, ret = 0;
546         CORBA_Environment ev;
547         
548         if (!forb_sequence_alloc_buf(&vreses, schedulable_contracts->_length)) {
549                 return errno;
550         }
551         vreses._length = 0;
552         
553         for (i=0; i<schedulable_contracts->_length; i++) {
554                 struct fcb_contract *fc;
555                 struct fres_contract *sc = schedulable_contracts->_buffer[i];
556
557                 log_contract("Changing VRES", i, sc);
558
559                 fc = fcb_contract_find(fcb, &sc->id);
560                 assert(fc != NULL);
561
562                 if (true /* TODO: if the schedulable contract is changed */) {
563                         if (last_ra != fc->ra) {
564                                 if (vreses._length) {
565                                         ret = fres_resource_allocator_change_vreses(last_ra->ra,
566                                                                                     &vreses, &ev);
567                                         if (forb_exception_occurred(&ev)) {
568                                                 ret = fres_forbex2err(&ev);
569                                                 goto err;
570                                         }
571                                         if (ret) goto err;
572                                 }
573                                 vreses._length = 0;
574                         }
575                         vreses._buffer[vreses._length] = sc;
576                         vreses._length++;
577                         fres_contract_destroy(fc->schedulable_contract);
578                         fc->schedulable_contract = fres_contract_duplicate(sc);
579                         last_ra = fc->ra;
580                 
581                 }
582         }
583         if (last_ra) {
584                 ret = fres_resource_allocator_change_vreses(last_ra->ra,
585                                                             &vreses, &ev);
586                 if (forb_exception_occurred(&ev))
587                         ret = fres_forbex2err(&ev);
588         }
589 err:
590         forb_sequence_free_buf(&vreses, forb_no_destructor);
591         return ret;
592 }
593
594 CORBA_long
595 negotiate_contracts(fres_contract_broker obj,
596                     const fres_contract_ptr_seq* contracts,
597                     fres_contract_id_seq** ids_out,
598                     CORBA_Environment *ev)
599 {
600         struct fcb *fcb = o2fcb(obj);
601         struct resource *resource = NULL;
602         int ret = 0;
603         forb_server_id app;
604         fres_contract_ptr_seq *schedulable_contracts;
605         struct fcb_contract **fcb_contracts, *fc;
606         unsigned i;
607         fres_contract_id_seq commit_ids;
608
609         /* Prepare output sequence for the case we return eariler with
610          * an error */
611         forb_sequence_alloc(*ids_out, 0);
612         if (!*ids_out) {
613                 ev->major = FORB_EX_NO_MEMORY;
614                 goto err;
615         }
616         CORBA_sequence_set_release(*ids_out, CORBA_TRUE);
617         
618         forb_get_req_source(obj, &app);
619         
620         fcb_contracts = malloc(sizeof(fcb_contracts[0])*contracts->_length);
621         if (!fcb_contracts) {
622                 ret = errno;
623                 goto err;
624         }
625         memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*contracts->_length);
626
627         ret = prepare_fcb_contracts(fcb, fcb_contracts, &resource,
628                                     &app, contracts->_buffer, contracts->_length);
629         if (ret)
630                 goto err_free_fcb_contracts;
631
632         struct reservation_list rl;
633         prepare_reservation_list(resource,
634                                  fcb_contracts, contracts->_length,
635                                  &rl);
636
637         /* Allocate all the needed memory before doing reservation. If
638          * there is no enough memory, it has no sense to call resource
639          * manager. */
640         if (!forb_sequence_alloc_buf(&commit_ids, rl.length)) {
641                 ret = errno;
642                 goto err_free_fcb_contracts;
643         }
644
645         /* Reserve contracts */
646         ret = rebalance_spare_capacity_and_reserve(resource, &rl);
647         if (ret) {
648                 if (ret == FRSH_ERR_CONTRACT_REJECTED) {
649                         ul_logmsg("Contract(s) was/were rejected\n");
650                 } else {
651                         char msg[100];
652                         fres_strerror(ret, msg, sizeof(msg));
653                         ul_logerr("Reservation error: %s\n", msg);
654                 }
655                 goto err_free_fcb_contracts;
656         }
657
658         /* Commit contracts */
659         commit_ids._length = rl.length;
660         i=0;
661         ul_list_for_each(reservation_list, &rl, fc) {
662                 forb_sequence_elem(&commit_ids, i) = fc->id;
663                 i++;
664         }
665         
666         fres_resource_manager_commit_contracts(resource->mng, &commit_ids,
667                                                &schedulable_contracts, ev);
668         if (forb_exception_occurred(ev)) {
669                 ret = fres_forbex2err(ev);
670                 goto err_cancel_reservation;
671         }
672
673         /* Add new contracts to our fcb database for later
674          * reference. Canceled contracts are removed below. */
675         for (i=0; i<contracts->_length; i++) {
676                 fc =  fcb_contracts[i];
677
678                 if (fc->user_contract) {
679                         if (fres_contract_get_num_blocks(fc->requested_contract) > 0) {
680                                 /* Renegotiation */
681                                 fres_contract_destroy(fc->user_contract);
682                                 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
683                                 /* Note: requested_contract is also
684                                  * pointed by contracts parameter and
685                                  * will be freed by FORB. */
686                                 fc->requested_contract = NULL;
687                         }
688                 } else {
689                         /* Insert new contracts */
690                         fcb_contract_insert(fcb, fcb_contracts[i]);
691                         fc->user_contract = fres_contract_duplicate(fc->requested_contract);
692                         fc->requested_contract = NULL;
693                         /* See the note above. */
694                 }
695         }
696
697         ret = change_vreses(fcb, schedulable_contracts);
698         if (ret)
699                 goto err_cancel_reservation;
700         
701         forb_sequence_free(schedulable_contracts, fres_contract_ptr_destroy);
702         if (ret != 0) {
703                 ul_logerr("VRes was not created\n");
704                 goto err_cancel_contracts;
705         }
706
707
708         /* Return IDs and delete canceled contracts from out database */
709         if (!forb_sequence_alloc_buf(*ids_out, contracts->_length)) {
710                 ev->major = FORB_EX_NO_MEMORY;
711                 goto err_cancel_contracts;
712         }
713         (*ids_out)->_length = contracts->_length;
714         for (i=0; i<contracts->_length; i++) {
715                 struct fcb_contract *fc =  fcb_contracts[i];
716                 forb_sequence_elem(*ids_out, i) = fc->id;
717
718                 if (fc->requested_contract &&
719                     fres_contract_get_num_blocks(fc->requested_contract) == 0) {
720                         fcb_contract_delete(fcb, fc);
721                         /* Note: requested_contract is also pointed by
722                          * contracts parameter and will be freed by FORB. */
723                         fc->requested_contract = NULL;
724                         fcb_contract_destroy(fc);
725                 }
726         }
727         return 0;
728
729 err_cancel_contracts:
730         /* TODO */
731         goto err_free_fcb_contracts;
732 err_cancel_reservation:
733         fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, ev);
734 err_free_fcb_contracts:
735         for (i=0; i<contracts->_length; i++) {
736                 fc = fcb_contracts[i];
737                 if (fc && !fc->user_contract) {
738                         fcb_contracts[i]->requested_contract = NULL; /* Destroyed by FORB */
739                         fcb_contract_destroy(fcb_contracts[i]);
740                 }
741         }
742         free(fcb_contracts);
743 err:
744         return ret;
745 }
746
747 void redistribute_spare_capacity(fres_contract_broker obj,
748                                  const frsh_resource_type_t restype,
749                                  const frsh_resource_id_t resid,
750                                  CORBA_Environment *ev)
751 {
752         struct fcb *fcb = o2fcb(obj);
753         struct res_key key = {restype, resid };
754         struct resource *resource;
755         struct reservation_list rl;
756         
757         resource = fcb_resource_find(fcb, &key);
758
759         prepare_reservation_list(resource, NULL, 0, &rl);
760
761 /*      forb_sequence_alloc(ids, rl.length); */
762 /*      if (!ids || !ids->_buffer) { */
763 /*              ev->major = FORB_EX_NO_MEMORY; */
764 /*              goto err_free_fcb_contracts; */
765 /*      } */
766 /*      CORBA_sequence_set_release(ids, CORBA_TRUE); */
767 /*      *ids_out = ids;         /\* ids is freed by FORB *\/ */
768         
769         
770         rebalance_spare_capacity_and_reserve(resource, &rl);
771         /* Commit */
772 }
773
774 CORBA_long register_resource(fres_contract_broker obj,
775                             const frsh_resource_type_t restype,
776                             const frsh_resource_id_t resid,
777                             const fres_resource_desc *desc,
778                             CORBA_Environment *ev)
779 {
780         struct fcb *fcb = o2fcb(obj);
781         struct resource *res, *res2;
782
783         res = malloc(sizeof(*res));
784         if (!res) goto err;
785         memset(res, 0, sizeof(*res));
786         res->key.type = restype;
787         res->key.id = resid;
788         res2 = fcb_resource_find(fcb, &res->key);
789         if (res2) {
790                 if (forb_object_is_stale(res2->mng)) {
791                         ul_logmsg("Removing stale manager for resource %d.%d\n",
792                                   restype, resid);
793                         forb_object_release(res2->mng);
794                         fcb_resource_delete(fcb, res2);
795                         /* TODO: Delete also all allocators associated
796                          * with this stale resource manager. */
797                         free(res);
798                         res = res2;
799                 } else {
800                         ul_logerr("Resource manager %d.%d already registered\n",
801                                   restype, resid);
802                         goto free_err;
803                 }
804         }
805         res->mng = forb_object_duplicate(desc->manager);
806         res->name = desc->name;
807
808         fcb_alloc_init_root_field(res);
809         sc_contracts_init_head(res);
810         ul_logmsg("Registering manager for resource \"%s\" (%d.%d)\n",
811                   res->name, restype, resid);
812         fcb_resource_insert(fcb, res);
813         return 0;
814 free_err:
815         free(res);
816 err:
817         return -1;
818 }
819
820
821 CORBA_long register_allocator(fres_contract_broker obj,
822                               const frsh_resource_type_t restype,
823                               const frsh_resource_id_t resid,
824                               const fres_resource_allocator ra_obj,
825                               CORBA_Environment *ev)
826 {
827         struct fcb *fcb = o2fcb(obj);
828         struct resource *res;
829         struct res_alloc *ra;
830         struct res_key resource;
831         forb_server_id server_id;
832         char server_id_str[40];
833         int ret;
834
835         forb_get_server_id(ra_obj, &server_id);
836         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
837         ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
838                   restype, resid, server_id_str);
839         
840         resource.type = restype;
841         resource.id = resid;
842         res = fcb_resource_find(fcb, &resource);
843         if (!res) {
844                 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
845                           restype, resid);
846                 ret = FRES_ERR_NO_RESOURCE_MANAGER;
847                 goto err;
848         }
849         ra = fcb_alloc_find(res, &server_id);
850         if (ra) {
851                 char *str = forb_object_to_string(ra_obj);
852                 ul_logerr("Allocator from already registered (%s)\n",
853                           str);
854                 forb_free(str);
855                 ret = FRES_ERR_ALLOCATOR_ALREADY_REGISTERED;
856                 goto err;
857         }
858         ra = malloc(sizeof(*ra));
859         if (!ra) {
860                 ret = ENOMEM;
861                 goto err;
862         }
863         ra->app = server_id;
864         ra->ra = forb_object_duplicate(ra_obj);
865         fcb_alloc_insert(res, ra);
866         return 0;
867 err:
868         return ret;
869 }
870
871 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
872 {
873         struct fcb *fcb = o2fcb(obj);
874         fres_resource_seq *seq;
875         struct resource *res;
876         int n;
877
878         seq = malloc(sizeof(*seq));
879         if (!seq) {
880                 ev->major = FORB_EX_NO_MEMORY;
881                 return;
882         }
883         memset(seq, 0, sizeof(*seq));
884
885         n=0;
886         gavl_cust_for_each(fcb_resource, fcb, res) {
887                 n++;
888         }
889
890         seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
891         seq->_maximum = n;
892         seq->_length = n;
893
894         n = 0;
895         gavl_cust_for_each(fcb_resource, fcb, res) {
896                 seq->_buffer[n].restype = res->key.type;
897                 seq->_buffer[n].resid = res->key.id;
898                 seq->_buffer[n].desc.manager = res->mng;
899                 seq->_buffer[n].desc.name = res->name;
900                 n++;
901         }
902
903         *resources = seq;
904 }
905
906 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
907 static int register_inet_port(forb_orb orb)
908 {
909         forb_port_t *port = forb_malloc(sizeof(*port));
910         int ret;
911         struct in_addr listen_on;
912         
913         if (!port)
914                 return -1;
915         memset(port, 0, sizeof(*port));
916         listen_on.s_addr = INADDR_ANY;
917         ret = forb_inet_port_init(&port->desc, listen_on, 0);
918         if (ret)
919                 return ret;
920         ret = forb_register_port(orb, port);
921         return ret;
922 }
923 #endif
924
925 struct forb_fres_contract_broker_impl impl = {
926         .negotiate_contracts = negotiate_contracts,
927         .register_resource = register_resource,
928         .register_allocator = register_allocator,
929         .redistribute_spare_capacity = redistribute_spare_capacity,
930         .get_resources = get_resources,
931 };
932
933 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
934 {
935         forb_server_id server_id;
936         char server_id_str[sizeof(forb_server_id)*2+1];
937
938         forb_get_server_id(peer_orb, &server_id);
939         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
940 #if 0
941         ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
942 #endif
943
944         if (orb_id == NULL)
945                 return;
946
947         if (strcmp(orb_id, "org.frescor.fcb") == 0) {
948                 fosa_abs_time_t now;
949                 fosa_rel_time_t delay;
950                 fosa_clock_get_time(CLOCK_REALTIME, &now);
951                 delay = fosa_abs_time_extract_interval(start_time, now);
952                 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
953                           fosa_rel_time_to_msec(delay));
954         }
955 }
956
957 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
958 {
959 }
960
961 static struct option long_opts[] = {
962     { "daemon",   optional_argument, NULL, 'd' },
963     { "loglevel", required_argument, NULL, 'l' },
964     { 0, 0, 0, 0}
965 };
966
967 static void
968 usage(void)
969 {
970         printf("usage: fcb [ options ]\n");
971         printf("  -d, --daemon [pid-file]   go to background after FORB initialization\n");
972         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
973 }
974
975 int print_log_domain(ul_log_domain_t *domain, void *context)
976 {
977         printf("%s = %d\n", domain->name, domain->level);
978         return 0;
979 }
980
981 int main(int argc, char *argv[])
982 {
983         forb_orb orb;
984         struct fcb fcb_data;
985         fres_contract_broker fcb;
986         forb_executor_t executor;
987         int ret;
988         forb_init_attr_t attr = {
989                 .orb_id = "org.frescor.fcb",
990                 .peer_discovery_callback = peer_discovery_callback,
991                 .peer_dead_callback = peer_dead_callback,
992                 .fixed_tcp_port = FCB_TCP_PORT,
993 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT           
994                 .fixed_server_id = FCB_SERVER_ID,
995                 .redistribute_hellos = true,
996 #endif
997         };
998         int  opt;
999
1000         while ((opt = getopt_long(argc, argv, "d::l:", &long_opts[0], NULL)) != EOF) {
1001                 switch (opt) {
1002                         case 'l':
1003                                 if (*optarg == '?') {
1004                                         ul_logreg_for_each_domain(print_log_domain, NULL);
1005                                         exit(0);
1006                                 }
1007                                 {
1008                                         int ret;
1009                                         ret = ul_log_domain_arg2levels(optarg);
1010                                         if (ret) 
1011                                                 error(1, EINVAL, "Error parsing -l argument at char %d\n", ret);
1012                                 }
1013                                 break;
1014                         case 'd':
1015                                 opt_daemon = true;
1016                                 opt_pidfile = optarg;
1017                                 break;
1018                         case 'h':
1019                         /*default:*/
1020                                 usage();
1021                                 exit(opt == 'h' ? 0 : 1);
1022                 }
1023         }
1024
1025         fosa_clock_get_time(CLOCK_REALTIME, &start_time);
1026
1027         if (opt_daemon)
1028                 forb_daemon_prepare(opt_pidfile);
1029
1030         orb = forb_init(&argc, &argv, &attr);
1031         if (!orb) error(1, errno, "FORB initialization failed");
1032
1033 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1034         ret = register_inet_port(orb);
1035         if (ret) error(0, errno, "INET port registration failed");
1036 #endif
1037
1038         fcb_resource_init_root_field(&fcb_data);
1039         fcb_contract_init_root_field(&fcb_data);
1040
1041         fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
1042         if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
1043
1044         /* Prepare executor before we register the fcb reference,
1045          * so that no reuqests are lost */
1046         ret = forb_executor_init(&executor);
1047         if (ret) error(1, errno, "forb_executor_init failed");
1048         
1049         ret = forb_executor_register_object(&executor, fcb);
1050         if (ret) error(1, errno, "forb_executor_register_object failed");
1051
1052         ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
1053         if (ret) error(1, errno, "forb_register_reference() failed");
1054
1055         ul_logmsg("Waiting for requests\n");
1056         if (opt_daemon)
1057                 forb_daemon_ready();
1058
1059         ret = forb_executor_run(&executor);
1060         if (ret) error(1, errno, "forb_executor_run failed");
1061         
1062         return 0;
1063 }