]> rtime.felk.cvut.cz Git - frescor/fna.git/blob - src_frescan/frescan_acceptor_threads.c
65118b487b0f9c0592cfc7624f01ddd551d9fa39
[frescor/fna.git] / src_frescan / frescan_acceptor_threads.c
1 /*!
2  * @file frescan_negotiation_threads.h
3  *
4  * @brief FRESCAN negotiation threads
5  *
6  * This module contains the acceptor threads and the master thread for local
7  * negotiations, with functions to create them.
8  *
9  * @version 0.01
10  *
11  * @date 2-Apr-2008
12  *
13  * @author Daniel Sangorrin <daniel.sangorrin@unican.es>
14  *
15  */
16
17 #include <assert.h>
18 #include "fosa_threads_and_signals.h" // fosa_thread_attr_init...
19 #include "frescan_negotiation_threads.h"
20 #include "frescan_config.h"
21 #include "frescan_debug.h"
22 #include "frescan_data.h"
23 #include "frescan_negotiation_messages.h"
24 #include "frescan_requests_queue.h"
25 #include "frescan_servers.h"
26
27 static void *frescan_acceptor_thread(void *arg);
28 static void *frescan_master_neg_thread(void *arg);
29
30 /**
31  * frescan_acceptor_thread_create()
32  */
33
34 int frescan_acceptor_thread_create(frescan_network_t net)
35 {
36         int ret;
37         fosa_thread_attr_t attr;
38
39         ret = fosa_thread_attr_init(&attr);
40         if (ret != 0) {
41                 ERROR("could not init thread attributes\n");
42                 return ret;
43         }
44
45         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_ACCEPTOR_THREAD_PRIO);
46         if (ret != 0) {
47                 ERROR("could not set acceptor thread prio %d\n",
48                       FRESCAN_ACCEPTOR_THREAD_PRIO);
49                 return ret;
50         }
51
52         ret = fosa_thread_create(&the_networks[net].acceptor_thread_id,
53                                   &attr,
54                                   frescan_acceptor_thread,
55                                   (void *)(uint32_t)net);
56
57         if (ret != 0) {
58                 ERROR("could not create the negotiator thread\n");
59                 return ret;
60         }
61
62         ret = fosa_thread_attr_destroy(&attr);
63         if (ret != 0) {
64                 ERROR("could not destroy thread attributes\n");
65                 return ret;
66         }
67
68         return 0;
69 }
70
71 /**
72  * frescan_master_neg_thread_create()
73  */
74
75 int frescan_master_neg_thread_create(frescan_network_t net)
76 {
77         int ret;
78         fosa_thread_attr_t attr;
79
80         ret = fosa_thread_attr_init(&attr);
81         if (ret != 0) {
82                 ERROR("could not init thread attributes\n");
83                 return ret;
84         }
85
86         ret = fosa_thread_attr_set_prio(&attr, FRESCAN_NEG_THREAD_PRIO);
87         if (ret != 0) {
88                 ERROR("could not set neg thread prio %d\n",
89                       FRESCAN_NEG_THREAD_PRIO);
90                 return ret;
91         }
92
93         ret = fosa_thread_create(&the_networks[net].neg_thread_id,
94                                   &attr,
95                                   frescan_master_neg_thread,
96                                   (void *)(uint32_t)net);
97
98         if (ret != 0) {
99                 ERROR("could not create the negotiator thread\n");
100                 return ret;
101         }
102
103         ret = fosa_thread_attr_destroy(&attr);
104         if (ret != 0) {
105                 ERROR("could not destroy thread attributes\n");
106                 return ret;
107         }
108
109         return 0;
110 }
111
112 /**
113  * frescan_acceptor_thread()
114  *
115  * a loop waiting for messages on the network and parsing them.
116  */
117
118 static void *frescan_acceptor_thread(void *arg)
119 {
120         int ret;
121         frescan_recv_params_t params;
122         uint8_t msg[200];
123         size_t recv_bytes;
124         frescan_node_t from;
125         frescan_prio_t prio;
126
127         DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
128               "master acceptor thread starts\n");
129
130         params.net = (frescan_network_t)(uint32_t)arg;
131         params.channel = FRESCAN_NEG_CHANNEL;
132         params.flags = FRESCAN_SYNC;
133
134         while(1) {
135                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
136                       "waiting for msg, net:%u chan:%u flags:%u\n",
137                       params.net, params.channel, params.flags);
138
139                 ret = frescan_recv(&params, msg, 200,
140                                    &recv_bytes, &from, &prio);
141                 assert(ret == 0);
142
143                 DEBUG(FRESCAN_ACCEPTOR_THREAD_ENABLE_DEBUG,
144                       "msg received, from:%u size:%u prio:%u\n",
145                       from, recv_bytes, prio);
146
147                 ret = frescan_message_parse(params.net, msg, recv_bytes, from);
148                 assert(ret == 0);
149         }
150
151         return NULL;
152 }
153
154 /**
155  * frescan_master_neg_thread
156  *
157  * the thread that negotiates LOCAL contracts in the MASTER node.
158  */
159
160 static void *frescan_master_neg_thread(void *arg)
161 {
162         int err;
163         frescan_request_id_t request;
164         frescan_req_type_t   type;
165         frescan_robj_id_t    reply;
166         frescan_contract_t   *contract;
167         frescan_neg_return_info_t *neg_return_info;
168         frescan_server_params_t server_params;
169
170         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "negotiator thread starts\n");
171
172         while(1) {
173                 DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG, "wait for a request\n");
174
175                 err = frescan_requestqueue_dequeue(&request);
176                 assert(err == 0);
177
178                 err = frescan_request_get_type(request, &type);
179                 assert(err == 0);
180
181                 switch(type) {
182                 case FRESCAN_NEGOTIATE:
183                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
184                                 "FRESCAN_NEGOTIATE request\n");
185                         err = frescan_request_get_contract(request, &contract);
186                         assert(err == 0);
187                         err = frescan_request_get_reply(request, &reply);
188                         assert(err == 0);
189                         err = frescan_request_get_return_info
190                                         (request, (void *)&neg_return_info);
191                         assert(err == 0);
192
193                         // TODO: sched test + add contract to table
194                         // so far always accepted witht he min values
195                         neg_return_info->error = 0;
196                         server_params.values = contract->min_values;
197                         server_params.prio   = contract->prio;
198                         err = frescan_servers_create
199                                        ((frescan_network_t)(uint32_t)arg,
200                                         &server_params,
201                                         &neg_return_info->id);
202                         assert(err == 0);
203
204                         err = frescan_replyobject_signal(reply);
205                         assert(err == 0);
206                         break;
207
208                 case FRESCAN_RENEGOTIATE:
209                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
210                                 "FRESCAN_RENEGOTIATE request\n");
211                         break;
212
213                 case FRESCAN_CANCEL:
214                         DEBUG(FRESCAN_NEG_THREAD_ENABLE_DEBUG,
215                                 "FRESCAN_CANCEL request\n");
216                         break;
217
218                 default:
219                         ERROR("wrong request type %d\n", type);
220                 }
221         }
222
223         return NULL;
224 }