]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_bwres_threads.c
9d7aefe06edbcca0235f50b88ec234982cfa1636
[frescor/fna.git] / src_frescan / frescan_bwres_threads.c
1 /*!
2  * @file frescan_bwres_threads.c
3  *
4  * @brief FRESCAN bandwidth reservation layer: negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  */
16
17 #include <assert.h>
18 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
19 #include "frescan_bwres_threads.h"
20 #include "frescan_bwres_messages.h"
21 #include "frescan_bwres_requests.h"
22 #include "frescan_config.h"
23 #include "frescan_debug.h"
24 #include "frescan_data.h"
25 #include "frescan_servers.h"
26
27 static void *frescan_manager_thread(void *arg);
28 static void *frescan_acceptor_thread(void *arg);
29
30 /**
31  * frescan_manager_thread_create()
32  *
33  * This call creates the manager thread at each node which will be waiting
34  * in a request queue for LOCAL or EXTERNAL requests.
35  */
36
37 int frescan_manager_thread_create(frescan_network_t net)
38 {
39         int ret;
40         fosa_thread_attr_t attr;
41
42         ret = fosa_thread_attr_init(&attr);
43         if (ret != 0) {
44                 ERROR("could not init thread attributes\n");
45                 return ret;
46         }
47
48         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
49         if (ret != 0) {
50                 ERROR("could not set neg thread prio %d\n",
51                       FRESCAN_NEG_THREAD_PRIO);
52                 return ret;
53         }
54
55         ret = fosa_thread_create(&the_networks[net].neg_thread_id,
56                                   &attr,
57                                   frescan_master_neg_thread,
58                                   (void *)(uint32_t)net);
59
60         if (ret != 0) {
61                 ERROR("could not create the negotiator thread\n");
62                 return ret;
63         }
64
65         ret = fosa_thread_attr_destroy(&attr);
66         if (ret != 0) {
67                 ERROR("could not destroy thread attributes\n");
68                 return ret;
69         }
70
71         return 0;
72 }
73
74 /**
75  * frescan_acceptor_thread_create()
76  */
77
78 int frescan_acceptor_thread_create(frescan_network_t net)
79 {
80         int ret;
81         fosa_thread_attr_t attr;
82
83         ret = fosa_thread_attr_init(&attr);
84         if (ret != 0) {
85                 ERROR("could not init thread attributes\n");
86                 return ret;
87         }
88
89         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
90         if (ret != 0) {
91                 ERROR("could not set acceptor thread prio %d\n",
92                       FRESCAN_ACCEPTOR_THREAD_PRIO);
93                 return ret;
94         }
95
96         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
97                                   &attr,
98                                   frescan_acceptor_thread,
99                                   (void *)(uint32_t)net);
100
101         if (ret != 0) {
102                 ERROR("could not create the negotiator thread\n");
103                 return ret;
104         }
105
106         ret = fosa_thread_attr_destroy(&attr);
107         if (ret != 0) {
108                 ERROR("could not destroy thread attributes\n");
109                 return ret;
110         }
111
112         return 0;
113 }
114
115 /**
116  * frescan_manager_thread
117  */
118
119 static void *frescan_manager_thread(void *arg)
120 {
121         int err;
122         frescan_request_id_t request;
123         frescan_req_type_t   type;
124         frescan_robj_id_t    reply;
125         frescan_contract_t   *contract;
126         frescan_neg_return_info_t *neg_return_info;
127         frescan_server_params_t server_params;
128         frescan_network_t net = (uint32_t)arg;
129
130         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
131
132         while(1) {
133                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
134
135                 err = frescan_requestqueue_dequeue(&request);
136                 assert(err == 0);
137
138                 err = frescan_request_get_type(request, &type);
139                 assert(err == 0);
140
141                 switch(type) {
142                         case FRESCAN_NEGOTIATE:
143                                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
144                                       "FRESCAN_NEGOTIATE request\n");
145                                 err = frescan_request_get_contract(request, &contract);
146                                 assert(err == 0);
147                                 err = frescan_request_get_reply(request, &reply);
148                                 assert(err == 0);
149                                 err = frescan_request_get_return_info
150                                                 (request, (void *)&neg_return_info);
151                                 assert(err == 0);
152
153                         // TODO: sched test + add contract to table
154                         // so far always accepted witht he min values
155                                 neg_return_info->error = 0;
156                                 server_params.values = contract->min_values;
157                                 server_params.prio   = contract->prio;
158
159                                 err = frescan_servers_create
160                                                 (net,
161                                                 &server_params,
162                                                 &neg_return_info->id);
163                                 assert(err == 0);
164
165                                 err = frescan_replyobject_signal(reply);
166                                 assert(err == 0);
167                                 break;
168
169                         case FRESCAN_RENEGOTIATE:
170                                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
171                                       "FRESCAN_RENEGOTIATE request\n");
172                                 break;
173
174                         case FRESCAN_CANCEL:
175                                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
176                                       "FRESCAN_CANCEL request\n");
177                                 break;
178
179                         default:
180                                 ERROR("wrong request type %d\n", type);
181                 }
182         }
183
184         return NULL;
185 }
186
187 /**
188  * frescan_acceptor_thread()
189  */
190
191 static void *frescan_acceptor_thread(void *arg)
192 {
193         int ret;
194         frescan_recv_params_t params;
195         uint8_t msg[200];
196         size_t recv_bytes;
197         frescan_node_t from;
198         frescan_prio_t prio;
199
200         DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
201               "master acceptor thread starts\n");
202
203         params.net = (frescan_network_t)(uint32_t)arg;
204         params.channel = FRESCAN_NEG_CHANNEL;
205         params.flags = FRESCAN_SYNC;
206
207         while(1) {
208                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
209                       "waiting for msg, net:%u chan:%u flags:%u\n",
210                       params.net, params.channel, params.flags);
211
212                 ret = frescan_recv(&params, msg, 200,
213                                    &recv_bytes, &from, &prio);
214                 assert(ret == 0);
215
216                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
217                       "msg received, from:%u size:%u prio:%u\n",
218                       from, recv_bytes, prio);
219
220                 ret = frescan_message_parse(params.net, msg, recv_bytes, from);
221                 assert(ret == 0);
222         }
223
224         return NULL;
225 }