diff options
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r-- | tokio/src/runtime/thread_pool/current.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/mod.rs | 95 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/owned.rs | 21 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/shared.rs | 32 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/slice.rs | 76 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/spawner.rs | 7 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 101 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_queue.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/pool.rs | 206 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 233 |
11 files changed, 187 insertions, 599 deletions
diff --git a/tokio/src/runtime/thread_pool/current.rs b/tokio/src/runtime/thread_pool/current.rs index 1ab83c54..60a20723 100644 --- a/tokio/src/runtime/thread_pool/current.rs +++ b/tokio/src/runtime/thread_pool/current.rs @@ -1,5 +1,4 @@ use crate::loom::sync::Arc; -use crate::runtime::park::Unpark; use crate::runtime::thread_pool::{slice, Owned}; use std::cell::Cell; @@ -23,10 +22,9 @@ struct Inner { // Pointer to the current worker info thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new())); -pub(super) fn set<F, R, P>(pool: &Arc<slice::Set<P>>, index: usize, f: F) -> R +pub(super) fn set<F, R>(pool: &Arc<slice::Set>, index: usize, f: F) -> R where F: FnOnce() -> R, - P: Unpark, { CURRENT_WORKER.with(|cell| { assert!(cell.get().workers.is_null()); @@ -65,10 +63,7 @@ where } impl Current { - pub(super) fn as_member<'a, P>(&self, set: &'a slice::Set<P>) -> Option<&'a Owned<P>> - where - P: Unpark, - { + pub(super) fn as_member<'a>(&self, set: &'a slice::Set) -> Option<&'a Owned> { let inner = CURRENT_WORKER.with(|cell| cell.get()); if ptr::eq(inner.workers as *const _, set.shared().as_ptr()) { diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 4b23c3b9..3d795fa4 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -18,9 +18,8 @@ mod slice; mod shared; use self::shared::Shared; -mod shutdown; - mod worker; +use worker::Worker; cfg_blocking! { pub(crate) use worker::block_in_place; @@ -39,9 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; #[cfg(loom)] const LOCAL_QUEUE_CAPACITY: usize = 2; -use crate::blocking; -use crate::loom::sync::Arc; -use crate::runtime::Park; +use crate::runtime::{self, blocking, Parker}; use crate::task::JoinHandle; use std::fmt; @@ -50,48 +47,29 @@ use std::future::Future; /// Work-stealing based thread pool for executing futures. pub(crate) struct ThreadPool { spawner: Spawner, - - /// Shutdown waiter - shutdown_rx: shutdown::Receiver, } -// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized -// loom doesn't support that because it requires CoerceUnsized, which is -// unstable -type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>; +pub(crate) struct Workers { + workers: Vec<Worker>, +} impl ThreadPool { - pub(crate) fn new<F, P>( + pub(crate) fn new( pool_size: usize, - blocking_pool: blocking::Spawner, - around_worker: Callback, - mut build_park: F, - ) -> ThreadPool - where - F: FnMut(usize) -> P, - P: Park + Send + 'static, - { - let (shutdown_tx, shutdown_rx) = shutdown::channel(); - - let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( + parker: Parker, + ) -> (ThreadPool, Workers) { + let (pool, workers) = worker::create_set( pool_size, - |i| BoxedPark::new(build_park(i)), - blocking_pool.clone(), - around_worker, - shutdown_tx, + parker, ); - // Spawn threads for each worker - for worker in workers { - blocking_pool.spawn_background(|| worker.run()); - } - let spawner = Spawner::new(pool); - ThreadPool { + let pool = ThreadPool { spawner, - shutdown_rx, - } + }; + + (pool, Workers { workers }) } /// Returns reference to `Spawner`. @@ -124,13 +102,6 @@ impl ThreadPool { enter.block_on(future) }) } - - /// Shutdown the thread pool. - pub(crate) fn shutdown_now(&mut self) { - if self.spawner.workers().close() { - self.shutdown_rx.wait(); - } - } } impl fmt::Debug for ThreadPool { @@ -141,37 +112,17 @@ impl fmt::Debug for ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.shutdown_now(); + self.spawner.workers().close(); } } -// TODO: delete? -pub(crate) struct BoxedPark<P> { - inner: P, -} - -impl<P> BoxedPark<P> { - pub(crate) fn new(inner: P) -> Self { - BoxedPark { inner } - } -} - -impl<P> Park for BoxedPark<P> -where - P: Park, -{ - type Unpark = Box<dyn crate::runtime::park::Unpark>; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - Box::new(self.inner.unpark()) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park() - } - - fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> { - self.inner.park_timeout(duration) +impl Workers { + pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) { + blocking_pool.enter(|| { + for worker in self.workers { + let b = blocking_pool.clone(); + runtime::spawn_blocking(move || worker.run(b)); + } + }); } } diff --git a/tokio/src/runtime/thread_pool/owned.rs b/tokio/src/runtime/thread_pool/owned.rs index 88284d5e..b60eb7f3 100644 --- a/tokio/src/runtime/thread_pool/owned.rs +++ b/tokio/src/runtime/thread_pool/owned.rs @@ -7,7 +7,7 @@ use std::cell::Cell; /// Per-worker data accessible only by the thread driving the worker. #[derive(Debug)] -pub(super) struct Owned<P: 'static> { +pub(super) struct Owned { /// Worker generation. This guards concurrent access to the `Owned` struct. /// When a worker starts running, it checks that the generation it has /// assigned matches the current generation. When it does, the worker has @@ -36,17 +36,14 @@ pub(super) struct Owned<P: 'static> { pub(super) rand: FastRand, /// Work queue - pub(super) work_queue: queue::Worker<Shared<P>>, + pub(super) work_queue: queue::Worker<Shared>, /// List of tasks owned by the worker - pub(super) owned_tasks: task::OwnedList<Shared<P>>, + pub(super) owned_tasks: task::OwnedList<Shared>, } -impl<P> Owned<P> -where - P: 'static, -{ - pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> { +impl Owned { + pub(super) fn new(work_queue: queue::Worker<Shared>, rand: FastRand) -> Owned { Owned { generation: AtomicUsize::new(0), tick: Cell::new(1), @@ -61,7 +58,7 @@ where } /// Returns `true` if a worker should be notified - pub(super) fn submit_local(&self, task: Task<Shared<P>>) -> bool { + pub(super) fn submit_local(&self, task: Task<Shared>) -> bool { let ret = self.work_queue.push(task); if self.defer_notification.get() { @@ -72,15 +69,15 @@ where } } - pub(super) fn submit_local_yield(&self, task: Task<Shared<P>>) { + pub(super) fn submit_local_yield(&self, task: Task<Shared>) { self.work_queue.push_yield(task); } - pub(super) fn bind_task(&mut self, task: &Task<Shared<P>>) { + pub(super) fn bind_task(&mut self, task: &Task<Shared>) { self.owned_tasks.insert(task); } - pub(super) fn release_task(&mut self, task: &Task<Shared<P>>) { + pub(super) fn release_task(&mut self, task: &Task<Shared>) { self.owned_tasks.remove(task); } } diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs index 99981151..86c784ad 100644 --- a/tokio/src/runtime/thread_pool/shared.rs +++ b/tokio/src/runtime/thread_pool/shared.rs @@ -1,4 +1,5 @@ -use crate::runtime::park::Unpark; +use crate::park::Unpark; +use crate::runtime::Unparker; use crate::runtime::thread_pool::slice; use crate::task::{self, Schedule, Task}; @@ -11,12 +12,9 @@ use std::ptr; /// - other workers /// - tasks /// -pub(crate) struct Shared<P> -where - P: 'static, -{ +pub(crate) struct Shared { /// Thread unparker - unpark: P, + unpark: Unparker, /// Tasks pending drop. Any worker pushes tasks, only the "owning" worker /// pops. @@ -26,17 +24,14 @@ where /// /// The slice::Set itself is tracked by an `Arc`, but this pointer is not /// included in the ref count. - slices: *const slice::Set<P>, + slices: *const slice::Set, } -unsafe impl<P: Unpark> Send for Shared<P> {} -unsafe impl<P: Unpark> Sync for Shared<P> {} +unsafe impl Send for Shared {} +unsafe impl Sync for Shared {} -impl<P> Shared<P> -where - P: Unpark, -{ - pub(super) fn new(unpark: P) -> Shared<P> { +impl Shared { + pub(super) fn new(unpark: Unparker) -> Shared { Shared { unpark, pending_drop: task::TransferStack::new(), @@ -52,19 +47,16 @@ where self.unpark.unpark(); } - fn slices(&self) -> &slice::Set<P> { + fn slices(&self) -> &slice::Set { unsafe { &*self.slices } } - pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set<P>) { + pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set) { self.slices = slices; } } -impl<P> Schedule for Shared<P> -where - P: Unpark, -{ +impl Schedule for Shared { fn bind(&self, task: &Task<Self>) { // Get access to the Owned component. This function can only be called // when on the worker. diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs index 4b3ef996..aa521a15 100644 --- a/tokio/src/runtime/thread_pool/slice.rs +++ b/tokio/src/runtime/thread_pool/slice.rs @@ -3,7 +3,8 @@ //! slice. use crate::loom::rand::seed; -use crate::runtime::park::Unpark; +use crate::park::Park; +use crate::runtime::Parker; use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared}; use crate::task::{self, JoinHandle, Task}; use crate::util::{CachePadded, FastRand}; @@ -11,48 +12,38 @@ use crate::util::{CachePadded, FastRand}; use std::cell::UnsafeCell; use std::future::Future; -pub(super) struct Set<P> -where - P: 'static, -{ +pub(super) struct Set { /// Data accessible from all workers. - shared: Box<[Shared<P>]>, + shared: Box<[Shared]>, /// Data owned by the worker. - owned: Box<[UnsafeCell<CachePadded<Owned<P>>>]>, + owned: Box<[UnsafeCell<CachePadded<Owned>>]>, /// Submit work to the pool while *not* currently on a worker thread. - inject: queue::Inject<Shared<P>>, + inject: queue::Inject<Shared>, /// Coordinates idle workers idle: Idle, } -unsafe impl<P: Unpark> Send for Set<P> {} -unsafe impl<P: Unpark> Sync for Set<P> {} +unsafe impl Send for Set {} +unsafe impl Sync for Set {} -impl<P> Set<P> -where - P: Unpark, -{ +impl Set { /// Create a new worker set using the provided queues. - pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self - where - F: FnMut(usize) -> P, - { - assert!(num_workers > 0); + pub(crate) fn new(parkers: &[Parker]) -> Self { + assert!(!parkers.is_empty()); - let queues = queue::build(num_workers); + let queues = queue::build(parkers.len()); let inject = queues[0].injector(); let mut shared = Vec::with_capacity(queues.len()); let mut owned = Vec::with_capacity(queues.len()); for (i, queue) in queues.into_iter().enumerate() { - let unpark = mk_unpark(i); let rand = FastRand::new(seed()); - shared.push(Shared::new(unpark)); + shared.push(Shared::new(parkers[i].unpark())); owned.push(UnsafeCell::new(CachePadded::new(Owned::new(queue, rand)))); } @@ -60,12 +51,21 @@ where shared: shared.into_boxed_slice(), owned: owned.into_boxed_slice(), inject, - idle: Idle::new(num_workers), - // blocking, + idle: Idle::new(parkers.len()), } } - fn inject_task(&self, task: Task<Shared<P>>) { + pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.schedule(task); + handle + } + + fn inject_task(&self, task: Task<Shared>) { self.inject.push(task, |res| { if let Err(task) = res { task.shutdown(); @@ -95,7 +95,7 @@ where } } - pub(crate) fn schedule(&self, task: Task<Shared<P>>) { + pub(crate) fn schedule(&self, task: Task<Shared>) { current::get(|current_worker| match current_worker.as_member(self) { Some(worker) => { if worker.submit_local(task) { @@ -136,28 +136,26 @@ where self.shared.len() } - pub(super) fn index_of(&self, shared: &Shared<P>) -> usize { + pub(super) fn index_of(&self, shared: &Shared) -> usize { use std::mem; - let size = mem::size_of::<Shared<P>>(); + let size = mem::size_of::<Shared>(); ((shared as *const _ as usize) - (&self.shared[0] as *const _ as usize)) / size } - pub(super) fn shared(&self) -> &[Shared<P>] { + pub(super) fn shared(&self) -> &[Shared] { &self.shared } - pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned<P>>>] { + pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned>>] { &self.owned } pub(super) fn idle(&self) -> &Idle { &self.idle } -} -impl<P: 'static> Set<P> { /// Wait for all locks on the injection queue to drop. /// /// This is done by locking w/o doing anything. @@ -166,21 +164,9 @@ impl<P: 'static> Set<P> { } } -impl<P: 'static> Drop for Set<P> { +impl Drop for Set { fn drop(&mut self) { // Before proceeding, wait for all concurrent wakers to exit self.wait_for_unlocked(); } } - -impl Set<Box<dyn Unpark>> { - pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (task, handle) = task::joinable(future); - self.schedule(task); - handle - } -} diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs index 4773ea9a..4fccad96 100644 --- a/tokio/src/runtime/thread_pool/spawner.rs +++ b/tokio/src/runtime/thread_pool/spawner.rs @@ -1,5 +1,4 @@ use crate::loom::sync::Arc; -use crate::runtime::park::Unpark; use crate::runtime::thread_pool::slice; use crate::task::JoinHandle; @@ -20,11 +19,11 @@ use std::future::Future; /// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner #[derive(Clone)] pub(crate) struct Spawner { - workers: Arc<slice::Set<Box<dyn Unpark>>>, + workers: Arc<slice::Set>, } impl Spawner { - pub(super) fn new(workers: Arc<slice::Set<Box<dyn Unpark>>>) -> Spawner { + pub(super) fn new(workers: Arc<slice::Set>) -> Spawner { Spawner { workers } } @@ -46,7 +45,7 @@ impl Spawner { } /// Reference to the worker set. Used by `ThreadPool` to initiate shutdown. - pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> { + pub(super) fn workers(&self) -> &slice::Set { &*self.workers } } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index b982e24e..81e292d6 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -1,14 +1,12 @@ +use crate::runtime::{self, Runtime}; use crate::runtime::tests::loom_oneshot as oneshot; -use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{Park, Unpark}; use crate::spawn; use loom::sync::atomic::{AtomicBool, AtomicUsize}; -use loom::sync::{Arc, Mutex, Notify}; +use loom::sync::{Arc, Mutex}; use std::future::Future; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use std::time::Duration; #[test] fn pool_multi_spawn() { @@ -46,7 +44,7 @@ fn pool_multi_spawn() { #[test] fn only_blocking() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(async move { @@ -56,7 +54,7 @@ fn only_blocking() { }); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -64,7 +62,7 @@ fn only_blocking() { fn blocking_and_regular() { const NUM: usize = 3; loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); @@ -91,7 +89,7 @@ fn blocking_and_regular() { done_rx.recv(); block_rx.recv(); - pool.shutdown_now(); + drop(pool); }); } @@ -153,7 +151,7 @@ fn complete_block_on_under_load() { use futures::FutureExt; loom::model(|| { - let pool = mk_pool(2); + let mut pool = mk_pool(2); pool.block_on({ futures::future::lazy(|_| ()).then(|_| { @@ -171,20 +169,11 @@ fn complete_block_on_under_load() { } fn mk_pool(num_threads: usize) -> Runtime { - use crate::blocking::BlockingPool; - - let blocking_pool = BlockingPool::new("test".into(), None); - let executor = ThreadPool::new( - num_threads, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |_| LoomPark::new(), - ); - - Runtime { - executor, - blocking_pool, - } + runtime::Builder::new() + .threaded_scheduler() + .num_threads(num_threads) + .build() + .unwrap() } use futures::future::poll_fn; @@ -244,69 +233,3 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> { } }) } - -/// Fake runtime -struct Runtime { - executor: ThreadPool, - #[allow(dead_code)] - blocking_pool: crate::blocking::BlockingPool, -} - -use std::ops; - -impl ops::Deref for Runtime { - type Target = ThreadPool; - - fn deref(&self) -> &ThreadPool { - &self.executor - } -} - -impl ops::DerefMut for Runtime { - fn deref_mut(&mut self) -> &mut ThreadPool { - &mut self.executor - } -} - -struct LoomPark { - notify: Arc<Notify>, -} - -struct LoomUnpark { - notify: Arc<Notify>, -} - -impl LoomPark { - fn new() -> LoomPark { - LoomPark { - notify: Arc::new(Notify::new()), - } - } -} - -impl Park for LoomPark { - type Unpark = LoomUnpark; - - type Error = (); - - fn unpark(&self) -> LoomUnpark { - let notify = self.notify.clone(); - LoomUnpark { notify } - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } - - fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { - self.notify.wait(); - Ok(()) - } -} - -impl Unpark for LoomUnpark { - fn unpark(&self) { - self.notify.notify(); - } -} diff --git a/tokio/src/runtime/thread_pool/tests/loom_queue.rs b/tokio/src/runtime/thread_pool/tests/loom_queue.rs index d0598c3e..a4e10620 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_queue.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_queue.rs @@ -64,5 +64,6 @@ fn multi_worker() { } fn val(num: u32) -> Task<Noop> { - task::background(async move { num }) + let (task, _) = task::joinable(async move { num }); + task } diff --git a/tokio/src/runtime/thread_pool/tests/mod.rs b/tokio/src/runtime/thread_pool/tests/mod.rs index dc1d3158..6638c558 100644 --- a/tokio/src/runtime/thread_pool/tests/mod.rs +++ b/tokio/src/runtime/thread_pool/tests/mod.rs @@ -5,7 +5,4 @@ mod loom_pool; mod loom_queue; #[cfg(not(loom))] -mod pool; - -#[cfg(not(loom))] mod queue; diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs deleted file mode 100644 index 25c11ea9..00000000 --- a/tokio/src/runtime/thread_pool/tests/pool.rs +++ /dev/null @@ -1,206 +0,0 @@ -#![warn(rust_2018_idioms)] - -use crate::blocking; -use crate::runtime::thread_pool::ThreadPool; -use crate::runtime::{Park, Unpark}; - -use futures::future::poll_fn; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::*; -use std::sync::{mpsc, Arc}; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; - -#[test] -fn eagerly_drops_futures() { - use std::sync::{mpsc, Mutex}; - - struct MyPark { - rx: mpsc::Receiver<()>, - tx: Mutex<mpsc::Sender<()>>, - #[allow(dead_code)] - park_tx: mpsc::SyncSender<()>, - unpark_tx: mpsc::SyncSender<()>, - } - - impl Park for MyPark { - type Unpark = MyUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MyUnpark { - tx: Mutex::new(self.tx.lock().unwrap().clone()), - unpark_tx: self.unpark_tx.clone(), - } - } - - fn park(&mut self) -> Result<(), Self::Error> { - let _ = self.rx.recv(); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - let _ = self.rx.recv_timeout(duration); - Ok(()) - } - } - - struct MyUnpark { - tx: Mutex<mpsc::Sender<()>>, - #[allow(dead_code)] - unpark_tx: mpsc::SyncSender<()>, - } - - impl Unpark for MyUnpark { - fn unpark(&self) { - let _ = self.tx.lock().unwrap().send(()); - } - } - - let (task_tx, task_rx) = mpsc::channel(); - let (drop_tx, drop_rx) = mpsc::channel(); - let (park_tx, park_rx) = mpsc::sync_channel(0); - let (unpark_tx, unpark_rx) = mpsc::sync_channel(0); - - let blocking_pool = blocking::BlockingPool::new("test".into(), None); - - let pool = ThreadPool::new( - 4, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |_| { - let (tx, rx) = mpsc::channel(); - MyPark { - tx: Mutex::new(tx), - rx, - park_tx: park_tx.clone(), - unpark_tx: unpark_tx.clone(), - } - }, - ); - - struct MyTask { - task_tx: Option<mpsc::Sender<Waker>>, - drop_tx: mpsc::Sender<()>, - } - - impl Future for MyTask { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if let Some(tx) = self.get_mut().task_tx.take() { - tx.send(cx.waker().clone()).unwrap(); - } - - Poll::Pending - } - } - - impl Drop for MyTask { - fn drop(&mut self) { - self.drop_tx.send(()).unwrap(); - } - } - - pool.spawn(MyTask { - task_tx: Some(task_tx), - drop_tx, - }); - - // Wait until we get the task handle. - let task = task_rx.recv().unwrap(); - - // Drop the pool, this should result in futures being forcefully dropped. - drop(pool); - - // Make sure `MyPark` and `MyUnpark` were dropped during shutdown. - assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); - assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); - - // If the future is forcefully dropped, then we will get a signal here. - drop_rx.recv().unwrap(); - - // Ensure `task` lives until after the test completes. - drop(task); -} - -#[test] -fn park_called_at_interval() { - struct MyPark { - park_light: Arc<AtomicBool>, - } - - struct MyUnpark {} - - impl Park for MyPark { - type Unpark = MyUnpark; - type Error = (); - - fn unpark(&self) -> Self::Unpark { - MyUnpark {} - } - - fn park(&mut self) -> Result<(), Self::Error> { - use std::thread; - use std::time::Duration; - - thread::sleep(Duration::from_millis(1)); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - if duration == Duration::from_millis(0) { - self.park_light.store(true, Relaxed); - Ok(()) - } else { - self.park() - } - } - } - - impl Unpark for MyUnpark { - fn unpark(&self) {} - } - - let park_light_1 = Arc::new(AtomicBool::new(false)); - let park_light_2 = park_light_1.clone(); - - let (done_tx, done_rx) = mpsc::channel(); - - let blocking_pool = blocking::BlockingPool::new("test".into(), None); - - let pool = ThreadPool::new( - 1, - blocking_pool.spawner().clone(), - Arc::new(Box::new(|_, next| next())), - move |idx| { - assert_eq!(idx, 0); - MyPark { - park_light: park_light_2.clone(), - } - }, - ); - - let mut cnt = 0; - - pool.spawn(poll_fn(move |cx| { - let did_park_light = park_light_1.load(Relaxed); - - if did_park_light { - // There is a bit of a race where the worker can tick a few times - // before seeing the task - assert!(cnt > 50); - done_tx.send(()).unwrap(); - return Poll::Ready(()); - } - - cnt += 1; - - cx.waker().wake_by_ref(); - Poll::Pending - })); - - done_rx.recv().unwrap(); -} diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index cf6b66d8..92f3cfbd 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -1,8 +1,9 @@ -use crate::blocking; use crate::loom::cell::CausalCell; use crate::loom::sync::Arc; -use crate::runtime::park::{Park, Unpark}; -use crate::runtime::thread_pool::{current, shutdown, slice, Callback, Owned, Shared, Spawner}; +use crate::park::Park; +use crate::runtime::{self, blocking}; +use crate::runtime::park::Parker; +use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner}; use crate::task::Task; use std::cell::Cell; @@ -37,23 +38,17 @@ cfg_blocking! { } } -pub(crate) struct Worker<P: Park + 'static> { +pub(crate) struct Worker { /// Parks the thread. Requires the calling worker to have obtained unique /// access via the generation synchronization action. - inner: Arc<Inner<P>>, + inner: Arc<Inner>, /// Scheduler slices - slices: Arc<slice::Set<P::Unpark>>, + slices: Arc<slice::Set>, /// Slice assigned to this worker index: usize, - /// Handle to the blocking pool - blocking_pool: blocking::Spawner, - - /// Run before calling worker logic - around_worker: Callback, - /// Worker generation. This is used to synchronize access to the internal /// data. generation: usize, @@ -64,21 +59,17 @@ pub(crate) struct Worker<P: Park + 'static> { /// Internal worker state. This may be referenced from multiple threads, but the /// generation guard protects unsafe access -struct Inner<P: Park + 'static> { +struct Inner { /// Used to park the thread - park: CausalCell<P>, - - /// Only held so that the scheduler can be signaled on shutdown. - shutdown_tx: shutdown::Sender, + park: CausalCell<Parker>, } -// TODO: clean up -unsafe impl<P: Park + Send + 'static> Send for Worker<P> {} +unsafe impl Send for Worker {} /// Used to ensure the invariants are respected -struct GenerationGuard<'a, P: Park + 'static> { +struct GenerationGuard<'a> { /// Worker reference - worker: &'a Worker<P>, + worker: &'a Worker, /// Prevent `Sync` access _p: PhantomData<Cell<()>>, @@ -87,38 +78,28 @@ struct GenerationGuard<'a, P: Park + 'static> { struct WorkerGone; // TODO: Move into slices -pub(super) fn create_set<F, P>( +pub(super) fn create_set( pool_size: usize, - mk_park: F, - blocking_pool: blocking::Spawner, - around_worker: Callback, - shutdown_tx: shutdown::Sender, -) -> (Arc<slice::Set<P::Unpark>>, Vec<Worker<P>>) -where - P: Send + Park, - F: FnMut(usize) -> P, -{ + parker: Parker, +) -> (Arc<slice::Set>, Vec<Worker>) { // Create the parks... - let parks: Vec<_> = (0..pool_size).map(mk_park).collect(); + let parkers: Vec<_> = (0..pool_size).map(|_| parker.clone()).collect(); - let mut slices = Arc::new(slice::Set::new(pool_size, |i| parks[i].unpark())); + let mut slices = Arc::new(slice::Set::new(&parkers)); // Establish the circular link between the individual worker state // structure and the container. Arc::get_mut(&mut slices).unwrap().set_ptr(); // This will contain each worker. - let workers = parks + let workers = parkers .into_iter() .enumerate() - .map(|(index, park)| { + .map(|(index, parker)| { Worker::new( slices.clone(), index, - park, - blocking_pool.clone(), - around_worker.clone(), - shutdown_tx.clone(), + parker, ) }) .collect(); @@ -132,114 +113,94 @@ where /// The number is fairly arbitrary. I believe this value was copied from golang. const GLOBAL_POLL_INTERVAL: u16 = 61; -impl<P> Worker<P> -where - P: Send + Park, < |