]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/ocaml/contrib/otherlibs/systhreads/event.ml
Inital import
[l4.git] / l4 / pkg / ocaml / contrib / otherlibs / systhreads / event.ml
1 (***********************************************************************)
2 (*                                                                     *)
3 (*                           Objective Caml                            *)
4 (*                                                                     *)
5 (*  David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt   *)
6 (*                                                                     *)
7 (*  Copyright 1996 Institut National de Recherche en Informatique et   *)
8 (*  en Automatique.  All rights reserved.  This file is distributed    *)
9 (*  under the terms of the GNU Library General Public License, with    *)
10 (*  the special exception on linking described in file ../../LICENSE.  *)
11 (*                                                                     *)
12 (***********************************************************************)
13
14 (* $Id: event.ml 4144 2001-12-07 13:41:02Z xleroy $ *)
15
16 (* Events *)
17 type 'a basic_event =
18   { poll: unit -> bool;
19       (* If communication can take place immediately, return true. *)
20     suspend: unit -> unit;
21       (* Offer the communication on the channel and get ready
22          to suspend current process. *)
23     result: unit -> 'a }
24       (* Return the result of the communication *)
25
26 type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
27
28 type 'a event =
29     Communication of 'a behavior
30   | Choose of 'a event list
31   | WrapAbort of 'a event * (unit -> unit)
32   | Guard of (unit -> 'a event)
33
34 (* Communication channels *)
35 type 'a channel =
36   { mutable writes_pending: 'a communication Queue.t;
37                         (* All offers to write on it *)
38     mutable reads_pending:  'a communication Queue.t }
39                         (* All offers to read from it *)
40
41 (* Communication offered *)
42 and 'a communication =
43   { performed: int ref;  (* -1 if not performed yet, set to the number *)
44                          (* of the matching communication after rendez-vous. *)
45     condition: Condition.t;             (* To restart the blocked thread. *)
46     mutable data: 'a option;            (* The data sent or received. *)
47     event_number: int }                 (* Event number in select *)
48
49 (* Create a channel *)
50
51 let new_channel () =
52   { writes_pending = Queue.create();
53     reads_pending = Queue.create() }
54
55 (* Basic synchronization function *)
56
57 let masterlock = Mutex.create()
58
59 let do_aborts abort_env genev performed =
60   if abort_env <> [] then begin
61     if performed >= 0 then begin
62       let ids_done = snd genev.(performed) in
63       List.iter
64         (fun (id,f) -> if not (List.mem id ids_done) then f ())
65         abort_env
66     end else begin
67       List.iter (fun (_,f) -> f ()) abort_env
68     end
69   end
70
71 let basic_sync abort_env genev =
72   let performed = ref (-1) in
73   let condition = Condition.create() in
74   let bev = Array.create (Array.length genev)
75                          (fst (genev.(0)) performed condition 0) in
76   for i = 1 to Array.length genev - 1 do
77     bev.(i) <- (fst genev.(i)) performed condition i
78   done;
79   (* See if any of the events is already activable *)
80   let rec poll_events i =
81     if i >= Array.length bev
82     then false
83     else bev.(i).poll() || poll_events (i+1) in
84   Mutex.lock masterlock;
85   if not (poll_events 0) then begin
86     (* Suspend on all events *)
87     for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
88     (* Wait until the condition is signalled *)
89     Condition.wait condition masterlock
90   end;
91   Mutex.unlock masterlock;
92   (* Extract the result *)
93   if abort_env = [] then
94     (* Preserve tail recursion *)
95     bev.(!performed).result() 
96   else begin
97     let num = !performed in
98     let result = bev.(num).result() in
99     (* Handle the aborts and return the result *)
100     do_aborts abort_env genev num;
101     result
102   end
103
104 (* Apply a random permutation on an array *)
105
106 let scramble_array a =
107   let len = Array.length a in
108   if len = 0 then invalid_arg "Event.choose";
109   for i = len - 1 downto 1 do
110     let j = Random.int (i + 1) in
111     let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
112   done;
113   a
114
115 (* Main synchronization function *)
116
117 let gensym = let count = ref 0 in fun () -> incr count; !count
118
119 let rec flatten_event
120       (abort_list : int list)
121       (accu : ('a behavior * int list) list)
122       (accu_abort : (int * (unit -> unit)) list)
123       ev =
124   match ev with
125      Communication bev -> ((bev,abort_list) :: accu) , accu_abort
126   | WrapAbort (ev,fn) ->
127       let id = gensym () in
128       flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
129   | Choose evl ->
130       let rec flatten_list accu' accu_abort'= function
131          ev :: l ->
132            let (accu'',accu_abort'') =
133              flatten_event abort_list accu' accu_abort' ev in
134            flatten_list accu'' accu_abort'' l
135        | [] -> (accu',accu_abort') in
136       flatten_list accu accu_abort evl
137   | Guard fn -> flatten_event abort_list accu accu_abort (fn ())
138
139 let sync ev =
140   let (evl,abort_env) = flatten_event [] [] [] ev in
141   basic_sync abort_env (scramble_array(Array.of_list evl))
142
143 (* Event polling -- like sync, but non-blocking *)
144
145 let basic_poll abort_env genev =
146   let performed = ref (-1) in
147   let condition = Condition.create() in
148   let bev = Array.create(Array.length genev)
149                         (fst genev.(0) performed condition 0) in
150   for i = 1 to Array.length genev - 1 do
151     bev.(i) <- fst genev.(i) performed condition i
152   done;
153   (* See if any of the events is already activable *)
154   let rec poll_events i =
155     if i >= Array.length bev
156     then false
157     else bev.(i).poll() || poll_events (i+1) in
158   Mutex.lock masterlock;
159   let ready = poll_events 0 in
160   if ready then begin
161     (* Extract the result *)
162     Mutex.unlock masterlock;
163     let result = Some(bev.(!performed).result()) in
164     do_aborts abort_env genev !performed; result
165   end else begin
166     (* Cancel the communication offers *)
167     performed := 0;
168     Mutex.unlock masterlock;
169     do_aborts abort_env genev (-1);
170     None
171   end
172
173 let poll ev =
174   let (evl,abort_env) = flatten_event [] [] [] ev in
175   basic_poll abort_env (scramble_array(Array.of_list evl))
176
177 (* Remove all communication opportunities already synchronized *)
178
179 let cleanup_queue q =
180   let q' = Queue.create() in
181   Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
182   q'
183
184 (* Event construction *)
185
186 let always data =
187   Communication(fun performed condition evnum ->
188     { poll = (fun () -> performed := evnum; true);
189       suspend = (fun () -> ());
190       result = (fun () -> data) })
191
192 let send channel data =
193   Communication(fun performed condition evnum ->
194     let wcomm =
195       { performed = performed;
196         condition = condition;
197         data = Some data;
198         event_number = evnum } in
199     { poll = (fun () ->
200         let rec poll () =
201           let rcomm = Queue.take channel.reads_pending in
202           if !(rcomm.performed) >= 0 then
203             poll ()
204           else begin
205             rcomm.data <- wcomm.data;
206             performed := evnum;
207             rcomm.performed := rcomm.event_number;
208             Condition.signal rcomm.condition
209           end in
210         try
211           poll();
212           true
213         with Queue.Empty ->
214           false);
215       suspend = (fun () ->
216         channel.writes_pending <- cleanup_queue channel.writes_pending;
217         Queue.add wcomm channel.writes_pending);
218       result = (fun () -> ()) })
219
220 let receive channel =
221   Communication(fun performed condition evnum ->
222     let rcomm =
223       { performed = performed;
224         condition = condition;
225         data = None;
226         event_number = evnum } in
227     { poll = (fun () ->
228         let rec poll () =
229           let wcomm = Queue.take channel.writes_pending in
230           if !(wcomm.performed) >= 0 then
231             poll ()
232           else begin
233             rcomm.data <- wcomm.data;
234             performed := evnum;
235             wcomm.performed := wcomm.event_number;
236             Condition.signal wcomm.condition
237           end in
238         try
239           poll();
240           true
241         with Queue.Empty ->
242           false);
243     suspend = (fun () ->
244       channel.reads_pending <- cleanup_queue channel.reads_pending;
245       Queue.add rcomm channel.reads_pending);
246     result = (fun () ->
247       match rcomm.data with
248         None -> invalid_arg "Event.receive"
249       | Some res -> res) })
250
251 let choose evl = Choose evl
252
253 let wrap_abort ev fn = WrapAbort(ev,fn)
254
255 let guard fn = Guard fn
256
257 let rec wrap ev fn =
258   match ev with
259     Communication genev ->
260       Communication(fun performed condition evnum ->
261         let bev = genev performed condition evnum in
262         { poll = bev.poll;
263           suspend = bev.suspend;
264           result = (fun () -> fn(bev.result())) })
265   | Choose evl ->
266       Choose(List.map (fun ev -> wrap ev fn) evl)
267   | WrapAbort (ev, f') ->
268       WrapAbort (wrap ev fn, f')
269   | Guard gu ->
270       Guard(fun () -> wrap (gu()) fn)
271
272 (* Convenience functions *)
273
274 let select evl = sync(Choose evl)