]> rtime.felk.cvut.cz Git - sojka/libev.git/commitdiff
first round of ev_async
authorMarc Alexander Lehmann <libev@schmorp.de>
Thu, 31 Jan 2008 13:10:56 +0000 (13:10 +0000)
committerMarc Alexander Lehmann <libev@schmorp.de>
Thu, 31 Jan 2008 13:10:56 +0000 (13:10 +0000)
ev.c
ev.h
ev.pod
ev_vars.h
ev_wrap.h
event_compat.h

diff --git a/ev.c b/ev.c
index b33ad3b749722810ecbf17d7beb27df053da15ad..e7a189a70b6c61cf27ecfad60d909a3b82ceeb61 100644 (file)
--- a/ev.c
+++ b/ev.c
@@ -1,7 +1,7 @@
 /*
  * libev event processing core, watcher management
  *
- * Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
+ * Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without modifica-
@@ -293,7 +293,7 @@ typedef ev_watcher_time *WT;
 #if EV_USE_MONOTONIC
 /* sig_atomic_t is used to avoid per-thread variables or locking but still */
 /* giving it a reasonably high chance of working on typical architetcures */
-static sig_atomic_t have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */
+static EV_ATOMIC_T have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */
 #endif
 
 #ifdef _WIN32
@@ -765,15 +765,13 @@ adjustheap (WT *heap, int N, int k)
 typedef struct
 {
   WL head;
-  sig_atomic_t volatile gotsig;
+  EV_ATOMIC_T gotsig;
 } ANSIG;
 
 static ANSIG *signals;
 static int signalmax;
 
-static int sigpipe [2];
-static sig_atomic_t volatile gotsig;
-static ev_io sigev;
+static EV_ATOMIC_T gotsig;
 
 void inline_size
 signals_init (ANSIG *base, int count)
@@ -787,24 +785,101 @@ signals_init (ANSIG *base, int count)
     }
 }
 
-static void
-sighandler (int signum)
+/*****************************************************************************/
+
+void inline_speed
+fd_intern (int fd)
 {
-#if _WIN32
-  signal (signum, sighandler);
+#ifdef _WIN32
+  int arg = 1;
+  ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg);
+#else
+  fcntl (fd, F_SETFD, FD_CLOEXEC);
+  fcntl (fd, F_SETFL, O_NONBLOCK);
 #endif
+}
 
-  signals [signum - 1].gotsig = 1;
+static void noinline
+evpipe_init (EV_P)
+{
+  if (!ev_is_active (&pipeev))
+    {
+      while (pipe (evpipe))
+        syserr ("(libev) error creating signal/async pipe");
+
+      fd_intern (evpipe [0]);
+      fd_intern (evpipe [1]);
+
+      ev_io_set (&pipeev, evpipe [0], EV_READ);
+      ev_io_start (EV_A_ &pipeev);
+      ev_unref (EV_A); /* child watcher should not keep loop alive */
+    }
+}
 
-  if (!gotsig)
+void inline_size
+evpipe_write (EV_P_ int sig, int async)
+{
+  if (!(gotasync || gotsig))
     {
       int old_errno = errno;
-      gotsig = 1;
-      write (sigpipe [1], &signum, 1);
+
+      if (sig)   gotsig   = 1;
+      if (async) gotasync = 1;
+
+      write (evpipe [1], &old_errno, 1);
       errno = old_errno;
     }
 }
 
+static void
+pipecb (EV_P_ ev_io *iow, int revents)
+{
+  {
+    int dummy;
+    read (evpipe [0], &dummy, 1);
+  }
+
+  if (gotsig)
+    {    
+      int signum;
+      gotsig = 0;
+
+      for (signum = signalmax; signum--; )
+        if (signals [signum].gotsig)
+          ev_feed_signal_event (EV_A_ signum + 1);
+    }
+
+  if (gotasync)
+    {
+      int i;
+      gotasync = 0;
+
+      for (i = asynccnt; i--; )
+        if (asyncs [i]->sent)
+          {
+            asyncs [i]->sent = 0;
+            ev_feed_event (EV_A_ asyncs [i], EV_ASYNC);
+          }
+    }
+}
+
+/*****************************************************************************/
+
+static void
+sighandler (int signum)
+{
+#if EV_MULTIPLICITY
+  struct ev_loop *loop = &default_loop_struct;
+#endif
+
+#if _WIN32
+  signal (signum, sighandler);
+#endif
+
+  signals [signum - 1].gotsig = 1;
+  evpipe_write (EV_A_ 1, 0);
+}
+
 void noinline
 ev_feed_signal_event (EV_P_ int signum)
 {
@@ -825,42 +900,6 @@ ev_feed_signal_event (EV_P_ int signum)
     ev_feed_event (EV_A_ (W)w, EV_SIGNAL);
 }
 
