summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-12 15:23:40 -0800
committerGitHub <noreply@github.com>2019-11-12 15:23:40 -0800
commit27e5b41067d01c0c9fac230c5addb58034201a63 (patch)
treef9bd8333dfe1853dfe1d8710b4dc966bd8555d54 /tokio/src/runtime/thread_pool
parente3df2eafd32e6f813d08617f0e2cd7abbc05c2b1 (diff)
reorganize modules (#1766)
This patch started as an effort to make `time::Timer` private. However, in an effort to get the build compiling again, more and more changes were made. This probably should have been broken up, but here we are. I will attempt to summarize the changes here. * Feature flags are reorganized to make clearer. `net-driver` becomes `io-driver`. `rt-current-thread` becomes `rt-core`. * The `Runtime` can be created without any executor. This replaces `enter`. It also allows creating I/O / time drivers that are standalone. * `tokio::timer` is renamed to `tokio::time`. This brings it in line with `std`. * `tokio::timer::Timer` is renamed to `Driver` and made private. * The `clock` module is removed. Instead, an `Instant` type is provided. This type defaults to calling `std::time::Instant`. A `test-util` feature flag can be used to enable hooking into time. * The `blocking` module is moved to the top level and is cleaned up. * The `task` module is moved to the top level. * The thread-pool's in-place blocking implementation is cleaned up. * `runtime::Spawner` is renamed to `runtime::Handle` and can be used to "enter" a runtime context.
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r--tokio/src/runtime/thread_pool/current.rs6
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs69
-rw-r--r--tokio/src/runtime/thread_pool/owned.rs11
-rw-r--r--tokio/src/runtime/thread_pool/queue/global.rs2
-rw-r--r--tokio/src/runtime/thread_pool/queue/inject.rs2
-rw-r--r--tokio/src/runtime/thread_pool/queue/local.rs2
-rw-r--r--tokio/src/runtime/thread_pool/queue/worker.rs2
-rw-r--r--tokio/src/runtime/thread_pool/shared.rs34
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs (renamed from tokio/src/runtime/thread_pool/set.rs)28
-rw-r--r--tokio/src/runtime/thread_pool/spawner.rs14
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs45
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_queue.rs4
-rw-r--r--tokio/src/runtime/thread_pool/tests/mod.rs3
-rw-r--r--tokio/src/runtime/thread_pool/tests/pool.rs11
-rw-r--r--tokio/src/runtime/thread_pool/tests/queue.rs4
-rw-r--r--tokio/src/runtime/thread_pool/tests/worker.rs77
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs691
17 files changed, 449 insertions, 556 deletions
diff --git a/tokio/src/runtime/thread_pool/current.rs b/tokio/src/runtime/thread_pool/current.rs
index 1a8113e3..1ab83c54 100644
--- a/tokio/src/runtime/thread_pool/current.rs
+++ b/tokio/src/runtime/thread_pool/current.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
use crate::runtime::park::Unpark;
-use crate::runtime::thread_pool::{worker, Owned};
+use crate::runtime::thread_pool::{slice, Owned};
use std::cell::Cell;
use std::ptr;
@@ -23,7 +23,7 @@ struct Inner {
// Pointer to the current worker info
thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new()));
-pub(super) fn set<F, R, P>(pool: &Arc<worker::Set<P>>, index: usize, f: F) -> R
+pub(super) fn set<F, R, P>(pool: &Arc<slice::Set<P>>, index: usize, f: F) -> R
where
F: FnOnce() -> R,
P: Unpark,
@@ -65,7 +65,7 @@ where
}
impl Current {
- pub(super) fn as_member<'a, P>(&self, set: &'a worker::Set<P>) -> Option<&'a Owned<P>>
+ pub(super) fn as_member<'a, P>(&self, set: &'a slice::Set<P>) -> Option<&'a Owned<P>>
where
P: Unpark,
{
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 314942d3..599ce548 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -13,7 +13,7 @@ mod queue;
mod spawner;
pub(crate) use self::spawner::Spawner;
-mod set;
+mod slice;
mod shared;
use self::shared::Shared;
@@ -21,10 +21,8 @@ use self::shared::Shared;
mod shutdown;
mod worker;
-use self::worker::Worker;
-#[cfg(feature = "blocking")]
-pub(crate) use worker::blocking;
+pub(crate) use worker::block_in_place;
/// Unit tests
#[cfg(test)]
@@ -39,10 +37,10 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
+use crate::blocking;
use crate::loom::sync::Arc;
-use crate::runtime::blocking::{self, PoolWaiter};
-use crate::runtime::task::JoinHandle;
use crate::runtime::Park;
+use crate::task::JoinHandle;
use std::fmt;
use std::future::Future;
@@ -53,9 +51,6 @@ pub(crate) struct ThreadPool {
/// Shutdown waiter
shutdown_rx: shutdown::Receiver,
-
- /// Shutdown valve for Pool
- blocking: PoolWaiter,
}
// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
@@ -66,7 +61,7 @@ type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
impl ThreadPool {
pub(crate) fn new<F, P>(
pool_size: usize,
- blocking_pool: Arc<blocking::Pool>,
+ blocking_pool: blocking::Spawner,
around_worker: Callback,
mut build_park: F,
) -> ThreadPool
@@ -76,65 +71,24 @@ impl ThreadPool {
{
let (shutdown_tx, shutdown_rx) = shutdown::channel();
- let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
- // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
- // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
- // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
- // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
- // method returns (and their copy of the Arc are dropped). In fact, we don't actually
- // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
- // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
- let shutdown_tx = shutdown_tx.clone();
- let around_worker = around_worker.clone();
-
- Box::new(move || {
- struct AbortOnPanic;
-
- impl Drop for AbortOnPanic {
- fn drop(&mut self) {
- if std::thread::panicking() {
- eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported.");
- std::process::abort();
- }
- }
- }
-
- let _abort_on_panic = AbortOnPanic;
-
- let idx = worker.id();
- let mut f = Some(move || worker.run());
- around_worker(idx, &mut || {
- (f.take()
- .expect("around_thread callback called closure twice"))(
- )
- });
-
- // Dropping the handle must happen __after__ the callback
- drop(shutdown_tx);
- }) as Box<dyn FnOnce() + Send + 'static>
- })
- as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
-
let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
pool_size,
- |i| Box::new(BoxedPark::new(build_park(i))),
- Arc::clone(&launch_worker),
+ |i| BoxedPark::new(build_park(i)),
blocking_pool.clone(),
+ around_worker,
+ shutdown_tx,
);
// Spawn threads for each worker
for worker in workers {
- crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker))
+ blocking_pool.spawn_background(|| worker.run());
}
let spawner = Spawner::new(pool);
- let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool);
- // ThreadPool::from_parts(spawner, shutdown_rx, blocking)
ThreadPool {
spawner,
shutdown_rx,
- blocking,
}
}
@@ -165,9 +119,7 @@ impl ThreadPool {
{
crate::runtime::global::with_thread_pool(self.spawner(), || {
let mut enter = crate::runtime::enter();
- crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || {
- enter.block_on(future)
- })
+ enter.block_on(future)
})
}
@@ -176,7 +128,6 @@ impl ThreadPool {
if self.spawner.workers().close() {
self.shutdown_rx.wait();
}
- self.blocking.shutdown();
}
}
diff --git a/tokio/src/runtime/thread_pool/owned.rs b/tokio/src/runtime/thread_pool/owned.rs
index 04a2f931..88284d5e 100644
--- a/tokio/src/runtime/thread_pool/owned.rs
+++ b/tokio/src/runtime/thread_pool/owned.rs
@@ -1,5 +1,6 @@
-use crate::runtime::task::{self, Task};
+use crate::loom::sync::atomic::AtomicUsize;
use crate::runtime::thread_pool::{queue, Shared};
+use crate::task::{self, Task};
use crate::util::FastRand;
use std::cell::Cell;
@@ -7,6 +8,13 @@ use std::cell::Cell;
/// Per-worker data accessible only by the thread driving the worker.
#[derive(Debug)]
pub(super) struct Owned<P: 'static> {
+ /// Worker generation. This guards concurrent access to the `Owned` struct.
+ /// When a worker starts running, it checks that the generation it has
+ /// assigned matches the current generation. When it does, the worker has
+ /// obtained unique access to the struct. When it fails, another thread has
+ /// gained unique access.
+ pub(super) generation: AtomicUsize,
+
/// Worker tick number. Used to schedule bookkeeping tasks every so often.
pub(super) tick: Cell<u16>,
@@ -40,6 +48,7 @@ where
{
pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> {
Owned {
+ generation: AtomicUsize::new(0),
tick: Cell::new(1),
is_running: Cell::new(true),
is_searching: Cell::new(false),
diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs
index edac6ede..931b76a6 100644
--- a/tokio/src/runtime/thread_pool/queue/global.rs
+++ b/tokio/src/runtime/thread_pool/queue/global.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
-use crate::runtime::task::{Header, Task};
+use crate::task::{Header, Task};
use std::marker::PhantomData;
use std::ptr::{self, NonNull};
diff --git a/tokio/src/runtime/thread_pool/queue/inject.rs b/tokio/src/runtime/thread_pool/queue/inject.rs
index f0f92fb2..1a2d047c 100644
--- a/tokio/src/runtime/thread_pool/queue/inject.rs
+++ b/tokio/src/runtime/thread_pool/queue/inject.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
-use crate::runtime::task::Task;
use crate::runtime::thread_pool::queue::Cluster;
+use crate::task::Task;
pub(crate) struct Inject<T: 'static> {
cluster: Arc<Cluster<T>>,
diff --git a/tokio/src/runtime/thread_pool/queue/local.rs b/tokio/src/runtime/thread_pool/queue/local.rs
index 14f34832..78b26dac 100644
--- a/tokio/src/runtime/thread_pool/queue/local.rs
+++ b/tokio/src/runtime/thread_pool/queue/local.rs
@@ -1,8 +1,8 @@
use crate::loom::cell::{CausalCell, CausalCheck};
use crate::loom::sync::atomic::{self, AtomicU32};
-use crate::runtime::task::Task;
use crate::runtime::thread_pool::queue::global;
use crate::runtime::thread_pool::LOCAL_QUEUE_CAPACITY;
+use crate::task::Task;
use std::fmt;
use std::mem::MaybeUninit;
diff --git a/tokio/src/runtime/thread_pool/queue/worker.rs b/tokio/src/runtime/thread_pool/queue/worker.rs
index 67a2a1b8..f9415669 100644
--- a/tokio/src/runtime/thread_pool/queue/worker.rs
+++ b/tokio/src/runtime/thread_pool/queue/worker.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
-use crate::runtime::task::Task;
use crate::runtime::thread_pool::queue::{local, Cluster, Inject};
+use crate::task::Task;
use std::cell::Cell;
use std::fmt;
diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs
index e0a1987e..99981151 100644
--- a/tokio/src/runtime/thread_pool/shared.rs
+++ b/tokio/src/runtime/thread_pool/shared.rs
@@ -1,6 +1,6 @@
use crate::runtime::park::Unpark;
-use crate::runtime::task::{self, Schedule, Task};
-use crate::runtime::thread_pool::worker;
+use crate::runtime::thread_pool::slice;
+use crate::task::{self, Schedule, Task};
use std::ptr;
@@ -24,13 +24,9 @@ where
/// Untracked pointer to the pool.
///
- /// The pool itself is tracked by an `Arc`, but this pointer is not included
- /// in the ref count.
- ///
- /// # Safety
- ///
- /// `Worker` instances are stored in the `Pool` and are never removed.
- set: *const worker::Set<P>,
+ /// The slice::Set itself is tracked by an `Arc`, but this pointer is not
+ /// included in the ref count.
+ slices: *const slice::Set<P>,
}
unsafe impl<P: Unpark> Send for Shared<P> {}
@@ -44,24 +40,24 @@ where
Shared {
unpark,
pending_drop: task::TransferStack::new(),
- set: ptr::null(),
+ slices: ptr::null(),
}
}
pub(crate) fn schedule(&self, task: Task<Self>) {
- self.set().schedule(task);
+ self.slices().schedule(task);
}
pub(super) fn unpark(&self) {
self.unpark.unpark();
}
- pub(super) fn set_container_ptr(&mut self, set: *const worker::Set<P>) {
- self.set = set;
+ fn slices(&self) -> &slice::Set<P> {
+ unsafe { &*self.slices }
}
- fn set(&self) -> &worker::Set<P> {
- unsafe { &*self.set }
+ pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set<P>) {
+ self.slices = slices;
}
}
@@ -73,8 +69,8 @@ where
// Get access to the Owned component. This function can only be called
// when on the worker.
unsafe {
- let index = self.set().index_of(self);
- let owned = &mut *self.set().owned()[index].get();
+ let index = self.slices().index_of(self);
+ let owned = &mut *self.slices().owned()[index].get();
owned.bind_task(task);
}
@@ -91,8 +87,8 @@ where
// Get access to the Owned component. This function can only be called
// when on the worker.
unsafe {
- let index = self.set().index_of(self);
- let owned = &mut *self.set().owned()[index].get();
+ let index = self.slices().index_of(self);
+ let owned = &mut *self.slices().owned()[index].get();
owned.release_task(task);
}
diff --git a/tokio/src/runtime/thread_pool/set.rs b/tokio/src/runtime/thread_pool/slice.rs
index 73555f82..1a0bd381 100644
--- a/tokio/src/runtime/thread_pool/set.rs
+++ b/tokio/src/runtime/thread_pool/slice.rs
@@ -1,12 +1,11 @@
-//! Putting a worker to sleep.
-//!
-//! - Attempt to spin.
+//! The scheduler is divided into multiple slices. Each slice is fairly
+//! isolated, having its own queue. A worker is dedicated to processing a single
+//! slice.
use crate::loom::rand::seed;
-use crate::loom::sync::Arc;
use crate::runtime::park::Unpark;
-use crate::runtime::task::{self, JoinHandle, Task};
use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared};
+use crate::task::{self, JoinHandle, Task};
use crate::util::{CachePadded, FastRand};
use std::cell::UnsafeCell;
@@ -27,9 +26,6 @@ where
/// Coordinates idle workers
idle: Idle,
-
- /// Pool where blocking tasks should be spawned.
- pub(crate) blocking: Arc<crate::runtime::blocking::Pool>,
}
unsafe impl<P: Unpark> Send for Set<P> {}
@@ -40,11 +36,7 @@ where
P: Unpark,
{
/// Create a new worker set using the provided queues.
- pub(crate) fn new<F>(
- num_workers: usize,
- mut mk_unpark: F,
- blocking: Arc<crate::runtime::blocking::Pool>,
- ) -> Self
+ pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self
where
F: FnMut(usize) -> P,
{
@@ -69,7 +61,7 @@ where
owned: owned.into_boxed_slice(),
inject,
idle: Idle::new(num_workers),
- blocking,
+ // blocking,
}
}
@@ -112,10 +104,6 @@ where
self.schedule(task);
}
- pub(super) fn blocking_pool(&self) -> &Arc<crate::runtime::blocking::Pool> {
- &self.blocking
- }
-
pub(crate) fn schedule(&self, task: Task<Shared<P>>) {
current::get(|current_worker| match current_worker.as_member(self) {
Some(worker) => {
@@ -129,10 +117,10 @@ where
})
}
- pub(crate) fn set_container_ptr(&mut self) {
+ pub(crate) fn set_ptr(&mut self) {
let ptr = self as *const _;
for shared in &mut self.shared[..] {
- shared.set_container_ptr(ptr);
+ shared.set_slices_ptr(ptr);
}
}
diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs
index dd80f4a6..b7031c43 100644
--- a/tokio/src/runtime/thread_pool/spawner.rs
+++ b/tokio/src/runtime/thread_pool/spawner.rs
@@ -1,7 +1,7 @@
use crate::loom::sync::Arc;
use crate::runtime::park::Unpark;
-use crate::runtime::task::JoinHandle;
-use crate::runtime::thread_pool::worker;
+use crate::runtime::thread_pool::slice;
+use crate::task::JoinHandle;
use std::fmt;
use std::future::Future;
@@ -20,11 +20,11 @@ use std::future::Future;
/// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner
#[derive(Clone)]
pub(crate) struct Spawner {
- workers: Arc<worker::Set<Box<dyn Unpark>>>,
+ workers: Arc<slice::Set<Box<dyn Unpark>>>,
}
impl Spawner {
- pub(super) fn new(workers: Arc<worker::Set<Box<dyn Unpark>>>) -> Spawner {
+ pub(super) fn new(workers: Arc<slice::Set<Box<dyn Unpark>>>) -> Spawner {
Spawner { workers }
}
@@ -45,12 +45,8 @@ impl Spawner {
self.workers.spawn_background(future);
}
- pub(super) fn blocking_pool(&self) -> &Arc<crate::runtime::blocking::Pool> {
- self.workers.blocking_pool()
- }
-
/// Reference to the worker set. Used by `ThreadPool` to initiate shutdown.
- pub(super) fn workers(&self) -> &worker::Set<Box<dyn Unpark>> {
+ pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> {
&*self.workers
}
}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index 5eb166ce..065d515e 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,5 +1,5 @@
use crate::runtime::tests::loom_oneshot as oneshot;
-use crate::runtime::thread_pool::{self, ThreadPool};
+use crate::runtime::thread_pool::ThreadPool;
use crate::runtime::{Park, Unpark};
use crate::spawn;
@@ -50,7 +50,7 @@ fn only_blocking() {
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
- thread_pool::blocking(move || {
+ crate::blocking::in_place(move || {
block_tx.send(());
})
});
@@ -72,7 +72,7 @@ fn blocking_and_regular() {
let done_tx = Arc::new(Mutex::new(Some(done_tx)));
pool.spawn(async move {
- thread_pool::blocking(move || {
+ crate::blocking::in_place(move || {
block_tx.send(());
})
});
@@ -166,15 +166,21 @@ fn complete_block_on_under_load() {
});
}
-fn mk_pool(num_threads: usize) -> ThreadPool {
- use crate::runtime::blocking;
+fn mk_pool(num_threads: usize) -> Runtime {
+ use crate::blocking::BlockingPool;
- ThreadPool::new(
+ let blocking_pool = BlockingPool::new("test".into(), None);
+ let executor = ThreadPool::new(
num_threads,
- blocking::Pool::new("test".into(), None),
+ blocking_pool.spawner().clone(),
Arc::new(Box::new(|_, next| next())),
move |_| LoomPark::new(),
- )
+ );
+
+ Runtime {
+ executor,
+ blocking_pool,
+ }
}
use futures::future::poll_fn;
@@ -235,6 +241,29 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> {
})
}
+/// Fake runtime
+struct Runtime {
+ executor: ThreadPool,
+ #[allow(dead_code)]
+ blocking_pool: crate::blocking::BlockingPool,
+}
+
+use std::ops;
+
+impl ops::Deref for Runtime {
+ type Target = ThreadPool;
+
+ fn deref(&self) -> &ThreadPool {
+ &self.executor
+ }
+}
+
+impl ops::DerefMut for Runtime {
+ fn deref_mut(&mut self) -> &mut ThreadPool {
+ &mut self.executor
+ }
+}
+
struct LoomPark {
notify: Arc<Notify>,
}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_queue.rs b/tokio/src/runtime/thread_pool/tests/loom_queue.rs
index b7c86f6c..d0598c3e 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_queue.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_queue.rs
@@ -1,6 +1,6 @@
-use crate::runtime::task::{self, Task};
-use crate::runtime::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::runtime::thread_pool::queue;
+use crate::task::{self, Task};
+use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use loom::thread;
diff --git a/tokio/src/runtime/thread_pool/tests/mod.rs b/tokio/src/runtime/thread_pool/tests/mod.rs
index 24578e2f..dc1d3158 100644
--- a/tokio/src/runtime/thread_pool/tests/mod.rs
+++ b/tokio/src/runtime/thread_pool/tests/mod.rs
@@ -9,6 +9,3 @@ mod pool;
#[cfg(not(loom))]
mod queue;
-
-#[cfg(not(loom))]
-mod worker;
diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs
index 6e753b35..c11281f0 100644
--- a/tokio/src/runtime/thread_pool/tests/pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/pool.rs
@@ -1,7 +1,8 @@
#![warn(rust_2018_idioms)]
+use crate::blocking;
use crate::runtime::thread_pool::ThreadPool;
-use crate::runtime::{blocking, Park, Unpark};
+use crate::runtime::{Park, Unpark};
use futures_util::future::poll_fn;
use std::future::Future;
@@ -63,9 +64,11 @@ fn eagerly_drops_futures() {
let (park_tx, park_rx) = mpsc::sync_channel(0);
let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
+ let blocking_pool = blocking::BlockingPool::new("test".into(), None);
+
let pool = ThreadPool::new(
4,
- blocking::Pool::new("test".into(), None),
+ blocking_pool.spawner().clone(),
Arc::new(Box::new(|_, next| next())),
move |_| {
let (tx, rx) = mpsc::channel();
@@ -166,9 +169,11 @@ fn park_called_at_interval() {
let (done_tx, done_rx) = mpsc::channel();
+ let blocking_pool = blocking::BlockingPool::new("test".into(), None);
+
let pool = ThreadPool::new(
1,
- blocking::Pool::new("test".into(), None),
+ blocking_pool.spawner().clone(),
Arc::new(Box::new(|_, next| next())),
move |idx| {
assert_eq!(idx, 0);
diff --git a/tokio/src/runtime/thread_pool/tests/queue.rs b/tokio/src/runtime/thread_pool/tests/queue.rs
index 86f32ed2..7c0a65d5 100644
--- a/tokio/src/runtime/thread_pool/tests/queue.rs
+++ b/tokio/src/runtime/thread_pool/tests/queue.rs
@@ -1,6 +1,6 @@
-use crate::runtime::task::{self, Task};
-use crate::runtime::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::runtime::thread_pool::{queue, LOCAL_QUEUE_CAPACITY};
+use crate::task::{self, Task};
+use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
macro_rules! assert_pop {
($q:expr, $expect:expr) => {
diff --git a/tokio/src/runtime/thread_pool/tests/worker.rs b/tokio/src/runtime/thread_pool/tests/worker.rs
deleted file mode 100644
index 91ec5804..00000000
--- a/tokio/src/runtime/thread_pool/tests/worker.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-use crate::runtime::blocking;
-use crate::runtime::tests::track_drop::track_drop;
-use crate::runtime::thread_pool;
-
-use tokio_test::assert_ok;
-
-use std::sync::Arc;
-
-macro_rules! pool {
- (2) => {{
- let (pool, mut w, mock_park) = pool!(!2);
- (pool, w.remove(0), w.remove(0), mock_park)
- }};
- (! $n:expr) => {{
- let mut mock_park = crate::runtime::tests::mock_park::MockPark::new();
- let blocking = blocking::Pool::new("test".into(), None);
- let (pool, workers) = thread_pool::worker::create_set(
- $n,
- |index| Box::new(mock_park.mk_park(index)),
- Arc::new(Box::new(|_| {
- unreachable!("attempted to move worker during non-blocking test")
- })),
- blocking,
- );
- (pool, workers, mock_park)
- }};
-}
-
-macro_rules! enter {
- ($w:expr, $expr:expr) => {{
- $w.enter(move || $expr);
- }};
-}
-
-#[test]
-fn execute_single_task() {
- use std::sync::mpsc;
-
- let (p, mut w0, _w1, ..) = pool!(2);
- let (tx, rx) = mpsc::channel();
-
- enter!(w0, p.spawn_background(async move { tx.send(1).unwrap() }));
-
- w0.tick();
-
- assert_ok!(rx.try_recv());
-}
-
-#[test]
-fn task_migrates() {
- use crate::sync::oneshot;
- use std::sync::mpsc;
-
- let (p, mut w0, mut w1, ..) = pool!(2);
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = mpsc::channel();
-
- let (task, did_drop) = track_drop(async move {
- let msg = rx1.await.unwrap();
- tx2.send(msg).unwrap();
- });
-
- enter!(w0, p.spawn_background(task));
-
- w0.tick();
- w1.enter(|| tx1.send("hello").unwrap());
-
- w1.tick();
- assert_ok!(rx2.try_recv());
-
- // Future drops immediately even though the underlying task is not freed
- assert!(did_drop.did_drop_future());
- assert!(did_drop.did_drop_output());
-
- // Tick the spawning worker in order to free memory
- w0.tick();
-}
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index 5abdba24..2de2101e 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -1,23 +1,21 @@
+use crate::blocking;
+use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
use crate::runtime::park::{Park, Unpark};
-use crate::runtime::task::Task;
-use crate::runtime::thread_pool::{current, Owned, Shared, Spawner};
+use crate::runtime::thread_pool::{current, shutdown, slice, Callback, Owned, Shared, Spawner};
+use crate::task::Task;
use std::cell::Cell;
-use std::ops::{Deref, DerefMut};
+use std::marker::PhantomData;
+use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
-// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
-// loom doesn't support that because it requires CoerceUnsized, which is unstable
-type LaunchWorker<P> = Arc<Box<dyn Fn(Worker<P>) -> Box<dyn FnOnce() + Send> + Send + Sync>>;
-
thread_local! {
- /// Thread-local tracking the current executor
- static ON_BLOCK: Cell<Option<*mut dyn FnMut()>> = Cell::new(None)
+ /// Used to handle block_in_place
+ static ON_BLOCK: Cell<Option<*const dyn Fn()>> = Cell::new(None)
}
-#[cfg(feature = "blocking")]
-pub(crate) fn blocking<F, R>(f: F) -> R
+pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
@@ -30,60 +28,100 @@ where
// This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps
// the worker's operation, and is unset just prior to when the FnMut is dropped.
- let allow_blocking = unsafe { &mut *allow_blocking };
+ let allow_blocking = unsafe { &*allow_blocking };
allow_blocking();
f()
})
}
-// TODO: remove this re-export
-pub(super) use crate::runtime::thread_pool::set::Set;
-
pub(crate) struct Worker<P: Park + 'static> {
- /// Entry in the set of workers.
- entry: Entry<P::Unpark>,
+ /// Parks the thread. Requires the calling worker to have obtained unique
+ /// access via the generation synchronization action.
+ inner: Arc<Inner<P>>,
- /// Park the thread
- park: Box<P>,
+ /// Scheduler slices
+ slices: Arc<slice::Set<P::Unpark>>,
+
+ /// Slice assigned to this worker
+ index: usize,
- /// Fn for launching another Worker should we need it
- launch_worker: LaunchWorker<P>,
+ /// Handle to the blocking pool
+ blocking_pool: blocking::Spawner,
+
+ /// Run before calling worker logic
+ around_worker: Callback,
+
+ /// Worker generation. This is used to synchronize access to the internal
+ /// data.
+ generation: usize,
/// To indicate that the Worker has been given away and should no longer be used
gone: Cell<bool>,
}
+/// Internal worker state. This may be referenced from multiple threads, but the
+/// generation guard protects unsafe access
+struct Inner<P: Park + 'static> {
+ /// Used to park the thread
+ park: CausalCell<P>,
+
+ /// Only held so that the scheduler can be signaled on shutdown.
+ shutdown_tx: shutdown::Sender,
+}
+
+// TODO: clean up
+unsafe impl<P: Park + Send + 'static> Send for Worker<P> {}
+
+/// Used to ensure the invariants are respected
+struct GenerationGuard<'a, P: Park + 'static> {
+ /// Worker reference
+ worker: &'a Worker<P>,
+
+ /// Prevent `Sync` access
+ _p: PhantomData<Cell<()>>,
+}
+
+struct WorkerGone;
+
+// TODO: Move into slices
pub(super) fn create_set<F, P>(
pool_size: usize,
mk_park: F,
- launch_worker: LaunchWorker<P>,
- blocking: Arc<crate::runtime::blocking::Pool>,
-) -> (Arc<Set<P::Unpark>>, Vec<Worker<P>>)
+ blocking_pool: blocking::Spawner,
+ around_worker: Callback,
+ shutdown_tx: shutdown::Sender,
+) -> (Arc<slice::Set<P::Unpark>>, Vec<Worker<P>>)
where
P: Send + Park,
- F: FnMut(usize) -> Box<P>,
+ F: FnMut(usize) -> P,
{
// Create the parks...
let parks: Vec<_> = (0..pool_size).map(mk_park).collect();
- let mut pool = Arc::new(Set::new(pool_size, |i| parks[i].unpark(), blocking));
+ let mut slices = Arc::new(slice::Set::new(pool_size, |i| parks[i].unpark()));
// Establish the circular link between the individual worker state
// structure and the container.
- Arc::get_mut(&mut pool).unwrap().set_container_ptr();
+ Arc::get_mut(&mut slices).unwrap().set_ptr();
// This will contain each worker.
let workers = parks
.into_iter()
.enumerate()