diff options
Diffstat (limited to 'tokio-reactor/src/lib.rs')
-rw-r--r-- | tokio-reactor/src/lib.rs | 206 |
1 files changed, 28 insertions, 178 deletions
diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index fb8f3531..ff2ab686 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -32,38 +32,43 @@ //! [`PollEvented`]: struct.PollEvented.html //! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html -pub(crate) mod background; +macro_rules! ready { + ($e:expr) => { + match $e { + ::std::task::Poll::Ready(v) => v, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; +} + mod poll_evented; mod registration; mod sharded_rwlock; // ===== Public re-exports ===== -pub use self::background::{Background, Shutdown}; pub use self::poll_evented::PollEvented; pub use self::registration::Registration; // ===== Private imports ===== use crate::sharded_rwlock::RwLock; -use futures::task::Task; use log::{debug, log_enabled, trace, Level}; use mio::event::Evented; use slab::Slab; use std::cell::RefCell; -use std::error::Error; use std::io; -use std::mem; #[cfg(all(unix, not(target_os = "fuchsia")))] use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, Weak}; +use std::task::Waker; use std::time::{Duration, Instant}; use std::{fmt, usize}; use tokio_executor::park::{Park, Unpark}; use tokio_executor::Enter; -use tokio_sync::task::AtomicTask; +use tokio_sync::task::AtomicWaker; /// The core reactor, or event loop. /// @@ -109,14 +114,6 @@ pub struct Turn { _priv: (), } -/// Error returned from `Handle::set_fallback`. -#[derive(Clone, Debug)] -pub struct SetFallbackError(()); - -#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")] -#[doc(hidden)] -pub type SetDefaultError = SetFallbackError; - #[test] fn test_handle_size() { use std::mem; @@ -140,8 +137,8 @@ struct Inner { struct ScheduledIo { aba_guard: usize, readiness: AtomicUsize, - reader: AtomicTask, - writer: AtomicTask, + reader: AtomicWaker, + writer: AtomicWaker, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -150,9 +147,6 @@ pub(crate) enum Direction { Write, } -/// The global fallback reactor. -static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0); - thread_local! { /// Tracks the reactor for the current execution context. static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None) @@ -262,33 +256,6 @@ impl Reactor { } } - /// Configures the fallback handle to be returned from `Handle::default`. - /// - /// The `Handle::default()` function will by default lazily spin up a global - /// thread and run a reactor on this global thread. This behavior is not - /// always desirable in all applications, however, and sometimes a different - /// fallback reactor is desired. - /// - /// This function will attempt to globally alter the return value of - /// `Handle::default()` to return the `handle` specified rather than a - /// lazily initialized global thread. If successful then all future calls to - /// `Handle::default()` which would otherwise fall back to the global thread - /// will instead return a clone of the handle specified. - /// - /// # Errors - /// - /// This function may not always succeed in configuring the fallback handle. - /// If this function was previously called (or perhaps concurrently called - /// on many threads) only the *first* invocation of this function will - /// succeed. All other invocations will return an error. - /// - /// Additionally if the global reactor thread has already been initialized - /// then this function will also return an error. (aka if `Handle::default` - /// has been called previously in this program). - pub fn set_fallback(&self) -> Result<(), SetFallbackError> { - set_fallback(self.handle().into_priv().unwrap()) - } - /// Performs one iteration of the event loop, blocking on waiting for events /// for at most `max_wait` (forever if `None`). /// @@ -328,16 +295,6 @@ impl Reactor { self.inner.io_dispatch.read().is_empty() } - /// Run this reactor on a background thread. - /// - /// This function takes ownership, spawns a new thread, and moves the - /// reactor to this new thread. It then runs the reactor, driving all - /// associated I/O resources, until the `Background` handle is dropped or - /// explicitly shutdown. - pub fn background(self) -> io::Result<Background> { - Background::new(self) - } - fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { // Block waiting for an event to happen, peeling out how many events // happened. @@ -406,20 +363,20 @@ impl Reactor { io.readiness.fetch_or(ready.as_usize(), Relaxed); if ready.is_writable() || platform::is_hup(&ready) { - wr = io.writer.take_task(); + wr = io.writer.take_waker(); } if !(ready & (!mio::Ready::writable())).is_empty() { - rd = io.reader.take_task(); + rd = io.reader.take_waker(); } } - if let Some(task) = rd { - task.notify(); + if let Some(w) = rd { + w.wake(); } - if let Some(task) = wr { - task.notify(); + if let Some(w) = wr { + w.wake(); } } } @@ -475,16 +432,6 @@ impl Handle { fn as_priv(&self) -> Option<&HandlePriv> { self.inner.as_ref() } - - fn into_priv(self) -> Option<HandlePriv> { - self.inner - } - - fn wakeup(&self) { - if let Some(handle) = self.as_priv() { - handle.wakeup(); - } - } } impl Unpark for Handle { @@ -508,19 +455,6 @@ impl fmt::Debug for Handle { } } -fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> { - unsafe { - let val = handle.into_usize(); - match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { - Ok(_) => Ok(()), - Err(_) => { - drop(HandlePriv::from_usize(val)); - Err(SetFallbackError(())) - } - } - } -} - // ===== impl HandlePriv ===== impl HandlePriv { @@ -530,71 +464,10 @@ impl HandlePriv { pub(crate) fn try_current() -> io::Result<HandlePriv> { CURRENT_REACTOR.with(|current| match *current.borrow() { Some(ref handle) => Ok(handle.clone()), - None => HandlePriv::fallback(), + None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")), }) } - /// Returns a handle to the fallback reactor. - fn fallback() -> io::Result<HandlePriv> { - let mut fallback = HANDLE_FALLBACK.load(SeqCst); - - // If the fallback hasn't been previously initialized then let's spin - // up a helper thread and try to initialize with that. If we can't - // actually create a helper thread then we'll just return a "defunct" - // handle which will return errors when I/O objects are attempted to be - // associated. - if fallback == 0 { - let reactor = match Reactor::new() { - Ok(reactor) => reactor, - Err(_) => { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to create reactor", - )); - } - }; - - // If we successfully set ourselves as the actual fallback then we - // want to `forget` the helper thread to ensure that it persists - // globally. If we fail to set ourselves as the fallback that means - // that someone was racing with this call to `Handle::default`. - // They ended up winning so we'll destroy our helper thread (which - // shuts down the thread) and reload the fallback. - if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() { - let ret = reactor.handle().into_priv().unwrap(); - - match reactor.background() { - Ok(bg) => bg.forget(), - // The global handle is fubar, but y'all probably got bigger - // problems if a thread can't spawn. - Err(_) => {} - } - - return Ok(ret); - } - - fallback = HANDLE_FALLBACK.load(SeqCst); - } - - // At this point our fallback handle global was configured so we use - // its value to reify a handle, clone it, and then forget our reified - // handle as we don't actually have an owning reference to it. - assert!(fallback != 0); - - let ret = unsafe { - let handle = HandlePriv::from_usize(fallback); - let ret = handle.clone(); - - // This prevents `handle` from being dropped and having the ref - // count decremented. - drop(handle.into_usize()); - - ret - }; - - Ok(ret) - } - /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. /// @@ -610,15 +483,6 @@ impl HandlePriv { } } - fn into_usize(self) -> usize { - unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) } - } - - unsafe fn from_usize(val: usize) -> HandlePriv { - let inner = mem::transmute::<usize, Weak<Inner>>(val);; - HandlePriv { inner } - } - fn inner(&self) -> Option<Arc<Inner>> { self.inner.upgrade() } @@ -655,8 +519,8 @@ impl Inner { io_dispatch.insert(ScheduledIo { aba_guard, readiness: AtomicUsize::new(0), - reader: AtomicTask::new(), - writer: AtomicTask::new(), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), }) }; @@ -684,20 +548,20 @@ impl Inner { } /// Registers interest in the I/O resource associated with `token`. - fn register(&self, token: usize, dir: Direction, t: Task) { + fn register(&self, token: usize, dir: Direction, w: Waker) { debug!("scheduling {:?} for: {}", dir, token); let io_dispatch = self.io_dispatch.read(); let sched = io_dispatch.get(token).unwrap(); - let (task, ready) = match dir { + let (waker, ready) = match dir { Direction::Read => (&sched.reader, !mio::Ready::writable()), Direction::Write => (&sched.writer, mio::Ready::writable()), }; - task.register_task(t); + waker.register(w); if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { - task.notify(); + waker.wake(); } } } @@ -709,8 +573,8 @@ impl Drop for Inner { // will start returning errors pretty quickly. let io = self.io_dispatch.read(); for (_, io) in io.iter() { - io.writer.notify(); - io.reader.notify(); + io.writer.wake(); + io.reader.wake(); } } } @@ -753,17 +617,3 @@ mod platform { false } } - -// ===== impl SetFallbackError ===== - -impl fmt::Display for SetFallbackError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "{}", self.description()) - } -} - -impl Error for SetFallbackError { - fn description(&self) -> &str { - "attempted to set fallback reactor while already configured" - } -} |