diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-12 15:23:40 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-12 15:23:40 -0800 |
commit | 27e5b41067d01c0c9fac230c5addb58034201a63 (patch) | |
tree | f9bd8333dfe1853dfe1d8710b4dc966bd8555d54 /tokio/src/runtime/thread_pool | |
parent | e3df2eafd32e6f813d08617f0e2cd7abbc05c2b1 (diff) |
reorganize modules (#1766)
This patch started as an effort to make `time::Timer` private. However, in an
effort to get the build compiling again, more and more changes were made. This
probably should have been broken up, but here we are. I will attempt to
summarize the changes here.
* Feature flags are reorganized to make clearer. `net-driver` becomes
`io-driver`. `rt-current-thread` becomes `rt-core`.
* The `Runtime` can be created without any executor. This replaces `enter`. It
also allows creating I/O / time drivers that are standalone.
* `tokio::timer` is renamed to `tokio::time`. This brings it in line with `std`.
* `tokio::timer::Timer` is renamed to `Driver` and made private.
* The `clock` module is removed. Instead, an `Instant` type is provided. This
type defaults to calling `std::time::Instant`. A `test-util` feature flag can
be used to enable hooking into time.
* The `blocking` module is moved to the top level and is cleaned up.
* The `task` module is moved to the top level.
* The thread-pool's in-place blocking implementation is cleaned up.
* `runtime::Spawner` is renamed to `runtime::Handle` and can be used to "enter"
a runtime context.
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r-- | tokio/src/runtime/thread_pool/current.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 69 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/owned.rs | 11 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/global.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/inject.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/local.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/worker.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/shared.rs | 34 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/slice.rs (renamed from tokio/src/runtime/thread_pool/set.rs) | 28 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/spawner.rs | 14 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 45 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_queue.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/pool.rs | 11 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/queue.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/worker.rs | 77 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 691 |
17 files changed, 449 insertions, 556 deletions
diff --git a/tokio/src/runtime/thread_pool/current.rs b/tokio/src/runtime/thread_pool/current.rs index 1a8113e3..1ab83c54 100644 --- a/tokio/src/runtime/thread_pool/current.rs +++ b/tokio/src/runtime/thread_pool/current.rs @@ -1,6 +1,6 @@ use crate::loom::sync::Arc; use crate::runtime::park::Unpark; -use crate::runtime::thread_pool::{worker, Owned}; +use crate::runtime::thread_pool::{slice, Owned}; use std::cell::Cell; use std::ptr; @@ -23,7 +23,7 @@ struct Inner { // Pointer to the current worker info thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new())); -pub(super) fn set<F, R, P>(pool: &Arc<worker::Set<P>>, index: usize, f: F) -> R +pub(super) fn set<F, R, P>(pool: &Arc<slice::Set<P>>, index: usize, f: F) -> R where F: FnOnce() -> R, P: Unpark, @@ -65,7 +65,7 @@ where } impl Current { - pub(super) fn as_member<'a, P>(&self, set: &'a worker::Set<P>) -> Option<&'a Owned<P>> + pub(super) fn as_member<'a, P>(&self, set: &'a slice::Set<P>) -> Option<&'a Owned<P>> where P: Unpark, { diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 314942d3..599ce548 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -13,7 +13,7 @@ mod queue; mod spawner; pub(crate) use self::spawner::Spawner; -mod set; +mod slice; mod shared; use self::shared::Shared; @@ -21,10 +21,8 @@ use self::shared::Shared; mod shutdown; mod worker; -use self::worker::Worker; -#[cfg(feature = "blocking")] -pub(crate) use worker::blocking; +pub(crate) use worker::block_in_place; /// Unit tests #[cfg(test)] @@ -39,10 +37,10 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; +use crate::blocking; use crate::loom::sync::Arc; -use crate::runtime::blocking::{self, PoolWaiter}; -use crate::runtime::task::JoinHandle; use crate::runtime::Park; +use crate::task::JoinHandle; use std::fmt; use std::future::Future; @@ -53,9 +51,6 @@ pub(crate) struct ThreadPool { /// Shutdown waiter shutdown_rx: shutdown::Receiver, - - /// Shutdown valve for Pool - blocking: PoolWaiter, } // The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized @@ -66,7 +61,7 @@ type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; impl ThreadPool { pub(crate) fn new<F, P>( pool_size: usize, - blocking_pool: Arc<blocking::Pool>, + blocking_pool: blocking::Spawner, around_worker: Callback, mut build_park: F, ) -> ThreadPool @@ -76,65 +71,24 @@ impl ThreadPool { { let (shutdown_tx, shutdown_rx) = shutdown::channel(); - let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { - // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never - // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually - // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. - // `build_with_park` drops it immediately, and the workers drop theirs when their `run` - // method returns (and their copy of the Arc are dropped). In fact, we don't actually - // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto - // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. - let shutdown_tx = shutdown_tx.clone(); - let around_worker = around_worker.clone(); - - Box::new(move || { - struct AbortOnPanic; - - impl Drop for AbortOnPanic { - fn drop(&mut self) { - if std::thread::panicking() { - eprintln!("[ERROR] unhandled panic in Tokio scheduler. This is a bug and should be reported."); - std::process::abort(); - } - } - } - - let _abort_on_panic = AbortOnPanic; - - let idx = worker.id(); - let mut f = Some(move || worker.run()); - around_worker(idx, &mut || { - (f.take() - .expect("around_thread callback called closure twice"))( - ) - }); - - // Dropping the handle must happen __after__ the callback - drop(shutdown_tx); - }) as Box<dyn FnOnce() + Send + 'static> - }) - as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); - let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( pool_size, - |i| Box::new(BoxedPark::new(build_park(i))), - Arc::clone(&launch_worker), + |i| BoxedPark::new(build_park(i)), blocking_pool.clone(), + around_worker, + shutdown_tx, ); // Spawn threads for each worker for worker in workers { - crate::runtime::blocking::Pool::spawn(&blocking_pool, launch_worker(worker)) + blocking_pool.spawn_background(|| worker.run()); } let spawner = Spawner::new(pool); - let blocking = crate::runtime::blocking::PoolWaiter::from(blocking_pool); - // ThreadPool::from_parts(spawner, shutdown_rx, blocking) ThreadPool { spawner, shutdown_rx, - blocking, } } @@ -165,9 +119,7 @@ impl ThreadPool { { crate::runtime::global::with_thread_pool(self.spawner(), || { let mut enter = crate::runtime::enter(); - crate::runtime::blocking::with_pool(self.spawner.blocking_pool(), || { - enter.block_on(future) - }) + enter.block_on(future) }) } @@ -176,7 +128,6 @@ impl ThreadPool { if self.spawner.workers().close() { self.shutdown_rx.wait(); } - self.blocking.shutdown(); } } diff --git a/tokio/src/runtime/thread_pool/owned.rs b/tokio/src/runtime/thread_pool/owned.rs index 04a2f931..88284d5e 100644 --- a/tokio/src/runtime/thread_pool/owned.rs +++ b/tokio/src/runtime/thread_pool/owned.rs @@ -1,5 +1,6 @@ -use crate::runtime::task::{self, Task}; +use crate::loom::sync::atomic::AtomicUsize; use crate::runtime::thread_pool::{queue, Shared}; +use crate::task::{self, Task}; use crate::util::FastRand; use std::cell::Cell; @@ -7,6 +8,13 @@ use std::cell::Cell; /// Per-worker data accessible only by the thread driving the worker. #[derive(Debug)] pub(super) struct Owned<P: 'static> { + /// Worker generation. This guards concurrent access to the `Owned` struct. + /// When a worker starts running, it checks that the generation it has + /// assigned matches the current generation. When it does, the worker has + /// obtained unique access to the struct. When it fails, another thread has + /// gained unique access. + pub(super) generation: AtomicUsize, + /// Worker tick number. Used to schedule bookkeeping tasks every so often. pub(super) tick: Cell<u16>, @@ -40,6 +48,7 @@ where { pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> { Owned { + generation: AtomicUsize::new(0), tick: Cell::new(1), is_running: Cell::new(true), is_searching: Cell::new(false), diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs index edac6ede..931b76a6 100644 --- a/tokio/src/runtime/thread_pool/queue/global.rs +++ b/tokio/src/runtime/thread_pool/queue/global.rs @@ -1,6 +1,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; -use crate::runtime::task::{Header, Task}; +use crate::task::{Header, Task}; use std::marker::PhantomData; use std::ptr::{self, NonNull}; diff --git a/tokio/src/runtime/thread_pool/queue/inject.rs b/tokio/src/runtime/thread_pool/queue/inject.rs index f0f92fb2..1a2d047c 100644 --- a/tokio/src/runtime/thread_pool/queue/inject.rs +++ b/tokio/src/runtime/thread_pool/queue/inject.rs @@ -1,6 +1,6 @@ use crate::loom::sync::Arc; -use crate::runtime::task::Task; use crate::runtime::thread_pool::queue::Cluster; +use crate::task::Task; pub(crate) struct Inject<T: 'static> { cluster: Arc<Cluster<T>>, diff --git a/tokio/src/runtime/thread_pool/queue/local.rs b/tokio/src/runtime/thread_pool/queue/local.rs index 14f34832..78b26dac 100644 --- a/tokio/src/runtime/thread_pool/queue/local.rs +++ b/tokio/src/runtime/thread_pool/queue/local.rs @@ -1,8 +1,8 @@ use crate::loom::cell::{CausalCell, CausalCheck}; use crate::loom::sync::atomic::{self, AtomicU32}; -use crate::runtime::task::Task; use crate::runtime::thread_pool::queue::global; use crate::runtime::thread_pool::LOCAL_QUEUE_CAPACITY; +use crate::task::Task; use std::fmt; use std::mem::MaybeUninit; diff --git a/tokio/src/runtime/thread_pool/queue/worker.rs b/tokio/src/runtime/thread_pool/queue/worker.rs index 67a2a1b8..f9415669 100644 --- a/tokio/src/runtime/thread_pool/queue/worker.rs +++ b/tokio/src/runtime/thread_pool/queue/worker.rs @@ -1,6 +1,6 @@ use crate::loom::sync::Arc; -use crate::runtime::task::Task; use crate::runtime::thread_pool::queue::{local, Cluster, Inject}; +use crate::task::Task; use std::cell::Cell; use std::fmt; diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs index e0a1987e..99981151 100644 --- a/tokio/src/runtime/thread_pool/shared.rs +++ b/tokio/src/runtime/thread_pool/shared.rs @@ -1,6 +1,6 @@ use crate::runtime::park::Unpark; -use crate::runtime::task::{self, Schedule, Task}; -use crate::runtime::thread_pool::worker; +use crate::runtime::thread_pool::slice; +use crate::task::{self, Schedule, Task}; use std::ptr; @@ -24,13 +24,9 @@ where /// Untracked pointer to the pool. /// - /// The pool itself is tracked by an `Arc`, but this pointer is not included - /// in the ref count. - /// - /// # Safety - /// - /// `Worker` instances are stored in the `Pool` and are never removed. - set: *const worker::Set<P>, + /// The slice::Set itself is tracked by an `Arc`, but this pointer is not + /// included in the ref count. + slices: *const slice::Set<P>, } unsafe impl<P: Unpark> Send for Shared<P> {} @@ -44,24 +40,24 @@ where Shared { unpark, pending_drop: task::TransferStack::new(), - set: ptr::null(), + slices: ptr::null(), } } pub(crate) fn schedule(&self, task: Task<Self>) { - self.set().schedule(task); + self.slices().schedule(task); } pub(super) fn unpark(&self) { self.unpark.unpark(); } - pub(super) fn set_container_ptr(&mut self, set: *const worker::Set<P>) { - self.set = set; + fn slices(&self) -> &slice::Set<P> { + unsafe { &*self.slices } } - fn set(&self) -> &worker::Set<P> { - unsafe { &*self.set } + pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set<P>) { + self.slices = slices; } } @@ -73,8 +69,8 @@ where // Get access to the Owned component. This function can only be called // when on the worker. unsafe { - let index = self.set().index_of(self); - let owned = &mut *self.set().owned()[index].get(); + let index = self.slices().index_of(self); + let owned = &mut *self.slices().owned()[index].get(); owned.bind_task(task); } @@ -91,8 +87,8 @@ where // Get access to the Owned component. This function can only be called // when on the worker. unsafe { - let index = self.set().index_of(self); - let owned = &mut *self.set().owned()[index].get(); + let index = self.slices().index_of(self); + let owned = &mut *self.slices().owned()[index].get(); owned.release_task(task); } diff --git a/tokio/src/runtime/thread_pool/set.rs b/tokio/src/runtime/thread_pool/slice.rs index 73555f82..1a0bd381 100644 --- a/tokio/src/runtime/thread_pool/set.rs +++ b/tokio/src/runtime/thread_pool/slice.rs @@ -1,12 +1,11 @@ -//! Putting a worker to sleep. -//! -//! - Attempt to spin. +//! The scheduler is divided into multiple slices. Each slice is fairly +//! isolated, having its own queue. A worker is dedicated to processing a single +//! slice. use crate::loom::rand::seed; -use crate::loom::sync::Arc; use crate::runtime::park::Unpark; -use crate::runtime::task::{self, JoinHandle, Task}; use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared}; +use crate::task::{self, JoinHandle, Task}; use crate::util::{CachePadded, FastRand}; use std::cell::UnsafeCell; @@ -27,9 +26,6 @@ where /// Coordinates idle workers idle: Idle, - - /// Pool where blocking tasks should be spawned. - pub(crate) blocking: Arc<crate::runtime::blocking::Pool>, } unsafe impl<P: Unpark> Send for Set<P> {} @@ -40,11 +36,7 @@ where P: Unpark, { /// Create a new worker set using the provided queues. - pub(crate) fn new<F>( - num_workers: usize, - mut mk_unpark: F, - blocking: Arc<crate::runtime::blocking::Pool>, - ) -> Self + pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self where F: FnMut(usize) -> P, { @@ -69,7 +61,7 @@ where owned: owned.into_boxed_slice(), inject, idle: Idle::new(num_workers), - blocking, + // blocking, } } @@ -112,10 +104,6 @@ where self.schedule(task); } - pub(super) fn blocking_pool(&self) -> &Arc<crate::runtime::blocking::Pool> { - &self.blocking - } - pub(crate) fn schedule(&self, task: Task<Shared<P>>) { current::get(|current_worker| match current_worker.as_member(self) { Some(worker) => { @@ -129,10 +117,10 @@ where }) } - pub(crate) fn set_container_ptr(&mut self) { + pub(crate) fn set_ptr(&mut self) { let ptr = self as *const _; for shared in &mut self.shared[..] { - shared.set_container_ptr(ptr); + shared.set_slices_ptr(ptr); } } diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs index dd80f4a6..b7031c43 100644 --- a/tokio/src/runtime/thread_pool/spawner.rs +++ b/tokio/src/runtime/thread_pool/spawner.rs @@ -1,7 +1,7 @@ use crate::loom::sync::Arc; use crate::runtime::park::Unpark; -use crate::runtime::task::JoinHandle; -use crate::runtime::thread_pool::worker; +use crate::runtime::thread_pool::slice; +use crate::task::JoinHandle; use std::fmt; use std::future::Future; @@ -20,11 +20,11 @@ use std::future::Future; /// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner #[derive(Clone)] pub(crate) struct Spawner { - workers: Arc<worker::Set<Box<dyn Unpark>>>, + workers: Arc<slice::Set<Box<dyn Unpark>>>, } impl Spawner { - pub(super) fn new(workers: Arc<worker::Set<Box<dyn Unpark>>>) -> Spawner { + pub(super) fn new(workers: Arc<slice::Set<Box<dyn Unpark>>>) -> Spawner { Spawner { workers } } @@ -45,12 +45,8 @@ impl Spawner { self.workers.spawn_background(future); } - pub(super) fn blocking_pool(&self) -> &Arc<crate::runtime::blocking::Pool> { - self.workers.blocking_pool() - } - /// Reference to the worker set. Used by `ThreadPool` to initiate shutdown. - pub(super) fn workers(&self) -> &worker::Set<Box<dyn Unpark>> { + pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> { &*self.workers } } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index 5eb166ce..065d515e 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,5 +1,5 @@ use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::{self, ThreadPool}; +use crate::runtime::thread_pool::ThreadPool; use crate::runtime::{Park, Unpark}; use crate::spawn; @@ -50,7 +50,7 @@ fn only_blocking() { let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { - thread_pool::blocking(move || { + crate::blocking::in_place(move || { block_tx.send(()); }) }); @@ -72,7 +72,7 @@ fn blocking_and_regular() { let done_tx = Arc::new(Mutex::new(Some(done_tx))); pool.spawn(async move { - thread_pool::blocking(move || { + crate::blocking::in_place(move || { block_tx.send(()); }) }); @@ -166,15 +166,21 @@ fn complete_block_on_under_load() { }); } -fn mk_pool(num_threads: usize) -> ThreadPool { - use crate::runtime::blocking; +fn mk_pool(num_threads: usize) -> Runtime { + use crate::blocking::BlockingPool; - ThreadPool::new( + let blocking_pool = BlockingPool::new("test".into(), None); + let executor = ThreadPool::new( num_threads, - blocking::Pool::new("test".into(), None), + blocking_pool.spawner().clone(), Arc::new(Box::new(|_, next| next())), move |_| LoomPark::new(), - ) + ); + + Runtime { + executor, + blocking_pool, + } } use futures::future::poll_fn; @@ -235,6 +241,29 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> { }) } +/// Fake runtime +struct Runtime { + executor: ThreadPool, + #[allow(dead_code)] + blocking_pool: crate::blocking::BlockingPool, +} + +use std::ops; + +impl ops::Deref for Runtime { + type Target = ThreadPool; + + fn deref(&self) -> &ThreadPool { + &self.executor + } +} + +impl ops::DerefMut for Runtime { + fn deref_mut(&mut self) -> &mut ThreadPool { + &mut self.executor + } +} + struct LoomPark { notify: Arc<Notify>, } diff --git a/tokio/src/runtime/thread_pool/tests/loom_queue.rs b/tokio/src/runtime/thread_pool/tests/loom_queue.rs index b7c86f6c..d0598c3e 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_queue.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::task::{self, Task}; -use crate::runtime::tests::mock_schedule::{Noop, NOOP_SCHEDULE}; use crate::runtime::thread_pool::queue; +use crate::task::{self, Task}; +use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE}; use loom::thread; diff --git a/tokio/src/runtime/thread_pool/tests/mod.rs b/tokio/src/runtime/thread_pool/tests/mod.rs index 24578e2f..dc1d3158 100644 --- a/tokio/src/runtime/thread_pool/tests/mod.rs +++ b/tokio/src/runtime/thread_pool/tests/mod.rs @@ -9,6 +9,3 @@ mod pool; #[cfg(not(loom))] mod queue; - -#[cfg(not(loom))] -mod worker; diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs index 6e753b35..c11281f0 100644 --- a/tokio/src/runtime/thread_pool/tests/pool.rs +++ b/tokio/src/runtime/thread_pool/tests/pool.rs @@ -1,7 +1,8 @@ #![warn(rust_2018_idioms)] +use crate::blocking; use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{blocking, Park, Unpark}; +use crate::runtime::{Park, Unpark}; use futures_util::future::poll_fn; use std::future::Future; @@ -63,9 +64,11 @@ fn eagerly_drops_futures() { let (park_tx, park_rx) = mpsc::sync_channel(0); let (unpark_tx, unpark_rx) = mpsc::sync_channel(0); + let blocking_pool = blocking::BlockingPool::new("test".into(), None); + let pool = ThreadPool::new( 4, - blocking::Pool::new("test".into(), None), + blocking_pool.spawner().clone(), Arc::new(Box::new(|_, next| next())), move |_| { let (tx, rx) = mpsc::channel(); @@ -166,9 +169,11 @@ fn park_called_at_interval() { let (done_tx, done_rx) = mpsc::channel(); + let blocking_pool = blocking::BlockingPool::new("test".into(), None); + let pool = ThreadPool::new( 1, - blocking::Pool::new("test".into(), None), + blocking_pool.spawner().clone(), Arc::new(Box::new(|_, next| next())), move |idx| { assert_eq!(idx, 0); diff --git a/tokio/src/runtime/thread_pool/tests/queue.rs b/tokio/src/runtime/thread_pool/tests/queue.rs index 86f32ed2..7c0a65d5 100644 --- a/tokio/src/runtime/thread_pool/tests/queue.rs +++ b/tokio/src/runtime/thread_pool/tests/queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::task::{self, Task}; -use crate::runtime::tests::mock_schedule::{Noop, NOOP_SCHEDULE}; use crate::runtime::thread_pool::{queue, LOCAL_QUEUE_CAPACITY}; +use crate::task::{self, Task}; +use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE}; macro_rules! assert_pop { ($q:expr, $expect:expr) => { diff --git a/tokio/src/runtime/thread_pool/tests/worker.rs b/tokio/src/runtime/thread_pool/tests/worker.rs deleted file mode 100644 index 91ec5804..00000000 --- a/tokio/src/runtime/thread_pool/tests/worker.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::runtime::blocking; -use crate::runtime::tests::track_drop::track_drop; -use crate::runtime::thread_pool; - -use tokio_test::assert_ok; - -use std::sync::Arc; - -macro_rules! pool { - (2) => {{ - let (pool, mut w, mock_park) = pool!(!2); - (pool, w.remove(0), w.remove(0), mock_park) - }}; - (! $n:expr) => {{ - let mut mock_park = crate::runtime::tests::mock_park::MockPark::new(); - let blocking = blocking::Pool::new("test".into(), None); - let (pool, workers) = thread_pool::worker::create_set( - $n, - |index| Box::new(mock_park.mk_park(index)), - Arc::new(Box::new(|_| { - unreachable!("attempted to move worker during non-blocking test") - })), - blocking, - ); - (pool, workers, mock_park) - }}; -} - -macro_rules! enter { - ($w:expr, $expr:expr) => {{ - $w.enter(move || $expr); - }}; -} - -#[test] -fn execute_single_task() { - use std::sync::mpsc; - - let (p, mut w0, _w1, ..) = pool!(2); - let (tx, rx) = mpsc::channel(); - - enter!(w0, p.spawn_background(async move { tx.send(1).unwrap() })); - - w0.tick(); - - assert_ok!(rx.try_recv()); -} - -#[test] -fn task_migrates() { - use crate::sync::oneshot; - use std::sync::mpsc; - - let (p, mut w0, mut w1, ..) = pool!(2); - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = mpsc::channel(); - - let (task, did_drop) = track_drop(async move { - let msg = rx1.await.unwrap(); - tx2.send(msg).unwrap(); - }); - - enter!(w0, p.spawn_background(task)); - - w0.tick(); - w1.enter(|| tx1.send("hello").unwrap()); - - w1.tick(); - assert_ok!(rx2.try_recv()); - - // Future drops immediately even though the underlying task is not freed - assert!(did_drop.did_drop_future()); - assert!(did_drop.did_drop_output()); - - // Tick the spawning worker in order to free memory - w0.tick(); -} diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 5abdba24..2de2101e 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -1,23 +1,21 @@ +use crate::blocking; +use crate::loom::cell::CausalCell; use crate::loom::sync::Arc; use crate::runtime::park::{Park, Unpark}; -use crate::runtime::task::Task; -use crate::runtime::thread_pool::{current, Owned, Shared, Spawner}; +use crate::runtime::thread_pool::{current, shutdown, slice, Callback, Owned, Shared, Spawner}; +use crate::task::Task; use std::cell::Cell; -use std::ops::{Deref, DerefMut}; +use std::marker::PhantomData; +use std::sync::atomic::Ordering::Relaxed; use std::time::Duration; -// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is unstable -type LaunchWorker<P> = Arc<Box<dyn Fn(Worker<P>) -> Box<dyn FnOnce() + Send> + Send + Sync>>; - thread_local! { - /// Thread-local tracking the current executor - static ON_BLOCK: Cell<Option<*mut dyn FnMut()>> = Cell::new(None) + /// Used to handle block_in_place + static ON_BLOCK: Cell<Option<*const dyn Fn()>> = Cell::new(None) } -#[cfg(feature = "blocking")] -pub(crate) fn blocking<F, R>(f: F) -> R +pub(crate) fn block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R, { @@ -30,60 +28,100 @@ where // This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps // the worker's operation, and is unset just prior to when the FnMut is dropped. - let allow_blocking = unsafe { &mut *allow_blocking }; + let allow_blocking = unsafe { &*allow_blocking }; allow_blocking(); f() }) } -// TODO: remove this re-export -pub(super) use crate::runtime::thread_pool::set::Set; - pub(crate) struct Worker<P: Park + 'static> { - /// Entry in the set of workers. - entry: Entry<P::Unpark>, + /// Parks the thread. Requires the calling worker to have obtained unique + /// access via the generation synchronization action. + inner: Arc<Inner<P>>, - /// Park the thread - park: Box<P>, + /// Scheduler slices + slices: Arc<slice::Set<P::Unpark>>, + + /// Slice assigned to this worker + index: usize, - /// Fn for launching another Worker should we need it - launch_worker: LaunchWorker<P>, + /// Handle to the blocking pool + blocking_pool: blocking::Spawner, + + /// Run before calling worker logic + around_worker: Callback, + + /// Worker generation. This is used to synchronize access to the internal + /// data. + generation: usize, /// To indicate that the Worker has been given away and should no longer be used gone: Cell<bool>, } +/// Internal worker state. This may be referenced from multiple threads, but the +/// generation guard protects unsafe access +struct Inner<P: Park + 'static> { + /// Used to park the thread + park: CausalCell<P>, + + /// Only held so that the scheduler can be signaled on shutdown. + shutdown_tx: shutdown::Sender, +} + +// TODO: clean up +unsafe impl<P: Park + Send + 'static> Send for Worker<P> {} + +/// Used to ensure the invariants are respected +struct GenerationGuard<'a, P: Park + 'static> { + /// Worker reference + worker: &'a Worker<P>, + + /// Prevent `Sync` access + _p: PhantomData<Cell<()>>, +} + +struct WorkerGone; + +// TODO: Move into slices pub(super) fn create_set<F, P>( pool_size: usize, mk_park: F, - launch_worker: LaunchWorker<P>, - blocking: Arc<crate::runtime::blocking::Pool>, -) -> (Arc<Set<P::Unpark>>, Vec<Worker<P>>) + blocking_pool: blocking::Spawner, + around_worker: Callback, + shutdown_tx: shutdown::Sender, +) -> (Arc<slice::Set<P::Unpark>>, Vec<Worker<P>>) where P: Send + Park, - F: FnMut(usize) -> Box<P>, + F: FnMut(usize) -> P, { // Create the parks... let parks: Vec<_> = (0..pool_size).map(mk_park).collect(); - let mut pool = Arc::new(Set::new(pool_size, |i| parks[i].unpark(), blocking)); + let mut slices = Arc::new(slice::Set::new(pool_size, |i| parks[i].unpark())); // Establish the circular link between the individual worker state // structure and the container. - Arc::get_mut(&mut pool).unwrap().set_container_ptr(); + Arc::get_mut(&mut slices).unwrap().set_ptr(); // This will contain each worker. let workers = parks .into_iter() .enumerate() |