]> rtime.felk.cvut.cz Git - frescor/frsh.git/blob - fres/cbroker/fcb.c
fcb: Refactor prepare_fcb_contracts()
[frescor/frsh.git] / fres / cbroker / fcb.c
1 /**************************************************************************/
2 /* ---------------------------------------------------------------------- */
3 /* Copyright (C) 2006 - 2008 FRESCOR consortium partners:                 */
4 /*                                                                        */
5 /*   Universidad de Cantabria,              SPAIN                         */
6 /*   University of York,                    UK                            */
7 /*   Scuola Superiore Sant'Anna,            ITALY                         */
8 /*   Kaiserslautern University,             GERMANY                       */
9 /*   Univ. Politécnica  Valencia,           SPAIN                        */
10 /*   Czech Technical University in Prague,  CZECH REPUBLIC                */
11 /*   ENEA                                   SWEDEN                        */
12 /*   Thales Communication S.A.              FRANCE                        */
13 /*   Visual Tools S.A.                      SPAIN                         */
14 /*   Rapita Systems Ltd                     UK                            */
15 /*   Evidence                               ITALY                         */
16 /*                                                                        */
17 /*   See http://www.frescor.org for a link to partners' websites          */
18 /*                                                                        */
19 /*          FRESCOR project (FP6/2005/IST/5-034026) is funded             */
20 /*       in part by the European Union Sixth Framework Programme          */
21 /*       The European Union is not liable of any use that may be          */
22 /*       made of this code.                                               */
23 /*                                                                        */
24 /*                                                                        */
25 /*  This file is part of FRSH (FRescor ScHeduler)                         */
26 /*                                                                        */
27 /* FRSH is free software; you can redistribute it and/or modify it        */
28 /* under terms of the GNU General Public License as published by the      */
29 /* Free Software Foundation; either version 2, or (at your option) any    */
30 /* later version.  FRSH is distributed in the hope that it will be        */
31 /* useful, but WITHOUT ANY WARRANTY; without even the implied warranty    */
32 /* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    */
33 /* General Public License for more details. You should have received a    */
34 /* copy of the GNU General Public License along with FRSH; see file       */
35 /* COPYING. If not, write to the Free Software Foundation, 675 Mass Ave,  */
36 /* Cambridge, MA 02139, USA.                                              */
37 /*                                                                        */
38 /* As a special exception, including FRSH header files in a file,         */
39 /* instantiating FRSH generics or templates, or linking other files       */
40 /* with FRSH objects to produce an executable application, does not       */
41 /* by itself cause the resulting executable application to be covered     */
42 /* by the GNU General Public License. This exception does not             */
43 /* however invalidate any other reasons why the executable file might be  */
44 /* covered by the GNU Public License.                                     */
45 /**************************************************************************/
46
47 /**
48  * @file   fcb.c
49  * @author Michal Sojka <sojkam1@fel.cvut.cz>
50  * @date   Mon Oct 20 18:00:18 2008
51  * 
52  * @brief  FRESCOR Contract Broker
53  * 
54  * 
55  */
56 #include <semaphore.h>
57 #include <getopt.h>
58 #include <forb.h>
59 #include <forb/config.h>
60 #include <fcb.h>
61 #include <fcb_contact_info.h>
62 #include <error.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <ul_gavlcust.h>
66 #include <string.h>
67 #include <ul_log.h>
68 #include <ul_logreg.h>
69 #include <forb/server_id.h>
70 #include <fres_contract.h>
71 #include "fcb_config.h"
72 #include <fosa_clocks_and_timers.h>
73 #include "contract_log.h"
74 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
75 #include <forb/proto_inet.h>
76 #endif
77
78 UL_LOG_CUST(ulogd_fcb);
79 ul_log_domain_t ulogd_fcb = {UL_LOGL_MSG, "main"};
80 UL_LOGREG_SINGLE_DOMAIN_INIT_FUNCTION(init_ulogd_fcb, ulogd_fcb);
81         
82 bool opt_daemon = false;
83 char *opt_pidfile = NULL;
84 fosa_abs_time_t start_time;
85
86
87 /**
88  * Resource identification 
89  */
90 struct res_key {
91         frsh_resource_type_t type;
92         frsh_resource_id_t id;
93 };
94
95 /**
96  * Registered resource
97  */
98 struct resource {
99         gavl_node_t node;
100         struct res_key key;
101         char *name;
102         fres_resource_manager mng; /**< Object reference of the resource manager */
103         gavl_cust_root_field_t allocators; /**< Registered allocators for this resource (from multiple applications/nodes) */
104         ul_list_head_t sc_contracts; /**< Negotiated contracts with spare capacity for this resource */
105 };
106
107 /**
108  * Resource allocator registered in different nodes/applications 
109  */
110 struct res_alloc {
111         gavl_node_t node;
112         forb_server_id app;
113         fres_resource_allocator ra;
114 };
115
116 struct fcb_contract {
117         fres_contract_id_t id;
118         struct res_alloc *ra;   /**< Allocator for this contract TODO: Remove contract if allocator is destroyed */
119         gavl_node_t node_fcb;
120         ul_list_node_t node_sc;
121         ul_list_node_t node_reservation;
122         struct {
123                 int initial;
124                 int try;
125         } sc_variant;
126         fosa_abs_time_t end_of_stability_period;
127         struct fres_contract *user_contract; /* The contract sent by user. Set after sucessfull neotiation. */
128         struct fres_contract *requested_contract; /* User request for the contract. Set by prepare_reservation_requests() in the beginning of negotiation. */
129         struct fres_contract *to_be_reserved_contract;
130         struct fres_contract *schedulable_contract;
131 };
132
133 /**
134  * Contract broker data
135  */
136 struct fcb {
137         gavl_cust_root_field_t resources; /**< Registered resources */
138         gavl_cust_root_field_t contracts; /**< Contracts negotiated by this FCB */
139 };
140
141 /** List of contracts to be reserved during spare capacity rebalancing */
142 struct reservation_list {
143         ul_list_head_t fcb_contracts;
144         unsigned length;
145 };
146
147 struct fcb_contract *fcb_contract_new(fres_contract_id_t *id)
148 {
149         struct fcb_contract *fcb_contract;
150         
151         fcb_contract = malloc(sizeof(*fcb_contract));
152         if (!fcb_contract) {
153                 return NULL;
154         }
155         memset(fcb_contract, 0, sizeof(*fcb_contract));
156         fcb_contract->id = *id;
157
158         return fcb_contract;
159 }
160
161 void fcb_contract_destroy(struct fcb_contract *fcb_contract)
162 {
163         if (fcb_contract->user_contract) {
164                 fres_contract_destroy(fcb_contract->user_contract);
165         }
166         if (fcb_contract->to_be_reserved_contract) {
167                 fres_contract_destroy(fcb_contract->to_be_reserved_contract);
168         }
169         if (fcb_contract->requested_contract) {
170                 fres_contract_destroy(fcb_contract->requested_contract);
171         }
172         if (fcb_contract->schedulable_contract) {
173                 fres_contract_destroy(fcb_contract->requested_contract);
174         }
175         free(fcb_contract);
176 }
177
178 static inline int res_key_cmp(const struct res_key *a,
179                               const struct res_key *b)
180 {
181         if (a->type < b->type) {
182                 return -1;
183         } else if (a->type > b->type) {
184                 return +1;
185         } else if (a->id < b->id) {
186                 return -1;
187         } else if (a->id > b->id) {
188                 return +1;
189         } else {
190                 return 0;
191         }
192 }
193
194 /* Container for registered resources */
195 GAVL_CUST_NODE_INT_DEC(fcb_resource   /* cust_prefix */,                \
196                        struct fcb     /* cust_root_t */,                \
197                        struct resource/* cust_item_t */,                \
198                        struct res_key /* cust_key_t */,                 \
199                        resources      /* cust_root_node */,             \
200                        node           /* cust_item_node */,             \
201                        key            /* cust_item_key */,              \
202                        res_key_cmp    /* cust_cmp_fnc */);
203
204 GAVL_CUST_NODE_INT_IMP(fcb_resource   /* cust_prefix */,                \
205                        struct fcb     /* cust_root_t */,                \
206                        struct resource/* cust_item_t */,                \
207                        struct res_key /* cust_key_t */,                 \
208                        resources      /* cust_root_node */,             \
209                        node           /* cust_item_node */,             \
210                        key            /* cust_item_key */,              \
211                        res_key_cmp    /* cust_cmp_fnc */);
212
213 /* Container for allocators registered for a given resource */
214 GAVL_CUST_NODE_INT_DEC(fcb_alloc        /* cust_prefix */,              \
215                        struct resource  /* cust_root_t */,              \
216                        struct res_alloc /* cust_item_t */,              \
217                        forb_server_id   /* cust_key_t */,               \
218                        allocators       /* cust_root_node */,           \
219                        node             /* cust_item_node */,           \
220                        app              /* cust_item_key */,            \
221                        fres_contract_id_cmp /* cust_cmp_fnc */);
222
223 GAVL_CUST_NODE_INT_IMP(fcb_alloc        /* cust_prefix */,              \
224                        struct resource   /* cust_root_t */,             \
225                        struct res_alloc /* cust_item_t */,              \
226                        forb_server_id   /* cust_key_t */,               \
227                        allocators       /* cust_root_node */,           \
228                        node             /* cust_item_node */,           \
229                        app              /* cust_item_key */,            \
230                        forb_server_id_cmp /* cust_cmp_fnc */);
231
232 /* Container for contracts with spare capacity negotiated for a given resource */
233 UL_LIST_CUST_DEC(sc_contracts        /* cust_prefix */,                 \
234                  struct resource  /* cust_head_t */,                    \
235                  struct fcb_contract /* cust_item_t */,                 \
236                  sc_contracts   /* cust_head_field */,                  \
237                  node_sc        /* cust_node_field */);
238
239 UL_LIST_CUST_DEC(reservation_list        /* cust_prefix */,             \
240                  struct reservation_list  /* cust_head_t */,            \
241                  struct fcb_contract /* cust_item_t */,                 \
242                  fcb_contracts  /* cust_head_field */,                  \
243                  node_reservation /* cust_node_field */);
244
245 /* Container for negotiated contracts */
246 GAVL_CUST_NODE_INT_DEC(fcb_contract         /* cust_prefix */,          \
247                        struct fcb           /* cust_root_t */,          \
248                        struct fcb_contract  /* cust_item_t */,          \
249                        fres_contract_id_t   /* cust_key_t */,           \
250                        contracts            /* cust_root_node */,       \
251                        node_fcb             /* cust_item_node */,       \
252                        id                   /* cust_item_key */,        \
253                        fres_contract_id_cmp /* cust_cmp_fnc */);
254
255 #if 1
256 GAVL_CUST_NODE_INT_IMP(fcb_contract         /* cust_prefix */,          \
257                        struct fcb           /* cust_root_t */,          \
258                        struct fcb_contract  /* cust_item_t */,          \
259                        fres_contract_id_t   /* cust_key_t */,           \
260                        contracts            /* cust_root_node */,       \
261                        node_fcb             /* cust_item_node */,       \
262                        id                   /* cust_item_key */,        \
263                        fres_contract_id_cmp /* cust_cmp_fnc */);
264 #else
265 #include "fcb_contract_gavl.inc"
266 #endif
267
268
269 #define o2fcb(o) (struct fcb*)forb_instance_data(o)
270
271 struct res_key*
272 get_res_key(const struct fres_contract *contract, struct res_key *key)
273 {
274         fres_block_resource *block_res;
275         
276         block_res = fres_contract_get_resource(contract);
277         if (!block_res) {
278                 ul_logerr("No resource specified\n");
279                 return NULL;
280         } else {
281                 key->type = block_res->resource_type;
282                 key->id = block_res->resource_id;
283         }
284         return key;
285 }
286
287 /** 
288  * Fills in an array of fcb_contracts according to requests submited
289  * by an application through negotiate_contracts() or
290  * negotiate_transaction(). For every contract in @a contracts, there
291  * is allocated one fcb_contract in @a fcb_contracts array.
292  * 
293  * @param fcb
294  * @param fcb_contracts Where to store pointers to fcb_contracts
295  * @param contracts Contracts submited for negotiation 
296  * @param num Number of contracts in contracts (which is the same as the number in fcb_contracts).
297  * 
298  * @return Zero on success, non-zero error code on error.
299  */
300 static int
301 prepare_fcb_contracts(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
302                       struct fres_contract *contracts[], int num)
303 {
304         unsigned i;
305         struct fcb_contract *fc;
306         struct res_key key;
307
308         for (i=0; i<num; i++) {
309                 struct fres_contract *c = contracts[i];
310
311                 if (fres_contract_id_is_empty(&c->id)) {
312                         /* Normal negotiation request */
313                         forb_uuid_generate((forb_uuid_t *)&c->id);
314                         fc = fcb_contract_new(&c->id);
315                         if (!fc)
316                                 return errno;
317                         fcb_contracts[i] = fc;
318                         if (!get_res_key(c, &key)) {
319                                 return FRSH_ERR_RESOURCE_ID_INVALID;
320                         }
321                         log_contract("Negotiation request", i, c);
322                 } else {
323                         fc = fcb_contract_find(fcb, &c->id);
324                         if (!fc) {
325                                 char str[60];
326                                 fres_contract_id_to_string(str, &c->id, sizeof(str));
327                                 ul_logerr("Attempt to change unknown contract %s\n", str);
328                                 return FRES_ERR_NOTHING_TO_RENEGOTIATE;
329                         } else {
330                                 fcb_contracts[i] = fc;
331                                 if (fres_contract_get_num_blocks(c) == 0) {
332                                         /* Cancelation */
333                                         get_res_key(fc->user_contract, &key);
334                                         log_contract("Cancelation request", i, fc->user_contract);
335                                 } else {
336                                         /* Renegotiation */
337                                         if (!get_res_key(c, &key)) {
338                                                 return FRSH_ERR_RESOURCE_ID_INVALID;
339                                         }
340                                         log_contract("Renegotiation request", i, fc->user_contract);
341                                 }
342                         }
343                 }
344                 fc->requested_contract = c;
345         }
346         return 0;
347 }
348
349 /**
350  * Ensure that all contracts belong to the same resource and find
351  * resource allocators for the contracts.
352  *
353  * @param fcb 
354  * @param fcb_contracts Array of fcb_contracts prepared by prepare_fcb_contracts()
355  * @param num Number of contracts in @a fcb_contracts
356  * @param resource Resource for which negotiation is being done.
357  * @param app Application which requests negotiation
358  */
359 static int
360 check_and_setup_resource(struct fcb *fcb, struct fcb_contract *fcb_contracts[],
361                          struct resource **resource,
362                          forb_server_id *app,
363                          int num)
364 {
365         unsigned i;
366         struct res_alloc *ra;
367         struct res_key key, key2 = {-1,-1};
368
369         for (i=0; i<num; i++) {
370                 struct fcb_contract *fc = fcb_contracts[i];
371                 struct fres_contract *c = fc->requested_contract;
372
373                 if (fres_contract_get_num_blocks(c) == 0) {
374                         /* Cancelation */
375                         get_res_key(fc->user_contract, &key);
376                         log_contract("Cancelation request", i, fc->user_contract);
377                 } else {
378                         /* (Re)Negotiation */
379                         if (!get_res_key(c, &key)) {
380                                 return FRSH_ERR_RESOURCE_ID_INVALID;
381                         }
382                 }
383                 /* Check that all contracts are for the same resource */
384                 if (i==0) {
385                         key2 = key;
386                         *resource = fcb_resource_find(fcb, &key);
387                         if (!*resource) {
388                                 ul_logerr("No resource manager for %d.%d registered\n",
389                                           key.type, key.id);
390                                 return FRES_ERR_NO_RESOURCE_MANAGER;
391                         }
392                         /* Find allocator for this request */
393                         ra = fcb_alloc_find(*resource, app);
394                         if (!ra) {
395                                 char str[60];
396                                 forb_server_id_to_string(str, app, sizeof(str));
397                                 ul_logerr("No resource allocator found for %d.%d and %s\n",
398                                           (*resource)->key.type, (*resource)->key.id, str);
399                                 return FRES_ERR_NO_RESOURCE_ALLOCATOR;
400                         }
401                 }
402                 else if (key.type != key2.type ||
403                          key.id   != key2.id) {
404                         ul_logerr("Contract #%d differs in resource!\n", i);
405                         return FRSH_ERR_RESOURCE_ID_INVALID;
406                 }
407                 fc->ra = ra;
408         }       
409         return 0;
410 }
411
412 /**
413  * @param resource Resource for which to rebalance capacity and negotiate new contracts
414  * @param fcb_contract New requests to negotiate
415  * @param num The number of elements in @a fcb_contract
416  * @param[out] rl List with changed contracts to be commited
417  */
418 static void
419 prepare_reservation_list(struct resource *resource,
420                          struct fcb_contract *fcb_contract[], int num,
421                          struct reservation_list *rl)
422 {
423         int i;
424         fosa_abs_time_t now;
425         struct fcb_contract *fc;
426
427         reservation_list_init_head(rl);
428         rl->length = 0;
429         for (i=0; i<num; i++) {
430                 assert(fcb_contract[i]->requested_contract != NULL);
431                 reservation_list_insert(rl, fcb_contract[i]);
432                 rl->length++;
433         }
434         fosa_clock_get_time(CLOCK_REALTIME, &now);
435         ul_list_for_each(sc_contracts, resource, fc) {
436                 if (fosa_abs_time_smaller(fc->end_of_stability_period, now) &&
437                     fc->requested_contract == NULL) /* Do not insert contract inserted above */
438                 {
439                         reservation_list_insert(rl, fc);
440                         rl->length++;
441                 }
442         }
443 }
444
445 /** 
446  * 
447  * 
448  * @param resource Resource for which to rebalance capacity and negotiate new contracts
449  * @param rl List with changed contracts to be commited
450  * 
451  * @return Zero on success, non-zero error code on error
452  */
453 static int
454 rebalance_spare_capacity_and_reserve(struct resource *resource,
455                                      struct reservation_list *rl)
456 {
457         int ret;
458         unsigned i;
459         struct fcb_contract *fc;
460         fres_block_spare_capacity *s;
461
462         fres_contract_ptr_seq contracts;
463         if (!forb_sequence_alloc_buf(&contracts, rl->length)) {
464                 return errno;
465         }
466         contracts._length = rl->length;
467         i=0;
468         /* Initialize optimization */
469         ul_list_for_each(reservation_list, rl, fc) {
470                 fc->to_be_reserved_contract =
471                         fc->requested_contract ? fres_contract_duplicate(fc->requested_contract) :
472                         fc->user_contract ? fres_contract_duplicate(fc->user_contract) :
473                         NULL;
474                 assert(fc->to_be_reserved_contract != NULL);
475
476                 forb_sequence_elem(&contracts, i) = fc->to_be_reserved_contract;
477                 i++;
478                 
479                 s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
480                 if (s && s->granularity == FRSH_GR_DISCRETE) {
481                         fc->sc_variant.initial = s->variants._length - 1;
482                         fc->sc_variant.try = fc->sc_variant.initial;
483                 }
484         }
485
486         bool all_combinations_tried;
487         int criterion, best_criterion = -1;
488         struct fcb_contract *fcb_changed;
489         /* Exhaustive search. Try all combinations of spare capacity
490          * variants and find the one with best creterion. */
491         do {
492                 all_combinations_tried = true;
493                 fcb_changed = NULL;
494                 criterion = 0;
495                 /* Prepare spare capacity variant */
496                 ul_list_for_each(reservation_list, rl, fc) {
497                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
498                         /* TODO: Simulate continuous granularity by discretization */
499                         if (s && s->granularity == FRSH_GR_DISCRETE) {
500                                 fres_container_copy(fc->to_be_reserved_contract->container,
501                                                     forb_sequence_elem(&s->variants, fc->sc_variant.try));
502                                 criterion += fc->sc_variant.try;
503
504                                 if (fcb_changed == NULL) {
505                                         /* Chnage tried variant for the next round */
506                                         fc->sc_variant.try = fc->sc_variant.try > 0 ?
507                                                 fc->sc_variant.try - 1 :
508                                                 s->variants._length - 1;
509                                         /* Do not change other
510                                          * contracts unless we have
511                                          * tried all variants */
512                                         if (fc->sc_variant.try != fc->sc_variant.initial) {
513                                                 fcb_changed = fc;
514                                         }
515                                 }
516                                 if (fc->sc_variant.try != fc->sc_variant.initial)
517                                         all_combinations_tried = false;
518                         }
519                 }
520
521                 if (criterion > best_criterion) {
522                         CORBA_Environment ev;
523                         /* Reserve contract */
524                         ret = fres_resource_manager_reserve_contracts(resource->mng, &contracts, &ev);
525                         if (forb_exception_occurred(&ev)) {
526                                 ret = fres_forbex2err(&ev);
527                                 ul_logerr("FORB exception when reserving contracts\n");
528                                 goto err;
529                         }
530                         if (ret < 0) {
531                                 ul_logerr("Contract reservation error %d\n", ret);
532                                 ret = FRES_ERR_ADMISSION_TEST;
533                                 goto err;
534                         }
535                         if (ret == 0) { /* negotiation succeeded */
536                                 best_criterion = criterion;
537                         }
538                 }
539         } while (!all_combinations_tried);
540
541         if (best_criterion == -1) {
542                 ret = FRSH_ERR_CONTRACT_REJECTED;
543         } else {
544                 /* At least some variant succeeded */
545                 ret = 0;
546                 sc_contracts_init_head(resource);
547                 ul_list_for_each(reservation_list, rl, fc) {
548                         s = fres_contract_get_spare_capacity(fc->to_be_reserved_contract);
549                         if (s && s->granularity == FRSH_GR_DISCRETE) {
550                                 sc_contracts_insert(resource, fc);
551                         }
552                 }
553         }
554 err:
555         ul_list_for_each(reservation_list, rl, fc) {
556                 fres_contract_destroy(fc->to_be_reserved_contract);
557                 fc->to_be_reserved_contract = NULL;
558         }
559         forb_sequence_free_buf(&contracts, forb_no_destructor);
560         return ret;
561 }
562
563 /**
564  * Create/change VReses according to @a schedulable_contracts.
565  *
566  * There might be more allocators for schedulable contracts, so merge
567  * adjacent vreses with the same allocator together and create/change
568  * them in one step. Also the order of schedulable contracts might be
569  * different from the initial order od ids in commit_contracts()
570  * therefore we have to search for every contract.
571  */
572 static int
573 change_vreses(struct fcb *fcb, fres_contract_ptr_seq *schedulable_contracts)
574 {
575         struct res_alloc *last_ra = NULL;
576         fres_contract_ptr_seq vreses;
577         int i, ret = 0;
578         CORBA_Environment ev;
579         
580         if (!forb_sequence_alloc_buf(&vreses, schedulable_contracts->_length)) {
581                 return errno;
582         }
583         vreses._length = 0;
584         
585         for (i=0; i<schedulable_contracts->_length; i++) {
586                 struct fcb_contract *fc;
587                 struct fres_contract *sc = schedulable_contracts->_buffer[i];
588
589                 log_contract("Changing VRES", i, sc);
590
591                 fc = fcb_contract_find(fcb, &sc->id);
592                 assert(fc != NULL);
593
594                 if (true /* TODO: if the schedulable contract is changed */) {
595                         if (last_ra != fc->ra) {
596                                 if (vreses._length) {
597                                         ret = fres_resource_allocator_change_vreses(last_ra->ra,
598                                                                                     &vreses, &ev);
599                                         if (forb_exception_occurred(&ev)) {
600                                                 ret = fres_forbex2err(&ev);
601                                                 goto err;
602                                         }
603                                         if (ret) goto err;
604                                 }
605                                 vreses._length = 0;
606                         }
607                         vreses._buffer[vreses._length] = sc;
608                         vreses._length++;
609                         fres_contract_destroy(fc->schedulable_contract);
610                         fc->schedulable_contract = fres_contract_duplicate(sc);
611                         last_ra = fc->ra;
612                 
613                 }
614         }
615         if (last_ra) {
616                 ret = fres_resource_allocator_change_vreses(last_ra->ra,
617                                                             &vreses, &ev);
618                 if (forb_exception_occurred(&ev))
619                         ret = fres_forbex2err(&ev);
620         }
621 err:
622         forb_sequence_free_buf(&vreses, forb_no_destructor);
623         return ret;
624 }
625
626 void
627 free_fcb_contracts(struct fcb_contract *fcb_contracts[], int num)
628 {
629         int i;
630         struct fcb_contract *fc;
631         for (i=0; i<num; i++) {
632                 fc = fcb_contracts[i];
633                 if (fc && !fc->user_contract) {
634                         fc->requested_contract = NULL; /* Destroyed by FORB */
635                         fcb_contract_destroy(fc);
636                 }
637         }
638         free(fcb_contracts);
639 }
640
641 CORBA_long
642 negotiate_contracts(fres_contract_broker obj,
643                     const fres_contract_ptr_seq* contracts,
644                     fres_contract_id_seq** ids_out,
645                     CORBA_Environment *ev)
646 {
647         struct fcb *fcb = o2fcb(obj);
648         struct resource *resource = NULL;
649         int ret = 0;
650         forb_server_id app;
651         fres_contract_ptr_seq *schedulable_contracts;
652         struct fcb_contract **fcb_contracts, *fc;
653         unsigned i;
654         fres_contract_id_seq commit_ids;
655         int num = contracts->_length;
656
657         /* Prepare output sequence for the case we return eariler with
658          * an error */
659         forb_sequence_alloc(*ids_out, 0);
660         if (!*ids_out) {
661                 ev->major = FORB_EX_NO_MEMORY;
662                 goto err;
663         }
664         CORBA_sequence_set_release(*ids_out, CORBA_TRUE);
665         
666         forb_get_req_source(obj, &app);
667         
668         fcb_contracts = malloc(sizeof(fcb_contracts[0])*num);
669         if (!fcb_contracts) {
670                 ret = errno;
671                 goto err;
672         }
673         memset(fcb_contracts, 0, sizeof(fcb_contracts[0])*num);
674
675         ret = prepare_fcb_contracts(fcb, fcb_contracts,
676                                     contracts->_buffer, num);
677         if (ret)
678                 goto err_free_fcb_contracts;
679         ret = check_and_setup_resource(fcb, fcb_contracts, &resource,
680                                        &app, num);
681         if (ret)
682                 goto err_free_fcb_contracts;
683
684         struct reservation_list rl;
685         prepare_reservation_list(resource,
686                                  fcb_contracts, num,
687                                  &rl);
688
689         /* Allocate all the needed memory before doing reservation. If
690          * there is not enough memory, it has no sense to call resource
691          * manager. */
692         if (!forb_sequence_alloc_buf(&commit_ids, rl.length)) {
693                 ret = errno;
694                 goto err_free_fcb_contracts;
695         }
696
697         /* Reserve contracts */
698         ret = rebalance_spare_capacity_and_reserve(resource, &rl);
699         if (ret) {
700                 if (ret == FRSH_ERR_CONTRACT_REJECTED) {
701                         ul_logmsg("Contract(s) was/were rejected\n");
702                 } else {
703                         char msg[100];
704                         fres_strerror(ret, msg, sizeof(msg));
705                         ul_logerr("Reservation error: %s\n", msg);
706                 }
707                 goto err_free_fcb_contracts;
708         }
709
710         /* Commit contracts */
711         commit_ids._length = rl.length;
712         i=0;
713         ul_list_for_each(reservation_list, &rl, fc) {
714                 forb_sequence_elem(&commit_ids, i) = fc->id;
715                 i++;
716         }
717         
718         fres_resource_manager_commit_contracts(resource->mng, &commit_ids,
719                                                &schedulable_contracts, ev);
720         if (forb_exception_occurred(ev)) {
721                 ret = fres_forbex2err(ev);
722                 goto err_cancel_reservation;
723         }
724
725         /* Add new contracts to our fcb database for later
726          * reference. Canceled contracts are removed below. */
727         for (i=0; i<num; i++) {
728                 fc =  fcb_contracts[i];
729
730                 if (fc->user_contract) {
731                         if (fres_contract_get_num_blocks(fc->requested_contract) > 0) {
732                                 /* Renegotiation */
733                                 fres_contract_destroy(fc->user_contract);
734                                 fc->user_contract = fres_contract_duplicate(fc->requested_contract);
735                                 /* Note: requested_contract is also
736                                  * pointed by contracts parameter and
737                                  * will be freed by FORB. */
738                                 fc->requested_contract = NULL;
739                         }
740                 } else {
741                         /* Insert new contracts */
742                         fcb_contract_insert(fcb, fcb_contracts[i]);
743                         fc->user_contract = fres_contract_duplicate(fc->requested_contract);
744                         fc->requested_contract = NULL;
745                         /* See the note above. */
746                 }
747         }
748
749         ret = change_vreses(fcb, schedulable_contracts);
750         if (ret)
751                 goto err_cancel_reservation;
752         
753         forb_sequence_free(schedulable_contracts, fres_contract_ptr_destroy);
754         if (ret != 0) {
755                 ul_logerr("VRes was not created\n");
756                 goto err_cancel_contracts;
757         }
758
759
760         /* Return IDs and delete canceled contracts from out database */
761         if (!forb_sequence_alloc_buf(*ids_out, num)) {
762                 ev->major = FORB_EX_NO_MEMORY;
763                 goto err_cancel_contracts;
764         }
765         (*ids_out)->_length = num;
766         for (i=0; i<num; i++) {
767                 struct fcb_contract *fc =  fcb_contracts[i];
768                 forb_sequence_elem(*ids_out, i) = fc->id;
769
770                 if (fc->requested_contract &&
771                     fres_contract_get_num_blocks(fc->requested_contract) == 0) {
772                         fcb_contract_delete(fcb, fc);
773                         /* Note: requested_contract is also pointed by
774                          * contracts parameter and will be freed by FORB. */
775                         fc->requested_contract = NULL;
776                         fcb_contract_destroy(fc);
777                 }
778         }
779         return 0;
780
781 err_cancel_contracts:
782         /* TODO */
783         goto err_free_fcb_contracts;
784 err_cancel_reservation:
785         fres_resource_manager_cancel_reservations(resource->mng, &commit_ids, ev);
786 err_free_fcb_contracts:
787         free_fcb_contracts(fcb_contracts, num);
788 err:
789         return ret;
790 }
791
792 void redistribute_spare_capacity(fres_contract_broker obj,
793                                  const frsh_resource_type_t restype,
794                                  const frsh_resource_id_t resid,
795                                  CORBA_Environment *ev)
796 {
797         struct fcb *fcb = o2fcb(obj);
798         struct res_key key = {restype, resid };
799         struct resource *resource;
800         struct reservation_list rl;
801         
802         resource = fcb_resource_find(fcb, &key);
803
804         prepare_reservation_list(resource, NULL, 0, &rl);
805
806 /*      forb_sequence_alloc(ids, rl.length); */
807 /*      if (!ids || !ids->_buffer) { */
808 /*              ev->major = FORB_EX_NO_MEMORY; */
809 /*              goto err_free_fcb_contracts; */
810 /*      } */
811 /*      CORBA_sequence_set_release(ids, CORBA_TRUE); */
812 /*      *ids_out = ids;         /\* ids is freed by FORB *\/ */
813         
814         
815         rebalance_spare_capacity_and_reserve(resource, &rl);
816         /* Commit */
817 }
818
819 CORBA_long register_resource(fres_contract_broker obj,
820                             const frsh_resource_type_t restype,
821                             const frsh_resource_id_t resid,
822                             const fres_resource_desc *desc,
823                             CORBA_Environment *ev)
824 {
825         struct fcb *fcb = o2fcb(obj);
826         struct resource *res, *res2;
827
828         res = malloc(sizeof(*res));
829         if (!res) goto err;
830         memset(res, 0, sizeof(*res));
831         res->key.type = restype;
832         res->key.id = resid;
833         res2 = fcb_resource_find(fcb, &res->key);
834         if (res2) {
835                 if (forb_object_is_stale(res2->mng)) {
836                         ul_logmsg("Removing stale manager for resource %d.%d\n",
837                                   restype, resid);
838                         forb_object_release(res2->mng);
839                         fcb_resource_delete(fcb, res2);
840                         /* TODO: Delete also all allocators associated
841                          * with this stale resource manager. */
842                         free(res);
843                         res = res2;
844                 } else {
845                         ul_logerr("Resource manager %d.%d already registered\n",
846                                   restype, resid);
847                         goto free_err;
848                 }
849         }
850         res->mng = forb_object_duplicate(desc->manager);
851         res->name = desc->name;
852
853         fcb_alloc_init_root_field(res);
854         sc_contracts_init_head(res);
855         ul_logmsg("Registering manager for resource \"%s\" (%d.%d)\n",
856                   res->name, restype, resid);
857         fcb_resource_insert(fcb, res);
858         return 0;
859 free_err:
860         free(res);
861 err:
862         return -1;
863 }
864
865
866 CORBA_long register_allocator(fres_contract_broker obj,
867                               const frsh_resource_type_t restype,
868                               const frsh_resource_id_t resid,
869                               const fres_resource_allocator ra_obj,
870                               CORBA_Environment *ev)
871 {
872         struct fcb *fcb = o2fcb(obj);
873         struct resource *res;
874         struct res_alloc *ra;
875         struct res_key resource;
876         forb_server_id server_id;
877         char server_id_str[40];
878         int ret;
879
880         forb_get_server_id(ra_obj, &server_id);
881         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
882         ul_logmsg("Registering allocator for resource %d.%d in app %s\n",
883                   restype, resid, server_id_str);
884         
885         resource.type = restype;
886         resource.id = resid;
887         res = fcb_resource_find(fcb, &resource);
888         if (!res) {
889                 ul_logerr("No manager found for %d.%d. Unable to register the allocator!\n",
890                           restype, resid);
891                 ret = FRES_ERR_NO_RESOURCE_MANAGER;
892                 goto err;
893         }
894         ra = fcb_alloc_find(res, &server_id);
895         if (ra) {
896                 char *str = forb_object_to_string(ra_obj);
897                 ul_logerr("Allocator from already registered (%s)\n",
898                           str);
899                 forb_free(str);
900                 ret = FRES_ERR_ALLOCATOR_ALREADY_REGISTERED;
901                 goto err;
902         }
903         ra = malloc(sizeof(*ra));
904         if (!ra) {
905                 ret = ENOMEM;
906                 goto err;
907         }
908         ra->app = server_id;
909         ra->ra = forb_object_duplicate(ra_obj);
910         fcb_alloc_insert(res, ra);
911         return 0;
912 err:
913         return ret;
914 }
915
916 void get_resources(fres_contract_broker obj, fres_resource_seq** resources, CORBA_Environment *ev)
917 {
918         struct fcb *fcb = o2fcb(obj);
919         fres_resource_seq *seq;
920         struct resource *res;
921         int n;
922
923         seq = malloc(sizeof(*seq));
924         if (!seq) {
925                 ev->major = FORB_EX_NO_MEMORY;
926                 return;
927         }
928         memset(seq, 0, sizeof(*seq));
929
930         n=0;
931         gavl_cust_for_each(fcb_resource, fcb, res) {
932                 n++;
933         }
934
935         seq->_buffer = CORBA_sequence_fres_resource_allocbuf(n);
936         seq->_maximum = n;
937         seq->_length = n;
938
939         n = 0;
940         gavl_cust_for_each(fcb_resource, fcb, res) {
941                 seq->_buffer[n].restype = res->key.type;
942                 seq->_buffer[n].resid = res->key.id;
943                 seq->_buffer[n].desc.manager = res->mng;
944                 seq->_buffer[n].desc.name = res->name;
945                 n++;
946         }
947
948         *resources = seq;
949 }
950
951 CORBA_long
952 negotiate_transaction(fres_contract_broker _obj,
953                       const fres_transaction_t* transaction,
954                       CORBA_Environment *ev)
955 {
956         struct fcb *fcb = o2fcb(_obj);
957         struct fcb_contract **fcb_contracts;
958         const fres_contract_ptr_seq* contracts = &transaction->contracts;
959         int num = contracts->_length;
960         int ret;
961
962         fcb_contracts = malloc(sizeof(*fcb_contracts)*num);
963         if (!fcb_contracts) {
964                 ret = errno;
965                 goto err;
966         }
967         memset(fcb_contracts, 0, sizeof(*fcb_contracts)*num);
968
969         ret = prepare_fcb_contracts(fcb, fcb_contracts,
970                                     contracts->_buffer, num);
971         if (ret)
972                 goto err_free_fcb_contracts;
973         
974         return FRSH_ERR_NOT_IMPLEMENTED;
975 err_free_fcb_contracts:
976         free_fcb_contracts(fcb_contracts, num);
977 err:
978         return ret;
979 }
980
981 CORBA_long
982 wait_transaction(fres_contract_broker _obj,
983                  const CORBA_char * name,
984                  fres_transaction_t** transaction,
985                  CORBA_Environment *ev)
986 {
987         return FRSH_ERR_NOT_IMPLEMENTED;
988 }
989
990 CORBA_long
991 allocate_transaction_vres(fres_contract_broker _obj,
992                           const CORBA_long id,
993                           CORBA_Environment *ev)
994 {
995         return FRSH_ERR_NOT_IMPLEMENTED;
996 }
997
998
999 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1000 static int register_inet_port(forb_orb orb)
1001 {
1002         forb_port_t *port = forb_malloc(sizeof(*port));
1003         int ret;
1004         struct in_addr listen_on;
1005         
1006         if (!port)
1007                 return -1;
1008         memset(port, 0, sizeof(*port));
1009         listen_on.s_addr = INADDR_ANY;
1010         ret = forb_inet_port_init(&port->desc, listen_on, 0);
1011         if (ret)
1012                 return ret;
1013         ret = forb_register_port(orb, port);
1014         return ret;
1015 }
1016 #endif
1017
1018 struct forb_fres_contract_broker_impl impl = {
1019         .negotiate_contracts = negotiate_contracts,
1020         .register_resource = register_resource,
1021         .register_allocator = register_allocator,
1022         .redistribute_spare_capacity = redistribute_spare_capacity,
1023         .get_resources = get_resources,
1024         .negotiate_transaction = negotiate_transaction,
1025         .wait_transaction = wait_transaction,
1026         .allocate_transaction_vres = allocate_transaction_vres,
1027 };
1028
1029 void peer_discovery_callback(const forb_orb peer_orb, const char *orb_id)
1030 {
1031         forb_server_id server_id;
1032         char server_id_str[sizeof(forb_server_id)*2+1];
1033
1034         forb_get_server_id(peer_orb, &server_id);
1035         forb_server_id_to_string(server_id_str, &server_id, sizeof(server_id_str));
1036 #if 0
1037         ul_logmsg("peer discovered: %s (orb_id '%s')\n", server_id_str, orb_id);
1038 #endif
1039
1040         if (orb_id == NULL)
1041                 return;
1042
1043         if (strcmp(orb_id, "org.frescor.fcb") == 0) {
1044                 fosa_abs_time_t now;
1045                 fosa_rel_time_t delay;
1046                 fosa_clock_get_time(CLOCK_REALTIME, &now);
1047                 delay = fosa_abs_time_extract_interval(start_time, now);
1048                 ul_logmsg("node joined: %s (time %ld ms)\n", server_id_str,
1049                           fosa_rel_time_to_msec(delay));
1050         }
1051 }
1052
1053 void peer_dead_callback(const forb_orb peer_orb, const char *orb_id)
1054 {
1055 }
1056
1057 static struct option long_opts[] = {
1058     { "daemon",   optional_argument, NULL, 'd' },
1059     { "loglevel", required_argument, NULL, 'l' },
1060     { 0, 0, 0, 0}
1061 };
1062
1063 static void
1064 usage(void)
1065 {
1066         printf("usage: fcb [ options ]\n");
1067         printf("  -d, --daemon [pid-file]   go to background after FORB initialization\n");
1068         printf("  -l, --loglevel <number>|<domain>=<number>,...\n");
1069 }
1070
1071 int print_log_domain(ul_log_domain_t *domain, void *context)
1072 {
1073         printf("%s = %d\n", domain->name, domain->level);
1074         return 0;
1075 }
1076
1077 int main(int argc, char *argv[])
1078 {
1079         forb_orb orb;
1080         struct fcb fcb_data;
1081         fres_contract_broker fcb;
1082         forb_executor_t executor;
1083         int ret;
1084         forb_init_attr_t attr = {
1085                 .orb_id = "org.frescor.fcb",
1086                 .peer_discovery_callback = peer_discovery_callback,
1087                 .peer_dead_callback = peer_dead_callback,
1088                 .fixed_tcp_port = FCB_TCP_PORT,
1089 #ifdef CONFIG_FORB_PROTO_INET_DEFAULT           
1090                 .fixed_server_id = FCB_SERVER_ID,
1091                 .redistribute_hellos = true,
1092 #endif
1093         };
1094         int  opt;
1095
1096         while ((opt = getopt_long(argc, argv, "d::l:", &long_opts[0], NULL)) != EOF) {
1097                 switch (opt) {
1098                         case 'l':
1099                                 if (*optarg == '?') {
1100                                         ul_logreg_for_each_domain(print_log_domain, NULL);
1101                                         exit(0);
1102                                 }
1103                                 {
1104                                         int ret;
1105                                         ret = ul_log_domain_arg2levels(optarg);
1106                                         if (ret) 
1107                                                 error(1, EINVAL, "Error parsing -l argument at char %d\n", ret);
1108                                 }
1109                                 break;
1110                         case 'd':
1111                                 opt_daemon = true;
1112                                 opt_pidfile = optarg;
1113                                 break;
1114                         case 'h':
1115                         /*default:*/
1116                                 usage();
1117                                 exit(opt == 'h' ? 0 : 1);
1118                 }
1119         }
1120
1121         fosa_clock_get_time(CLOCK_REALTIME, &start_time);
1122
1123         if (opt_daemon)
1124                 forb_daemon_prepare(opt_pidfile);
1125
1126         orb = forb_init(&argc, &argv, &attr);
1127         if (!orb) error(1, errno, "FORB initialization failed");
1128
1129 #if CONFIG_FCB_INET && !CONFIG_FORB_PROTO_INET_DEFAULT
1130         ret = register_inet_port(orb);
1131         if (ret) error(0, errno, "INET port registration failed");
1132 #endif
1133
1134         fcb_resource_init_root_field(&fcb_data);
1135         fcb_contract_init_root_field(&fcb_data);
1136
1137         fcb = forb_fres_contract_broker_new(orb, &impl, &fcb_data);
1138         if (!fcb) error(1, errno, "forb_fres_contract_broker_new failed");
1139
1140         /* Prepare executor before we register the fcb reference,
1141          * so that no reuqests are lost */
1142         ret = forb_executor_init(&executor);
1143         if (ret) error(1, errno, "forb_executor_init failed");
1144         
1145         ret = forb_executor_register_object(&executor, fcb);
1146         if (ret) error(1, errno, "forb_executor_register_object failed");
1147
1148         ret = forb_register_reference(fcb, fres_contract_broker_reg_name);
1149         if (ret) error(1, errno, "forb_register_reference() failed");
1150
1151         ul_logmsg("Waiting for requests\n");
1152         if (opt_daemon)
1153                 forb_daemon_ready();
1154
1155         ret = forb_executor_run(&executor);
1156         if (ret) error(1, errno, "forb_executor_run failed");
1157         
1158         return 0;
1159 }