1 (***********************************************************************)
5 (* David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
7 (* Copyright 1996 Institut National de Recherche en Informatique et *)
8 (* en Automatique. All rights reserved. This file is distributed *)
9 (* under the terms of the GNU Library General Public License, with *)
10 (* the special exception on linking described in file ../../LICENSE. *)
12 (***********************************************************************)
14 (* $Id: event.ml 4144 2001-12-07 13:41:02Z xleroy $ *)
19 (* If communication can take place immediately, return true. *)
20 suspend: unit -> unit;
21 (* Offer the communication on the channel and get ready
22 to suspend current process. *)
24 (* Return the result of the communication *)
26 type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
29 Communication of 'a behavior
30 | Choose of 'a event list
31 | WrapAbort of 'a event * (unit -> unit)
32 | Guard of (unit -> 'a event)
34 (* Communication channels *)
36 { mutable writes_pending: 'a communication Queue.t;
37 (* All offers to write on it *)
38 mutable reads_pending: 'a communication Queue.t }
39 (* All offers to read from it *)
41 (* Communication offered *)
42 and 'a communication =
43 { performed: int ref; (* -1 if not performed yet, set to the number *)
44 (* of the matching communication after rendez-vous. *)
45 condition: Condition.t; (* To restart the blocked thread. *)
46 mutable data: 'a option; (* The data sent or received. *)
47 event_number: int } (* Event number in select *)
49 (* Create a channel *)
52 { writes_pending = Queue.create();
53 reads_pending = Queue.create() }
55 (* Basic synchronization function *)
57 let masterlock = Mutex.create()
59 let do_aborts abort_env genev performed =
60 if abort_env <> [] then begin
61 if performed >= 0 then begin
62 let ids_done = snd genev.(performed) in
64 (fun (id,f) -> if not (List.mem id ids_done) then f ())
67 List.iter (fun (_,f) -> f ()) abort_env
71 let basic_sync abort_env genev =
72 let performed = ref (-1) in
73 let condition = Condition.create() in
74 let bev = Array.create (Array.length genev)
75 (fst (genev.(0)) performed condition 0) in
76 for i = 1 to Array.length genev - 1 do
77 bev.(i) <- (fst genev.(i)) performed condition i
79 (* See if any of the events is already activable *)
80 let rec poll_events i =
81 if i >= Array.length bev
83 else bev.(i).poll() || poll_events (i+1) in
84 Mutex.lock masterlock;
85 if not (poll_events 0) then begin
86 (* Suspend on all events *)
87 for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
88 (* Wait until the condition is signalled *)
89 Condition.wait condition masterlock
91 Mutex.unlock masterlock;
92 (* Extract the result *)
93 if abort_env = [] then
94 (* Preserve tail recursion *)
95 bev.(!performed).result()
97 let num = !performed in
98 let result = bev.(num).result() in
99 (* Handle the aborts and return the result *)
100 do_aborts abort_env genev num;
104 (* Apply a random permutation on an array *)
106 let scramble_array a =
107 let len = Array.length a in
108 if len = 0 then invalid_arg "Event.choose";
109 for i = len - 1 downto 1 do
110 let j = Random.int (i + 1) in
111 let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
115 (* Main synchronization function *)
117 let gensym = let count = ref 0 in fun () -> incr count; !count
119 let rec flatten_event
120 (abort_list : int list)
121 (accu : ('a behavior * int list) list)
122 (accu_abort : (int * (unit -> unit)) list)
125 Communication bev -> ((bev,abort_list) :: accu) , accu_abort
126 | WrapAbort (ev,fn) ->
127 let id = gensym () in
128 flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
130 let rec flatten_list accu' accu_abort'= function
132 let (accu'',accu_abort'') =
133 flatten_event abort_list accu' accu_abort' ev in
134 flatten_list accu'' accu_abort'' l
135 | [] -> (accu',accu_abort') in
136 flatten_list accu accu_abort evl
137 | Guard fn -> flatten_event abort_list accu accu_abort (fn ())
140 let (evl,abort_env) = flatten_event [] [] [] ev in
141 basic_sync abort_env (scramble_array(Array.of_list evl))
143 (* Event polling -- like sync, but non-blocking *)
145 let basic_poll abort_env genev =
146 let performed = ref (-1) in
147 let condition = Condition.create() in
148 let bev = Array.create(Array.length genev)
149 (fst genev.(0) performed condition 0) in
150 for i = 1 to Array.length genev - 1 do
151 bev.(i) <- fst genev.(i) performed condition i
153 (* See if any of the events is already activable *)
154 let rec poll_events i =
155 if i >= Array.length bev
157 else bev.(i).poll() || poll_events (i+1) in
158 Mutex.lock masterlock;
159 let ready = poll_events 0 in
161 (* Extract the result *)
162 Mutex.unlock masterlock;
163 let result = Some(bev.(!performed).result()) in
164 do_aborts abort_env genev !performed; result
166 (* Cancel the communication offers *)
168 Mutex.unlock masterlock;
169 do_aborts abort_env genev (-1);
174 let (evl,abort_env) = flatten_event [] [] [] ev in
175 basic_poll abort_env (scramble_array(Array.of_list evl))
177 (* Remove all communication opportunities already synchronized *)
179 let cleanup_queue q =
180 let q' = Queue.create() in
181 Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
184 (* Event construction *)
187 Communication(fun performed condition evnum ->
188 { poll = (fun () -> performed := evnum; true);
189 suspend = (fun () -> ());
190 result = (fun () -> data) })
192 let send channel data =
193 Communication(fun performed condition evnum ->
195 { performed = performed;
196 condition = condition;
198 event_number = evnum } in
201 let rcomm = Queue.take channel.reads_pending in
202 if !(rcomm.performed) >= 0 then
205 rcomm.data <- wcomm.data;
207 rcomm.performed := rcomm.event_number;
208 Condition.signal rcomm.condition
216 channel.writes_pending <- cleanup_queue channel.writes_pending;
217 Queue.add wcomm channel.writes_pending);
218 result = (fun () -> ()) })
220 let receive channel =
221 Communication(fun performed condition evnum ->
223 { performed = performed;
224 condition = condition;
226 event_number = evnum } in
229 let wcomm = Queue.take channel.writes_pending in
230 if !(wcomm.performed) >= 0 then
233 rcomm.data <- wcomm.data;
235 wcomm.performed := wcomm.event_number;
236 Condition.signal wcomm.condition
244 channel.reads_pending <- cleanup_queue channel.reads_pending;
245 Queue.add rcomm channel.reads_pending);
247 match rcomm.data with
248 None -> invalid_arg "Event.receive"
249 | Some res -> res) })
251 let choose evl = Choose evl
253 let wrap_abort ev fn = WrapAbort(ev,fn)
255 let guard fn = Guard fn
259 Communication genev ->
260 Communication(fun performed condition evnum ->
261 let bev = genev performed condition evnum in
263 suspend = bev.suspend;
264 result = (fun () -> fn(bev.result())) })
266 Choose(List.map (fun ev -> wrap ev fn) evl)
267 | WrapAbort (ev, f') ->
268 WrapAbort (wrap ev fn, f')
270 Guard(fun () -> wrap (gu()) fn)
272 (* Convenience functions *)
274 let select evl = sync(Choose evl)