summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/thread_pool
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/thread_pool')
-rw-r--r--tokio/src/runtime/thread_pool/current.rs9
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs95
-rw-r--r--tokio/src/runtime/thread_pool/owned.rs21
-rw-r--r--tokio/src/runtime/thread_pool/shared.rs32
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs76
-rw-r--r--tokio/src/runtime/thread_pool/spawner.rs7
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs101
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_queue.rs3
-rw-r--r--tokio/src/runtime/thread_pool/tests/mod.rs3
-rw-r--r--tokio/src/runtime/thread_pool/tests/pool.rs206
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs233
11 files changed, 187 insertions, 599 deletions
diff --git a/tokio/src/runtime/thread_pool/current.rs b/tokio/src/runtime/thread_pool/current.rs
index 1ab83c54..60a20723 100644
--- a/tokio/src/runtime/thread_pool/current.rs
+++ b/tokio/src/runtime/thread_pool/current.rs
@@ -1,5 +1,4 @@
use crate::loom::sync::Arc;
-use crate::runtime::park::Unpark;
use crate::runtime::thread_pool::{slice, Owned};
use std::cell::Cell;
@@ -23,10 +22,9 @@ struct Inner {
// Pointer to the current worker info
thread_local!(static CURRENT_WORKER: Cell<Inner> = Cell::new(Inner::new()));
-pub(super) fn set<F, R, P>(pool: &Arc<slice::Set<P>>, index: usize, f: F) -> R
+pub(super) fn set<F, R>(pool: &Arc<slice::Set>, index: usize, f: F) -> R
where
F: FnOnce() -> R,
- P: Unpark,
{
CURRENT_WORKER.with(|cell| {
assert!(cell.get().workers.is_null());
@@ -65,10 +63,7 @@ where
}
impl Current {
- pub(super) fn as_member<'a, P>(&self, set: &'a slice::Set<P>) -> Option<&'a Owned<P>>
- where
- P: Unpark,
- {
+ pub(super) fn as_member<'a>(&self, set: &'a slice::Set) -> Option<&'a Owned> {
let inner = CURRENT_WORKER.with(|cell| cell.get());
if ptr::eq(inner.workers as *const _, set.shared().as_ptr()) {
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 4b23c3b9..3d795fa4 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -18,9 +18,8 @@ mod slice;
mod shared;
use self::shared::Shared;
-mod shutdown;
-
mod worker;
+use worker::Worker;
cfg_blocking! {
pub(crate) use worker::block_in_place;
@@ -39,9 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
-use crate::blocking;
-use crate::loom::sync::Arc;
-use crate::runtime::Park;
+use crate::runtime::{self, blocking, Parker};
use crate::task::JoinHandle;
use std::fmt;
@@ -50,48 +47,29 @@ use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct ThreadPool {
spawner: Spawner,
-
- /// Shutdown waiter
- shutdown_rx: shutdown::Receiver,
}
-// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
-// loom doesn't support that because it requires CoerceUnsized, which is
-// unstable
-type Callback = Arc<Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>>;
+pub(crate) struct Workers {
+ workers: Vec<Worker>,
+}
impl ThreadPool {
- pub(crate) fn new<F, P>(
+ pub(crate) fn new(
pool_size: usize,
- blocking_pool: blocking::Spawner,
- around_worker: Callback,
- mut build_park: F,
- ) -> ThreadPool
- where
- F: FnMut(usize) -> P,
- P: Park + Send + 'static,
- {
- let (shutdown_tx, shutdown_rx) = shutdown::channel();
-
- let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
+ parker: Parker,
+ ) -> (ThreadPool, Workers) {
+ let (pool, workers) = worker::create_set(
pool_size,
- |i| BoxedPark::new(build_park(i)),
- blocking_pool.clone(),
- around_worker,
- shutdown_tx,
+ parker,
);
- // Spawn threads for each worker
- for worker in workers {
- blocking_pool.spawn_background(|| worker.run());
- }
-
let spawner = Spawner::new(pool);
- ThreadPool {
+ let pool = ThreadPool {
spawner,
- shutdown_rx,
- }
+ };
+
+ (pool, Workers { workers })
}
/// Returns reference to `Spawner`.
@@ -124,13 +102,6 @@ impl ThreadPool {
enter.block_on(future)
})
}
-
- /// Shutdown the thread pool.
- pub(crate) fn shutdown_now(&mut self) {
- if self.spawner.workers().close() {
- self.shutdown_rx.wait();
- }
- }
}
impl fmt::Debug for ThreadPool {
@@ -141,37 +112,17 @@ impl fmt::Debug for ThreadPool {
impl Drop for ThreadPool {
fn drop(&mut self) {
- self.shutdown_now();
+ self.spawner.workers().close();
}
}
-// TODO: delete?
-pub(crate) struct BoxedPark<P> {
- inner: P,
-}
-
-impl<P> BoxedPark<P> {
- pub(crate) fn new(inner: P) -> Self {
- BoxedPark { inner }
- }
-}
-
-impl<P> Park for BoxedPark<P>
-where
- P: Park,
-{
- type Unpark = Box<dyn crate::runtime::park::Unpark>;
- type Error = P::Error;
-
- fn unpark(&self) -> Self::Unpark {
- Box::new(self.inner.unpark())
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.inner.park()
- }
-
- fn park_timeout(&mut self, duration: std::time::Duration) -> Result<(), Self::Error> {
- self.inner.park_timeout(duration)
+impl Workers {
+ pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) {
+ blocking_pool.enter(|| {
+ for worker in self.workers {
+ let b = blocking_pool.clone();
+ runtime::spawn_blocking(move || worker.run(b));
+ }
+ });
}
}
diff --git a/tokio/src/runtime/thread_pool/owned.rs b/tokio/src/runtime/thread_pool/owned.rs
index 88284d5e..b60eb7f3 100644
--- a/tokio/src/runtime/thread_pool/owned.rs
+++ b/tokio/src/runtime/thread_pool/owned.rs
@@ -7,7 +7,7 @@ use std::cell::Cell;
/// Per-worker data accessible only by the thread driving the worker.
#[derive(Debug)]
-pub(super) struct Owned<P: 'static> {
+pub(super) struct Owned {
/// Worker generation. This guards concurrent access to the `Owned` struct.
/// When a worker starts running, it checks that the generation it has
/// assigned matches the current generation. When it does, the worker has
@@ -36,17 +36,14 @@ pub(super) struct Owned<P: 'static> {
pub(super) rand: FastRand,
/// Work queue
- pub(super) work_queue: queue::Worker<Shared<P>>,
+ pub(super) work_queue: queue::Worker<Shared>,
/// List of tasks owned by the worker
- pub(super) owned_tasks: task::OwnedList<Shared<P>>,
+ pub(super) owned_tasks: task::OwnedList<Shared>,
}
-impl<P> Owned<P>
-where
- P: 'static,
-{
- pub(super) fn new(work_queue: queue::Worker<Shared<P>>, rand: FastRand) -> Owned<P> {
+impl Owned {
+ pub(super) fn new(work_queue: queue::Worker<Shared>, rand: FastRand) -> Owned {
Owned {
generation: AtomicUsize::new(0),
tick: Cell::new(1),
@@ -61,7 +58,7 @@ where
}
/// Returns `true` if a worker should be notified
- pub(super) fn submit_local(&self, task: Task<Shared<P>>) -> bool {
+ pub(super) fn submit_local(&self, task: Task<Shared>) -> bool {
let ret = self.work_queue.push(task);
if self.defer_notification.get() {
@@ -72,15 +69,15 @@ where
}
}
- pub(super) fn submit_local_yield(&self, task: Task<Shared<P>>) {
+ pub(super) fn submit_local_yield(&self, task: Task<Shared>) {
self.work_queue.push_yield(task);
}
- pub(super) fn bind_task(&mut self, task: &Task<Shared<P>>) {
+ pub(super) fn bind_task(&mut self, task: &Task<Shared>) {
self.owned_tasks.insert(task);
}
- pub(super) fn release_task(&mut self, task: &Task<Shared<P>>) {
+ pub(super) fn release_task(&mut self, task: &Task<Shared>) {
self.owned_tasks.remove(task);
}
}
diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs
index 99981151..86c784ad 100644
--- a/tokio/src/runtime/thread_pool/shared.rs
+++ b/tokio/src/runtime/thread_pool/shared.rs
@@ -1,4 +1,5 @@
-use crate::runtime::park::Unpark;
+use crate::park::Unpark;
+use crate::runtime::Unparker;
use crate::runtime::thread_pool::slice;
use crate::task::{self, Schedule, Task};
@@ -11,12 +12,9 @@ use std::ptr;
/// - other workers
/// - tasks
///
-pub(crate) struct Shared<P>
-where
- P: 'static,
-{
+pub(crate) struct Shared {
/// Thread unparker
- unpark: P,
+ unpark: Unparker,
/// Tasks pending drop. Any worker pushes tasks, only the "owning" worker
/// pops.
@@ -26,17 +24,14 @@ where
///
/// The slice::Set itself is tracked by an `Arc`, but this pointer is not
/// included in the ref count.
- slices: *const slice::Set<P>,
+ slices: *const slice::Set,
}
-unsafe impl<P: Unpark> Send for Shared<P> {}
-unsafe impl<P: Unpark> Sync for Shared<P> {}
+unsafe impl Send for Shared {}
+unsafe impl Sync for Shared {}
-impl<P> Shared<P>
-where
- P: Unpark,
-{
- pub(super) fn new(unpark: P) -> Shared<P> {
+impl Shared {
+ pub(super) fn new(unpark: Unparker) -> Shared {
Shared {
unpark,
pending_drop: task::TransferStack::new(),
@@ -52,19 +47,16 @@ where
self.unpark.unpark();
}
- fn slices(&self) -> &slice::Set<P> {
+ fn slices(&self) -> &slice::Set {
unsafe { &*self.slices }
}
- pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set<P>) {
+ pub(super) fn set_slices_ptr(&mut self, slices: *const slice::Set) {
self.slices = slices;
}
}
-impl<P> Schedule for Shared<P>
-where
- P: Unpark,
-{
+impl Schedule for Shared {
fn bind(&self, task: &Task<Self>) {
// Get access to the Owned component. This function can only be called
// when on the worker.
diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs
index 4b3ef996..aa521a15 100644
--- a/tokio/src/runtime/thread_pool/slice.rs
+++ b/tokio/src/runtime/thread_pool/slice.rs
@@ -3,7 +3,8 @@
//! slice.
use crate::loom::rand::seed;
-use crate::runtime::park::Unpark;
+use crate::park::Park;
+use crate::runtime::Parker;
use crate::runtime::thread_pool::{current, queue, Idle, Owned, Shared};
use crate::task::{self, JoinHandle, Task};
use crate::util::{CachePadded, FastRand};
@@ -11,48 +12,38 @@ use crate::util::{CachePadded, FastRand};
use std::cell::UnsafeCell;
use std::future::Future;
-pub(super) struct Set<P>
-where
- P: 'static,
-{
+pub(super) struct Set {
/// Data accessible from all workers.
- shared: Box<[Shared<P>]>,
+ shared: Box<[Shared]>,
/// Data owned by the worker.
- owned: Box<[UnsafeCell<CachePadded<Owned<P>>>]>,
+ owned: Box<[UnsafeCell<CachePadded<Owned>>]>,
/// Submit work to the pool while *not* currently on a worker thread.
- inject: queue::Inject<Shared<P>>,
+ inject: queue::Inject<Shared>,
/// Coordinates idle workers
idle: Idle,
}
-unsafe impl<P: Unpark> Send for Set<P> {}
-unsafe impl<P: Unpark> Sync for Set<P> {}
+unsafe impl Send for Set {}
+unsafe impl Sync for Set {}
-impl<P> Set<P>
-where
- P: Unpark,
-{
+impl Set {
/// Create a new worker set using the provided queues.
- pub(crate) fn new<F>(num_workers: usize, mut mk_unpark: F) -> Self
- where
- F: FnMut(usize) -> P,
- {
- assert!(num_workers > 0);
+ pub(crate) fn new(parkers: &[Parker]) -> Self {
+ assert!(!parkers.is_empty());
- let queues = queue::build(num_workers);
+ let queues = queue::build(parkers.len());
let inject = queues[0].injector();
let mut shared = Vec::with_capacity(queues.len());
let mut owned = Vec::with_capacity(queues.len());
for (i, queue) in queues.into_iter().enumerate() {
- let unpark = mk_unpark(i);
let rand = FastRand::new(seed());
- shared.push(Shared::new(unpark));
+ shared.push(Shared::new(parkers[i].unpark()));
owned.push(UnsafeCell::new(CachePadded::new(Owned::new(queue, rand))));
}
@@ -60,12 +51,21 @@ where
shared: shared.into_boxed_slice(),
owned: owned.into_boxed_slice(),
inject,
- idle: Idle::new(num_workers),
- // blocking,
+ idle: Idle::new(parkers.len()),
}
}
- fn inject_task(&self, task: Task<Shared<P>>) {
+ pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.schedule(task);
+ handle
+ }
+
+ fn inject_task(&self, task: Task<Shared>) {
self.inject.push(task, |res| {
if let Err(task) = res {
task.shutdown();
@@ -95,7 +95,7 @@ where
}
}
- pub(crate) fn schedule(&self, task: Task<Shared<P>>) {
+ pub(crate) fn schedule(&self, task: Task<Shared>) {
current::get(|current_worker| match current_worker.as_member(self) {
Some(worker) => {
if worker.submit_local(task) {
@@ -136,28 +136,26 @@ where
self.shared.len()
}
- pub(super) fn index_of(&self, shared: &Shared<P>) -> usize {
+ pub(super) fn index_of(&self, shared: &Shared) -> usize {
use std::mem;
- let size = mem::size_of::<Shared<P>>();
+ let size = mem::size_of::<Shared>();
((shared as *const _ as usize) - (&self.shared[0] as *const _ as usize)) / size
}
- pub(super) fn shared(&self) -> &[Shared<P>] {
+ pub(super) fn shared(&self) -> &[Shared] {
&self.shared
}
- pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned<P>>>] {
+ pub(super) fn owned(&self) -> &[UnsafeCell<CachePadded<Owned>>] {
&self.owned
}
pub(super) fn idle(&self) -> &Idle {
&self.idle
}
-}
-impl<P: 'static> Set<P> {
/// Wait for all locks on the injection queue to drop.
///
/// This is done by locking w/o doing anything.
@@ -166,21 +164,9 @@ impl<P: 'static> Set<P> {
}
}
-impl<P: 'static> Drop for Set<P> {
+impl Drop for Set {
fn drop(&mut self) {
// Before proceeding, wait for all concurrent wakers to exit
self.wait_for_unlocked();
}
}
-
-impl Set<Box<dyn Unpark>> {
- pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let (task, handle) = task::joinable(future);
- self.schedule(task);
- handle
- }
-}
diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs
index 4773ea9a..4fccad96 100644
--- a/tokio/src/runtime/thread_pool/spawner.rs
+++ b/tokio/src/runtime/thread_pool/spawner.rs
@@ -1,5 +1,4 @@
use crate::loom::sync::Arc;
-use crate::runtime::park::Unpark;
use crate::runtime::thread_pool::slice;
use crate::task::JoinHandle;
@@ -20,11 +19,11 @@ use std::future::Future;
/// [`ThreadPool::spawner`]: struct.ThreadPool.html#method.spawner
#[derive(Clone)]
pub(crate) struct Spawner {
- workers: Arc<slice::Set<Box<dyn Unpark>>>,
+ workers: Arc<slice::Set>,
}
impl Spawner {
- pub(super) fn new(workers: Arc<slice::Set<Box<dyn Unpark>>>) -> Spawner {
+ pub(super) fn new(workers: Arc<slice::Set>) -> Spawner {
Spawner { workers }
}
@@ -46,7 +45,7 @@ impl Spawner {
}
/// Reference to the worker set. Used by `ThreadPool` to initiate shutdown.
- pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> {
+ pub(super) fn workers(&self) -> &slice::Set {
&*self.workers
}
}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index b982e24e..81e292d6 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -1,14 +1,12 @@
+use crate::runtime::{self, Runtime};
use crate::runtime::tests::loom_oneshot as oneshot;
-use crate::runtime::thread_pool::ThreadPool;
-use crate::runtime::{Park, Unpark};
use crate::spawn;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
-use loom::sync::{Arc, Mutex, Notify};
+use loom::sync::{Arc, Mutex};
use std::future::Future;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
-use std::time::Duration;
#[test]
fn pool_multi_spawn() {
@@ -46,7 +44,7 @@ fn pool_multi_spawn() {
#[test]
fn only_blocking() {
loom::model(|| {
- let mut pool = mk_pool(1);
+ let pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(async move {
@@ -56,7 +54,7 @@ fn only_blocking() {
});
block_rx.recv();
- pool.shutdown_now();
+ drop(pool);
});
}
@@ -64,7 +62,7 @@ fn only_blocking() {
fn blocking_and_regular() {
const NUM: usize = 3;
loom::model(|| {
- let mut pool = mk_pool(1);
+ let pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
@@ -91,7 +89,7 @@ fn blocking_and_regular() {
done_rx.recv();
block_rx.recv();
- pool.shutdown_now();
+ drop(pool);
});
}
@@ -153,7 +151,7 @@ fn complete_block_on_under_load() {
use futures::FutureExt;
loom::model(|| {
- let pool = mk_pool(2);
+ let mut pool = mk_pool(2);
pool.block_on({
futures::future::lazy(|_| ()).then(|_| {
@@ -171,20 +169,11 @@ fn complete_block_on_under_load() {
}
fn mk_pool(num_threads: usize) -> Runtime {
- use crate::blocking::BlockingPool;
-
- let blocking_pool = BlockingPool::new("test".into(), None);
- let executor = ThreadPool::new(
- num_threads,
- blocking_pool.spawner().clone(),
- Arc::new(Box::new(|_, next| next())),
- move |_| LoomPark::new(),
- );
-
- Runtime {
- executor,
- blocking_pool,
- }
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .num_threads(num_threads)
+ .build()
+ .unwrap()
}
use futures::future::poll_fn;
@@ -244,69 +233,3 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> {
}
})
}
-
-/// Fake runtime
-struct Runtime {
- executor: ThreadPool,
- #[allow(dead_code)]
- blocking_pool: crate::blocking::BlockingPool,
-}
-
-use std::ops;
-
-impl ops::Deref for Runtime {
- type Target = ThreadPool;
-
- fn deref(&self) -> &ThreadPool {
- &self.executor
- }
-}
-
-impl ops::DerefMut for Runtime {
- fn deref_mut(&mut self) -> &mut ThreadPool {
- &mut self.executor
- }
-}
-
-struct LoomPark {
- notify: Arc<Notify>,
-}
-
-struct LoomUnpark {
- notify: Arc<Notify>,
-}
-
-impl LoomPark {
- fn new() -> LoomPark {
- LoomPark {
- notify: Arc::new(Notify::new()),
- }
- }
-}
-
-impl Park for LoomPark {
- type Unpark = LoomUnpark;
-
- type Error = ();
-
- fn unpark(&self) -> LoomUnpark {
- let notify = self.notify.clone();
- LoomUnpark { notify }
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- self.notify.wait();
- Ok(())
- }
-
- fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
- self.notify.wait();
- Ok(())
- }
-}
-
-impl Unpark for LoomUnpark {
- fn unpark(&self) {
- self.notify.notify();
- }
-}
diff --git a/tokio/src/runtime/thread_pool/tests/loom_queue.rs b/tokio/src/runtime/thread_pool/tests/loom_queue.rs
index d0598c3e..a4e10620 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_queue.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_queue.rs
@@ -64,5 +64,6 @@ fn multi_worker() {
}
fn val(num: u32) -> Task<Noop> {
- task::background(async move { num })
+ let (task, _) = task::joinable(async move { num });
+ task
}
diff --git a/tokio/src/runtime/thread_pool/tests/mod.rs b/tokio/src/runtime/thread_pool/tests/mod.rs
index dc1d3158..6638c558 100644
--- a/tokio/src/runtime/thread_pool/tests/mod.rs
+++ b/tokio/src/runtime/thread_pool/tests/mod.rs
@@ -5,7 +5,4 @@ mod loom_pool;
mod loom_queue;
#[cfg(not(loom))]
-mod pool;
-
-#[cfg(not(loom))]
mod queue;
diff --git a/tokio/src/runtime/thread_pool/tests/pool.rs b/tokio/src/runtime/thread_pool/tests/pool.rs
deleted file mode 100644
index 25c11ea9..00000000
--- a/tokio/src/runtime/thread_pool/tests/pool.rs
+++ /dev/null
@@ -1,206 +0,0 @@
-#![warn(rust_2018_idioms)]
-
-use crate::blocking;
-use crate::runtime::thread_pool::ThreadPool;
-use crate::runtime::{Park, Unpark};
-
-use futures::future::poll_fn;
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::atomic::Ordering::Relaxed;
-use std::sync::atomic::*;
-use std::sync::{mpsc, Arc};
-use std::task::{Context, Poll, Waker};
-use std::time::Duration;
-
-#[test]
-fn eagerly_drops_futures() {
- use std::sync::{mpsc, Mutex};
-
- struct MyPark {
- rx: mpsc::Receiver<()>,
- tx: Mutex<mpsc::Sender<()>>,
- #[allow(dead_code)]
- park_tx: mpsc::SyncSender<()>,
- unpark_tx: mpsc::SyncSender<()>,
- }
-
- impl Park for MyPark {
- type Unpark = MyUnpark;
- type Error = ();
-
- fn unpark(&self) -> Self::Unpark {
- MyUnpark {
- tx: Mutex::new(self.tx.lock().unwrap().clone()),
- unpark_tx: self.unpark_tx.clone(),
- }
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- let _ = self.rx.recv();
- Ok(())
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- let _ = self.rx.recv_timeout(duration);
- Ok(())
- }
- }
-
- struct MyUnpark {
- tx: Mutex<mpsc::Sender<()>>,
- #[allow(dead_code)]
- unpark_tx: mpsc::SyncSender<()>,
- }
-
- impl Unpark for MyUnpark {
- fn unpark(&self) {
- let _ = self.tx.lock().unwrap().send(());
- }
- }
-
- let (task_tx, task_rx) = mpsc::channel();
- let (drop_tx, drop_rx) = mpsc::channel();
- let (park_tx, park_rx) = mpsc::sync_channel(0);
- let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
-
- let blocking_pool = blocking::BlockingPool::new("test".into(), None);
-
- let pool = ThreadPool::new(
- 4,
- blocking_pool.spawner().clone(),
- Arc::new(Box::new(|_, next| next())),
- move |_| {
- let (tx, rx) = mpsc::channel();
- MyPark {
- tx: Mutex::new(tx),
- rx,
- park_tx: park_tx.clone(),
- unpark_tx: unpark_tx.clone(),
- }
- },
- );
-
- struct MyTask {
- task_tx: Option<mpsc::Sender<Waker>>,
- drop_tx: mpsc::Sender<()>,
- }
-
- impl Future for MyTask {
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- if let Some(tx) = self.get_mut().task_tx.take() {
- tx.send(cx.waker().clone()).unwrap();
- }
-
- Poll::Pending
- }
- }
-
- impl Drop for MyTask {
- fn drop(&mut self) {
- self.drop_tx.send(()).unwrap();
- }
- }
-
- pool.spawn(MyTask {
- task_tx: Some(task_tx),
- drop_tx,
- });
-
- // Wait until we get the task handle.
- let task = task_rx.recv().unwrap();
-
- // Drop the pool, this should result in futures being forcefully dropped.
- drop(pool);
-
- // Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
- assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
- assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
-
- // If the future is forcefully dropped, then we will get a signal here.
- drop_rx.recv().unwrap();
-
- // Ensure `task` lives until after the test completes.
- drop(task);
-}
-
-#[test]
-fn park_called_at_interval() {
- struct MyPark {
- park_light: Arc<AtomicBool>,
- }
-
- struct MyUnpark {}
-
- impl Park for MyPark {
- type Unpark = MyUnpark;
- type Error = ();
-
- fn unpark(&self) -> Self::Unpark {
- MyUnpark {}
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
- use std::thread;
- use std::time::Duration;
-
- thread::sleep(Duration::from_millis(1));
- Ok(())
- }
-
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- if duration == Duration::from_millis(0) {
- self.park_light.store(true, Relaxed);
- Ok(())
- } else {
- self.park()
- }
- }
- }
-
- impl Unpark for MyUnpark {
- fn unpark(&self) {}
- }
-
- let park_light_1 = Arc::new(AtomicBool::new(false));
- let park_light_2 = park_light_1.clone();
-
- let (done_tx, done_rx) = mpsc::channel();
-
- let blocking_pool = blocking::BlockingPool::new("test".into(), None);
-
- let pool = ThreadPool::new(
- 1,
- blocking_pool.spawner().clone(),
- Arc::new(Box::new(|_, next| next())),
- move |idx| {
- assert_eq!(idx, 0);
- MyPark {
- park_light: park_light_2.clone(),
- }
- },
- );
-
- let mut cnt = 0;
-
- pool.spawn(poll_fn(move |cx| {
- let did_park_light = park_light_1.load(Relaxed);
-
- if did_park_light {
- // There is a bit of a race where the worker can tick a few times
- // before seeing the task
- assert!(cnt > 50);
- done_tx.send(()).unwrap();
- return Poll::Ready(());
- }
-
- cnt += 1;
-
- cx.waker().wake_by_ref();
- Poll::Pending
- }));
-
- done_rx.recv().unwrap();
-}
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index cf6b66d8..92f3cfbd 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -1,8 +1,9 @@
-use crate::blocking;
use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc;
-use crate::runtime::park::{Park, Unpark};
-use crate::runtime::thread_pool::{current, shutdown, slice, Callback, Owned, Shared, Spawner};
+use crate::park::Park;
+use crate::runtime::{self, blocking};
+use crate::runtime::park::Parker;
+use crate::runtime::thread_pool::{current, slice, Owned, Shared, Spawner};
use crate::task::Task;
use std::cell::Cell;
@@ -37,23 +38,17 @@ cfg_blocking! {
}
}
-pub(crate) struct Worker<P: Park + 'static> {
+pub(crate) struct Worker {
/// Parks the thread. Requires the calling worker to have obtained unique
/// access via the generation synchronization action.
- inner: Arc<Inner<P>>,
+ inner: Arc<Inner>,
/// Scheduler slices
- slices: Arc<slice::Set<P::Unpark>>,
+ slices: Arc<slice::Set>,
/// Slice assigned to this worker
index: usize,
- /// Handle to the blocking pool
- blocking_pool: blocking::Spawner,
-
- /// Run before calling worker logic
- around_worker: Callback,
-
/// Worker generation. This is used to synchronize access to the internal
/// data.
generation: usize,
@@ -64,21 +59,17 @@ pub(crate) struct Worker<P: Park + 'static> {
/// Internal worker state. This may be referenced from multiple threads, but the
/// generation guard protects unsafe access
-struct Inner<P: Park + 'static> {
+struct Inner {
/// Used to park the thread
- park: CausalCell<P>,
-
- /// Only held so that the scheduler can be signaled on shutdown.
- shutdown_tx: shutdown::Sender,
+ park: CausalCell<Parker>,
}
-// TODO: clean up
-unsafe impl<P: Park + Send + 'static> Send for Worker<P> {}
+unsafe impl Send for Worker {}
/// Used to ensure the invariants are respected
-struct GenerationGuard<'a, P: Park + 'static> {
+struct GenerationGuard<'a> {
/// Worker reference
- worker: &'a Worker<P>,
+ worker: &'a Worker,
/// Prevent `Sync` access
_p: PhantomData<Cell<()>>,
@@ -87,38 +78,28 @@ struct GenerationGuard<'a, P: Park + 'static> {
struct WorkerGone;
// TODO: Move into slices
-pub(super) fn create_set<F, P>(
+pub(super) fn create_set(
pool_size: usize,
- mk_park: F,
- blocking_pool: blocking::Spawner,
- around_worker: Callback,
- shutdown_tx: shutdown::Sender,
-) -> (Arc<slice::Set<P::Unpark>>, Vec<Worker<P>>)
-where
- P: Send + Park,
- F: FnMut(usize) -> P,
-{
+ parker: Parker,
+) -> (Arc<slice::Set>, Vec<Worker>) {
// Create the parks...
- let parks: Vec<_> = (0..pool_size).map(mk_park).collect();
+ let parkers: Vec<_> = (0..pool_size).map(|_| parker.clone()).collect();
- let mut slices = Arc::new(slice::Set::new(pool_size, |i| parks[i].unpark()));
+ let mut slices = Arc::new(slice::Set::new(&parkers));
// Establish the circular link between the individual worker state
// structure and the container.
Arc::get_mut(&mut slices).unwrap().set_ptr();
// This will contain each worker.
- let workers = parks
+ let workers = parkers
.into_iter()
.enumerate()
- .map(|(index, park)| {
+ .map(|(index, parker)| {
Worker::new(
slices.clone(),
index,
- park,
- blocking_pool.clone(),
- around_worker.clone(),
- shutdown_tx.clone(),
+ parker,
)
})
.collect();
@@ -132,114 +113,94 @@ where
/// The number is fairly arbitrary. I believe this value was copied from golang.
const GLOBAL_POLL_INTERVAL: u16 = 61;
-impl<P> Worker<P>
-where
- P: Send + Park,
<