From d5c1119c881c9a8b511aa9000fd26b9bda014256 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 5 Nov 2019 19:12:30 -0800 Subject: runtime: combine `executor` and `runtime` mods (#1734) Now, all types are under `runtime`. `executor::util` is moved to a top level `util` module. --- tokio-test/src/clock.rs | 2 +- tokio/src/executor/blocking/builder.rs | 58 -- tokio/src/executor/blocking/mod.rs | 361 ----------- tokio/src/executor/current_thread/mod.rs | 363 ----------- tokio/src/executor/enter.rs | 141 ----- tokio/src/executor/global.rs | 152 ----- tokio/src/executor/mod.rs | 74 --- tokio/src/executor/park/mod.rs | 142 ----- tokio/src/executor/park/thread.rs | 273 --------- tokio/src/executor/task/core.rs | 156 ----- tokio/src/executor/task/error.rs | 48 -- tokio/src/executor/task/harness.rs | 559 ----------------- tokio/src/executor/task/join.rs | 85 --- tokio/src/executor/task/list.rs | 96 --- tokio/src/executor/task/mod.rs | 150 ----- tokio/src/executor/task/raw.rs | 193 ------ tokio/src/executor/task/stack.rs | 88 --- tokio/src/executor/task/state.rs | 504 ---------------- tokio/src/executor/task/tests/loom.rs | 277 --------- tokio/src/executor/task/tests/mod.rs | 5 - tokio/src/executor/task/tests/task.rs | 643 -------------------- tokio/src/executor/task/waker.rs | 107 ---- tokio/src/executor/tests/backoff.rs | 32 - tokio/src/executor/tests/loom_oneshot.rs | 49 -- tokio/src/executor/tests/loom_schedule.rs | 51 -- tokio/src/executor/tests/mock_park.rs | 66 -- tokio/src/executor/tests/mock_schedule.rs | 131 ---- tokio/src/executor/tests/mod.rs | 40 -- tokio/src/executor/tests/track_drop.rs | 57 -- tokio/src/executor/thread_pool/builder.rs | 206 ------- tokio/src/executor/thread_pool/current.rs | 89 --- tokio/src/executor/thread_pool/idle.rs | 229 ------- tokio/src/executor/thread_pool/mod.rs | 44 -- tokio/src/executor/thread_pool/owned.rs | 77 --- tokio/src/executor/thread_pool/pool.rs | 85 --- tokio/src/executor/thread_pool/queue/global.rs | 199 ------ tokio/src/executor/thread_pool/queue/inject.rs | 41 -- tokio/src/executor/thread_pool/queue/local.rs | 298 --------- tokio/src/executor/thread_pool/queue/mod.rs | 41 -- tokio/src/executor/thread_pool/queue/worker.rs | 127 ---- tokio/src/executor/thread_pool/set.rs | 207 ------- tokio/src/executor/thread_pool/shared.rs | 104 ---- tokio/src/executor/thread_pool/shutdown.rs | 47 -- tokio/src/executor/thread_pool/spawner.rs | 62 -- tokio/src/executor/thread_pool/tests/loom_pool.rs | 269 --------- tokio/src/executor/thread_pool/tests/loom_queue.rs | 68 --- tokio/src/executor/thread_pool/tests/mod.rs | 14 - tokio/src/executor/thread_pool/tests/pool.rs | 194 ------ tokio/src/executor/thread_pool/tests/queue.rs | 281 --------- tokio/src/executor/thread_pool/tests/worker.rs | 76 --- tokio/src/executor/thread_pool/worker.rs | 666 -------------------- tokio/src/executor/util/mod.rs | 5 - tokio/src/executor/util/pad.rs | 52 -- tokio/src/executor/util/rand.rs | 52 -- tokio/src/fs/mod.rs | 2 +- tokio/src/lib.rs | 11 +- tokio/src/loom/std/mod.rs | 1 + tokio/src/net/addr.rs | 4 +- tokio/src/net/driver/reactor/mod.rs | 2 +- tokio/src/runtime/blocking/builder.rs | 58 ++ tokio/src/runtime/blocking/mod.rs | 361 +++++++++++ tokio/src/runtime/builder.rs | 114 ++-- tokio/src/runtime/current_thread/mod.rs | 364 +++++++++++ tokio/src/runtime/enter.rs | 139 +++++ tokio/src/runtime/global.rs | 152 +++++ tokio/src/runtime/io.rs | 56 ++ tokio/src/runtime/mod.rs | 96 ++- tokio/src/runtime/park/mod.rs | 143 +++++ tokio/src/runtime/park/thread.rs | 279 +++++++++ tokio/src/runtime/spawner.rs | 20 +- tokio/src/runtime/task/core.rs | 156 +++++ tokio/src/runtime/task/error.rs | 48 ++ tokio/src/runtime/task/harness.rs | 559 +++++++++++++++++ tokio/src/runtime/task/join.rs | 85 +++ tokio/src/runtime/task/list.rs | 96 +++ tokio/src/runtime/task/mod.rs | 153 +++++ tokio/src/runtime/task/raw.rs | 193 ++++++ tokio/src/runtime/task/stack.rs | 88 +++ tokio/src/runtime/task/state.rs | 504 ++++++++++++++++ tokio/src/runtime/task/tests/loom.rs | 277 +++++++++ tokio/src/runtime/task/tests/mod.rs | 5 + tokio/src/runtime/task/tests/task.rs | 643 ++++++++++++++++++++ tokio/src/runtime/task/waker.rs | 107 ++++ tokio/src/runtime/tests/backoff.rs | 32 + tokio/src/runtime/tests/loom_oneshot.rs | 49 ++ tokio/src/runtime/tests/loom_schedule.rs | 51 ++ tokio/src/runtime/tests/mock_park.rs | 66 ++ tokio/src/runtime/tests/mock_schedule.rs | 131 ++++ tokio/src/runtime/tests/mod.rs | 40 ++ tokio/src/runtime/tests/track_drop.rs | 57 ++ tokio/src/runtime/thread_pool/builder.rs | 208 +++++++ tokio/src/runtime/thread_pool/current.rs | 89 +++ tokio/src/runtime/thread_pool/idle.rs | 229 +++++++ tokio/src/runtime/thread_pool/mod.rs | 46 ++ tokio/src/runtime/thread_pool/owned.rs | 77 +++ tokio/src/runtime/thread_pool/pool.rs | 85 +++ tokio/src/runtime/thread_pool/queue/global.rs | 199 ++++++ tokio/src/runtime/thread_pool/queue/inject.rs | 41 ++ tokio/src/runtime/thread_pool/queue/local.rs | 298 +++++++++ tokio/src/runtime/thread_pool/queue/mod.rs | 41 ++ tokio/src/runtime/thread_pool/queue/worker.rs | 127 ++++ tokio/src/runtime/thread_pool/set.rs | 207 +++++++ tokio/src/runtime/thread_pool/shared.rs | 104 ++++ tokio/src/runtime/thread_pool/shutdown.rs | 47 ++ tokio/src/runtime/thread_pool/spawner.rs | 62 ++ tokio/src/runtime/thread_pool/tests/loom_pool.rs | 269 +++++++++ tokio/src/runtime/thread_pool/tests/loom_queue.rs | 68 +++ tokio/src/runtime/thread_pool/tests/mod.rs | 14 + tokio/src/runtime/thread_pool/tests/pool.rs | 193 ++++++ tokio/src/runtime/thread_pool/tests/queue.rs | 281 +++++++++ tokio/src/runtime/thread_pool/tests/worker.rs | 76 +++ tokio/src/runtime/thread_pool/worker.rs | 667 +++++++++++++++++++++ tokio/src/runtime/timer.rs | 41 ++ tokio/src/timer/timer/mod.rs | 4 +- tokio/src/util/mod.rs | 5 + tokio/src/util/pad.rs | 52 ++ tokio/src/util/rand.rs | 52 ++ tokio/tests/clock.rs | 6 +- tokio/tests/rt_thread_pool.rs | 6 +- 119 files changed, 8660 insertions(+), 8512 deletions(-) delete mode 100644 tokio/src/executor/blocking/builder.rs delete mode 100644 tokio/src/executor/blocking/mod.rs delete mode 100644 tokio/src/executor/current_thread/mod.rs delete mode 100644 tokio/src/executor/enter.rs delete mode 100644 tokio/src/executor/global.rs delete mode 100644 tokio/src/executor/mod.rs delete mode 100644 tokio/src/executor/park/mod.rs delete mode 100644 tokio/src/executor/park/thread.rs delete mode 100644 tokio/src/executor/task/core.rs delete mode 100644 tokio/src/executor/task/error.rs delete mode 100644 tokio/src/executor/task/harness.rs delete mode 100644 tokio/src/executor/task/join.rs delete mode 100644 tokio/src/executor/task/list.rs delete mode 100644 tokio/src/executor/task/mod.rs delete mode 100644 tokio/src/executor/task/raw.rs delete mode 100644 tokio/src/executor/task/stack.rs delete mode 100644 tokio/src/executor/task/state.rs delete mode 100644 tokio/src/executor/task/tests/loom.rs delete mode 100644 tokio/src/executor/task/tests/mod.rs delete mode 100644 tokio/src/executor/task/tests/task.rs delete mode 100644 tokio/src/executor/task/waker.rs delete mode 100644 tokio/src/executor/tests/backoff.rs delete mode 100644 tokio/src/executor/tests/loom_oneshot.rs delete mode 100644 tokio/src/executor/tests/loom_schedule.rs delete mode 100644 tokio/src/executor/tests/mock_park.rs delete mode 100644 tokio/src/executor/tests/mock_schedule.rs delete mode 100644 tokio/src/executor/tests/mod.rs delete mode 100644 tokio/src/executor/tests/track_drop.rs delete mode 100644 tokio/src/executor/thread_pool/builder.rs delete mode 100644 tokio/src/executor/thread_pool/current.rs delete mode 100644 tokio/src/executor/thread_pool/idle.rs delete mode 100644 tokio/src/executor/thread_pool/mod.rs delete mode 100644 tokio/src/executor/thread_pool/owned.rs delete mode 100644 tokio/src/executor/thread_pool/pool.rs delete mode 100644 tokio/src/executor/thread_pool/queue/global.rs delete mode 100644 tokio/src/executor/thread_pool/queue/inject.rs delete mode 100644 tokio/src/executor/thread_pool/queue/local.rs delete mode 100644 tokio/src/executor/thread_pool/queue/mod.rs delete mode 100644 tokio/src/executor/thread_pool/queue/worker.rs delete mode 100644 tokio/src/executor/thread_pool/set.rs delete mode 100644 tokio/src/executor/thread_pool/shared.rs delete mode 100644 tokio/src/executor/thread_pool/shutdown.rs delete mode 100644 tokio/src/executor/thread_pool/spawner.rs delete mode 100644 tokio/src/executor/thread_pool/tests/loom_pool.rs delete mode 100644 tokio/src/executor/thread_pool/tests/loom_queue.rs delete mode 100644 tokio/src/executor/thread_pool/tests/mod.rs delete mode 100644 tokio/src/executor/thread_pool/tests/pool.rs delete mode 100644 tokio/src/executor/thread_pool/tests/queue.rs delete mode 100644 tokio/src/executor/thread_pool/tests/worker.rs delete mode 100644 tokio/src/executor/thread_pool/worker.rs delete mode 100644 tokio/src/executor/util/mod.rs delete mode 100644 tokio/src/executor/util/pad.rs delete mode 100644 tokio/src/executor/util/rand.rs create mode 100644 tokio/src/runtime/blocking/builder.rs create mode 100644 tokio/src/runtime/blocking/mod.rs create mode 100644 tokio/src/runtime/current_thread/mod.rs create mode 100644 tokio/src/runtime/enter.rs create mode 100644 tokio/src/runtime/global.rs create mode 100644 tokio/src/runtime/io.rs create mode 100644 tokio/src/runtime/park/mod.rs create mode 100644 tokio/src/runtime/park/thread.rs create mode 100644 tokio/src/runtime/task/core.rs create mode 100644 tokio/src/runtime/task/error.rs create mode 100644 tokio/src/runtime/task/harness.rs create mode 100644 tokio/src/runtime/task/join.rs create mode 100644 tokio/src/runtime/task/list.rs create mode 100644 tokio/src/runtime/task/mod.rs create mode 100644 tokio/src/runtime/task/raw.rs create mode 100644 tokio/src/runtime/task/stack.rs create mode 100644 tokio/src/runtime/task/state.rs create mode 100644 tokio/src/runtime/task/tests/loom.rs create mode 100644 tokio/src/runtime/task/tests/mod.rs create mode 100644 tokio/src/runtime/task/tests/task.rs create mode 100644 tokio/src/runtime/task/waker.rs create mode 100644 tokio/src/runtime/tests/backoff.rs create mode 100644 tokio/src/runtime/tests/loom_oneshot.rs create mode 100644 tokio/src/runtime/tests/loom_schedule.rs create mode 100644 tokio/src/runtime/tests/mock_park.rs create mode 100644 tokio/src/runtime/tests/mock_schedule.rs create mode 100644 tokio/src/runtime/tests/mod.rs create mode 100644 tokio/src/runtime/tests/track_drop.rs create mode 100644 tokio/src/runtime/thread_pool/builder.rs create mode 100644 tokio/src/runtime/thread_pool/current.rs create mode 100644 tokio/src/runtime/thread_pool/idle.rs create mode 100644 tokio/src/runtime/thread_pool/mod.rs create mode 100644 tokio/src/runtime/thread_pool/owned.rs create mode 100644 tokio/src/runtime/thread_pool/pool.rs create mode 100644 tokio/src/runtime/thread_pool/queue/global.rs create mode 100644 tokio/src/runtime/thread_pool/queue/inject.rs create mode 100644 tokio/src/runtime/thread_pool/queue/local.rs create mode 100644 tokio/src/runtime/thread_pool/queue/mod.rs create mode 100644 tokio/src/runtime/thread_pool/queue/worker.rs create mode 100644 tokio/src/runtime/thread_pool/set.rs create mode 100644 tokio/src/runtime/thread_pool/shared.rs create mode 100644 tokio/src/runtime/thread_pool/shutdown.rs create mode 100644 tokio/src/runtime/thread_pool/spawner.rs create mode 100644 tokio/src/runtime/thread_pool/tests/loom_pool.rs create mode 100644 tokio/src/runtime/thread_pool/tests/loom_queue.rs create mode 100644 tokio/src/runtime/thread_pool/tests/mod.rs create mode 100644 tokio/src/runtime/thread_pool/tests/pool.rs create mode 100644 tokio/src/runtime/thread_pool/tests/queue.rs create mode 100644 tokio/src/runtime/thread_pool/tests/worker.rs create mode 100644 tokio/src/runtime/thread_pool/worker.rs create mode 100644 tokio/src/runtime/timer.rs create mode 100644 tokio/src/util/mod.rs create mode 100644 tokio/src/util/pad.rs create mode 100644 tokio/src/util/rand.rs diff --git a/tokio-test/src/clock.rs b/tokio-test/src/clock.rs index c5fac787..38417539 100644 --- a/tokio-test/src/clock.rs +++ b/tokio-test/src/clock.rs @@ -22,7 +22,7 @@ //! }); //! ``` -use tokio::executor::park::{Park, Unpark}; +use tokio::runtime::{Park, Unpark}; use tokio::timer::clock::{Clock, Now}; use tokio::timer::Timer; diff --git a/tokio/src/executor/blocking/builder.rs b/tokio/src/executor/blocking/builder.rs deleted file mode 100644 index e755ae23..00000000 --- a/tokio/src/executor/blocking/builder.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::executor::blocking::Pool; -use crate::loom::thread; - -use std::usize; - -/// Builds a blocking thread pool with custom configuration values. -pub(crate) struct Builder { - /// Thread name - name: String, - - /// Thread stack size - stack_size: Option, -} - -impl Default for Builder { - fn default() -> Self { - Builder { - name: "tokio-blocking-thread".to_string(), - stack_size: None, - } - } -} - -impl Builder { - /// Set name of threads spawned by the pool - /// - /// If this configuration is not set, then the thread will use the system - /// default naming scheme. - pub(crate) fn name>(&mut self, val: S) -> &mut Self { - self.name = val.into(); - self - } - - /// Set the stack size (in bytes) for worker threads. - /// - /// The actual stack size may be greater than this value if the platform - /// specifies minimal stack size. - /// - /// The default stack size for spawned threads is 2 MiB, though this - /// particular stack size is subject to change in the future. - pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self { - self.stack_size = Some(val); - self - } - - pub(crate) fn build(self) -> Pool { - let mut p = Pool::default(); - let Builder { stack_size, name } = self; - p.new_thread = Box::new(move || { - let mut b = thread::Builder::new().name(name.clone()); - if let Some(stack_size) = stack_size { - b = b.stack_size(stack_size); - } - b - }); - p - } -} diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs deleted file mode 100644 index d105fe22..00000000 --- a/tokio/src/executor/blocking/mod.rs +++ /dev/null @@ -1,361 +0,0 @@ -//! Thread pool for blocking operations - -use crate::loom::sync::{Arc, Condvar, Mutex}; -use crate::loom::thread; -#[cfg(feature = "blocking")] -use crate::sync::oneshot; - -use std::cell::Cell; -use std::collections::VecDeque; -use std::fmt; -#[cfg(feature = "blocking")] -use std::future::Future; -use std::ops::Deref; -#[cfg(feature = "blocking")] -use std::pin::Pin; -#[cfg(feature = "blocking")] -use std::task::{Context, Poll}; -use std::time::Duration; - -#[cfg(feature = "rt-full")] -mod builder; - -#[cfg(feature = "rt-full")] -pub(crate) use builder::Builder; - -#[derive(Clone, Copy)] -enum State { - Empty, - Ready(*const Arc), -} - -thread_local! { - /// Thread-local tracking the current executor - static BLOCKING: Cell = Cell::new(State::Empty) -} - -/// Set the blocking pool for the duration of the closure -/// -/// If a blocking pool is already set, it will be restored when the closure returns or if it -/// panics. -#[allow(dead_code)] // we allow dead code since this won't be called if no executors are enabled -pub(crate) fn with_pool(pool: &Arc, f: F) -> R -where - F: FnOnce() -> R, -{ - // While scary, this is safe. The function takes a `&Pool`, which guarantees - // that the reference lives for the duration of `with_pool`. - // - // Because we are always clearing the TLS value at the end of the - // function, we can cast the reference to 'static which thread-local - // cells require. - BLOCKING.with(|cell| { - let was = cell.replace(State::Empty); - - // Ensure that the pool is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell, State); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.set(self.1); - } - } - - let _reset = Reset(cell, was); - cell.set(State::Ready(pool as *const _)); - f() - }) -} - -pub(crate) struct Pool { - shared: Mutex, - condvar: Condvar, - new_thread: Box thread::Builder + Send + Sync + 'static>, -} - -impl fmt::Debug for Pool { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Pool").finish() - } -} - -struct Shared { - queue: VecDeque>, - num_th: u32, - num_idle: u32, - num_notify: u32, - shutdown: bool, -} - -const MAX_THREADS: u32 = 1_000; -const KEEP_ALIVE: Duration = Duration::from_secs(10); - -/// Result of a blocking operation running on the blocking thread pool. -#[cfg(feature = "blocking")] -#[derive(Debug)] -pub struct Blocking { - rx: oneshot::Receiver, -} - -impl Pool { - /// Run the provided function on an executor dedicated to blocking operations. - pub(crate) fn spawn(this: &Arc, f: Box) { - let should_spawn = { - let mut shared = this.shared.lock().unwrap(); - - if shared.shutdown { - // no need to even push this task; it would never get picked up - return; - } - - shared.queue.push_back(f); - - if shared.num_idle == 0 { - // No threads are able to process the task. - - if shared.num_th == MAX_THREADS { - // At max number of threads - false - } else { - shared.num_th += 1; - true - } - } else { - // Notify an idle worker thread. The notification counter - // is used to count the needed amount of notifications - // exactly. Thread libraries may generate spurious - // wakeups, this counter is used to keep us in a - // consistent state. - shared.num_idle -= 1; - shared.num_notify += 1; - this.condvar.notify_one(); - false - } - }; - - if should_spawn { - Pool::spawn_thread(Arc::clone(this), (this.new_thread)()); - } - } - - // NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc - fn spawn_thread(this: Arc, builder: thread::Builder) { - builder - .spawn(move || { - let mut shared = this.shared.lock().unwrap(); - 'main: loop { - // BUSY - while let Some(task) = shared.queue.pop_front() { - drop(shared); - run_task(task); - shared = this.shared.lock().unwrap(); - if shared.shutdown { - break; // Need to increment idle before we exit - } - } - - // IDLE - shared.num_idle += 1; - - while !shared.shutdown { - let lock_result = this.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); - shared = lock_result.0; - let timeout_result = lock_result.1; - - if shared.num_notify != 0 { - // We have received a legitimate wakeup, - // acknowledge it by decrementing the counter - // and transition to the BUSY state. - shared.num_notify -= 1; - break; - } - - if timeout_result.timed_out() { - break 'main; - } - - // Spurious wakeup detected, go back to sleep. - } - - if shared.shutdown { - // Work was produced, and we "took" it (by decrementing num_notify). - // This means that num_idle was decremented once for our wakeup. - // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; - // NOTE: Technically we should also do num_notify++ and notify again, - // but since we're shutting down anyway, that won't be necessary. - break; - } - } - - // Thread exit - shared.num_th -= 1; - - // num_idle should now be tracked exactly, panic - // with a descriptive message if it is not the - // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); - - if shared.shutdown && shared.num_th == 0 { - this.condvar.notify_one(); - } - }) - .unwrap(); - } - - /// Shut down all workers in the pool the next time they are idle. - /// - /// Blocks until all threads have exited. - pub(crate) fn shutdown(&self) { - let mut shared = self.shared.lock().unwrap(); - shared.shutdown = true; - self.condvar.notify_all(); - - while shared.num_th > 0 { - shared = self.condvar.wait(shared).unwrap(); - } - } -} - -#[derive(Debug)] -pub(crate) struct PoolWaiter(Arc); - -impl From for PoolWaiter { - fn from(p: Pool) -> Self { - Self::from(Arc::new(p)) - } -} - -impl From> for PoolWaiter { - fn from(p: Arc) -> Self { - Self(p) - } -} - -impl Deref for PoolWaiter { - type Target = Arc; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Drop for PoolWaiter { - fn drop(&mut self) { - self.0.shutdown(); - } -} - -/// Run the provided blocking function without blocking the executor. -/// -/// In general, issuing a blocking call or performing a lot of compute in a -/// future without yielding is not okay, as it may prevent the executor from -/// driving other futures forward. If you run a closure through this method, -/// the current executor thread will relegate all its executor duties to another -/// (possibly new) thread, and only then poll the task. Note that this requires -/// additional synchronization. -/// -/// # Examples -/// -/// ``` -/// # async fn docs() { -/// tokio::executor::blocking::in_place(move || { -/// // do some compute-heavy work or call synchronous code -/// }); -/// # } -/// ``` -#[cfg(feature = "rt-full")] -pub fn in_place(f: F) -> R -where - F: FnOnce() -> R, -{ - use crate::executor; - - executor::enter::exit(|| executor::thread_pool::blocking(f)) -} - -/// Run the provided closure on a thread where blocking is acceptable. -/// -/// In general, issuing a blocking call or performing a lot of compute in a future without -/// yielding is not okay, as it may prevent the executor from driving other futures forward. -/// A closure that is run through this method will instead be run on a dedicated thread pool for -/// such blocking tasks without holding up the main futures executor. -/// -/// # Examples -/// -/// ``` -/// # async fn docs() { -/// tokio::executor::blocking::run(move || { -/// // do some compute-heavy work or call synchronous code -/// }).await; -/// # } -/// ``` -#[cfg(feature = "blocking")] -pub fn run(f: F) -> Blocking -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let (tx, rx) = oneshot::channel(); - - BLOCKING.with(|current_pool| match current_pool.get() { - State::Ready(pool) => { - let pool = unsafe { &*pool }; - Pool::spawn( - pool, - Box::new(move || { - // receiver may have gone away - let _ = tx.send(f()); - }), - ); - } - State::Empty => panic!("must be called from the context of Tokio runtime"), - }); - - Blocking { rx } -} - -#[cfg(feature = "blocking")] -impl Future for Blocking { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use std::task::Poll::*; - - match Pin::new(&mut self.rx).poll(cx) { - Ready(Ok(v)) => Ready(v), - Ready(Err(_)) => panic!( - "the blocking operation has been dropped before completing. \ - This should not happen and is a bug." - ), - Pending => Pending, - } - } -} - -fn run_task(f: Box) { - use std::panic::{catch_unwind, AssertUnwindSafe}; - - let _ = catch_unwind(AssertUnwindSafe(|| f())); -} - -impl Default for Pool { - fn default() -> Self { - Pool { - shared: Mutex::new(Shared { - queue: VecDeque::new(), - num_th: 0, - num_idle: 0, - num_notify: 0, - shutdown: false, - }), - condvar: Condvar::new(), - new_thread: Box::new(|| { - thread::Builder::new().name("tokio-blocking-driver".to_string()) - }), - } - } -} diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs deleted file mode 100644 index 24fee3a8..00000000 --- a/tokio/src/executor/current_thread/mod.rs +++ /dev/null @@ -1,363 +0,0 @@ -use crate::executor::park::{Park, Unpark}; -use crate::executor::task::{self, JoinHandle, Schedule, Task}; - -use std::cell::UnsafeCell; -use std::collections::VecDeque; -use std::fmt; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::sync::{Arc, Mutex}; -use std::task::{RawWaker, RawWakerVTable, Waker}; -use std::time::Duration; - -/// Executes tasks on the current thread -#[derive(Debug)] -pub(crate) struct CurrentThread

-where - P: Park, -{ - /// Scheduler component - scheduler: Arc, - - /// Local state - local: Local

, -} - -#[derive(Debug, Clone)] -pub(crate) struct Spawner { - scheduler: Arc, -} - -/// The scheduler component. -pub(super) struct Scheduler { - /// List of all active tasks spawned onto this executor. - /// - /// # Safety - /// - /// Must only be accessed from the primary thread - owned_tasks: UnsafeCell>, - - /// Local run queue. - /// - /// Tasks notified from the current thread are pushed into this queue. - /// - /// # Safety - /// - /// References should not be handed out. Only call `push` / `pop` functions. - /// Only call from the owning thread. - local_queue: UnsafeCell>>, - - /// Remote run queue. - /// - /// Tasks notified from another thread are pushed into this queue. - remote_queue: Mutex, - - /// Tasks pending drop - pending_drop: task::TransferStack, - - /// Unpark the blocked thread - unpark: Box, -} - -unsafe impl Send for Scheduler {} -unsafe impl Sync for Scheduler {} - -/// Local state -#[derive(Debug)] -struct Local

{ - /// Current tick - tick: u8, - - /// Thread park handle - park: P, -} - -#[derive(Debug)] -struct RemoteQueue { - /// FIFO list of tasks - queue: VecDeque>, - - /// `true` when a task can be pushed into the queue, false otherwise. - open: bool, -} - -/// Max number of tasks to poll per tick. -const MAX_TASKS_PER_TICK: usize = 61; - -/// How often to check the remote queue first -const CHECK_REMOTE_INTERVAL: u8 = 13; - -impl

CurrentThread

-where - P: Park, -{ - pub(crate) fn new(park: P) -> CurrentThread

{ - let unpark = park.unpark(); - - CurrentThread { - scheduler: Arc::new(Scheduler { - owned_tasks: UnsafeCell::new(task::OwnedList::new()), - local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), - remote_queue: Mutex::new(RemoteQueue { - queue: VecDeque::with_capacity(64), - open: true, - }), - pending_drop: task::TransferStack::new(), - unpark: Box::new(unpark), - }), - local: Local { tick: 0, park }, - } - } - - pub(crate) fn spawner(&self) -> Spawner { - Spawner { - scheduler: self.scheduler.clone(), - } - } - - /// Spawn a future onto the thread pool - pub(crate) fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); - handle - } - - pub(crate) fn block_on(&mut self, mut future: F) -> F::Output - where - F: Future, - { - use std::pin::Pin; - use std::task::Context; - use std::task::Poll::Ready; - - let local = &mut self.local; - let scheduler = &*self.scheduler; - - crate::executor::global::with_current_thread(scheduler, || { - let mut _enter = - crate::executor::enter().expect("attempting to block while on a Tokio executor"); - - let raw_waker = RawWaker::new( - scheduler as *const Scheduler as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), - ); - - let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); - let mut cx = Context::from_waker(&waker); - - // `block_on` takes ownership of `f`. Once it is pinned here, the - // original `f` binding can no longer be accessed, making the - // pinning safe. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; - - loop { - if let Ready(v) = future.as_mut().poll(&mut cx) { - return v; - } - - scheduler.tick(local); - - // Maintenance work - scheduler.drain_pending_drop(); - } - }) - } -} - -impl Spawner { - /// Spawn a future onto the thread pool - pub(crate) fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); - handle - } -} - -impl Scheduler { - fn tick(&self, local: &mut Local) { - for _ in 0..MAX_TASKS_PER_TICK { - // Get the current tick - let tick = local.tick; - - // Increment the tick - local.tick = tick.wrapping_add(1); - - let task = match self.next_task(tick) { - Some(task) => task, - None => { - local.park.park().ok().expect("failed to park"); - return; - } - }; - - if let Some(task) = task.run(&mut || Some(self.into())) { - unsafe { - self.schedule_local(task); - } - } - } - - local - .park - .park_timeout(Duration::from_millis(0)) - .ok() - .expect("failed to park"); - } - - fn drain_pending_drop(&self) { - for task in self.pending_drop.drain() { - unsafe { - (*self.owned_tasks.get()).remove(&task); - } - drop(task); - } - } - - /// # Safety - /// - /// Must be called from the same thread that holds the `CurrentThread` - /// value. - pub(super) unsafe fn spawn_background(&self, future: F) - where - F: Future + Send + 'static, - { - let task = task::background(future); - self.schedule_local(task); - } - - unsafe fn schedule_local(&self, task: Task) { - (*self.local_queue.get()).push_front(task); - } - - fn next_task(&self, tick: u8) -> Option> { - if 0 == tick % CHECK_REMOTE_INTERVAL { - self.next_remote_task().or_else(|| self.next_local_task()) - } else { - self.next_local_task().or_else(|| self.next_remote_task()) - } - } - - fn next_local_task(&self) -> Option> { - unsafe { (*self.local_queue.get()).pop_front() } - } - - fn next_remote_task(&self) -> Option> { - self.remote_queue.lock().unwrap().queue.pop_front() - } -} - -impl Schedule for Scheduler { - fn bind(&self, task: &Task) { - unsafe { - (*self.owned_tasks.get()).insert(task); - } - } - - fn release(&self, task: Task) { - self.pending_drop.push(task); - } - - fn release_local(&self, task: &Task) { - unsafe { - (*self.owned_tasks.get()).remove(task); - } - } - - fn schedule(&self, task: Task) { - use crate::executor::global; - - if global::current_thread_is_current(self) { - unsafe { self.schedule_local(task) }; - } else { - let mut lock = self.remote_queue.lock().unwrap(); - - if lock.open { - lock.queue.push_back(task); - } else { - task.shutdown(); - } - - // while locked, call unpark - self.unpark.unpark(); - - drop(lock); - } - } -} - -impl

Drop for CurrentThread

-where - P: Park, -{ - fn drop(&mut self) { - // Close the remote queue - let mut lock = self.scheduler.remote_queue.lock().unwrap(); - lock.open = false; - - while let Some(task) = lock.queue.pop_front() { - task.shutdown(); - } - - drop(lock); - - // Drain all local tasks - while let Some(task) = self.scheduler.next_local_task() { - task.shutdown(); - } - - // Release owned tasks - unsafe { - (*self.scheduler.owned_tasks.get()).shutdown(); - } - - self.scheduler.drain_pending_drop(); - - // Wait until all tasks have been released. - while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } { - self.local.park.park().ok().expect("park failed"); - self.scheduler.drain_pending_drop(); - } - } -} - -impl fmt::Debug for Scheduler { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler").finish() - } -} - -unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker { - let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler)); - let s2 = s1.clone(); - - RawWaker::new( - &**s2 as *const Scheduler as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop), - ) -} - -unsafe fn sched_wake(ptr: *const ()) { - let scheduler = Arc::from_raw(ptr as *const Scheduler); - scheduler.unpark.unpark(); -} - -unsafe fn sched_wake_by_ref(ptr: *const ()) { - let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler)); - scheduler.unpark.unpark(); -} - -unsafe fn sched_drop(ptr: *const ()) { - let _ = Arc::from_raw(ptr as *const Scheduler); -} - -unsafe fn sched_noop(_ptr: *const ()) { - unreachable!(); -} diff --git a/tokio/src/executor/enter.rs b/tokio/src/executor/enter.rs deleted file mode 100644 index 19ae26e0..00000000 --- a/tokio/src/executor/enter.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::cell::{Cell, RefCell}; -use std::error::Error; -use std::fmt; -#[cfg(feature = "rt-full")] -use std::future::Future; -use std::marker::PhantomData; - -thread_local!(static ENTERED: Cell = Cell::new(false)); - -/// Represents an executor context. -/// -/// For more details, see [`enter` documentation](fn.enter.html) -pub(crate) struct Enter { - _p: PhantomData>, -} - -/// An error returned by `enter` if an execution scope has already been -/// entered. -pub(crate) struct EnterError { - _a: (), -} - -impl fmt::Debug for EnterError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EnterError") - .field("reason", &format!("{}", self)) - .finish() - } -} - -impl fmt::Display for EnterError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "attempted to run an executor while another executor is already running" - ) - } -} - -impl Error for EnterError {} - -/// Marks the current thread as being within the dynamic extent of an -/// executor. -/// -/// Executor implementations should call this function before blocking the -/// thread. If `None` is returned, the executor should fail by panicking or -/// taking some other action without blocking the current thread. This prevents -/// deadlocks due to multiple executors competing for the same thread. -/// -/// # Error -/// -/// Returns an error if the current thread is already marked -pub(crate) fn enter() -> Result { - ENTERED.with(|c| { - if c.get() { - Err(EnterError { _a: () }) - } else { - c.set(true); - - Ok(Enter { _p: PhantomData }) - } - }) -} - -// Forces the current "entered" state to be cleared while the closure -// is executed. -// -// # Warning -// -// This is hidden for a reason. Do not use without fully understanding -// executors. Misuing can easily cause your program to deadlock. -#[cfg(feature = "rt-full")] -pub(crate) fn exit R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset; - impl Drop for Reset { - fn drop(&mut self) { - ENTERED.with(|c| { - c.set(true); - }); - } - } - - ENTERED.with(|c| { - debug_assert!(c.get()); - c.set(false); - }); - - let reset = Reset; - let ret = f(); - ::std::mem::forget(reset); - - ENTERED.with(|c| { - assert!(!c.get(), "closure claimed permanent executor"); - c.set(true); - }); - - ret -} - -impl Enter { - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - #[cfg(feature = "rt-full")] - pub(crate) fn block_on(&mut self, mut f: F) -> F::Output { - use crate::executor::park::{Park, ParkThread}; - use std::pin::Pin; - use std::task::Context; - use std::task::Poll::Ready; - - let mut park = ParkThread::new(); - let waker = park.unpark().into_waker(); - let mut cx = Context::from_waker(&waker); - - // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can - // no longer be accessed, making the pinning safe. - let mut f = unsafe { Pin::new_unchecked(&mut f) }; - - loop { - if let Ready(v) = f.as_mut().poll(&mut cx) { - return v; - } - park.park().unwrap(); - } - } -} - -impl fmt::Debug for Enter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Enter").finish() - } -} - -impl Drop for Enter { - fn drop(&mut self) { - ENTERED.with(|c| { - assert!(c.get()); - c.set(false); - }); - } -} diff --git a/tokio/src/executor/global.rs b/tokio/src/executor/global.rs deleted file mode 100644 index ddb9b1ab..00000000 --- a/tokio/src/executor/global.rs +++ /dev/null @@ -1,152 +0,0 @@ -#[cfg(feature = "rt-current-thread")] -use crate::executor::current_thread; - -#[cfg(feature = "rt-full")] -use crate::executor::thread_pool; - -use std::cell::Cell; -use std::future::Future; - -#[derive(Clone, Copy)] -enum State { - // default executor not defined - Empty, - - // default executor is a thread pool instance. - #[cfg(feature = "rt-full")] - ThreadPool(*const thread_pool::Spawner), - - // Current-thread executor - #[cfg(feature = "rt-current-thread")] - CurrentThread(*const current_thread::Scheduler), -} - -thread_local! { - /// Thread-local tracking the current executor - static EXECUTOR: Cell = Cell::new(State::Empty) -} - -// ===== global spawn fns ===== - -/// Spawns a future on the default executor. -/// -/// In order for a future to do work, it must be spawned on an executor. The -/// `spawn` function is the easiest way to do this. It spawns a future on the -/// [default executor] for the current execution context (tracked using a -/// thread-local variable). -/// -/// The default executor is **usually** a thread pool. -/// -/// # Examples -/// -/// In this example, a server is started and `spawn` is used to start a new task -/// that processes each received connection. -/// -/// ``` -/// use tokio::net::TcpListener; -/// -/// # async fn process(_t: T) {} -/// # async fn dox() -> Result<(), Box> { -/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; -/// -/// loop { -/// let (socket, _) = listener.accept().await?; -/// -/// tokio::spawn(async move { -/// // Process each socket concurrently. -/// process(socket).await -/// }); -/// } -/// # } -/// ``` -/// -/// [default executor]: struct.DefaultExecutor.html -/// -/// # Panics -/// -/// This function will panic if the default executor is not set or if spawning -/// onto the default executor returns an error. To avoid the panic, use -/// [`DefaultExecutor`]. -/// -/// [`DefaultExecutor`]: struct.DefaultExecutor.html -pub fn spawn(future: T) -where - T: Future + Send + 'static, -{ - EXECUTOR.with(|current_executor| match current_executor.get() { - #[cfg(feature = "rt-full")] - State::ThreadPool(threadpool_ptr) => { - let thread_pool = unsafe { &*threadpool_ptr }; - thread_pool.spawn_background(future); - } - #[cfg(feature = "rt-current-thread")] - State::CurrentThread(current_thread_ptr) => { - let current_thread = unsafe { &*current_thread_ptr }; - - // Safety: The `CurrentThread` value set the thread-local (same - // thread). - unsafe { - current_thread.spawn_background(future); - } - } - State::Empty => { - // Explicit drop of `future` silences the warning that `future` is - // not used when neither rt-* feature flags are enabled. - drop(future); - panic!("must be called from the context of Tokio runtime"); - } - }) -} - -#[cfg(feature = "rt-current-thread")] -pub(super) fn with_current_thread(current_thread: ¤t_thread::Scheduler, f: F) -> R -where - F: FnOnce() -> R, -{ - with_state( - State::CurrentThread(current_thread as *const current_thread::Scheduler), - f, - ) -} - -#[cfg(feature = "rt-current-thread")] -pub(super) fn current_thread_is_current(current_thread: ¤t_thread::Scheduler) -> bool { - EXECUTOR.with(|current_executor| match current_executor.get() { - State::CurrentThread(ptr) => ptr == current_thread as *const _, - _ => false, - }) -} - -#[cfg(feature = "rt-full")] -pub(super) fn with_thread_pool(thread_pool: &thread_pool::Spawner, f: F) -> R -where - F: FnOnce() -> R, -{ - with_state(State::ThreadPool(thread_pool as *const _), f) -} - -#[cfg(feature = "rt-current-thread")] -fn with_state(state: State, f: F) -> R -where - F: FnOnce() -> R, -{ - EXECUTOR.with(|cell| { - let was = cell.replace(State::Empty); - - // Ensure that the executor is removed from the thread-local context - // when leaving the scope. This handles cases that involve panicking. - struct Reset<'a>(&'a Cell, State); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.set(self.1); - } - } - - let _reset = Reset(cell, was); - - cell.set(state); - - f() - }) -} diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor/mod.rs deleted file mode 100644 index 6134ea3b..00000000 --- a/tokio/src/executor/mod.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! Task execution related traits and utilities. -//! -//! In the Tokio execution model, futures are lazy. When a future is created, no -//! work is performed. In order for the work defined by the future to happen, -//! the future must be submitted to an executor. A future that is submitted to -//! an executor is called a "task". -//! -//! The executor is responsible for ensuring that [`Future::poll`] is called -//! whenever the task is notified. Notification happens when the internal -//! state of a task transitions from *not ready* to *ready*. For example, a -//! socket might have received data and a call to `read` will now be able to -//! succeed. -//! -//! The specific strategy used to manage the tasks is left up to the -//! executor. There are two main flavors of executors: single-threaded and -//! multi-threaded. Tokio provides implementation for both of these in the -//! [`runtime`] module. -//! -//! # `Executor` trait. -//! -//! This module provides the [`Executor`] trait (re-exported from -//! [`tokio-executor`]), which describes the API that all executors must -//! implement. -//! -//! A free [`spawn`] function is provided that allows spawning futures onto the -//! default executor (tracked via a thread-local variable) without referencing a -//! handle. It is expected that all executors will set a value for the default -//! executor. This value will often be set to the executor itself, but it is -//! possible that the default executor might be set to a different executor. -//! -//! For example, a single threaded executor might set the default executor to a -//! thread pool instead of itself, allowing futures to spawn new tasks onto the -//! thread pool when those tasks are `Send`. -//! -//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll -//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify -//! [`runtime`]: ../runtime/index.html -//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1 -//! [`Executor`]: trait.Executor.html -//! [`spawn`]: fn.spawn.html#[cfg(all(test, loom))] - -// At the top due to macros -#[cfg(test)] -#[macro_use] -mod tests; - -#[cfg(feature = "rt-current-thread")] -mod enter; -#[cfg(feature = "rt-current-thread")] -pub(crate) use self::enter::enter; - -mod global; -pub use self::global::spawn; - -pub mod park; - -#[cfg(feature = "rt-current-thread")] -mod task; -#[cfg(feature = "rt-current-thread")] -pub use self::task::{JoinError, JoinHandle}; - -#[cfg(feature = "rt-full")] -mod util; - -#[cfg(all(not(feature = "blocking"), feature = "rt-full"))] -mod blocking; -#[cfg(feature = "blocking")] -pub mod blocking; - -#[cfg(feature = "rt-current-thread")] -pub(crate) mod current_thread; - -#[cfg(feature = "rt-full")] -pub(crate) mod thread_pool; diff --git a/tokio/src/executor/park/mod.rs b/tokio/src/executor/park/mod.rs deleted file mode 100644 index 9d6d508f..00000000 --- a/tokio/src/executor/park/mod.rs +++ /dev/null @@ -1,142 +0,0 @@ -//! Abstraction over blocking and unblocking the current thread. -//! -//! Provides an abstraction over blocking the current thread. This is similar to -//! the park / unpark constructs provided by [`std`] but made generic. This -//! allows embedding custom functionality to perform when the thread is blocked. -//! -//! A blocked [`Park`][p] instance is unblocked by calling [`unpark`] on its -//! [`Unpark`][up] handle. -//! -//! The [`ParkThread`] struct implements [`Park`][p] using -//! [`thread::park`][`std`] to put the thread to sleep. The Tokio reactor also -//! implements park, but uses [`mio::Poll`][mio] to block the thread instead. -//! -//! The [`Park`][p] trait is composable. A timer implementation might decorate a -//! [`Park`][p] implementation by checking if any timeouts have elapsed after -//! the inner [`Park`][p] implementation unblocks. -//! -//! # Model -//! -//! Conceptually, each [`Park`][p] instance has an associated token, which is -//! initially not present: -//! -//! * The [`park`] method blocks the current thread unless or until the token -//! is available, at which point it atomically consumes the token. -//! * The [`unpark`] method atomically makes the token available if it wasn't -//! already. -//! -//! Some things to note: -//! -//! * If [`unpark`] is called before [`park`], the next call to [`park`] will -//! **not** block the thread. -//! * **Spurious** wakeups are permitted, i.e., the [`park`] method may unblock -//! even if [`unpark`] was not called. -//! * [`park_timeout`] does the same as [`park`] but allows specifying a maximum -//! time to block the thread for. -//! -//! [`std`]: https://doc.rust-lang.org/std/thread/fn.park.html -//! [`thread::park`]: https://doc.rust-lang.org/std/thread/fn.park.html -//! [`ParkThread`]: struct.ParkThread.html -//! [p]: trait.Park.html -//! [`park`]: trait.Park.html#tymethod.park -//! [`park_timeout`]: trait.Park.html#tymethod.park_timeout -//! [`unpark`]: trait.Unpark.html#tymethod.unpark -//! [up]: trait.Unpark.html -//! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html - -#[cfg(feature = "rt-full")] -mod thread; -#[cfg(feature = "rt-full")] -pub(crate) use self::thread::ParkThread; - -use std::sync::Arc; -use std::time::Duration; - -/// Block the current thread. -/// -/// See [module documentation][mod] for more details. -/// -/// [mod]: ../index.html -pub trait Park { - /// Unpark handle type for the `Park` implementation. - type Unpark: Unpark; - - /// Error returned by `park` - type Error; - - /// Get a new `Unpark` handle associated with this `Park` instance. - fn unpark(&self) -> Self::Unpark; - - /// Block the current thread unless or until the token is available. - /// - /// A call to `park` does not guarantee that the thread will remain blocked - /// forever, and callers should be prepared for this possibility. This - /// function may wakeup spuriously for any reason. - /// - /// See [module documentation][mod] for more details. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation - /// - /// [mod]: ../index.html - fn park(&mut self) -> Result<(), Self::Error>; - - /// Park the current thread for at most `duration`. - /// - /// This function is the same as `park` but allows specifying a maximum time - /// to block the thread for. - /// - /// Same as `park`, there is no guarantee that the thread will remain - /// blocked for any amount of time. Spurious wakeups are permitted for any - /// reason. - /// - /// See [module documentation][mod] for more details. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation - /// - /// [mod]: ../index.html - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; -} - -/// Unblock a thread blocked by the associated [`Park`] instance. -/// -/// See [module documentation][mod] for more details. -/// -/// [mod]: ../index.html -/// [`Park`]: trait.Park.html -pub trait Unpark: Sync + Send + 'static { - /// Unblock a thread that is blocked by the associated `Park` handle. - /// - /// Calling `unpark` atomically makes available the unpark token, if it is - /// not already available. - /// - /// See [module documentation][mod] for more details. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Unpark` implementation - /// - /// [mod]: ../index.html - fn unpark(&self); -} - -impl Unpark for Box { - fn unpark(&self) { - (**self).unpark() - } -} - -impl Unpark for Arc { - fn unpark(&self) { - (**self).unpark() - } -} diff --git a/tokio/src/executor/park/thread.rs b/tokio/src/executor/park/thread.rs deleted file mode 100644 index 880dfb49..00000000 --- a/tokio/src/executor/park/thread.rs +++ /dev/null @@ -1,273 +0,0 @@ -use crate::executor::park::{Park, Unpark}; -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::{Arc, Condvar, Mutex}; - -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::atomic::Ordering; -use std::time::Duration; - -/// Blocks the current thread using a condition variable. -/// -/// Implements the [`Park`] functionality by using a condition variable. An -/// atomic variable is also used to avoid using the condition variable if -/// possible. -/// -/// The condition variable is cached in a thread-local variable and is shared -/// across all `ParkThread` instances created on the same thread. This also -/// means that an instance of `ParkThread` might be unblocked by a handle -/// associated with a different `ParkThread` instance. -#[derive(Debug)] -pub(crate) struct ParkThread { - _anchor: PhantomData>, -} - -/// Error returned by [`ParkThread`] -/// -/// This currently is never returned, but might at some point in the future. -/// -/// [`ParkThread`]: struct.ParkThread.html -#[derive(Debug)] -pub(crate) struct ParkError { - _p: (), -} - -struct Parker { - unparker: Arc, -} - -/// Unblocks a thread that was blocked by `ParkThread`. -#[derive(Clone, Debug)] -pub(crate) struct UnparkThread { - inner: Arc, -} - -#[derive(Debug)] -struct Inner { - state: AtomicUsize, - mutex: Mutex<()>, - condvar: Condvar, -} - -const IDLE: usize = 0; -const NOTIFY: usize = 1; -const SLEEP: usize = 2; - -thread_local! { - static CURRENT_PARKER: Parker = Parker::new(); -} - -// ==== impl Parker ==== - -impl Parker { - fn new() -> Self { - Self { - unparker: Arc::new(Inner { - state: AtomicUsize::new(IDLE), - mutex: Mutex::new(()), - condvar: Condvar::new(), - }), - } - } - - fn unparker(&self) -> &Arc { - &self.unparker - } - - fn park(&self) -> Result<(), ParkError> { - self.unparker.park(None) - } - - fn park_timeout(&self, timeout: Duration) -> Result<(), ParkError> { - self.unparker.park(Some(timeout)) - } -} - -// ==== impl Inner ==== - -impl Inner { - /// Park the current thread for at most `dur`. - fn park(&self, timeout: Option) -> Result<(), ParkError> { - // If currently notified, then we skip sleeping. This is checked outside - // of the lock to avoid acquiring a mutex if not necessary. - match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { - NOTIFY => return Ok(()), - IDLE => {} - _ => unreachable!(), - } - - // The state is currently idle, so obtain the lock and then try to - // transition to a sleeping state. - let mut m = self.mutex.lock().unwrap(); - - // Transition to sleeping - match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { - NOTIFY => { - // Notified before we could sleep, consume the notification and - // exit - self.state.store(IDLE, Ordering::SeqCst); - return Ok(()); - } - IDLE => {} - _ => unreachable!(), - } - - m = match timeout { - Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0, - None => self.condvar.wait(m).unwrap(), - }; - - // Transition back to idle. If the state has transitioned to `NOTIFY`, - // this will consume that notification - self.state.store(IDLE, Ordering::SeqCst); - - // Explicitly drop the mutex guard. There is no real point in doing it - // except that I find it helpful to make it explicit where we want the - // mutex to unlock. - drop(m); - - Ok(()) - } - - fn unpark(&self) { - // First, try transitioning from IDLE -> NOTIFY, this does not require a - // lock. - match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { - IDLE | NOTIFY => return, - SLEEP => {} - _ => unreachable!(), - } - - // The other half is sleeping, this requires a lock - let _m = self.mutex.lock().unwrap(); - - // Transition to NOTIFY - match self.state.swap(NOTIFY, Ordering::SeqCst) { - SLEEP => {} - NOTIFY => return, - IDLE => return, - _ => unreachable!(), - } - - // Wakeup the sleeper - self.condvar.notify_one(); - } -} - -// ===== impl ParkThread ===== - -impl ParkThread { - /// Create a new `ParkThread` handle for the current thread. - /// - /// This type cannot be moved to other threads, so it should be created on - /// the thread that the caller intends to park. - pub(crate) fn new() -> ParkThread { - ParkThread { - _anchor: PhantomData, - } - } - - /// Get a reference to the `ParkThread` handle for this thread. - fn with_current(&self, f: F) -> R - where - F: FnOnce(&Parker) -> R, - { - CURRENT_PARKER.with(|inner| f(inner)) - } -} - -impl Park for ParkThread { - type Unpark = UnparkThread; - type Error = ParkError; - - fn unpark(&self) -> Self::Unpark { - let inner = self.with_current(|inner| inner.unparker().clone()); - UnparkThread { inner } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|inner| inner.park())?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|inner| inner.park_timeout(duration))?; - Ok(()) - } -} - -impl Default for ParkThread { - fn default() -> Self { - Self::new() - } -} - -// ===== impl UnparkThread ===== - -impl Unpark for UnparkThread { - fn unpark(&self) { - self.inner.unpark(); - } -} - -#[cfg(feature = "rt-full")] -mod waker { - use super::{Inner, UnparkThread}; - use crate::loom::sync::Arc; - - use std::mem; - use std::task::{RawWaker, RawWakerVTable, Waker}; - - impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) - } - } - } - - impl Inner { - #[allow(clippy::wrong_self_convention)] - fn into_raw(this: Arc) -> *const () { - Arc::into_raw(this) as *const () - } - - unsafe fn from_raw(ptr: *const ()) -> Arc { - Arc::from_raw(ptr as *const Inner) - } - } - - unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { - RawWaker::new( - Inner::into_raw(unparker), - &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), - ) - } - - unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); - - // Increment the ref count - mem::forget(unparker.clone()); - - unparker_to_raw_waker(unparker) - } - - unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); - } - - unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - } - - unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - - // We don't actually own a reference to the unparker - mem::forget(unparker); - } -} diff --git a/tokio/src/executor/task/core.rs b/tokio/src/executor/task/core.rs deleted file mode 100644 index 7a3016ee..00000000 --- a/tokio/src/executor/task/core.rs +++ /dev/null @@ -1,156 +0,0 @@ -use crate::executor::task::raw::{self, Vtable}; -use crate::executor::task::state::State; -use crate::executor::task::waker::waker_ref; -use crate::executor::task::Schedule; -use crate::loom::alloc::Track; -use crate::loom::cell::CausalCell; - -use std::cell::UnsafeCell; -use std::future::Future; -use std::mem::MaybeUninit; -use std::pin::Pin; -use std::ptr::{self, NonNull}; -use std::task::{Context, Poll, Waker}; - -/// The task cell. Contains the components of the task. -/// -/// It is critical for `Header` to be the first field as the task structure will -/// be referenced by both *mut Cell and *mut Header. -#[repr(C)] -pub(super) struct Cell { - /// Hot task state data - pub(super) header: Header, - - /// Either the future or output, depending on the execution stage. - pub(super) core: Core, - - /// Cold data - pub(super) trailer: Trailer, -} - -/// The core of the task. -/// -/// Holds the future or output, depending on the stage of execution. -pub(super) struct Core { - stage: Stage, -} - -/// Crate public as this is also needed by the pool. -#[repr(C)] -pub(crate) struct Header { - /// Task state - pub(super) state: State, - - /// Pointer to the executor owned by the task - pub(super) executor: CausalCell>>, - - /// Pointer to next task, used for misc task linked lists. - pub(crate) queue_next: UnsafeCell<*const Header>, - - /// Pointer to the next task in the ownership list. - pub(crate) owned_next: UnsafeCell>>, - - /// Pointer to the previous task in the ownership list. - pub(crate) owned_prev: UnsafeCell>>, - - /// Table of function pointers for executing actions on the task. - pub(super) vtable: &'static Vtable, - - /// Used by loom to track the causality of the future. Without loom, this is - /// unit. - pub(super) future_causality: CausalCell<()>, -} - -/// Cold data is stored after the future. -pub(super) struct Trailer { - /// Consumer task waiting on completion of this ta