summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-19 11:09:40 -0700
committerGitHub <noreply@github.com>2019-10-19 11:09:40 -0700
commited5a94eb2db95b7cc142045fbbd5d68c6276e04e (patch)
tree0c731fc9696760feb37fe841790d9eebdbeb50a8 /tokio
parent2a181320b708f7d03dfb00502b1d6cc783ce0927 (diff)
executor: rewrite the work-stealing thread pool (#1657)
This patch is a ground up rewrite of the existing work-stealing thread pool. The goal is to reduce overhead while simplifying code when possible. At a high level, the following architectural changes were made: - The local run queues were switched for bounded circle buffer queues. - Reduce cross-thread synchronization. - Refactor task constructs to use a single allocation and always include a join handle (#887). - Simplify logic around putting workers to sleep and waking them up. **Local run queues** Move away from crossbeam's implementation of the Chase-Lev deque. This implementation included unnecessary overhead as it supported capabilities that are not needed for the work-stealing thread pool. Instead, a fixed size circle buffer is used for the local queue. When the local queue is full, half of the tasks contained in it are moved to the global run queue. **Reduce cross-thread synchronization** This is done via many small improvements. Primarily, an upper bound is placed on the number of concurrent stealers. Limiting the number of stealers results in lower contention. Secondly, the rate at which workers are notified and woken up is throttled. This also reduces contention by preventing many threads from racing to steal work. **Refactor task structure** Now that Tokio is able to target a rust version that supports `std::alloc` as well as `std::task`, the pool is able to optimize how the task structure is laid out. Now, a single allocation per task is required and a join handle is always provided enabling the spawner to retrieve the result of the task (#887). **Simplifying logic** When possible, complexity is reduced in the implementation. This is done by using locks and other simpler constructs in cold paths. The set of sleeping workers is now represented as a `Mutex<VecDeque<usize>>`. Instead of optimizing access to this structure, we reduce the amount the pool must access this structure. Secondly, we have (temporarily) removed `threadpool::blocking`. This capability will come back later, but the original implementation was way more complicated than necessary. **Results** The thread pool benchmarks have improved significantly: Old thread pool: ``` test chained_spawn ... bench: 2,019,796 ns/iter (+/- 302,168) test ping_pong ... bench: 1,279,948 ns/iter (+/- 154,365) test spawn_many ... bench: 10,283,608 ns/iter (+/- 1,284,275) test yield_many ... bench: 21,450,748 ns/iter (+/- 1,201,337) ``` New thread pool: ``` test chained_spawn ... bench: 147,943 ns/iter (+/- 6,673) test ping_pong ... bench: 537,744 ns/iter (+/- 20,928) test spawn_many ... bench: 7,454,898 ns/iter (+/- 283,449) test yield_many ... bench: 16,771,113 ns/iter (+/- 733,424) ``` Real-world benchmarks improve significantly as well. This is testing the hyper hello world server using: `wrk -t1 -c50 -d10`: Old scheduler: ``` Running 10s test @ http://127.0.0.1:3000 1 threads and 50 connections Thread Stats Avg Stdev Max +/- Stdev Latency 371.53us 99.05us 1.97ms 60.53% Req/Sec 114.61k 8.45k 133.85k 67.00% 1139307 requests in 10.00s, 95.61MB read Requests/sec: 113923.19 Transfer/sec: 9.56MB ``` New scheduler: ``` Running 10s test @ http://127.0.0.1:3000 1 threads and 50 connections Thread Stats Avg Stdev Max +/- Stdev Latency 275.05us 69.81us 1.09ms 73.57% Req/Sec 153.17k 10.68k 171.51k 71.00% 1522671 requests in 10.00s, 127.79MB read Requests/sec: 152258.70 Transfer/sec: 12.78MB ```
Diffstat (limited to 'tokio')
-rw-r--r--tokio/Cargo.toml2
-rw-r--r--tokio/src/runtime/mod.rs7
-rw-r--r--tokio/src/runtime/threadpool/background.rs61
-rw-r--r--tokio/src/runtime/threadpool/builder.rs195
-rw-r--r--tokio/src/runtime/threadpool/mod.rs104
-rw-r--r--tokio/src/runtime/threadpool/spawner.rs (renamed from tokio/src/runtime/threadpool/task_executor.rs)45
-rw-r--r--tokio/tests/runtime_threaded.rs48
-rw-r--r--tokio/tests/timer.rs2
8 files changed, 124 insertions, 340 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index ce6b3b63..6ed8002c 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -53,7 +53,7 @@ rt-full = [
"sync",
"timer",
"tokio-executor/current-thread",
- "tokio-executor/threadpool",
+ "tokio-executor/thread-pool",
"tracing-core",
]
signal = ["tokio-net/signal"]
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index aa183cbd..6e42a486 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -70,9 +70,6 @@
//! }
//! ```
//!
-//! In this function, the `run` function blocks until the runtime becomes idle.
-//! See [`shutdown_on_idle`][idle] for more shutdown details.
-//!
//! From within the context of the runtime, additional tasks are spawned using
//! the [`tokio::spawn`] function. Futures spawned using this function will be
//! executed on the same thread pool used by the [`Runtime`].
@@ -129,7 +126,6 @@
//! [`Reactor`]: ../reactor/struct.Reactor.html
//! [`ThreadPool`]: https://docs.rs/tokio-executor/0.2.0-alpha.2/tokio_executor/threadpool/struct.ThreadPool.html
//! [`run`]: fn.run.html
-//! [idle]: struct.Runtime.html#method.shutdown_on_idle
//! [`tokio::spawn`]: ../executor/fn.spawn.html
//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html
//! [`tokio::main`]: ../../tokio_macros/attr.main.html
@@ -141,8 +137,9 @@ mod threadpool;
#[cfg(feature = "rt-full")]
pub use self::threadpool::{
Builder,
+ JoinHandle,
Runtime,
- TaskExecutor,
+ Spawner,
};
// Internal export, don't use.
diff --git a/tokio/src/runtime/threadpool/background.rs b/tokio/src/runtime/threadpool/background.rs
deleted file mode 100644
index 3d884118..00000000
--- a/tokio/src/runtime/threadpool/background.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-//! Temporary reactor + timer that runs on a background thread. This it to make
-//! `block_on` work.
-
-use tokio_executor::current_thread::CurrentThread;
-use tokio_net::driver::{self, Reactor};
-use tokio_sync::oneshot;
-use tokio_timer::clock::Clock;
-use tokio_timer::timer::{self, Timer};
-
-use std::{io, thread};
-
-#[derive(Debug)]
-pub(crate) struct Background {
- reactor_handle: driver::Handle,
- timer_handle: timer::Handle,
- shutdown_tx: Option<oneshot::Sender<()>>,
- thread: Option<thread::JoinHandle<()>>,
-}
-
-pub(crate) fn spawn(clock: &Clock) -> io::Result<Background> {
- let clock = clock.clone();
-
- let reactor = Reactor::new()?;
- let reactor_handle = reactor.handle();
-
- let timer = Timer::new_with_now(reactor, clock);
- let timer_handle = timer.handle();
-
- let (shutdown_tx, shutdown_rx) = oneshot::channel();
- let shutdown_tx = Some(shutdown_tx);
-
- let thread = thread::spawn(move || {
- let mut rt = CurrentThread::new_with_park(timer);
- let _ = rt.block_on(shutdown_rx);
- });
- let thread = Some(thread);
-
- Ok(Background {
- reactor_handle,
- timer_handle,
- shutdown_tx,
- thread,
- })
-}
-
-impl Background {
- pub(super) fn reactor(&self) -> &driver::Handle {
- &self.reactor_handle
- }
-
- pub(super) fn timer(&self) -> &timer::Handle {
- &self.timer_handle
- }
-}
-
-impl Drop for Background {
- fn drop(&mut self) {
- let _ = self.shutdown_tx.take().unwrap().send(());
- let _ = self.thread.take().unwrap().join();
- }
-}
diff --git a/tokio/src/runtime/threadpool/builder.rs b/tokio/src/runtime/threadpool/builder.rs
index 13173095..a45abdbc 100644
--- a/tokio/src/runtime/threadpool/builder.rs
+++ b/tokio/src/runtime/threadpool/builder.rs
@@ -1,16 +1,13 @@
-use super::{background, Inner, Runtime};
+use super::{Inner, Runtime};
-use tokio_executor::threadpool;
+use tokio_executor::thread_pool;
use tokio_net::driver::{self, Reactor};
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
-use num_cpus;
use tracing_core as trace;
-use std::io;
-use std::sync::Mutex;
-use std::time::Duration;
-use std::any::Any;
+use std::{fmt, io};
+use std::sync::{Arc, Mutex};
/// Builds Tokio Runtime with custom configuration values.
///
@@ -28,18 +25,14 @@ use std::any::Any;
/// # Examples
///
/// ```
-/// use std::time::Duration;
-///
/// use tokio::runtime::Builder;
/// use tokio_timer::clock::Clock;
///
/// fn main() {
/// // build Runtime
/// let runtime = Builder::new()
-/// .blocking_threads(4)
/// .clock(Clock::system())
-/// .core_threads(4)
-/// .keep_alive(Some(Duration::from_secs(60)))
+/// .num_threads(4)
/// .name_prefix("my-custom-name-")
/// .stack_size(3 * 1024 * 1024)
/// .build()
@@ -48,13 +41,18 @@ use std::any::Any;
/// // use runtime ...
/// }
/// ```
-#[derive(Debug)]
pub struct Builder {
/// Thread pool specific builder
- threadpool_builder: threadpool::Builder,
+ thread_pool_builder: thread_pool::Builder,
/// The number of worker threads
- core_threads: usize,
+ num_threads: usize,
+
+ /// To run after each worker thread starts
+ after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+
+ /// To run before each worker thread stops
+ before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
/// The clock to use
clock: Clock,
@@ -66,15 +64,18 @@ impl Builder {
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
- let core_threads = num_cpus::get().max(1);
+ let num_threads = num_cpus::get().max(1);
- let mut threadpool_builder = threadpool::Builder::new();
- threadpool_builder.name_prefix("tokio-runtime-worker-");
- threadpool_builder.pool_size(core_threads);
+ let mut thread_pool_builder = thread_pool::Builder::new();
+ thread_pool_builder
+ .name_prefix("tokio-runtime-worker-")
+ .num_threads(num_threads);
Builder {
- threadpool_builder,
- core_threads,
+ thread_pool_builder,
+ num_threads,
+ after_start: None,
+ before_stop: None,
clock: Clock::new(),
}
}
@@ -85,35 +86,6 @@ impl Builder {
self
}
- /// Sets a callback to handle panics in futures.
- ///
- /// The callback is triggered when a panic during a future bubbles up to
- /// Tokio. By default Tokio catches these panics, and they will be ignored.
- /// The parameter passed to this callback is the same error value returned
- /// from `std::panic::catch_unwind()`. To abort the process on panics, use
- /// `std::panic::resume_unwind()` in this callback as shown below.
- ///
- /// # Examples
- ///
- /// ```
- /// # use tokio::runtime;
- ///
- /// # pub fn main() {
- /// let rt = runtime::Builder::new()
- /// .panic_handler(|err| std::panic::resume_unwind(err))
- /// .build()
- /// .unwrap();
- /// # }
- /// ```
- pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
- where
- F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
- {
- self.threadpool_builder.panic_handler(f);
- self
- }
-
-
/// Set the maximum number of worker threads for the `Runtime`'s thread pool.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
@@ -128,71 +100,14 @@ impl Builder {
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
- /// .core_threads(4)
+ /// .num_threads(4)
/// .build()
/// .unwrap();
/// # }
/// ```
- pub fn core_threads(&mut self, val: usize) -> &mut Self {
- self.core_threads = val;
- self.threadpool_builder.pool_size(val);
- self
- }
-
- /// Set the maximum number of concurrent blocking sections in the `Runtime`'s
- /// thread pool.
- ///
- /// When the maximum concurrent `blocking` calls is reached, any further
- /// calls to `blocking` will return `NotReady` and the task is notified once
- /// previously in-flight calls to `blocking` return.
- ///
- /// This must be a number between 1 and 32,768 though it is advised to keep
- /// this value on the smaller side.
- ///
- /// The default value is 100.
- ///
- /// # Examples
- ///
- /// ```
- /// # use tokio::runtime;
- ///
- /// # pub fn main() {
- /// let rt = runtime::Builder::new()
- /// .blocking_threads(200)
- /// .build();
- /// # }
- /// ```
- pub fn blocking_threads(&mut self, val: usize) -> &mut Self {
- self.threadpool_builder.max_blocking(val);
- self
- }
-
- /// Set the worker thread keep alive duration for threads in the `Runtime`'s
- /// thread pool.
- ///
- /// If set, a worker thread will wait for up to the specified duration for
- /// work, at which point the thread will shutdown. When work becomes
- /// available, a new thread will eventually be spawned to replace the one
- /// that shut down.
- ///
- /// When the value is `None`, the thread will wait for work forever.
- ///
- /// The default value is `None`.
- ///
- /// # Examples
- ///
- /// ```
- /// # use tokio::runtime;
- /// use std::time::Duration;
- ///
- /// # pub fn main() {
- /// let rt = runtime::Builder::new()
- /// .keep_alive(Some(Duration::from_secs(30)))
- /// .build();
- /// # }
- /// ```
- pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
- self.threadpool_builder.keep_alive(val);
+ pub fn num_threads(&mut self, val: usize) -> &mut Self {
+ self.num_threads = val;
+ self.thread_pool_builder.num_threads(val);
self
}
@@ -216,7 +131,7 @@ impl Builder {
/// # }
/// ```
pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
- self.threadpool_builder.name_prefix(val);
+ self.thread_pool_builder.name_prefix(val);
self
}
@@ -240,7 +155,7 @@ impl Builder {
/// # }
/// ```
pub fn stack_size(&mut self, val: usize) -> &mut Self {
- self.threadpool_builder.stack_size(val);
+ self.thread_pool_builder.stack_size(val);
self
}
@@ -265,7 +180,7 @@ impl Builder {
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
- self.threadpool_builder.after_start(f);
+ self.after_start = Some(Arc::new(f));
self
}
@@ -289,7 +204,7 @@ impl Builder {
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
- self.threadpool_builder.before_stop(f);
+ self.before_stop = Some(Arc::new(f));
self
}
@@ -308,14 +223,11 @@ impl Builder {
/// # }
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
- // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too.
- self.threadpool_builder.pool_size(self.core_threads);
-
let mut reactor_handles = Vec::new();
let mut timer_handles = Vec::new();
let mut timers = Vec::new();
- for _ in 0..self.core_threads {
+ for _ in 0..self.num_threads {
// Create a new reactor.
let reactor = Reactor::new()?;
reactor_handles.push(reactor.handle());
@@ -336,36 +248,44 @@ impl Builder {
let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone);
let trace = dispatch.clone();
- let background = background::spawn(&clock)?;
+ let around_reactor_handles = reactor_handles.clone();
+ let around_timer_handles = timer_handles.clone();
- let pool = self
- .threadpool_builder
- .around_worker(move |w| {
- let index = w.id().to_usize();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
- let _reactor = driver::set_default(&reactor_handles[index]);
+ let pool = self
+ .thread_pool_builder
+ .around_worker(move |index, next| {
+ let _reactor = driver::set_default(&around_reactor_handles[index]);
clock::with_default(&clock, || {
- let _timer = timer::set_default(&timer_handles[index]);
+ let _timer = timer::set_default(&around_timer_handles[index]);
trace::dispatcher::with_default(&dispatch, || {
- w.run();
+ if let Some(after_start) = after_start.as_ref() {
+ after_start();
+ }
+
+ next();
+
+ if let Some(before_stop) = before_stop.as_ref() {
+ before_stop();
+ }
})
})
})
- .custom_park(move |worker_id| {
- let index = worker_id.to_usize();
-
+ .build_with_park(move |index| {
timers[index]
.lock()
.unwrap()
.take()
.unwrap()
- })
- .build();
+ });
Ok(Runtime {
inner: Some(Inner {
pool,
- background,
+ reactor_handles,
+ timer_handles,
trace,
}),
})
@@ -377,3 +297,12 @@ impl Default for Builder {
Self::new()
}
}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("thread_pool_builder", &self.thread_pool_builder)
+ .field("after_start", &self.after_start.as_ref().map(|_| "..."))
+ .finish()
+ }
+}
diff --git a/tokio/src/runtime/threadpool/mod.rs b/tokio/src/runtime/threadpool/mod.rs
index 47009cc4..0717589c 100644
--- a/tokio/src/runtime/threadpool/mod.rs
+++ b/tokio/src/runtime/threadpool/mod.rs
@@ -1,15 +1,15 @@
-mod background;
mod builder;
-mod task_executor;
-
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::builder::Builder;
+
+mod spawner;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::spawner::Spawner;
+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::task_executor::TaskExecutor;
-use background::Background;
+pub use tokio_executor::thread_pool::JoinHandle;
-use tokio_executor::enter;
-use tokio_executor::threadpool::ThreadPool;
+use tokio_executor::thread_pool::ThreadPool;
use tokio_net::driver;
use tokio_timer::timer;
@@ -41,18 +41,14 @@ struct Inner {
/// Task execution pool.
pool: ThreadPool,
+ /// Reactor handles
+ reactor_handles: Vec<tokio_net::driver::Handle>,
+
+ /// Timer handles
+ timer_handles: Vec<timer::Handle>,
+
/// Tracing dispatcher
trace: trace::Dispatch,
-
- /// Maintains a reactor and timer that are always running on a background
- /// thread. This is to support `runtime.block_on` w/o requiring the future
- /// to be `Send`.
- ///
- /// A dedicated background thread is required as the threadpool threads
- /// might not be running. However, this is a temporary work around.
- ///
- /// TODO: Delete this
- background: Background,
}
// ===== impl Runtime =====
@@ -80,9 +76,6 @@ impl Runtime {
/// .unwrap();
///
/// // Use the runtime...
- ///
- /// // Shutdown the runtime
- /// rt.shutdown_now();
/// ```
///
/// [mod]: index.html
@@ -90,27 +83,6 @@ impl Runtime {
Builder::new().build()
}
- /// Return a handle to the runtime's executor.
- ///
- /// The returned handle can be used to spawn tasks that run on this runtime.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// let rt = Runtime::new()
- /// .unwrap();
- ///
- /// let executor_handle = rt.executor();
- ///
- /// // use `executor_handle`
- /// ```
- pub fn executor(&self) -> TaskExecutor {
- let inner = self.inner().pool.sender().clone();
- TaskExecutor { inner }
- }
-
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
@@ -134,8 +106,6 @@ impl Runtime {
/// rt.spawn(async {
/// println!("now running on a worker thread");
/// });
- ///
- /// rt.shutdown_on_idle();
/// }
/// ```
///
@@ -166,32 +136,19 @@ impl Runtime {
where
F: Future,
{
- let mut entered = enter().expect("nested block_on");
-
- let bg = &self.inner().background;
let trace = &self.inner().trace;
- tokio_executor::with_default(&mut self.inner().pool.sender(), || {
- let _reactor = driver::set_default(bg.reactor());
- let _timer = timer::set_default(bg.timer());
- trace::dispatcher::with_default(trace, || {
- entered.block_on(future)
- })
+ let _reactor = driver::set_default(&self.inner().reactor_handles[0]);
+ let _timer = timer::set_default(&self.inner().timer_handles[0]);
+
+ trace::dispatcher::with_default(trace, || {
+ self.inner().pool.block_on(future)
})
}
- /// Signals the runtime to shutdown once it becomes idle.
- ///
- /// Blocks the current thread until the shutdown operation has completed.
- /// This function can be used to perform a graceful shutdown of the runtime.
+ /// Return a handle to the runtime's spawner.
///
- /// The runtime enters an idle state once **all** of the following occur.
- ///
- /// * The thread pool has no tasks to execute, i.e., all tasks that were
- /// spawned have completed.
- /// * The reactor is not managing any I/O resources.
- ///
- /// See [module level][mod] documentation for more details.
+ /// The returned handle can be used to spawn tasks that run on this runtime.
///
/// # Examples
///
@@ -201,18 +158,13 @@ impl Runtime {
/// let rt = Runtime::new()
/// .unwrap();
///
- /// // Use the runtime...
+ /// let spawner = rt.spawner();
///
- /// // Shutdown the runtime
- /// rt.shutdown_on_idle();
+ /// spawner.spawn(async { println!("hello"); });
/// ```
- ///
- /// [mod]: index.html
- pub fn shutdown_on_idle(mut self) {
- let mut e = tokio_executor::enter().unwrap();
-
- let inner = self.inner.take().unwrap();
- e.block_on(inner.pool.shutdown_on_idle());
+ pub fn spawner(&self) -> Spawner {
+ let inner = self.inner().pool.spawner().clone();
+ Spawner::new(inner)
}
/// Signals the runtime to shutdown immediately.
@@ -248,11 +200,9 @@ impl Runtime {
/// ```
///
/// [mod]: index.html
+ #[allow(warnings)]
pub fn shutdown_now(mut self) {
- let mut e = tokio_executor::enter().unwrap();
- let inner = self.inner.take().unwrap();
-
- e.block_on(inner.pool.shutdown_now());
+ self.inner.unwrap().pool.shutdown_now();
}
fn inner(&self) -> &Inner {
diff --git a/tokio/src/runtime/threadpool/task_executor.rs b/tokio/src/runtime/threadpool/spawner.rs
index 3107ac11..204bcc0e 100644
--- a/tokio/src/runtime/threadpool/task_executor.rs
+++ b/tokio/src/runtime/threadpool/spawner.rs
@@ -1,21 +1,25 @@
-use tokio_executor::SpawnError;
-use tokio_executor::threadpool::Sender;
+use crate::runtime::JoinHandle;
+
+use tokio_executor::thread_pool;
use std::future::Future;
-use std::pin::Pin;
-/// Executes futures on the runtime
+/// Spawns futures on the runtime
///
/// All futures spawned using this executor will be submitted to the associated
/// Runtime's executor. This executor is usually a thread pool.
///
/// For more details, see the [module level](index.html) documentation.
#[derive(Debug, Clone)]
-pub struct TaskExecutor {
- pub(super) inner: Sender,
+pub struct Spawner {
+ inner: thread_pool::Spawner,
}
-impl TaskExecutor {
+impl Spawner {
+ pub(super) fn new(inner: thread_pool::Spawner) -> Spawner {
+ Spawner { inner }
+ }
+
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
@@ -34,10 +38,10 @@ impl TaskExecutor {
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
- /// let executor = rt.executor();
+ /// let spawner = rt.spawner();
///
/// // Spawn a future onto the runtime
- /// executor.spawn(async {
+ /// spawner.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
@@ -47,27 +51,10 @@ impl TaskExecutor {
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
- pub fn spawn<F>(&self, future: F)
- where F: Future<Output = ()> + Send + 'static,
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future<Output = ()> + Send + 'static,
{
- self.inner.spawn(future).unwrap();
- }
-}
-
-impl tokio_executor::Executor for TaskExecutor {
- fn spawn(
- &mut self,
- future: Pin<Box<dyn Future<Output = ()> + Send>>,
- ) -> Result<(), SpawnError> {
self.inner.spawn(future)
}
}
-
-impl<T> tokio_executor::TypedExecutor<T> for TaskExecutor
-where
- T: Future<Output = ()> + Send + 'static,
-{
- fn spawn(&mut self, future: T) -> Result<(), crate::executor::SpawnError> {
- crate::executor::Executor::spawn(self, Box::pin(future))
- }
-}
diff --git a/tokio/tests/runtime_threaded.rs b/tokio/tests/runtime_threaded.rs
index 36f652dd..96f34b98 100644
--- a/tokio/tests/runtime_threaded.rs
+++ b/tokio/tests/runtime_threaded.rs
@@ -47,14 +47,13 @@ fn spawn_shutdown() {
rt.spawn(client_server(tx.clone()));
- // Use executor trait
- let f = Box::pin(client_server(tx));
- tokio_executor::Executor::spawn(&mut rt.executor(), f).unwrap();
+ // Use spawner
+ rt.spawner().spawn(client_server(tx));
- rt.shutdown_on_idle();
+ assert_ok!(rx.recv());
+ assert_ok!(rx.recv());
- assert_ok!(rx.try_recv());
- assert_ok!(rx.try_recv());
+ rt.shutdown_now();
assert_err!(rx.try_recv());
}
@@ -68,8 +67,6 @@ fn block_on_timer() {
});
assert_eq!(v, 42);
-
- rt.shutdown_on_idle();
}
#[test]
@@ -111,30 +108,35 @@ fn block_waits() {
});
assert_ok!(b_rx.try_recv());
-
- rt.shutdown_on_idle();
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
- let cnt = Arc::new(Mutex::new(0));
let rt = Runtime::new().unwrap();
+ let cnt = Arc::new(Mutex::new(0));
+ let (tx, rx) = mpsc::channel();
+ let tx = Arc::new(Mutex::new(tx));
+
let c = cnt.clone();
rt.block_on(async move {
for _ in 0..ITER {
let c = c.clone();
+ let tx = tx.clone();
tokio::spawn(async move {
let mut x = c.lock().unwrap();
*x = 1 + *x;
+
+ if *x == ITER {
+ tx.lock().unwrap().send(()).unwrap();
+ }
});
}
});
- rt.shutdown_on_idle();
-
+ rx.recv().unwrap();
assert_eq!(ITER, *cnt.lock().unwrap());
}
@@ -146,30 +148,12 @@ fn nested_enter() {
rt.block_on(async {
assert_err!(tokio_executor::enter());
- // Since this is testing panics in other threads, printing about panics
- // is noisy and can give the impression that the test is ignoring panics.
- //
- // It *is* ignoring them, but on purpose.
- let prev_hook = panic::take_hook();
- panic::set_hook(Box::new(|info| {
- let s = info.to_string();
- if s.starts_with("panicked at 'nested ")
- || s.starts_with("panicked at 'Multiple executors at once")
- {
- // expected, noop
- } else {
- println!("{}", s);
- }
- }));
-
let res = panic::catch_unwind(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {});
});
assert_err!(res);
-
- panic::set_hook(prev_hook);
});
}
@@ -198,7 +182,7 @@ fn after_start_and_before_stop_is_called() {
rt.block_on(client_server(tx));
- rt.shutdown_on_idle();
+ drop(rt);
assert_ok!(rx.try_recv());
diff --git a/tokio/tests/timer.rs b/tokio/tests/timer.rs
index 138e8a2f..ca716ccb 100644
--- a/tokio/tests/timer.rs
+++ b/tokio/tests/timer.rs
@@ -23,8 +23,6 @@ fn timer_with_threaded_runtime() {
tx.send(()).unwrap();
});
- rt.shutdown_on_idle();
-
rx.recv().unwrap();
}