]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_negotiation_threads.c
i break the svn for one day to change the name of files, redo the negotiation message...
[frescor/fna.git] / src_frescan / frescan_negotiation_threads.c
1 /*!
2  * @file frescan_negotiation_threads.h
3  *
4  * @brief FRESCAN negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  */
16
17 #include <assert.h>
18 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
19 #include "frescan_negotiation_threads.h"
20 #include "frescan_config.h"
21 #include "frescan_debug.h"
22 #include "frescan_data.h"
23 #include "frescan_negotiation_messages.h"
24 #include "frescan_requests_queue.h"
25 #include "frescan_servers.h"
26
27 static void *frescan_acceptor_thread(void *arg);
28 static void *frescan_master_neg_thread(void *arg);
29
30 /**
31  * frescan_acceptor_thread_create()
32  */
33
34 int frescan_acceptor_thread_create(frescan_network_t net)
35 {
36         int ret;
37         fosa_thread_attr_t attr;
38
39         ret = fosa_thread_attr_init(&attr);
40         if (ret != 0) {
41                 ERROR("could not init thread attributes\n");
42                 return ret;
43         }
44
45         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
46         if (ret != 0) {
47                 ERROR("could not set acceptor thread prio %d\n",
48                       FRESCAN_ACCEPTOR_THREAD_PRIO);
49                 return ret;
50         }
51
52         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
53                                   &attr,
54                                   frescan_acceptor_thread,
55                                   (void *)(uint32_t)net);
56
57         if (ret != 0) {
58                 ERROR("could not create the negotiator thread\n");
59                 return ret;
60         }
61
62         ret = fosa_thread_attr_destroy(&attr);
63         if (ret != 0) {
64                 ERROR("could not destroy thread attributes\n");
65                 return ret;
66         }
67
68         return 0;
69 }
70
71 /**
72  * frescan_master_neg_thread_create()
73  */
74
75 int frescan_master_neg_thread_create(frescan_network_t net)
76 {
77         int ret;
78         fosa_thread_attr_t attr;
79
80         ret = fosa_thread_attr_init(&attr);
81         if (ret != 0) {
82                 ERROR("could not init thread attributes\n");
83                 return ret;
84         }
85
86         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
87         if (ret != 0) {
88                 ERROR("could not set neg thread prio %d\n",
89                       FRESCAN_NEG_THREAD_PRIO);
90                 return ret;
91         }
92
93         ret = fosa_thread_create(&the_networks[net].neg_thread_id,
94                                   &attr,
95                                   frescan_master_neg_thread,
96                                   (void *)(uint32_t)net);
97
98         if (ret != 0) {
99                 ERROR("could not create the negotiator thread\n");
100                 return ret;
101         }
102
103         ret = fosa_thread_attr_destroy(&attr);
104         if (ret != 0) {
105                 ERROR("could not destroy thread attributes\n");
106                 return ret;
107         }
108
109         return 0;
110 }
111
112 /**
113  * frescan_acceptor_thread()
114  *
115  * a loop waiting for messages on the network and parsing them.
116  */
117
118 static void *frescan_acceptor_thread(void *arg)
119 {
120         int ret;
121         frescan_recv_params_t params;
122         uint8_t msg[200];
123         size_t recv_bytes;
124         frescan_node_t from;
125         frescan_prio_t prio;
126
127         DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
128               "master acceptor thread starts\n");
129
130         params.net = (frescan_network_t)(uint32_t)arg;
131         params.channel = FRESCAN_NEG_CHANNEL;
132         params.flags = FRESCAN_SYNC;
133
134         while(1) {
135                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
136                       "waiting for msg, net:%u chan:%u flags:%u\n",
137                       params.net, params.channel, params.flags);
138
139                 ret = frescan_recv(&params, msg, 200,
140                                    &recv_bytes, &from, &prio);
141                 assert(ret == 0);
142
143                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
144                       "msg received, from:%u size:%u prio:%u\n",
145                       from, recv_bytes, prio);
146
147                 ret = frescan_message_parse(params.net, msg, recv_bytes, from);
148                 assert(ret == 0);
149         }
150
151         return NULL;
152 }
153
154 /**
155  * frescan_master_neg_thread
156  *
157  * the thread that negotiates LOCAL contracts in the MASTER node.
158  */
159
160 static void *frescan_master_neg_thread(void *arg)
161 {
162         int err;
163         frescan_request_id_t request;
164         frescan_req_type_t   type;
165         frescan_robj_id_t    reply;
166         frescan_contract_t   *contract;
167         frescan_neg_return_info_t *neg_return_info;
168         frescan_server_params_t server_params;
169         frescan_network_t net = (uint32_t)arg;
170
171         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
172
173         while(1) {
174                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
175
176                 err = frescan_requestqueue_dequeue(&request);
177                 assert(err == 0);
178
179                 err = frescan_request_get_type(request, &type);
180                 assert(err == 0);
181
182                 switch(type) {
183                 case FRESCAN_NEGOTIATE:
184                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
185                                 "FRESCAN_NEGOTIATE request\n");
186                         err = frescan_request_get_contract(request, &contract);
187                         assert(err == 0);
188                         err = frescan_request_get_reply(request, &reply);
189                         assert(err == 0);
190                         err = frescan_request_get_return_info
191                                         (request, (void *)&neg_return_info);
192                         assert(err == 0);
193
194                         // TODO: sched test + add contract to table
195                         // so far always accepted witht he min values
196                         neg_return_info->error = 0;
197                         server_params.values = contract->min_values;
198                         server_params.prio   = contract->prio;
199
200                         err = frescan_servers_create
201                                        (net,
202                                         &server_params,
203                                         &neg_return_info->id);
204                         assert(err == 0);
205
206                         err = frescan_replyobject_signal(reply);
207                         assert(err == 0);
208                         break;
209
210                 case FRESCAN_RENEGOTIATE:
211                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
212                                 "FRESCAN_RENEGOTIATE request\n");
213                         break;
214
215                 case FRESCAN_CANCEL:
216                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
217                                 "FRESCAN_CANCEL request\n");
218                         break;
219
220                 default:
221                         ERROR("wrong request type %d\n", type);
222                 }
223         }
224
225         return NULL;
226 }