summaryrefslogtreecommitdiffstats
path: root/tokio-reactor/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-reactor/src/lib.rs')
-rw-r--r--tokio-reactor/src/lib.rs206
1 files changed, 28 insertions, 178 deletions
diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs
index fb8f3531..ff2ab686 100644
--- a/tokio-reactor/src/lib.rs
+++ b/tokio-reactor/src/lib.rs
@@ -32,38 +32,43 @@
//! [`PollEvented`]: struct.PollEvented.html
//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html
-pub(crate) mod background;
+macro_rules! ready {
+ ($e:expr) => {
+ match $e {
+ ::std::task::Poll::Ready(v) => v,
+ ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
+ }
+ };
+}
+
mod poll_evented;
mod registration;
mod sharded_rwlock;
// ===== Public re-exports =====
-pub use self::background::{Background, Shutdown};
pub use self::poll_evented::PollEvented;
pub use self::registration::Registration;
// ===== Private imports =====
use crate::sharded_rwlock::RwLock;
-use futures::task::Task;
use log::{debug, log_enabled, trace, Level};
use mio::event::Evented;
use slab::Slab;
use std::cell::RefCell;
-use std::error::Error;
use std::io;
-use std::mem;
#[cfg(all(unix, not(target_os = "fuchsia")))]
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, Weak};
+use std::task::Waker;
use std::time::{Duration, Instant};
use std::{fmt, usize};
use tokio_executor::park::{Park, Unpark};
use tokio_executor::Enter;
-use tokio_sync::task::AtomicTask;
+use tokio_sync::task::AtomicWaker;
/// The core reactor, or event loop.
///
@@ -109,14 +114,6 @@ pub struct Turn {
_priv: (),
}
-/// Error returned from `Handle::set_fallback`.
-#[derive(Clone, Debug)]
-pub struct SetFallbackError(());
-
-#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")]
-#[doc(hidden)]
-pub type SetDefaultError = SetFallbackError;
-
#[test]
fn test_handle_size() {
use std::mem;
@@ -140,8 +137,8 @@ struct Inner {
struct ScheduledIo {
aba_guard: usize,
readiness: AtomicUsize,
- reader: AtomicTask,
- writer: AtomicTask,
+ reader: AtomicWaker,
+ writer: AtomicWaker,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
@@ -150,9 +147,6 @@ pub(crate) enum Direction {
Write,
}
-/// The global fallback reactor.
-static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
-
thread_local! {
/// Tracks the reactor for the current execution context.
static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)
@@ -262,33 +256,6 @@ impl Reactor {
}
}
- /// Configures the fallback handle to be returned from `Handle::default`.
- ///
- /// The `Handle::default()` function will by default lazily spin up a global
- /// thread and run a reactor on this global thread. This behavior is not
- /// always desirable in all applications, however, and sometimes a different
- /// fallback reactor is desired.
- ///
- /// This function will attempt to globally alter the return value of
- /// `Handle::default()` to return the `handle` specified rather than a
- /// lazily initialized global thread. If successful then all future calls to
- /// `Handle::default()` which would otherwise fall back to the global thread
- /// will instead return a clone of the handle specified.
- ///
- /// # Errors
- ///
- /// This function may not always succeed in configuring the fallback handle.
- /// If this function was previously called (or perhaps concurrently called
- /// on many threads) only the *first* invocation of this function will
- /// succeed. All other invocations will return an error.
- ///
- /// Additionally if the global reactor thread has already been initialized
- /// then this function will also return an error. (aka if `Handle::default`
- /// has been called previously in this program).
- pub fn set_fallback(&self) -> Result<(), SetFallbackError> {
- set_fallback(self.handle().into_priv().unwrap())
- }
-
/// Performs one iteration of the event loop, blocking on waiting for events
/// for at most `max_wait` (forever if `None`).
///
@@ -328,16 +295,6 @@ impl Reactor {
self.inner.io_dispatch.read().is_empty()
}
- /// Run this reactor on a background thread.
- ///
- /// This function takes ownership, spawns a new thread, and moves the
- /// reactor to this new thread. It then runs the reactor, driving all
- /// associated I/O resources, until the `Background` handle is dropped or
- /// explicitly shutdown.
- pub fn background(self) -> io::Result<Background> {
- Background::new(self)
- }
-
fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// Block waiting for an event to happen, peeling out how many events
// happened.
@@ -406,20 +363,20 @@ impl Reactor {
io.readiness.fetch_or(ready.as_usize(), Relaxed);
if ready.is_writable() || platform::is_hup(&ready) {
- wr = io.writer.take_task();
+ wr = io.writer.take_waker();
}
if !(ready & (!mio::Ready::writable())).is_empty() {
- rd = io.reader.take_task();
+ rd = io.reader.take_waker();
}
}
- if let Some(task) = rd {
- task.notify();
+ if let Some(w) = rd {
+ w.wake();
}
- if let Some(task) = wr {
- task.notify();
+ if let Some(w) = wr {
+ w.wake();
}
}
}
@@ -475,16 +432,6 @@ impl Handle {
fn as_priv(&self) -> Option<&HandlePriv> {
self.inner.as_ref()
}
-
- fn into_priv(self) -> Option<HandlePriv> {
- self.inner
- }
-
- fn wakeup(&self) {
- if let Some(handle) = self.as_priv() {
- handle.wakeup();
- }
- }
}
impl Unpark for Handle {
@@ -508,19 +455,6 @@ impl fmt::Debug for Handle {
}
}
-fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> {
- unsafe {
- let val = handle.into_usize();
- match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
- Ok(_) => Ok(()),
- Err(_) => {
- drop(HandlePriv::from_usize(val));
- Err(SetFallbackError(()))
- }
- }
- }
-}
-
// ===== impl HandlePriv =====
impl HandlePriv {
@@ -530,71 +464,10 @@ impl HandlePriv {
pub(crate) fn try_current() -> io::Result<HandlePriv> {
CURRENT_REACTOR.with(|current| match *current.borrow() {
Some(ref handle) => Ok(handle.clone()),
- None => HandlePriv::fallback(),
+ None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")),
})
}
- /// Returns a handle to the fallback reactor.
- fn fallback() -> io::Result<HandlePriv> {
- let mut fallback = HANDLE_FALLBACK.load(SeqCst);
-
- // If the fallback hasn't been previously initialized then let's spin
- // up a helper thread and try to initialize with that. If we can't
- // actually create a helper thread then we'll just return a "defunct"
- // handle which will return errors when I/O objects are attempted to be
- // associated.
- if fallback == 0 {
- let reactor = match Reactor::new() {
- Ok(reactor) => reactor,
- Err(_) => {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to create reactor",
- ));
- }
- };
-
- // If we successfully set ourselves as the actual fallback then we
- // want to `forget` the helper thread to ensure that it persists
- // globally. If we fail to set ourselves as the fallback that means
- // that someone was racing with this call to `Handle::default`.
- // They ended up winning so we'll destroy our helper thread (which
- // shuts down the thread) and reload the fallback.
- if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() {
- let ret = reactor.handle().into_priv().unwrap();
-
- match reactor.background() {
- Ok(bg) => bg.forget(),
- // The global handle is fubar, but y'all probably got bigger
- // problems if a thread can't spawn.
- Err(_) => {}
- }
-
- return Ok(ret);
- }
-
- fallback = HANDLE_FALLBACK.load(SeqCst);
- }
-
- // At this point our fallback handle global was configured so we use
- // its value to reify a handle, clone it, and then forget our reified
- // handle as we don't actually have an owning reference to it.
- assert!(fallback != 0);
-
- let ret = unsafe {
- let handle = HandlePriv::from_usize(fallback);
- let ret = handle.clone();
-
- // This prevents `handle` from being dropped and having the ref
- // count decremented.
- drop(handle.into_usize());
-
- ret
- };
-
- Ok(ret)
- }
-
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
///
@@ -610,15 +483,6 @@ impl HandlePriv {
}
}
- fn into_usize(self) -> usize {
- unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
- }
-
- unsafe fn from_usize(val: usize) -> HandlePriv {
- let inner = mem::transmute::<usize, Weak<Inner>>(val);;
- HandlePriv { inner }
- }
-
fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
@@ -655,8 +519,8 @@ impl Inner {
io_dispatch.insert(ScheduledIo {
aba_guard,
readiness: AtomicUsize::new(0),
- reader: AtomicTask::new(),
- writer: AtomicTask::new(),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
})
};
@@ -684,20 +548,20 @@ impl Inner {
}
/// Registers interest in the I/O resource associated with `token`.
- fn register(&self, token: usize, dir: Direction, t: Task) {
+ fn register(&self, token: usize, dir: Direction, w: Waker) {
debug!("scheduling {:?} for: {}", dir, token);
let io_dispatch = self.io_dispatch.read();
let sched = io_dispatch.get(token).unwrap();
- let (task, ready) = match dir {
+ let (waker, ready) = match dir {
Direction::Read => (&sched.reader, !mio::Ready::writable()),
Direction::Write => (&sched.writer, mio::Ready::writable()),
};
- task.register_task(t);
+ waker.register(w);
if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
- task.notify();
+ waker.wake();
}
}
}
@@ -709,8 +573,8 @@ impl Drop for Inner {
// will start returning errors pretty quickly.
let io = self.io_dispatch.read();
for (_, io) in io.iter() {
- io.writer.notify();
- io.reader.notify();
+ io.writer.wake();
+ io.reader.wake();
}
}
}
@@ -753,17 +617,3 @@ mod platform {
false
}
}
-
-// ===== impl SetFallbackError =====
-
-impl fmt::Display for SetFallbackError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "{}", self.description())
- }
-}
-
-impl Error for SetFallbackError {
- fn description(&self) -> &str {
- "attempted to set fallback reactor while already configured"
- }
-}