summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2019-10-30 11:58:49 -0400
committerCarl Lerche <me@carllerche.com>2019-10-30 08:58:49 -0700
commit109fd3086b97896c422d9565e7d1ddbe4b6f300b (patch)
treeae2b1cf8e5b2a90fdb677b25ca0b1294126bdc5e /tokio
parente3261440e56ebf576790e4c6f7f125a6c8759659 (diff)
thread-pool: in-place blocking with new scheduler (#1681)
The initial new scheduler PR omitted in-place blocking support. This patch brings it back.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/executor/task/harness.rs10
-rw-r--r--tokio/src/executor/task/mod.rs2
-rw-r--r--tokio/src/executor/task/raw.rs9
-rw-r--r--tokio/src/executor/task/tests/loom.rs16
-rw-r--r--tokio/src/executor/task/tests/task.rs64
-rw-r--r--tokio/src/executor/thread_pool/builder.rs15
-rw-r--r--tokio/src/executor/thread_pool/current.rs4
-rw-r--r--tokio/src/executor/thread_pool/mod.rs3
-rw-r--r--tokio/src/executor/thread_pool/queue/inject.rs5
-rw-r--r--tokio/src/executor/thread_pool/set.rs15
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_pool.rs56
-rw-r--r--tokio/src/executor/thread_pool/tests/loom_queue.rs4
-rw-r--r--tokio/src/executor/thread_pool/tests/queue.rs2
-rw-r--r--tokio/src/executor/thread_pool/tests/worker.rs14
-rw-r--r--tokio/src/executor/thread_pool/worker.rs314
-rw-r--r--tokio/tests/thread_pool.rs48
16 files changed, 495 insertions, 86 deletions
diff --git a/tokio/src/executor/task/harness.rs b/tokio/src/executor/task/harness.rs
index ef2978dc..e5355e4f 100644
--- a/tokio/src/executor/task/harness.rs
+++ b/tokio/src/executor/task/harness.rs
@@ -51,7 +51,7 @@ where
/// Panics raised while polling the future are handled.
///
/// Returns `true` if the task needs to be scheduled again
- pub(super) fn poll(mut self, executor: NonNull<S>) -> bool {
+ pub(super) fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool {
use std::panic;
// Transition the task to the running state.
@@ -81,6 +81,7 @@ where
// own the task here.
let task = ManuallyDrop::new(Task::from_raw(header.into()));
// Call the scheduler's bind callback
+ let executor = executor().expect("first poll must happen from an executor");
executor.as_ref().bind(&task);
header.executor.with_mut(|ptr| *ptr = Some(executor));
}
@@ -393,7 +394,7 @@ where
fn complete(
mut self,
- executor: NonNull<S>,
+ executor: &mut dyn FnMut() -> Option<NonNull<S>>,
join_interest: bool,
output: super::Result<T::Output>,
) {
@@ -402,15 +403,16 @@ where
self.core().store_output(output);
}
+ let executor = executor();
let bound_executor = unsafe { self.header().executor.with(|ptr| *ptr) };
// Handle releasing the task. First, check if the current
// executor is the one that is bound to the task:
- if Some(executor) == bound_executor {
+ if executor.is_some() && executor == bound_executor {
unsafe {
// perform a local release
let task = ManuallyDrop::new(self.to_task());
- executor.as_ref().release_local(&task);
+ executor.as_ref().unwrap().as_ref().release_local(&task);
if self.transition_to_released(join_interest).is_final_ref() {
self.dealloc();
diff --git a/tokio/src/executor/task/mod.rs b/tokio/src/executor/task/mod.rs
index dfc6628e..3e4e500b 100644
--- a/tokio/src/executor/task/mod.rs
+++ b/tokio/src/executor/task/mod.rs
@@ -99,7 +99,7 @@ impl<S: 'static> Task<S> {
impl<S: Schedule> Task<S> {
/// Returns `self` when the task needs to be immediately re-scheduled
- pub(crate) fn run(self, executor: NonNull<S>) -> Option<Self> {
+ pub(crate) fn run(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> Option<Self> {
if unsafe { self.raw.poll(executor) } {
Some(self)
} else {
diff --git a/tokio/src/executor/task/raw.rs b/tokio/src/executor/task/raw.rs
index a9b048fc..d0ffac3a 100644
--- a/tokio/src/executor/task/raw.rs
+++ b/tokio/src/executor/task/raw.rs
@@ -15,7 +15,7 @@ pub(super) struct RawTask<S: 'static> {
pub(super) struct Vtable<S: 'static> {
/// Poll the future
- pub(super) poll: unsafe fn(*mut (), NonNull<S>) -> bool,
+ pub(super) poll: unsafe fn(*mut (), &mut dyn FnMut() -> Option<NonNull<S>>) -> bool,
/// The task handle has been dropped and the join waker needs to be dropped
/// or the task struct needs to be deallocated
@@ -101,7 +101,7 @@ impl<S> RawTask<S> {
/// Safety: mutual exclusion is required to call this function.
///
/// Returns `true` if the task needs to be scheduled again.
- pub(super) unsafe fn poll(self, executor: NonNull<S>) -> bool {
+ pub(super) unsafe fn poll(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool {
// Get the vtable without holding a ref to the meta struct. This is done
// because a mutable reference to the task is passed into the poll fn.
let vtable = self.header().vtable;
@@ -150,7 +150,10 @@ impl<S: 'static> Clone for RawTask<S> {
impl<S: 'static> Copy for RawTask<S> {}
-unsafe fn poll<T: Future, S: Schedule>(ptr: *mut (), executor: NonNull<S>) -> bool {
+unsafe fn poll<T: Future, S: Schedule>(
+ ptr: *mut (),
+ executor: &mut dyn FnMut() -> Option<NonNull<S>>,
+) -> bool {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll(executor)
}
diff --git a/tokio/src/executor/task/tests/loom.rs b/tokio/src/executor/task/tests/loom.rs
index 53539987..b377aeb9 100644
--- a/tokio/src/executor/task/tests/loom.rs
+++ b/tokio/src/executor/task/tests/loom.rs
@@ -15,7 +15,7 @@ fn create_drop_join_handle() {
let (task, join_handle) = task::joinable(async { "hello" });
let schedule = LoomSchedule::new();
- let schedule = From::from(&schedule);
+ let schedule = &mut || Some(From::from(&schedule));
let th = thread::spawn(move || {
drop(join_handle);
@@ -37,7 +37,7 @@ fn poll_drop_handle_then_drop() {
let (task, mut join_handle) = task::joinable(async { "hello" });
let schedule = LoomSchedule::new();
- let schedule = From::from(&schedule);
+ let schedule = &mut || Some(From::from(&schedule));
let th = thread::spawn(move || {
block_on(poll_fn(|cx| {
@@ -58,7 +58,7 @@ fn join_output() {
let (task, join_handle) = task::joinable(async { "hello world" });
let schedule = LoomSchedule::new();
- let schedule = From::from(&schedule);
+ let schedule = &mut || Some(From::from(&schedule));
let th = thread::spawn(move || {
let out = assert_ok!(block_on(join_handle));
@@ -115,12 +115,12 @@ fn release_remote() {
// Join handle
let th = join_one_task(join_handle);
- let task = match task.run(From::from(&s1)) {
+ let task = match task.run(&mut || Some(From::from(&s1))) {
Some(task) => task,
None => s1.recv().expect("released!"),
};
- assert_none!(task.run(From::from(&s2)));
+ assert_none!(task.run(&mut || Some(From::from(&s2))));
assert_none!(s1.recv());
assert_ok!(th.join().unwrap());
@@ -152,7 +152,7 @@ fn shutdown_from_list_after_poll() {
// Join handle
let th = join_two_tasks(join_handle);
- match task.run(From::from(&s1)) {
+ match task.run(&mut || Some(From::from(&s1))) {
Some(task) => {
// always drain the list before calling shutdown on tasks
list.shutdown();
@@ -184,7 +184,7 @@ fn shutdown_from_queue_after_poll() {
// Join handle
let th = join_two_tasks(join_handle);
- let task = match task.run(From::from(&s1)) {
+ let task = match task.run(&mut || Some(From::from(&s1))) {
Some(task) => task,
None => assert_some!(s1.recv()),
};
@@ -239,7 +239,7 @@ fn work(schedule: &LoomSchedule) {
let mut task = Some(task);
while let Some(t) = task.take() {
- task = t.run(From::from(schedule));
+ task = t.run(&mut || Some(From::from(schedule)));
}
}
}
diff --git a/tokio/src/executor/task/tests/task.rs b/tokio/src/executor/task/tests/task.rs
index 76319b8d..66fbae59 100644
--- a/tokio/src/executor/task/tests/task.rs
+++ b/tokio/src/executor/task/tests/task.rs
@@ -28,7 +28,7 @@ fn create_complete_drop() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
- let mock = From::from(&mock);
+ let mock = &mut || Some(From::from(&mock));
// Nothing is returned
assert!(task.run(mock).is_none());
@@ -53,7 +53,7 @@ fn create_yield_complete_drop() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
- let mock = From::from(&mock);
+ let mock = &mut || Some(From::from(&mock));
// Task is returned
let task = assert_some!(task.run(mock));
@@ -83,7 +83,7 @@ fn create_clone_yield_complete_drop() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
- let mock = From::from(&mock);
+ let mock = &mut || Some(From::from(&mock));
// Task is returned
let task = assert_some!(task.run(mock));
@@ -111,7 +111,7 @@ fn create_wake_drop() {
let mock = mock().bind(&task).schedule().release_local();
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
assert_none!(mock.next_pending_run());
// The future was **not** dropped.
@@ -121,7 +121,7 @@ fn create_wake_drop() {
let task = assert_some!(mock.next_pending_run());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
// The future was dropped.
assert!(did_drop.did_drop_future());
@@ -143,7 +143,7 @@ fn notify_complete() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
- let mock = From::from(&mock);
+ let mock = &mut || Some(From::from(&mock));
assert_none!(task.run(mock));
assert!(did_drop.did_drop_future());
@@ -165,9 +165,9 @@ fn complete_on_second_schedule_obj() {
let mock2 = mock().bind(&task).release();
// Task is returned
- let task = assert_some!(task.run(From::from(&mock2)));
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock2))));
- assert_none!(task.run(From::from(&mock1)));
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
// The message was sent
assert!(rx.try_recv().is_ok());
@@ -187,7 +187,7 @@ fn join_task_immediate_drop_handle() {
let mock = mock().bind(&task).release_local();
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(did_drop.did_drop_output());
@@ -202,7 +202,7 @@ fn join_task_immediate_complete_1() {
let mock = mock().bind(&task).release_local();
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
@@ -227,7 +227,7 @@ fn join_task_immediate_complete_2() {
assert_pending!(handle.poll());
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
@@ -253,14 +253,14 @@ fn join_task_complete_later() {
let mock = mock().bind(&task).release_local();
- let task = assert_some!(task.run(From::from(&mock)));
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
assert!(!did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
assert!(handle.is_woken());
let out = assert_ready_ok!(handle.poll());
@@ -288,12 +288,12 @@ fn drop_join_after_poll() {
assert_pending!(handle.poll());
drop(handle);
- let task = assert_some!(task.run(From::from(&mock)));
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
assert!(!did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
assert!(did_drop.did_drop_future());
assert!(did_drop.did_drop_output());
@@ -317,7 +317,7 @@ fn join_handle_change_task_complete() {
assert_pending!(t1.poll());
drop(t1);
- let task = assert_some!(task.run(From::from(&mock)));
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
let mut t2 = spawn(poll_fn(|cx| Pin::new(&mut handle).poll(cx)));
assert_pending!(t2.poll());
@@ -325,7 +325,7 @@ fn join_handle_change_task_complete() {
assert!(!did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
assert!(t2.is_woken());
@@ -347,7 +347,7 @@ fn drop_handle_after_complete() {
let mock = mock().bind(&task).release_local();
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(!did_drop.did_drop_output());
@@ -370,7 +370,7 @@ fn non_initial_task_state_drop_join_handle_without_polling() {
let mock = mock().bind(&task).schedule().release_local();
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
drop(handle);
@@ -380,7 +380,7 @@ fn non_initial_task_state_drop_join_handle_without_polling() {
tx.send(()).unwrap();
let task = assert_some!(mock.next_pending_run());
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(did_drop.did_drop_output());
@@ -400,7 +400,7 @@ fn task_panic_background() {
let mock = mock().bind(&task).release_local();
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
}
@@ -422,7 +422,7 @@ fn task_panic_join() {
assert_pending!(handle.poll());
- assert!(task.run(From::from(&mock)).is_none());
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
assert!(did_drop.did_drop_future());
assert!(handle.is_woken());
@@ -443,12 +443,12 @@ fn complete_second_schedule_obj_before_join() {
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock2)));
+ assert_none!(task.run(&mut || Some(From::from(&mock2))));
tx.send("hello").unwrap();
let task = assert_some!(mock2.next_pending_run());
- assert_none!(task.run(From::from(&mock1)));
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
assert!(did_drop.did_drop_future());
// The join handle was notified
@@ -476,12 +476,12 @@ fn complete_second_schedule_obj_after_join() {
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock2)));
+ assert_none!(task.run(&mut || Some(From::from(&mock2))));
tx.send("hello").unwrap();
let task = assert_some!(mock2.next_pending_run());
- assert_none!(task.run(From::from(&mock1)));
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
assert!(did_drop.did_drop_future());
// The join handle was notified
@@ -512,7 +512,7 @@ fn shutdown_from_list_before_notified() {
let mock = mock().bind(&task).release();
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
list.shutdown();
assert!(did_drop.did_drop_future());
@@ -542,7 +542,7 @@ fn shutdown_from_list_after_notified() {
let mock = mock().bind(&task).schedule().release();
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
tx.send(()).unwrap();
@@ -552,7 +552,7 @@ fn shutdown_from_list_after_notified() {
assert_none!(mock.next_pending_drop());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
assert!(did_drop.did_drop_future());
assert!(handle.is_woken());
@@ -580,8 +580,8 @@ fn shutdown_from_list_after_complete() {
let m2 = mock();
assert_pending!(handle.poll());
- let task = assert_some!(task.run(From::from(&m1)));
- assert_none!(task.run(From::from(&m2)));
+ let task = assert_some!(task.run(&mut || Some(From::from(&m1))));
+ assert_none!(task.run(&mut || Some(From::from(&m2))));
assert!(did_drop.did_drop_future());
assert!(handle.is_woken());
@@ -626,7 +626,7 @@ fn shutdown_from_task_after_notified() {
let mock = mock().bind(&task).schedule().release();
assert_pending!(handle.poll());
- assert_none!(task.run(From::from(&mock)));
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
tx.send(()).unwrap();
diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs
index 7955e286..c1298a5a 100644
--- a/tokio/src/executor/thread_pool/builder.rs
+++ b/tokio/src/executor/thread_pool/builder.rs
@@ -155,7 +155,14 @@ impl Builder {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
let around_worker = self.around_worker.as_ref().map(Arc::clone);
- let launch_worker = move |worker: Worker<BoxedPark<P>>| {
+ let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| {
+ // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never
+ // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually
+ // the case. Only `build_with_park` and each worker hold onto a copy of this Arc.
+ // `build_with_park` drops it immediately, and the workers drop theirs when their `run`
+ // method returns (and their copy of the Arc are dropped). In fact, we don't actually
+ // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto
+ // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient.
let shutdown_tx = shutdown_tx.clone();
let around_worker = around_worker.as_ref().map(Arc::clone);
Box::new(move || {
@@ -186,7 +193,8 @@ impl Builder {
// Dropping the handle must happen __after__ the callback
drop(shutdown_tx);
}) as Box<dyn FnOnce() + Send + 'static>
- };
+ })
+ as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>);
let mut blocking = crate::executor::blocking::Builder::default();
blocking.name(self.name.clone());
@@ -197,7 +205,8 @@ impl Builder {
let (pool, workers) = worker::create_set::<_, BoxedPark<P>>(
self.pool_size,
- |i| BoxedPark::new(build_park(i)),
+ |i| Box::new(BoxedPark::new(build_park(i))),
+ Arc::clone(&launch_worker),
blocking.clone(),
);
diff --git a/tokio/src/executor/thread_pool/current.rs b/tokio/src/executor/thread_pool/current.rs
index 6910dca1..f02be101 100644
--- a/tokio/src/executor/thread_pool/current.rs
+++ b/tokio/src/executor/thread_pool/current.rs
@@ -50,6 +50,10 @@ where
})
}
+pub(super) fn clear() {
+ CURRENT_WORKER.with(|cell| cell.set(Inner::new()))
+}
+
pub(super) fn get<F, R>(f: F) -> R
where
F: FnOnce(&Current) -> R,
diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs
index bab594ef..09179245 100644
--- a/tokio/src/executor/thread_pool/mod.rs
+++ b/tokio/src/executor/thread_pool/mod.rs
@@ -37,6 +37,9 @@ mod worker;
#[cfg(test)]
mod tests;
+#[cfg(feature = "blocking")]
+pub use worker::blocking;
+
// Re-export `task::Error`
pub use crate::executor::task::Error;
diff --git a/tokio/src/executor/thread_pool/queue/inject.rs b/tokio/src/executor/thread_pool/queue/inject.rs
index dbad65fd..5f8caf1a 100644
--- a/tokio/src/executor/thread_pool/queue/inject.rs
+++ b/tokio/src/executor/thread_pool/queue/inject.rs
@@ -19,6 +19,11 @@ impl<T: 'static> Inject<T> {
self.cluster.global.push(task, f)
}
+ /// Check if the queue has been closed
+ pub(crate) fn is_closed(&self) -> bool {
+ self.cluster.global.is_closed()
+ }
+
/// Close the queue
///
/// Returns `true` if the channel was closed. `false` indicates the pool was
diff --git a/tokio/src/executor/thread_pool/set.rs b/tokio/src/executor/thread_pool/set.rs
index dddd50e8..5ee2f544 100644
--- a/tokio/src/executor/thread_pool/set.rs
+++ b/tokio/src/executor/thread_pool/set.rs
@@ -150,6 +150,10 @@ where
}
}
+ pub(crate) fn is_closed(&self) -> bool {
+ self.inject.is_closed()
+ }
+
pub(crate) fn len(&self) -> usize {
self.shared.len()
}
@@ -175,10 +179,19 @@ where
}
}
+impl<P: 'static> Set<P> {
+ /// Wait for all locks on the injection queue to drop.
+ ///
+ /// This is done by locking w/o doing anything.
+ pub(super) fn wait_for_unlocked(&self) {
+ self.inject.wait_for_unlocked();
+ }
+}
+
impl<P: 'static> Drop for Set<P> {
fn drop(&mut self) {
// Before proceeding, wait for all concurrent wakers to exit
- self.inject.wait_for_unlocked();
+ self.wait_for_unlocked();
}
}
diff --git a/tokio/src/executor/thread_pool/tests/loom_pool.rs b/tokio/src/executor/thread_pool/tests/loom_pool.rs
index 55da54d0..df754070 100644
--- a/tokio/src/executor/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/executor/thread_pool/tests/loom_pool.rs
@@ -1,9 +1,9 @@
-use crate::spawn;
use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::executor::loom::sync::{Arc, Mutex};
use crate::executor::tests::loom_oneshot as oneshot;
-use crate::executor::thread_pool::ThreadPool;
+use crate::executor::thread_pool::{self, Builder, ThreadPool};
+use crate::spawn;
use std::future::Future;
@@ -42,6 +42,58 @@ fn pool_multi_spawn() {
}
#[test]
+fn only_blocking() {
+ loom::model(|| {
+ let mut pool = Builder::new().num_threads(1).build();
+ let (block_tx, block_rx) = oneshot::channel();
+
+ pool.spawn(async move {
+ thread_pool::blocking(move || {
+ block_tx.send(());
+ })
+ });
+
+ block_rx.recv();
+ pool.shutdown_now();
+ });
+}
+
+#[test]
+fn blocking_and_regular() {
+ const NUM: usize = 3;
+ loom::model(|| {
+ let mut pool = Builder::new().num_threads(1).build();
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let (block_tx, block_rx) = oneshot::channel();
+ let (done_tx, done_rx) = oneshot::channel();
+ let done_tx = Arc::new(Mutex::new(Some(done_tx)));
+
+ pool.spawn(async move {
+ thread_pool::blocking(move || {
+ block_tx.send(());
+ })
+ });
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ let done_tx = done_tx.clone();
+
+ pool.spawn(async move {
+ if NUM == cnt.fetch_add(1, Relaxed) + 1 {
+ done_tx.lock().unwrap().take().unwrap().send(());
+ }
+ });
+ }
+
+ done_rx.recv();
+ block_rx.recv();
+
+ pool.shutdown_now();
+ });
+}
+
+#[test]
fn pool_multi_notify() {
loom::model(|| {
let pool = ThreadPool::new();
diff --git a/tokio/src/executor/thread_pool/tests/loom_queue.rs b/tokio/src/executor/thread_pool/tests/loom_queue.rs
index 8b0214a3..cc9ae449 100644
--- a/tokio/src/executor/thread_pool/tests/loom_queue.rs
+++ b/tokio/src/executor/thread_pool/tests/loom_queue.rs
@@ -25,7 +25,7 @@ fn multi_worker() {
// Try to work
while let Some(task) = q.pop_local_first() {
- assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none());
+ assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none());
let r = rem.get();
assert!(r > 0);
rem.set(r - 1);
@@ -33,7 +33,7 @@ fn multi_worker() {
// Try to steal
if let Some(task) = q.steal(0) {
- assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none());
+ assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none());
let r = rem.get();
assert!(r > 0);
rem.set(r - 1);
diff --git a/tokio/src/executor/thread_pool/tests/queue.rs b/tokio/src/executor/thread_pool/tests/queue.rs
index be89d94d..94a3c7a1 100644
--- a/tokio/src/executor/thread_pool/tests/queue.rs
+++ b/tokio/src/executor/thread_pool/tests/queue.rs
@@ -252,7 +252,7 @@ fn num(task: Task<Noop>) -> u32 {
use std::task::Context;
use std::task::Poll::*;
- assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none());
+ assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none());
// Find the task that completed
TASKS.with(|c| {
diff --git a/tokio/src/executor/thread_pool/tests/worker.rs b/tokio/src/executor/thread_pool/tests/worker.rs
index dc132ab7..f5f9bace 100644
--- a/tokio/src/executor/thread_pool/tests/worker.rs
+++ b/tokio/src/executor/thread_pool/tests/worker.rs
@@ -3,6 +3,8 @@ use crate::executor::thread_pool;
use tokio_test::assert_ok;
+use std::sync::Arc;
+
macro_rules! pool {
(2) => {{
let (pool, mut w, mock_park) = pool!(!2);
@@ -11,8 +13,14 @@ macro_rules! pool {
(! $n:expr) => {{
let mut mock_park = crate::executor::tests::mock_park::MockPark::new();
let blocking = std::sync::Arc::new(crate::executor::blocking::Pool::default());
- let (pool, workers) =
- thread_pool::create_pool($n, |index| mock_park.mk_park(index), blocking);
+ let (pool, workers) = thread_pool::create_pool(
+ $n,
+ |index| Box::new(mock_park.mk_park(index)),
+ Arc::new(Box::new(|_| {
+ unreachable!("attempted to move worker during non-blocking test")
+ })),
+ blocking,
+ );
(pool, workers, mock_park)
}};
}
@@ -39,8 +47,8 @@ fn execute_single_task() {
#[test]
fn task_migrates() {
- use std::sync::mpsc;
use crate::sync::oneshot;
+ use std::sync::mpsc;
let (p, mut w0, mut w1, ..) = pool!(2);
let (tx1, rx1) = oneshot::channel();
diff --git a/tokio/src/executor/thread_pool/worker.rs b/tokio/src/executor/thread_pool/worker.rs
index 7fbe6589..b78c214a 100644
--- a/tokio/src/executor/thread_pool/worker.rs
+++ b/tokio/src/executor/thread_pool/worker.rs
@@ -3,8 +3,57 @@ use crate::executor::park::{Park, Unpark};
use crate::executor::task::Task;
use crate::executor::thread_pool::{current, Owned, Shared};
+use std::cell::Cell;
+use std::ops::{Deref, DerefMut};
use std::time::Duration;
+// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized
+// loom doesn't support that because it requires CoerceUnsized, which is unstable
+type LaunchWorker<P> = Arc<Box<dyn Fn(Worker<P>) -> Box<dyn FnOnce() + Send> + Send + Sync>>;
+
+thread_local! {
+ /// Thread-local tracking the current executor
+ static ON_BLOCK: Cell<Option<*mut dyn FnMut()>> = Cell::new(None)
+}
+
+/// Run the provided blocking function without blocking the executor.
+///
+/// In general, issuing a blocking call or performing a lot of compute in a future without
+/// yielding is not okay, as it may prevent the executor from driving other futures forward.
+/// If you run a closure through this method, the current executor thread will relegate all its
+/// executor duties to another (possibly new) thread, and only then poll the task. Note that this
+/// requires additional synchronization.
+///
+/// # Examples
+///
+/// ```
+/// # async fn docs() {
+/// tokio::executor::thread_pool::blocking(move || {
+/// // do some compute-heavy work or call synchronous code
+/// });
+/// # }
+/// ```
+#[cfg(feature = "blocking")]
+pub fn blocking<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // Make the current worker give away its Worker to another thread so that we can safely block
+ // this one without preventing progress on other futures the worker owns.
+ ON_BLOCK.with(|ob| {
+ let allow_blocking = ob
+ .get()
+ .expect("can only call blocking when on Tokio runtime");
+
+ // This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps
+ // the worker's operation, and is unset just prior to when the FnMut is dropped.
+ let allow_blocking = unsafe { &mut *allow_blocking };
+
+ allow_blocking();
+ f()
+ })
+}
+
// TODO: remove this re-export
pub(super) use crate::executor::thread_pool::set::Set;
@@ -13,17 +62,24 @@ pub(crate) struct Worker<P: Park + 'static> {
entry: Entry<P::Unpark>,
/// Park the thread
- park: P,
+ park: Box<P>,
+
+ /// Fn for launching another Worker should we need it
+ launch_worker: LaunchWorker<P>,
+
+ /// To indicate that the Worker has been given away and should no longer be used
+ gone: Cell<bool>,
}
pub(crate) fn create_set<F, P>(
pool_size: usize,
mk_park: F,
+ launch_worker: LaunchWorker<P>,
blocking: Arc<crate::executor::blocking::Pool>,
) -> (Arc<Set<P::Unpark>>, Vec<Worker<P>>)
where
P: Send + Park,
- F: FnMut(usize) -> P,
+ F: FnMut(usize) -> Box<P>,
{
// Create the parks...
let parks: Vec<_> = (0..pool_size).map(mk_park).collect();
@@ -40,7 +96,7 @@ where
.enumerate()
.map(|(index, park)| {
// unsafe is safe because we call Worker::new only once with each index in the pool
- unsafe { Worker::new(pool.clone(), index, park) }
+ unsafe { Worker::new(pool.clone(), index, park, Arc::clone(&launch_worker)) }
})
.collect();
@@ -58,10 +114,17 @@ where
P: Send + Park,
{
// unsafe because new may only be called once for each index in pool's set
- pub(super) unsafe fn new(pool: Arc<Set<P::Unpark>>, index: usize, park: P) -> Self {
+ pub(super) unsafe fn new(
+ pool: Arc<Set<P::Unpark>>,
+ index: usize,
+ park: Box<P>,
+ launch_worker: LaunchWorker<P>,
+ ) -> Self {
Worker {
entry: Entry::new(pool, index),
park,
+ launch_worker,
+ gone: Cell::new(false),
}
}
@@ -72,18 +135,149 @@ where
let mut executor = &**pool;
let entry = &mut self.entry;
- let park = &mut self.park;
+ let launch_worker = &self.launch_worker;
let blocking = &executor.blocking;
+ let gone = &self.gone;
+
+ let mut park = DropNotGone::