summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2020-04-30 15:19:17 -0700
committerGitHub <noreply@github.com>2020-04-30 15:19:17 -0700
commit20b5df90372ac97d817d2e3666773dd9561f057f (patch)
tree67a9189a93dabdffb3f51735d8380a516238219b
parentfa9743f0d4bff0da7790c2ad9b4c00f3efe17dc8 (diff)
task: fix LocalSet having a single shared task budget (#2462)
## Motivation Currently, an issue exists where a `LocalSet` has a single cooperative task budget that's shared across all futures spawned on the `LocalSet` _and_ by any future passed to `LocalSet::run_until` or `LocalSet::block_on`. Because these methods will poll the `run_until` future before polling spawned tasks, it is possible for that task to _always_ deterministically starve the entire `LocalSet` so that no local tasks can proceed. When the completion of that future _itself_ depends on other tasks on the `LocalSet`, this will then result in a deadlock, as in issue #2460. A detailed description of why this is the case, taken from [this comment][1]: `LocalSet` wraps each time a local task is run in `budget`: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L406 This is identical to what tokio's other schedulers do when running tasks, and in theory should give each task its own budget every time it's polled. _However_, `LocalSet` is different from other schedulers. Unlike the runtime schedulers, a `LocalSet` is itself a future that's run on another scheduler, in `block_on`. `block_on` _also_ sets a budget: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/runtime/basic_scheduler.rs#L131 The docs for `budget` state that: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/coop.rs#L73 This means that inside of a `LocalSet`, the calls to `budget` are no-ops. Instead, each future polled by the `LocalSet` is subtracting from a single global budget. `LocalSet`'s `RunUntil` future polls the provided future before polling any other tasks spawned on the local set: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L525-L535 In this case, the provided future is `JoinAll`. Unfortunately, every time a `JoinAll` is polled, it polls _every_ joined future that has not yet completed. When the number of futures in the `JoinAll` is >= 128, this means that the `JoinAll` immediately exhausts the task budget. This would, in theory, be a _good_ thing --- if the `JoinAll` had a huge number of `JoinHandle`s in it and none of them are ready, it would limit the time we spend polling those join handles. However, because the `LocalSet` _actually_ has a single shared task budget, this means polling the `JoinAll` _always_ exhausts the entire budget. There is now no budget remaining to poll any other tasks spawned on the `LocalSet`, and they are never able to complete. [1]: https://github.com/tokio-rs/tokio/issues/2460#issuecomment-621403122 ## Solution This branch solves this issue by resetting the task budget when polling a `LocalSet`. I've added a new function to `coop` for resetting the task budget to `UNCONSTRAINED` for the duration of a closure, and thus allowing the `budget` calls in `LocalSet` to _actually_ create a new budget for each spawned local task. Additionally, I've changed `LocalSet` to _also_ ensure that a separate task budget is applied to any future passed to `block_on`/`run_until`. Additionally, I've added a test reproducing the issue described in #2460. This test fails prior to this change, and passes after it. Fixes #2460 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
-rw-r--r--tokio/src/coop.rs49
-rw-r--r--tokio/src/macros/cfg.rs17
-rw-r--r--tokio/src/task/local.rs61
-rw-r--r--tokio/tests/task_local_set.rs75
4 files changed, 147 insertions, 55 deletions
diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs
index 1d624591..606ba3a7 100644
--- a/tokio/src/coop.rs
+++ b/tokio/src/coop.rs
@@ -85,15 +85,11 @@ where
return f();
}
- struct Guard<'a>(&'a Cell<usize>);
- impl<'a> Drop for Guard<'a> {
- fn drop(&mut self) {
- self.0.set(UNCONSTRAINED);
- }
- }
-
hits.set(BUDGET);
- let _guard = Guard(hits);
+ let _guard = ResetGuard {
+ hits,
+ prev: UNCONSTRAINED,
+ };
f()
})
}
@@ -114,6 +110,32 @@ cfg_blocking_impl! {
}
}
+cfg_rt_core! {
+ cfg_rt_util! {
+ /// Run the given closure with a new task budget, resetting the previous
+ /// budget when the closure finishes.
+ ///
+ /// This is intended for internal use by `LocalSet` and (potentially) other
+ /// similar schedulers which are themselves futures, and need a fresh budget
+ /// for each of their children.
+ #[inline(always)]
+ pub(crate) fn reset<F, R>(f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ HITS.with(move |hits| {
+ let prev = hits.get();
+ hits.set(UNCONSTRAINED);
+ let _guard = ResetGuard {
+ hits,
+ prev,
+ };
+ f()
+ })
+ }
+ }
+}
+
/// Invoke `f` with a subset of the remaining budget.
///
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
@@ -289,6 +311,11 @@ pin_project_lite::pin_project! {
}
}
+struct ResetGuard<'a> {
+ hits: &'a Cell<usize>,
+ prev: usize,
+}
+
impl<F: Future> Future for CoopFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -327,6 +354,12 @@ cfg_sync! {
impl<F> CoopFutureExt for F where F: Future {}
}
+impl<'a> Drop for ResetGuard<'a> {
+ fn drop(&mut self) {
+ self.hits.set(self.prev);
+ }
+}
+
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index 0679aa73..85f95cbd 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -35,6 +35,23 @@ macro_rules! cfg_blocking_impl {
}
}
+/// Enables blocking API internals
+macro_rules! cfg_blocking_impl_or_task {
+ ($($item:item)*) => {
+ $(
+ #[cfg(any(
+ feature = "blocking",
+ feature = "fs",
+ feature = "dns",
+ feature = "io-std",
+ feature = "rt-threaded",
+ feature = "task",
+ ))]
+ $item
+ )*
+ }
+}
+
/// Enables enter::block_on
macro_rules! cfg_block_on {
($($item:item)*) => {
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index 9af50cee..346fe437 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -454,20 +454,24 @@ impl Future for LocalSet {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());
- if self.with(|| self.tick()) {
- // If `tick` returns true, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- Poll::Pending
- } else if self.context.tasks.borrow().owned.is_empty() {
- // If the scheduler has no remaining futures, we're done!
- Poll::Ready(())
- } else {
- // There are still futures in the local set, but we've polled all the
- // futures in the run queue. Therefore, we can just return Pending
- // since the remaining futures will be woken from somewhere else.
- Poll::Pending
- }
+ // Reset any previous task budget while polling tasks spawned on the
+ // `LocalSet`, ensuring that each has its own separate budget.
+ crate::coop::reset(|| {
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ } else if self.context.tasks.borrow().owned.is_empty() {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
+ })
}
}
@@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> {
.register_by_ref(cx.waker());
let _no_blocking = crate::runtime::enter::disallow_blocking();
-
- if let Poll::Ready(output) = me.future.poll(cx) {
- return Poll::Ready(output);
- }
-
- if me.local_set.tick() {
- // If `tick` returns `true`, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- }
-
- Poll::Pending
+ // Reset any previous task budget so that the future passed to
+ // `run_until` and any tasks spawned on the `LocalSet` have their
+ // own budgets.
+ crate::coop::reset(|| {
+ let f = me.future;
+ if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ })
})
}
}
diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs
index 1a10fefa..38c7c939 100644
--- a/tokio/tests/task_local_set.rs
+++ b/tokio/tests/task_local_set.rs
@@ -312,28 +312,17 @@ fn drop_cancels_tasks() {
assert_eq!(1, Rc::strong_count(&rc1));
}
-#[test]
-fn drop_cancels_remote_tasks() {
- // This test reproduces issue #1885.
+/// Runs a test function in a separate thread, and panics if the test does not
+/// complete within the specified timeout, or if the test function panics.
+///
+/// This is intended for running tests whose failure mode is a hang or infinite
+/// loop that cannot be detected otherwise.
+fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
use std::sync::mpsc::RecvTimeoutError;
let (done_tx, done_rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
- let (tx, mut rx) = mpsc::channel::<()>(1024);
-
- let mut rt = rt();
-
- let local = LocalSet::new();
- local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
- local.block_on(&mut rt, async {
- time::delay_for(Duration::from_millis(1)).await;
- });
-
- drop(tx);
-
- // This enters an infinite loop if the remote notified tasks are not
- // properly cancelled.
- drop(local);
+ f();
// Send a message on the channel so that the test thread can
// determine if we have entered an infinite loop:
@@ -349,10 +338,11 @@ fn drop_cancels_remote_tasks() {
//
// Note that it should definitely complete in under a minute, but just
// in case CI is slow, we'll give it a long timeout.
- match done_rx.recv_timeout(Duration::from_secs(60)) {
+ match done_rx.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => panic!(
- "test did not complete within 60 seconds, \
- we have (probably) entered an infinite loop!"
+ "test did not complete within {:?} seconds, \
+ we have (probably) entered an infinite loop!",
+ timeout,
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
@@ -366,6 +356,49 @@ fn drop_cancels_remote_tasks() {
thread.join().expect("test thread should not panic!")
}
+#[test]
+fn drop_cancels_remote_tasks() {
+ // This test reproduces issue #1885.
+ with_timeout(Duration::from_secs(60), || {
+ let (tx, mut rx) = mpsc::channel::<()>(1024);
+
+ let mut rt = rt();
+
+ let local = LocalSet::new();
+ local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
+ local.block_on(&mut rt, async {
+ time::delay_for(Duration::from_millis(1)).await;
+ });
+
+ drop(tx);
+
+ // This enters an infinite loop if the remote notified tasks are not
+ // properly cancelled.
+ drop(local);
+ });
+}
+
+#[test]
+fn local_tasks_wake_join_all() {
+ // This test reproduces issue #2460.
+ with_timeout(Duration::from_secs(60), || {
+ use futures::future::join_all;
+ use tokio::task::LocalSet;
+
+ let mut rt = rt();
+ let set = LocalSet::new();
+ let mut handles = Vec::new();
+
+ for _ in 1..=128 {
+ handles.push(set.spawn_local(async move {
+ tokio::task::spawn_local(async move {}).await.unwrap();
+ }));
+ }
+
+ rt.block_on(set.run_until(join_all(handles)));
+ });
+}
+
#[tokio::test]
async fn local_tasks_are_polled_after_tick() {
// Reproduces issues #1899 and #1900