summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-06 21:29:10 -0800
committerGitHub <noreply@github.com>2019-11-06 21:29:10 -0800
commit4dbe6af0a1a1c8e579b92ec8ffc1d419244e0944 (patch)
tree4ee02032f615464af50aa6858440c5cecccec4cf
parent9bec094150e869caae5105d7080f0ae54757b2d9 (diff)
runtime: misc pool cleanup (#1743)
- Remove builders for internal types - Avoid duplicating the blocking pool when using the concurrent scheduler. - misc smaller cleanup
-rw-r--r--tokio/src/runtime/blocking/builder.rs58
-rw-r--r--tokio/src/runtime/blocking/mod.rs65
-rw-r--r--tokio/src/runtime/builder.rs104
-rw-r--r--tokio/src/runtime/mod.rs13
-rw-r--r--tokio/src/runtime/thread_pool/builder.rs208
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs190
-rw-r--r--tokio/src/runtime/thread_pool/pool.rs84
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs26
-rw-r--r--tokio/src/runtime/thread_pool/tests/pool.rs38
-rw-r--r--tokio/src/runtime/thread_pool/tests/worker.rs3
10 files changed, 324 insertions, 465 deletions
diff --git a/tokio/src/runtime/blocking/builder.rs b/tokio/src/runtime/blocking/builder.rs
deleted file mode 100644
index 59178f25..00000000
--- a/tokio/src/runtime/blocking/builder.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-use crate::loom::thread;
-use crate::runtime::blocking::Pool;
-
-use std::usize;
-
-/// Builds a blocking thread pool with custom configuration values.
-pub(crate) struct Builder {
- /// Thread name
- name: String,
-
- /// Thread stack size
- stack_size: Option<usize>,
-}
-
-impl Default for Builder {
- fn default() -> Self {
- Builder {
- name: "tokio-blocking-thread".to_string(),
- stack_size: None,
- }
- }
-}
-
-impl Builder {
- /// Set name of threads spawned by the pool
- ///
- /// If this configuration is not set, then the thread will use the system
- /// default naming scheme.
- pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
- self.name = val.into();
- self
- }
-
- /// Set the stack size (in bytes) for worker threads.
- ///
- /// The actual stack size may be greater than this value if the platform
- /// specifies minimal stack size.
- ///
- /// The default stack size for spawned threads is 2 MiB, though this
- /// particular stack size is subject to change in the future.
- pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
- self.stack_size = Some(val);
- self
- }
-
- pub(crate) fn build(self) -> Pool {
- let mut p = Pool::default();
- let Builder { stack_size, name } = self;
- p.new_thread = Box::new(move || {
- let mut b = thread::Builder::new().name(name.clone());
- if let Some(stack_size) = stack_size {
- b = b.stack_size(stack_size);
- }
- b
- });
- p
- }
-}
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index e634ea59..941babaa 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -17,12 +17,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
-#[cfg(feature = "rt-full")]
-mod builder;
-
-#[cfg(feature = "rt-full")]
-pub(crate) use builder::Builder;
-
#[derive(Clone, Copy)]
enum State {
Empty,
@@ -69,11 +63,22 @@ where
}
pub(crate) struct Pool {
+ /// State shared between worker threads
shared: Mutex<Shared>,
+
+ /// Pool threads wait on this.
condvar: Condvar,
- new_thread: Box<dyn Fn() -> thread::Builder + Send + Sync + 'static>,
+
+ /// Spawned threads use this name
+ thread_name: String,
+
+ /// Spawned thread stack size
+ stack_size: Option<usize>,
}
+#[derive(Debug)]
+pub(crate) struct PoolWaiter(Arc<Pool>);
+
impl fmt::Debug for Pool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Pool").finish()
@@ -99,6 +104,21 @@ pub struct Blocking<T> {
}
impl Pool {
+ pub(crate) fn new(thread_name: String, stack_size: Option<usize>) -> Arc<Pool> {
+ Arc::new(Pool {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ }),
+ condvar: Condvar::new(),
+ thread_name,
+ stack_size,
+ })
+ }
+
/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn(this: &Arc<Self>, f: Box<dyn FnOnce() + Send + 'static>) {
let should_spawn = {
@@ -135,12 +155,18 @@ impl Pool {
};
if should_spawn {
- Pool::spawn_thread(Arc::clone(this), (this.new_thread)());
+ Pool::spawn_thread(Arc::clone(this));
}
}
// NOTE: we cannot use self here w/o arbitrary_self_types since Arc is loom::Arc
- fn spawn_thread(this: Arc<Self>, builder: thread::Builder) {
+ fn spawn_thread(this: Arc<Self>) {
+ let mut builder = thread::Builder::new().name(this.thread_name.clone());
+
+ if let Some(stack_size) = this.stack_size {
+ builder = builder.stack_size(stack_size);
+ }
+
builder
.spawn(move || {
let mut shared = this.shared.lock().unwrap();
@@ -221,9 +247,6 @@ impl Pool {
}
}
-#[derive(Debug)]
-pub(crate) struct PoolWaiter(Arc<Pool>);
-
impl From<Pool> for PoolWaiter {
fn from(p: Pool) -> Self {
Self::from(Arc::new(p))
@@ -341,21 +364,3 @@ fn run_task(f: Box<dyn FnOnce() + Send>) {
let _ = catch_unwind(AssertUnwindSafe(|| f()));
}
-
-impl Default for Pool {
- fn default() -> Self {
- Pool {
- shared: Mutex::new(Shared {
- queue: VecDeque::new(),
- num_th: 0,
- num_idle: 0,
- num_notify: 0,
- shutdown: false,
- }),
- condvar: Condvar::new(),
- new_thread: Box::new(|| {
- thread::Builder::new().name("tokio-blocking-driver".to_string())
- }),
- }
- }
-}
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index 92fdfa1a..6081c10e 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,13 +1,9 @@
+use crate::loom::sync::Arc;
#[cfg(feature = "blocking")]
-use crate::runtime::blocking::{Pool, PoolWaiter};
-#[cfg(feature = "rt-current-thread")]
-use crate::runtime::current_thread::CurrentThread;
-#[cfg(feature = "rt-full")]
-use crate::runtime::thread_pool;
+use crate::runtime::blocking;
use crate::runtime::{io, timer, Runtime};
use std::fmt;
-use std::sync::Arc;
/// Builds Tokio Runtime with custom configuration values.
///
@@ -57,10 +53,10 @@ pub struct Builder {
thread_stack_size: Option<usize>,
/// Callback to run after each thread starts.
- after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+ after_start: Option<Callback>,
/// To run before each worker thread stops
- before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
+ before_stop: Option<Callback>,
/// The clock to use
clock: timer::Clock,
@@ -75,6 +71,12 @@ enum Kind {
ThreadPool,
}
+#[cfg(not(loom))]
+type Callback = Arc<dyn Fn() + Send + Sync>;
+
+#[cfg(loom)]
+type Callback = Arc<Box<dyn Fn() + Send + Sync>>;
+
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
@@ -206,6 +208,7 @@ impl Builder {
/// .build();
/// # }
/// ```
+ #[cfg(not(loom))]
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
@@ -231,6 +234,7 @@ impl Builder {
/// .build();
/// # }
/// ```
+ #[cfg(not(loom))]
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
@@ -285,13 +289,13 @@ impl Builder {
net_handles,
timer_handles,
#[cfg(feature = "blocking")]
- blocking_pool: PoolWaiter::from(Pool::default()),
+ blocking_pool: self.build_blocking_pool().into(),
})
}
#[cfg(feature = "rt-current-thread")]
fn build_current_thread(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
+ use crate::runtime::{CurrentThread, Kind};
// Create network driver
let (net, handle) = io::create()?;
@@ -307,19 +311,19 @@ impl Builder {
let executor = CurrentThread::new(timer);
// Blocking pool
- let blocking_pool = PoolWaiter::from(Pool::default());
+ let blocking_pool = self.build_blocking_pool();
Ok(Runtime {
kind: Kind::CurrentThread(executor),
net_handles,
timer_handles,
- blocking_pool,
+ blocking_pool: blocking_pool.into(),
})
}
#[cfg(feature = "rt-full")]
fn build_threadpool(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
+ use crate::runtime::{Kind, ThreadPool};
use crate::timer::clock;
use std::sync::Mutex;
@@ -341,8 +345,8 @@ impl Builder {
// Get a handle to the clock for the runtime.
let clock = self.clock.clone();
- // Blocking pool
- let blocking_pool = PoolWaiter::from(Pool::default());
+ // Create the blocking pool
+ let blocking_pool = self.build_blocking_pool();
let pool = {
let net_handles = net_handles.clone();
@@ -351,48 +355,52 @@ impl Builder {
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
- let mut builder = thread_pool::Builder::new();
- builder.num_threads(self.num_threads);
- builder.name(&self.thread_name);
-
- if let Some(stack_size) = self.thread_stack_size {
- builder.stack_size(stack_size);
- }
-
- builder
- .around_worker(move |index, next| {
- // Configure the network driver
- let _net = io::set_default(&net_handles[index]);
-
- // Configure the clock
- clock::with_default(&clock, || {
- // Configure the timer
- let _timer = timer::set_default(&timer_handles[index]);
-
- // Call the start callback
- if let Some(after_start) = after_start.as_ref() {
- after_start();
- }
-
- // Run the worker
- next();
-
- // Call the after call back
- if let Some(before_stop) = before_stop.as_ref() {
- before_stop();
- }
- })
+ let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| {
+ // Configure the network driver
+ let _net = io::set_default(&net_handles[index]);
+
+ // Configure the clock
+ clock::with_default(&clock, || {
+ // Configure the timer
+ let _timer = timer::set_default(&timer_handles[index]);
+
+ // Call the start callback
+ if let Some(after_start) = after_start.as_ref() {
+ after_start();
+ }
+
+ // Run the worker
+ next();
+
+ // Call the after call back
+ if let Some(before_stop) = before_stop.as_ref() {
+ before_stop();
+ }
})
- .build(move |index| timers[index].lock().unwrap().take().unwrap())
+ })
+ as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>);
+
+ ThreadPool::new(
+ self.num_threads,
+ blocking_pool.clone(),
+ around_worker,
+ move |index| timers[index].lock().unwrap().take().unwrap(),
+ )
};
Ok(Runtime {
kind: Kind::ThreadPool(pool),
net_handles,
timer_handles,
- blocking_pool,
+ blocking_pool: blocking_pool.into(),
})
}
+
+ #[cfg(feature = "blocking")]
+ fn build_blocking_pool(&self) -> Arc<blocking::Pool> {
+ // Create the blocking pool
+ blocking::Pool::new(self.thread_name.clone(), self.thread_stack_size)
+ }
}
impl Default for Builder {
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index 9dbc857a..c8ff71c8 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -137,17 +137,21 @@ mod tests;
mod blocking;
#[cfg(feature = "blocking")]
pub mod blocking;
+#[cfg(feature = "blocking")]
+use crate::runtime::blocking::PoolWaiter;
mod builder;
pub use self::builder::Builder;
#[cfg(feature = "rt-current-thread")]
mod current_thread;
+#[cfg(feature = "rt-current-thread")]
+use self::current_thread::CurrentThread;
#[cfg(feature = "blocking")]
mod enter;
#[cfg(feature = "blocking")]
-pub(crate) use self::enter::enter;
+use self::enter::enter;
mod global;
pub use self::global::spawn;
@@ -171,13 +175,8 @@ mod timer;
#[cfg(feature = "rt-full")]
pub(crate) mod thread_pool;
-
-#[cfg(feature = "blocking")]
-use crate::runtime::blocking::PoolWaiter;
-#[cfg(feature = "rt-current-thread")]
-use crate::runtime::current_thread::CurrentThread;
#[cfg(feature = "rt-full")]
-use crate::runtime::thread_pool::ThreadPool;
+use self::thread_pool::ThreadPool;
#[cfg(feature = "blocking")]
use std::future::Future;
diff --git a/tokio/src/runtime/thread_pool/builder.rs b/tokio/src/runtime/thread_pool/builder.rs
deleted file mode 100644
index 04b5fa23..00000000
--- a/tokio/src/runtime/thread_pool/builder.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-use crate::loom::sync::Arc;
-use crate::loom::sys::num_cpus;
-use crate::runtime::park::Park;
-use crate::runtime::thread_pool::{shutdown, Spawner, ThreadPool, Worker};
-
-use std::{fmt, usize};
-
-/// Builds a thread pool with custom configuration values.
-pub(crate) struct Builder {
- /// Number of worker threads to spawn
- pool_size: usize,
-
- /// Thread name
- name: String,
-
- /// Thread stack size
- stack_size: Option<usize>,
-
- /// Around worker callback
- around_worker: Option<Callback>,
-}
-
-// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
-// loom doesn't support that because it requires CoerceUnsized, which is unstable
-type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
-
-impl Builder {
- /// Returns a new thread pool builder initialized with default configuration
- /// values.
- pub(crate) fn new() -> Builder {
- Builder {
- pool_size: num_cpus(),
- name: "tokio-runtime-worker".to_string(),
- stack_size: None,
- around_worker: None,
- }
- }
-
- /// Set the number of threads running async tasks.
- ///
- /// This must be a number between 1 and 2,048 though it is advised to keep
- /// this value on the smaller side.
- ///
- /// The default value is the number of cores available to the system.
- pub(crate) fn num_threads(&mut self, value: usize) -> &mut Self {
- self.pool_size = value;
- self
- }
-
- /// Set name of threads spawned by the scheduler
- ///
- /// If this configuration is not set, then the thread will use the system
- /// default naming scheme.
- pub(crate) fn name<S: Into<String>>(&mut self, val: S) -> &mut Self {
- self.name = val.into();
- self
- }
-
- /// Set the stack size (in bytes) for worker threads.
- ///
- /// The actual stack size may be greater than this value if the platform
- /// specifies minimal stack size.
- ///
- /// The default stack size for spawned threads is 2 MiB, though this
- /// particular stack size is subject to change in the future.
- pub(crate) fn stack_size(&mut self, val: usize) -> &mut Self {
- self.stack_size = Some(val);
- self
- }
-
- /// Execute function `f` on each worker thread.
- ///
- /// This function is provided a function that executes the worker and is
- /// expected to call it, otherwise the worker thread will shutdown without
- /// doing any work.
- pub(crate) fn around_worker<F>(&mut self, f: F) -> &mut Self
- where
- F: Fn(usize, &mut dyn FnMut()) + Send + Sync + 'static,
- {
- self.around_worker = Some(Arc::new(Box::new(f)));
- self
- }
-
- /// Create the configured `ThreadPool` with a custom `park` instances.
- ///
- /// The provided closure `build_park` is called once per worker and returns
- /// a `Park` instance that is used by the worker to put itself to sleep.
- pub(crate) fn build<F, P>(&self, mut build_park: F) -> ThreadPool
- where
- F: FnMut(usize) -> P,
- P: Park + Send + 'static,
- {
- use crate::runtime::thread_pool::worker;
-
- let (shutdown_tx, shutdown_rx) = shutdown::channel();
-
- let around_worker = self.around_worker.as_ref().map(Arc::clone);
- let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
- // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
- // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
- // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
- // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
- // method returns (and their copy of the Arc are dropped). In fact, we don't actually
- // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
- // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
- let shutdown_tx = shutdown_tx.clone();
- let around_worker = around_worker.as_ref().map(Arc::clone);
- Box::new(move || {
- struct AbortOnPanic;
-
- impl Drop for AbortOnPanic {
- fn drop(&mut self) {
- if std::thread::panicking() {
- eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
- std::process::abort();
- }
- }
- }
-
- let _abort_on_panic = AbortOnPanic;
- if let Some(cb) = around_worker.as_ref() {
- let idx = worker.id();
- let mut f = Some(move || worker.run());
- cb(idx, &mut || {
- (f.take()
- .expect("around_thread callback called closure twice"))(
- )
- })
- } else {
- worker.run()
- }
-
- // Dropping the handle must happen __after__ the callback
- drop(shutdown_tx);
- }) as Box<dyn FnOnce() + Send + 'static>
- })
- as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
-
- let mut blocking = crate::runtime::blocking::Builder::default();
- blocking.name(self.name.clone());
- if let Some(ss) = self.stack_size {
- blocking.stack_size(ss);
- }
- let blocking = Arc::new(blocking.build());
-
- let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
- self.pool_size,
- |i| Box::new(BoxedPark::new(build_park(i))),
- Arc::clone(&launch_worker),
- blocking.clone(),
- );
-
- // Spawn threads for each worker
- for worker in workers {
- crate::runtime::blocking::Pool::spawn(&blocking, launch_worker(worker))
- }
-
- let spawner = Spawner::new(pool);
- let blocking = crate::runtime::blocking::PoolWaiter::from(blocking);
- ThreadPool::from_parts(spawner, shutdown_rx, blocking)
- }
-}
-
-impl Default for Builder {
- fn default() -> Builder {
- Builder::new()
- }
-}
-
-impl fmt::Debug for Builder {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Builder")
- .field("pool_size", &self.pool_size)
- .field("name", &self.name)
- .field("stack_size", &self.stack_size)
- .finish()
- }
-}
-
-pub(crate) struct BoxedPark<P> {
- inner: P,
-}
-
-impl<P> BoxedPark<P> {
- pub(crate) fn new(inner: P) -> Self {
- BoxedPark { inner }
- }
-}
-
-impl<P> Park for BoxedPark<P>
-where
- P: Park,
-{
- type Unpark = Box<dyn crate::runtime::park::Unpark>;
- type Error = P::Error;
-
- fn unpark(&self) -> Self::Unpark {
- Box::new(self.inner.unpark())
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.inner.park()
- }
-
- fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
- self.inner.park_timeout(duration)
- }
-}
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 5dce2a10..314942d3 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -1,8 +1,5 @@
//! Threadpool
-mod builder;
-pub(crate) use self::builder::Builder;
-
mod current;
mod idle;
@@ -11,9 +8,6 @@ use self::idle::Idle;
mod owned;
use self::owned::Owned;
-mod pool;
-pub(crate) use self::pool::ThreadPool;
-
mod queue;
mod spawner;
@@ -44,3 +38,187 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
// time.
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
+
+use crate::loom::sync::Arc;
+use crate::runtime::blocking::{self, PoolWaiter};
+use crate::runtime::task::JoinHandle;
+use crate::runtime::Park;
+
+use std::fmt;
+use std::future::Future;
+
+/// Work-stealing based thread pool for executing futures.
+pub(crate) struct ThreadPool {
+ spawner: Spawner,
+
+ /// Shutdown waiter
+ shutdown_rx: shutdown::Receiver,
+
+ /// Shutdown valve for Pool
+ blocking: PoolWaiter,
+}
+
+// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
+// loom doesn't support that because it requires CoerceUnsized, which is
+// unstable
+type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
+
+impl ThreadPool {
+ pub(crate) fn new<F, P>(
+ pool_size: usize,
+ blocking_pool: Arc<blocking::Pool>,
+ around_worker: Callback,
+ mut build_park: F,
+ ) -> ThreadPool
+ where
+ F: FnMut(usize) -> P,
+ P: Park + Send + 'static,
+ {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
+ // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
+ // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
+ // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
+ // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
+ // method returns (and their copy of the Arc are dropped). In fact, we don't actually
+ // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
+ // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
+ let shutdown_tx = shutdown_tx.clone();
+ let around_worker = around_worker.clone();
+
+ Box::new(move || {
+ struct AbortOnPanic;
+
+ impl Drop for AbortOnPanic {
+ fn drop(&mut self) {
+ if std::thread::panicking() {
+ eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
+ std::process::abort();
+ }
+ }
+ }
+
+ let _abort_on_panic = AbortOnPanic;
+
+ let idx = worker.id();
+ let mut f = Some(move || worker.run());
+ around_worker(idx, &mut || {
+ (f.take()
+ .expect("around_thread callback called closure twice"))(
+ )
+ });
+
+ // Dropping the handle must happen __after__ the callback
+ drop(shutdown_tx);
+ }) as Box<dyn FnOnce() + Send + 'static>
+ })
+ as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
+
+ let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
+ pool_size,
+ |i| Box::new(BoxedPark::new(build_park(i))),
+ Arc::clone(&launch_worker),
+ blocking_pool.clone(),
+ );
+
+ // Spawn threads for each worker
+ for worker in workers {
+ crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker))
+ }
+
+ let spawner = Spawner::new(pool);
+ let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool);
+
+ // ThreadPool::from_parts(spawner, shutdown_rx, blocking)
+ ThreadPool {
+ spawner,
+ shutdown_rx,
+ blocking,
+ }
+ }
+
+ /// Returns reference to `Spawner`.
+ ///
+ /// The `Spawner` handle can be cloned and enables spawning tasks from other
+ /// threads.
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ /// Spawn a task
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.spawner.spawn(future)
+ }
+
+ /// Block the current thread waiting for the future to complete.
+ ///
+ /// The future will execute on the current thread, but all spawned tasks
+ /// will be executed on the thread pool.
+ pub(crate) fn block_on<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ crate::runtime::global::with_thread_pool(self.spawner(), || {
+ let mut enter = crate::runtime::enter();
+ crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || {
+ enter.block_on(future)
+ })
+ })
+ }
+
+ /// Shutdown the thread pool.
+ pub(crate) fn shutdown_now(&mut self) {
+ if self.spawner.workers().close() {
+ self.shutdown_rx.wait();
+ }
+ self.blocking.shutdown();
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool").finish()
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.shutdown_now();
+ }
+}
+
+// TODO: delete?
+pub(crate) struct BoxedPark<P> {
+ inner: P,
+}
+
+impl<P> BoxedPark<P> {
+ pub(crate) fn new(inner: P) -> Self {
+ BoxedPark { inner }
+ }
+}
+
+impl<P> Park for BoxedPark<P>
+where
+ P: Park,
+{
+ type Unpark = Box<dyn crate::runtime::park::Unpark>;
+ type Error = P::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ Box::new(self.inner.unpark())
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+}
diff --git a/tokio/src/runtime/thread_pool/pool.rs b/tokio/src/runtime/thread_pool/pool.rs
deleted file mode 100644
index a6ef4346..00000000
--- a/tokio/src/runtime/thread_pool/pool.rs
+++ /dev/null
@@ -1,84 +0,0 @@
-use crate::runtime::blocking::PoolWaiter;
-use crate::runtime::task::JoinHandle;
-use crate::runtime::thread_pool::{shutdown, Spawner};
-
-use std::fmt;
-use std::future::Future;
-
-/// Work-stealing based thread pool for executing futures.
-pub(crate) struct ThreadPool {
- spawner: Spawner,
-
- /// Shutdown waiter
- shutdown_rx: shutdown::Receiver,
-
- /// Shutdown valve for Pool
- blocking: PoolWaiter,
-}
-
-impl ThreadPool {
- pub(super) fn from_parts(
- spawner: Spawner,
- shutdown_rx: shutdown::Receiver,
- blocking: PoolWaiter,
- ) -> ThreadPool {
- ThreadPool {
- spawner,
- shutdown_rx,
- blocking,
- }
- }
-
- /// Returns reference to `Spawner`.
- ///
- /// The `Spawner` handle can be cloned and enables spawning tasks from other
- /// threads.
- pub(crate) fn spawner(&self) -> &Spawner {
- &self.spawner
- }
-
- /// Spawn a task
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- self.spawner.spawn(future)
- }
-
- /// Block the current thread waiting for the future to complete.
- ///
- /// The future will execute on the current thread, but all spawned tasks
- /// will be executed on the thread pool.
- pub(crate) fn block_on<F>(&self, future: F) -> F::Output
- where
- F: Future,
- {
- crate::runtime::global::with_thread_pool(self.spawner(), || {
- let mut enter = crate::runtime::enter();
- crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || {
- enter.block_on(future)
- })
- })
- }
-
- /// Shutdown the thread pool.
- pub(crate) fn shutdown_now(&mut self) {
- if self.spawner.workers().close() {
- self.shutdown_rx.wait();
- }
- self.blocking.shutdown();
- }
-}
-
-impl fmt::Debug for ThreadPool {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("ThreadPool").finish()
- }
-}
-
-impl Drop for ThreadPool {
- fn drop(&mut self) {
- self.shutdown_now();
- }
-}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index b7be932c..5eb166ce 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,5 +1,5 @@
use crate::runtime::tests::loom_oneshot as oneshot;
-use crate::runtime::thread_pool::{self, Builder};
+use crate::runtime::thread_pool::{self, ThreadPool};
use crate::runtime::{Park, Unpark};
use crate::spawn;
@@ -13,8 +13,7 @@ use std::time::Duration;
#[test]
fn pool_multi_spawn() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
-
+ let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
@@ -47,7 +46,7 @@ fn pool_multi_spawn() {
#[test]
fn only_blocking() {
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
+ let mut pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
@@ -65,7 +64,7 @@ fn only_blocking() {
fn blocking_and_regular() {
const NUM: usize = 3;
loom::model(|| {
- let mut pool = Builder::new().num_threads(1).build(|_| LoomPark::new());
+ let mut pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
@@ -99,7 +98,7 @@ fn blocking_and_regular() {
#[test]
fn pool_multi_notify() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
@@ -135,7 +134,7 @@ fn pool_multi_notify() {
#[test]
fn pool_shutdown() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
pool.spawn(async move {
gated2(true).await;
@@ -152,7 +151,7 @@ fn pool_shutdown() {
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
- let pool = Builder::new().build(|_| LoomPark::new());
+ let pool = mk_pool(2);
pool.block_on(async {
// Spin hard
@@ -167,6 +166,17 @@ fn complete_block_on_under_load() {
});