-static void
-sigcb (EV_P_ ev_io *iow, int revents)
-{
-  int signum;
-
-  read (sigpipe [0], &revents, 1);
-  gotsig = 0;
-
-  for (signum = signalmax; signum--; )
-    if (signals [signum].gotsig)
-      ev_feed_signal_event (EV_A_ signum + 1);
-}
-
-void inline_speed
-fd_intern (int fd)
-{
-#ifdef _WIN32
-  int arg = 1;
-  ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg);
-#else
-  fcntl (fd, F_SETFD, FD_CLOEXEC);
-  fcntl (fd, F_SETFL, O_NONBLOCK);
-#endif
-}
-
-static void noinline
-siginit (EV_P)
-{
-  fd_intern (sigpipe [0]);
-  fd_intern (sigpipe [1]);
-
-  ev_io_set (&sigev, sigpipe [0], EV_READ);
-  ev_io_start (EV_A_ &sigev);
-  ev_unref (EV_A); /* child watcher should not keep loop alive */
-}
-
 /*****************************************************************************/
 
 static WL childs [EV_PID_HASHSIZE];
@@ -1086,8 +1125,8 @@ loop_init (EV_P_ unsigned int flags)
       if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
 #endif
 
-      ev_init (&sigev, sigcb);
-      ev_set_priority (&sigev, EV_MAXPRI);
+      ev_init (&pipeev, pipecb);
+      ev_set_priority (&pipeev, EV_MAXPRI);
     }
 }
 
@@ -1096,6 +1135,15 @@ loop_destroy (EV_P)
 {
   int i;
 
+  if (ev_is_active (&pipeev))
+    {
+      ev_ref (EV_A); /* signal watcher */
+      ev_io_stop (EV_A_ &pipeev);
+
+      close (evpipe [0]); evpipe [0] = 0;
+      close (evpipe [1]); evpipe [1] = 0;
+    }
+
 #if EV_USE_INOTIFY
   if (fs_fd >= 0)
     close (fs_fd);
@@ -1163,20 +1211,19 @@ loop_fork (EV_P)
   infy_fork (EV_A);
 #endif
 
-  if (ev_is_active (&sigev))
+  if (ev_is_active (&pipeev))
     {
-      /* default loop */
+      /* this "locks" the handlers against writing to the pipe */
+      gotsig = gotasync = 1;
 
       ev_ref (EV_A);
-      ev_io_stop (EV_A_ &sigev);
-      close (sigpipe [0]);
-      close (sigpipe [1]);
+      ev_io_stop (EV_A_ &pipeev);
+      close (evpipe [0]);
+      close (evpipe [1]);
 
-      while (pipe (sigpipe))
-        syserr ("(libev) error creating pipe");
-
-      siginit (EV_A);
-      sigcb (EV_A_ &sigev, EV_READ);
+      evpipe_init (EV_A);
+      /* now iterate over everything */
+      evcb (EV_A_ &pipeev, EV_READ);
     }
 
   postfork = 0;
@@ -1221,10 +1268,6 @@ int
 ev_default_loop (unsigned int flags)
 #endif
 {
-  if (sigpipe [0] == sigpipe [1])
-    if (pipe (sigpipe))
-      return 0;
-
   if (!ev_default_loop_ptr)
     {
 #if EV_MULTIPLICITY
@@ -1237,8 +1280,6 @@ ev_default_loop (unsigned int flags)
 
       if (ev_backend (EV_A))
         {
-          siginit (EV_A);
-
 #ifndef _WIN32
           ev_signal_init (&childev, childcb, SIGCHLD);
           ev_set_priority (&childev, EV_MAXPRI);
@@ -1265,12 +1306,6 @@ ev_default_destroy (void)
   ev_signal_stop (EV_A_ &childev);
 #endif
 
-  ev_ref (EV_A); /* signal watcher */
-  ev_io_stop (EV_A_ &sigev);
-
-  close (sigpipe [0]); sigpipe [0] = 0;
-  close (sigpipe [1]); sigpipe [1] = 0;
-
   loop_destroy (EV_A);
 }
 
@@ -1867,6 +1902,8 @@ ev_signal_start (EV_P_ ev_signal *w)
 
   assert (("ev_signal_start called with illegal signal number", w->signum > 0));
 
+  evpipe_init (EV_A);
+
   {
 #ifndef _WIN32
     sigset_t full, prev;
@@ -2389,6 +2426,44 @@ ev_fork_stop (EV_P_ ev_fork *w)
 }
 #endif
 
+#if EV_ASYNC_ENABLE
+void
+ev_async_start (EV_P_ ev_async *w)
+{
+  if (expect_false (ev_is_active (w)))
+    return;
+
+  evpipe_init (EV_A);
+
+  ev_start (EV_A_ (W)w, ++asynccnt);
+  array_needsize (ev_async *, asyncs, asyncmax, asynccnt, EMPTY2);
+  asyncs [asynccnt - 1] = w;
+}
+
+void
+ev_async_stop (EV_P_ ev_async *w)
+{
+  clear_pending (EV_A_ (W)w);
+  if (expect_false (!ev_is_active (w)))
+    return;
+
+  {
+    int active = ((W)w)->active;
+    asyncs [active - 1] = asyncs [--asynccnt];
+    ((W)asyncs [active - 1])->active = active;
+  }
+
+  ev_stop (EV_A_ (W)w);
+}
+
+void
+ev_async_send (EV_P_ ev_async *w)
+{
+  w->sent = 1;
+  evpipe_write (EV_A_ 0, 1);
+}
+#endif
+
 /*****************************************************************************/
 
 struct ev_once
diff --git a/ev.h b/ev.h
index b1ebab5994a375607494e6d36191379591cfc590..4dab80da0b852d08a1634b2bc2cd849ffecce6ac 100644 (file)
--- a/ev.h
+++ b/ev.h
@@ -1,7 +1,7 @@
 /*
  * libev native API header
  *
- * Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
+ * Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without modifica-
@@ -78,6 +78,15 @@ typedef double ev_tstamp;
 # define EV_EMBED_ENABLE 1
 #endif
 
+#ifndef EV_ASYNC_ENABLE
+# define EV_ASYNC_ENABLE 1
+#endif
+
+#ifndef EV_ATOMIC_T
+# include <signal.h>
+# define EV_ATOMIC_T sig_atomic_t volatile
+#endif
+
 /*****************************************************************************/
 
 #if EV_STAT_ENABLE
@@ -120,6 +129,7 @@ struct ev_loop;
 #define EV_CHECK    0x00008000L /* event loop finished poll */
 #define EV_EMBED    0x00010000L /* embedded event loop needs sweep */
 #define EV_FORK     0x00020000L /* event loop resumed in child */
+#define EV_ASYNC    0x00040000L /* async intra-loop signal */
 #define EV_ERROR    0x80000000L /* sent when an error occurs */
 
 /* can be used to add custom fields to all watchers, while losing binary compatibility */
@@ -307,6 +317,17 @@ typedef struct ev_embed
 } ev_embed;
 #endif
 
+#if EV_ASYNC_ENABLE
+/* invoked when somebody calls ev_async_send on the watcher */
+/* revent EV_ASYNC */
+typedef struct ev_async
+{
+  EV_WATCHER (ev_async)
+
+  EV_ATOMIC_T sent; /* private */
+} ev_async;
+#endif
+
 /* the presence of this union forces similar struct layout */
 union ev_any_watcher
 {
@@ -332,6 +353,9 @@ union ev_any_watcher
 #if EV_EMBED_ENABLE
   struct ev_embed embed;
 #endif
+#if EV_ASYND_ENABLE
+  struct ev_async async;
+#endif
 };
 
 /* bits for ev_default_loop and ev_loop_new */
@@ -465,6 +489,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent
 #define ev_check_set(ev)                    /* nop, yes, this is a serious in-joke */
 #define ev_embed_set(ev,other_)             do { (ev)->other = (other_); } while (0)
 #define ev_fork_set(ev)                     /* nop, yes, this is a serious in-joke */
+#define ev_async_set(ev)                    do { (ev)->gotasync = 0; } while (0)
 
 #define ev_io_init(ev,cb,fd,events)         do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0)
 #define ev_timer_init(ev,cb,after,repeat)   do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0)
@@ -477,6 +502,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent
 #define ev_check_init(ev,cb)                do { ev_init ((ev), (cb)); ev_check_set ((ev)); } while (0)
 #define ev_embed_init(ev,cb,other)          do { ev_init ((ev), (cb)); ev_embed_set ((ev),(other)); } while (0)
 #define ev_fork_init(ev,cb)                 do { ev_init ((ev), (cb)); ev_fork_set ((ev)); } while (0)
+#define ev_async_init(ev,cb)                do { ev_init ((ev), (cb)); ev_async_set ((ev)); } while (0)
 
 #define ev_is_pending(ev)                   (0 + ((ev_watcher *)(void *)(ev))->pending) /* ro, true when watcher is waiting for callback invocation */
 #define ev_is_active(ev)                    (0 + ((ev_watcher *)(void *)(ev))->active) /* ro, true when the watcher has been started */
@@ -552,6 +578,12 @@ void ev_embed_stop     (EV_P_ ev_embed *w);
 void ev_embed_sweep    (EV_P_ ev_embed *w);
 # endif
 
+# if EV_ASYNC_ENABLE
+void ev_async_start    (EV_P_ ev_async *w);
+void ev_async_stop     (EV_P_ ev_async *w);
+void ev_async_send     (EV_P_ ev_async *w);
+# endif
+
 #endif
 
 #ifdef __cplusplus
diff --git a/ev.pod b/ev.pod
index ff636964f7866548ae6db0fa65a8033ed70d762f..46bac97d2bdfae732d726f864ae7e7ba3124ec8b 100644 (file)
--- a/ev.pod
+++ b/ev.pod
@@ -776,6 +776,10 @@ The embedded event loop specified in the C<ev_embed> watcher needs attention.
 The event loop has been resumed in the child process after fork (see
 C<ev_fork>).
 
+=item C<EV_ASYNC>
+
+The given async watcher has been asynchronously notified (see C<ev_async>).
+
 =item C<EV_ERROR>
 
 An unspecified error has occured, the watcher has been stopped. This might
@@ -2048,6 +2052,51 @@ believe me.
 =back
 
 
+=head2 C<ev_async> - how to wake up another event loop
+
+In general, you cannot use an C<ev_loop> from multiple threads or other
+asynchronous sources such as signal handlers (as opposed to multiple event
+loops - those are of course safe to use in different threads).
+
+Sometimes, however, you need to wake up another event loop you do not
+control, for example because it belongs to another thread. This is what
+C<ev_async> watchers do: as long as the C<ev_async> watcher is active, you
+can signal it by calling C<ev_async_send>, which is thread- and signal
+safe.
+
+This functionality is very similar to C<ev_signal> watchers, as signals,
+too, are asynchronous in nature, and signals, too, will be compressed
+(i.e. the number of callback invocations may be less than the number of
+C<ev_async_sent> calls).
+
+Unlike C<ev_signal> watchers, C<ev_async> works with any event loop, not
+just the default loop.
+
+=head3 Watcher-Specific Functions and Data Members
+
+=over 4
+
+=item ev_async_init (ev_async *, callback)
+
+Initialises and configures the async watcher - it has no parameters of any
+kind. There is a C<ev_asynd_set> macro, but using it is utterly pointless,
+believe me.
+
+=item ev_async_send (loop, ev_async *)
+
+Sends/signals/activates the given C<ev_async> watcher, that is, feeds
+an C<EV_ASYNC> event on the watcher into the event loop. Unlike
+C<ev_feed_event>, this call is safe to do in other threads, signal or
+similar contexts (see the dicusssion of C<EV_ATOMIC_T> in the embedding
+section below on what exactly this means).
+
+This call incurs the overhead of a syscall only once per loop iteration,
+so while the overhead might be noticable, it doesn't apply to repeated
+calls to C<ev_async_send>.
+
+=back
+
+
 =head1 OTHER FUNCTIONS
 
 There are some other functions of possible interest. Described. Here. Now.
index 4ac997e7fc5cb96b3e392814382f8a46b3803976..c9637f2fee78d5010e07f1440c59fbef529c3d2c 100644 (file)
--- a/ev_vars.h
+++ b/ev_vars.h
@@ -1,7 +1,7 @@
 /*
  * loop member variable declarations
  *
- * Copyright (c) 2007 Marc Alexander Lehmann <libev@schmorp.de>
+ * Copyright (c) 2007,2008 Marc Alexander Lehmann <libev@schmorp.de>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without modifica-
@@ -55,6 +55,9 @@ VARx(ev_tstamp, backend_fudge) /* assumed typical timer resolution */
 VAR (backend_modify, void (*backend_modify)(EV_P_ int fd, int oev, int nev))
 VAR (backend_poll  , void (*backend_poll)(EV_P_ ev_tstamp timeout))
 
+VAR (evpipe, int evpipe [2])
+VARx(ev_io, pipeev)
+
 #if !defined(_WIN32) || EV_GENWRAP
 VARx(pid_t, curpid)
 #endif
@@ -137,6 +140,13 @@ VARx(int, forkmax)
 VARx(int, forkcnt)
 #endif
 
+#if EV_ASYNC_ENABLE || EV_GENWRAP
+VARx(EV_ATOMIC_T, gotasync)
+VARx(struct ev_async **, asyncs)
+VARx(int, asyncmax)
+VARx(int, asynccnt)
+#endif
+
 #if EV_USE_INOTIFY || EV_GENWRAP
 VARx(int, fs_fd)
 VARx(ev_io, fs_w)
index f9bc61e75d0f0697aad9b60104a7622cd90e3631..fbfc802a9009cfda4d98dc1ed7fc9b868dab6883 100644 (file)
--- a/ev_wrap.h
+++ b/ev_wrap.h
@@ -13,6 +13,8 @@
 #define backend_fudge ((loop)->backend_fudge)
 #define backend_modify ((loop)->backend_modify)
 #define backend_poll ((loop)->backend_poll)
+#define evpipe ((loop)->evpipe)
+#define pipeev ((loop)->pipeev)
 #define curpid ((loop)->curpid)
 #define postfork ((loop)->postfork)
 #define vec_ri ((loop)->vec_ri)
 #define forks ((loop)->forks)
 #define forkmax ((loop)->forkmax)
 #define forkcnt ((loop)->forkcnt)
+#define gotasync ((loop)->gotasync)
+#define asyncs ((loop)->asyncs)
+#define asyncmax ((loop)->asyncmax)
+#define asynccnt ((loop)->asynccnt)
 #define fs_fd ((loop)->fs_fd)
 #define fs_w ((loop)->fs_w)
 #define fs_hash ((loop)->fs_hash)
@@ -78,6 +84,8 @@
 #undef backend_fudge
 #undef backend_modify
 #undef backend_poll
+#undef evpipe
+#undef pipeev
 #undef curpid
 #undef postfork
 #undef vec_ri
 #undef forks
 #undef forkmax
 #undef forkcnt
+#undef gotasync
+#undef asyncs
+#undef asyncmax
+#undef asynccnt
 #undef fs_fd
 #undef fs_w
 #undef fs_hash
index 481190657daddf49b315fc0a810dc182e8f0e667..d5cc1effa7c8c3713264b54cba4f03dcae58a08b 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
+ * Copyright (c) 2008      Marc Alexander Lehmann <libev@schmorp.de>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without