summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-05-06 19:02:07 -0700
committerGitHub <noreply@github.com>2020-05-06 19:02:07 -0700
commit4748b2571fc02d5ebbfe59e457f0e8d8ef0eb5f3 (patch)
tree73da1e3baba02bad5411a222ff62490304932fe7
parent66fef4a9bcccd944e3b72b1e83f789e4131d4e52 (diff)
rt: simplify coop implementation (#2498)
Simplifies coop implementation. Prunes unused code, create a `Budget` type to track the current budget.
-rw-r--r--tokio/src/coop.rs467
-rw-r--r--tokio/src/macros/cfg.rs20
-rw-r--r--tokio/src/sync/batch_semaphore.rs4
-rw-r--r--tokio/src/sync/mutex.rs3
-rw-r--r--tokio/src/sync/rwlock.rs3
-rw-r--r--tokio/src/sync/semaphore.rs5
-rw-r--r--tokio/src/task/local.rs62
7 files changed, 196 insertions, 368 deletions
diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs
index 606ba3a7..4d57161f 100644
--- a/tokio/src/coop.rs
+++ b/tokio/src/coop.rs
@@ -1,11 +1,12 @@
//! Opt-in yield points for improved cooperative scheduling.
//!
-//! A single call to [`poll`] on a top-level task may potentially do a lot of work before it
-//! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the
-//! executor, it can starve other tasks waiting on that executor to execute them, or drive
-//! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a
-//! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate
-//! with the executor to avoid starvation.
+//! A single call to [`poll`] on a top-level task may potentially do a lot of
+//! work before it returns `Poll::Pending`. If a task runs for a long period of
+//! time without yielding back to the executor, it can starve other tasks
+//! waiting on that executor to execute them, or drive underlying resources.
+//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
+//! long-running task. Instead, this module provides an opt-in mechanism for
+//! futures to collaborate with the executor to avoid starvation.
//!
//! Consider a future like this one:
//!
@@ -16,9 +17,10 @@
//! }
//! ```
//!
-//! It may look harmless, but consider what happens under heavy load if the input stream is
-//! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks
-//! and resources on the same executor. With opt-in yield points, this problem is alleviated:
+//! It may look harmless, but consider what happens under heavy load if the
+//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
+//! yield, and will starve other tasks and resources on the same executor. With
+//! opt-in yield points, this problem is alleviated:
//!
//! ```ignore
//! # use tokio::stream::{Stream, StreamExt};
@@ -29,67 +31,89 @@
//! }
//! ```
//!
-//! The `proceed` future will coordinate with the executor to make sure that every so often control
-//! is yielded back to the executor so it can run other tasks.
+//! The `proceed` future will coordinate with the executor to make sure that
+//! every so often control is yielded back to the executor so it can run other
+//! tasks.
//!
//! # Placing yield points
//!
-//! Voluntary yield points should be placed _after_ at least some work has been done. If they are
-//! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because
-//! of the number of yield points that inevitably appear before it is reached. In general, you will
-//! want yield points to only appear in "leaf" futures -- those that do not themselves poll other
-//! futures. By doing this, you avoid double-counting each iteration of the outer future against
-//! the cooperating budget.
+//! Voluntary yield points should be placed _after_ at least some work has been
+//! done. If they are not, a future sufficiently deep in the task hierarchy may
+//! end up _never_ getting to run because of the number of yield points that
+//! inevitably appear before it is reached. In general, you will want yield
+//! points to only appear in "leaf" futures -- those that do not themselves poll
+//! other futures. By doing this, you avoid double-counting each iteration of
+//! the outer future against the cooperating budget.
//!
-//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
+//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
use std::cell::Cell;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-/// Constant used to determine how much "work" a task is allowed to do without yielding.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup
-/// and scheduling costs, but low enough that we do not starve other tasks for too long. The value
-/// also needs to be high enough that particularly deep tasks are able to do at least some useful
-/// work at all.
-///
-/// Note that as more yield points are added in the ecosystem, this value will probably also have
-/// to be raised.
-const BUDGET: usize = 128;
-
-/// Constant used to determine if budgeting has been disabled.
-const UNCONSTRAINED: usize = usize::max_value();
thread_local! {
- static HITS: Cell<usize> = Cell::new(UNCONSTRAINED);
+ static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
}
-/// Run the given closure with a cooperative task budget.
-///
-/// Enabling budgeting when it is already enabled is a no-op.
+/// Opaque type tracking the amount of "work" a task may still do before
+/// yielding back to the scheduler.
+#[derive(Debug, Copy, Clone)]
+pub(crate) struct Budget(Option<u8>);
+
+impl Budget {
+ /// Budget assigned to a task on each poll.
+ ///
+ /// The value itself is chosen somewhat arbitrarily. It needs to be high
+ /// enough to amortize wakeup and scheduling costs, but low enough that we
+ /// do not starve other tasks for too long. The value also needs to be high
+ /// enough that particularly deep tasks are able to do at least some useful
+ /// work at all.
+ ///
+ /// Note that as more yield points are added in the ecosystem, this value
+ /// will probably also have to be raised.
+ const fn initial() -> Budget {
+ Budget(Some(128))
+ }
+
+ /// Returns an unconstrained budget. Operations will not be limited.
+ const fn unconstrained() -> Budget {
+ Budget(None)
+ }
+}
+
+cfg_rt_threaded! {
+ impl Budget {
+ fn has_remaining(self) -> bool {
+ self.0.map(|budget| budget > 0).unwrap_or(true)
+ }
+ }
+}
+
+/// Run the given closure with a cooperative task budget. When the function
+/// returns, the budget is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn budget<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
- HITS.with(move |hits| {
- if hits.get() != UNCONSTRAINED {
- // We are already being budgeted.
- //
- // Arguably this should be an error, but it can happen "correctly"
- // such as with block_on + LocalSet, so we make it a no-op.
- return f();
+ struct ResetGuard<'a> {
+ cell: &'a Cell<Budget>,
+ prev: Budget,
+ }
+
+ impl<'a> Drop for ResetGuard<'a> {
+ fn drop(&mut self) {
+ self.cell.set(self.prev);
}
+ }
+
+ CURRENT.with(move |cell| {
+ let prev = cell.get();
+
+ cell.set(Budget::initial());
+
+ let _guard = ResetGuard { cell, prev };
- hits.set(BUDGET);
- let _guard = ResetGuard {
- hits,
- prev: UNCONSTRAINED,
- };
f()
})
}
@@ -97,266 +121,53 @@ where
cfg_rt_threaded! {
#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
- HITS.with(|hits| hits.get() > 0)
+ CURRENT.with(|cell| cell.get().has_remaining())
}
}
cfg_blocking_impl! {
/// Forcibly remove the budgeting constraints early.
pub(crate) fn stop() {
- HITS.with(|hits| {
- hits.set(UNCONSTRAINED);
+ CURRENT.with(|cell| {
+ cell.set(Budget::unconstrained());
});
}
}
-cfg_rt_core! {
- cfg_rt_util! {
- /// Run the given closure with a new task budget, resetting the previous
- /// budget when the closure finishes.
- ///
- /// This is intended for internal use by `LocalSet` and (potentially) other
- /// similar schedulers which are themselves futures, and need a fresh budget
- /// for each of their children.
- #[inline(always)]
- pub(crate) fn reset<F, R>(f: F) -> R
- where
- F: FnOnce() -> R,
- {
- HITS.with(move |hits| {
- let prev = hits.get();
- hits.set(UNCONSTRAINED);
- let _guard = ResetGuard {
- hits,
- prev,
- };
- f()
- })
- }
- }
-}
-
-/// Invoke `f` with a subset of the remaining budget.
-///
-/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
-/// from using up your entire budget. For example, imagine the following future:
-///
-/// ```rust
-/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
-/// use futures::stream::FuturesUnordered;
-/// struct MyFuture<F1, F2> {
-/// big: FuturesUnordered<F1>,
-/// small: F2,
-/// }
-///
-/// use tokio::stream::Stream;
-/// impl<F1, F2> Future for MyFuture<F1, F2>
-/// where F1: Future, F2: Future
-/// # , F1: Unpin, F2: Unpin
-/// {
-/// type Output = F2::Output;
-///
-/// // fn poll(...)
-/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
-/// # let this = &mut *self;
-/// let mut big = // something to pin self.big
-/// # Pin::new(&mut this.big);
-/// let small = // something to pin self.small
-/// # Pin::new(&mut this.small);
-///
-/// // see if any of the big futures have finished
-/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) {
-/// // do something with e
-/// # let _ = e;
-/// }
-///
-/// // see if the small future has finished
-/// small.poll(cx)
-/// }
-/// # }
-/// ```
-///
-/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and
-/// `small` never gets polled. That would be sad. If you want to stick up for the little future,
-/// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a
-/// particular segment of your code. In the code above, you would write
-///
-/// ```rust,ignore
-/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
-/// # use futures::stream::FuturesUnordered;
-/// # struct MyFuture<F1, F2> {
-/// # big: FuturesUnordered<F1>,
-/// # small: F2,
-/// # }
-/// #
-/// # use tokio::stream::Stream;
-/// # impl<F1, F2> Future for MyFuture<F1, F2>
-/// # where F1: Future, F2: Future
-/// # , F1: Unpin, F2: Unpin
-/// # {
-/// # type Output = F2::Output;
-/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
-/// # let this = &mut *self;
-/// # let mut big = Pin::new(&mut this.big);
-/// # let small = Pin::new(&mut this.small);
-/// #
-/// // see if any of the big futures have finished
-/// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) {
-/// # // do something with e
-/// # let _ = e;
-/// # }
-/// # small.poll(cx)
-/// # }
-/// # }
-/// ```
-///
-/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left
-/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`,
-/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its
-/// entire budget.
-///
-/// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code
-/// inside the buget is the _minimum_ of the _current_ budget and the bound.
-///
-#[allow(unreachable_pub, dead_code)]
-pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R {
- HITS.with(|hits| {
- let budget = hits.get();
- // with_bound cannot _increase_ the remaining budget
- let bound = std::cmp::min(budget, bound);
- // When f() exits, how much should we add to what is left?
- let floor = budget.saturating_sub(bound);
- // Make sure we restore the remaining budget even on panic
- struct RestoreBudget<'a>(&'a Cell<usize>, usize);
- impl<'a> Drop for RestoreBudget<'a> {
- fn drop(&mut self) {
- let left = self.0.get();
- self.0.set(self.1 + left);
+cfg_coop! {
+ use std::task::{Context, Poll};
+
+ /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
+ #[inline]
+ pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
+ CURRENT.with(|cell| {
+ let mut budget = cell.get();
+
+ if budget.decrement() {
+ cell.set(budget);
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
}
- }
- // Time to restrict!
- hits.set(bound);
- let _restore = RestoreBudget(&hits, floor);
- f()
- })
-}
-
-/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
-#[allow(unreachable_pub, dead_code)]
-#[inline]
-pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
- HITS.with(|hits| {
- let n = hits.get();
- if n == UNCONSTRAINED {
- // opted out of budgeting
- Poll::Ready(())
- } else if n == 0 {
- cx.waker().wake_by_ref();
- Poll::Pending
- } else {
- hits.set(n.saturating_sub(1));
- Poll::Ready(())
- }
- })
-}
-
-/// Resolves immediately unless the current task has already exceeded its budget.
-///
-/// This should be placed after at least some work has been done. Otherwise a future sufficiently
-/// deep in the task hierarchy may end up never getting to run because of the number of yield
-/// points that inevitably appear before it is even reached. For example:
-///
-/// ```ignore
-/// # use tokio::stream::{Stream, StreamExt};
-/// async fn drop_all<I: Stream + Unpin>(mut input: I) {
-/// while let Some(_) = input.next().await {
-/// tokio::coop::proceed().await;
-/// }
-/// }
-/// ```
-#[allow(unreachable_pub, dead_code)]
-#[inline]
-pub async fn proceed() {
- use crate::future::poll_fn;
- poll_fn(|cx| poll_proceed(cx)).await;
-}
-
-pin_project_lite::pin_project! {
- /// A future that cooperatively yields to the task scheduler when polling,
- /// if the task's budget is exhausted.
- ///
- /// Internally, this is simply a future combinator which calls
- /// [`poll_proceed`] in its `poll` implementation before polling the wrapped
- /// future.
- ///
- /// # Examples
- ///
- /// ```rust,ignore
- /// # #[tokio::main]
- /// # async fn main() {
- /// use tokio::coop::CoopFutureExt;
- ///
- /// async { /* ... */ }
- /// .cooperate()
- /// .await;
- /// # }
- /// ```
- ///
- /// [`poll_proceed`]: fn@poll_proceed
- #[derive(Debug)]
- #[allow(unreachable_pub, dead_code)]
- pub struct CoopFuture<F> {
- #[pin]
- future: F,
+ })
}
-}
-
-struct ResetGuard<'a> {
- hits: &'a Cell<usize>,
- prev: usize,
-}
-
-impl<F: Future> Future for CoopFuture<F> {
- type Output = F::Output;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- ready!(poll_proceed(cx));
- self.project().future.poll(cx)
- }
-}
-impl<F: Future> CoopFuture<F> {
- /// Returns a new `CoopFuture` wrapping the given future.
- ///
- #[allow(unreachable_pub, dead_code)]
- pub fn new(future: F) -> Self {
- Self { future }
+ impl Budget {
+ /// Decrement the budget. Returns `true` if successful. Decrementing fails
+ /// when there is not enough remaining budget.
+ fn decrement(&mut self) -> bool {
+ if let Some(num) = &mut self.0 {
+ if *num > 0 {
+ *num -= 1;
+ true
+ } else {
+ false
+ }
+ } else {
+ true
+ }
}
-}
-
-// Currently only used by `tokio::sync`; and if we make this combinator public,
-// it should probably be on the `FutureExt` trait instead.
-cfg_sync! {
- /// Extension trait providing `Future::cooperate` extension method.
- ///
- /// Note: if/when the co-op API becomes public, this method should probably be
- /// provided by `FutureExt`, instead.
- pub(crate) trait CoopFutureExt: Future {
- /// Wrap `self` to cooperatively yield to the scheduler when polling, if the
- /// task's budget is exhausted.
- fn cooperate(self) -> CoopFuture<Self>
- where
- Self: Sized,
- {
- CoopFuture::new(self)
- }
- }
-
- impl<F> CoopFutureExt for F where F: Future {}
-}
-
-impl<'a> Drop for ResetGuard<'a> {
- fn drop(&mut self) {
- self.hits.set(self.prev);
}
}
@@ -364,49 +175,53 @@ impl<'a> Drop for ResetGuard<'a> {
mod test {
use super::*;
- fn get() -> usize {
- HITS.with(|hits| hits.get())
+ fn get() -> Budget {
+ CURRENT.with(|cell| cell.get())
}
#[test]
fn bugeting() {
+ use futures::future::poll_fn;
use tokio_test::*;
- assert_eq!(get(), UNCONSTRAINED);
+ assert!(get().0.is_none());
+
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), UNCONSTRAINED);
+
+ assert!(get().0.is_none());
+
budget(|| {
- assert_eq!(get(), BUDGET);
+ assert_eq!(get().0, Budget::initial().0);
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), BUDGET - 1);
+ assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), BUDGET - 2);
+ assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
+
+ budget(|| {
+ assert_eq!(get().0, Budget::initial().0);
+
+ assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
+ assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
+ });
+
+ assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
});
- assert_eq!(get(), UNCONSTRAINED);
+
+ assert!(get().0.is_none());
budget(|| {
- limit(3, || {
- assert_eq!(get(), 3);
- assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), 2);
- limit(4, || {
- assert_eq!(get(), 2);
- assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), 1);
- });
- assert_eq!(get(), 1);
+ let n = get().0.unwrap();
+
+ for _ in 0..n {
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), 0);
- assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), 0);
- assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), 0);
- });
- assert_eq!(get(), BUDGET - 3);
- assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
- assert_eq!(get(), BUDGET - 4);
- assert_ready!(task::spawn(proceed()).poll());
- assert_eq!(get(), BUDGET - 5);
+ }
+
+ let mut task = task::spawn(poll_fn(|cx| {
+ ready!(poll_proceed(cx));
+ Poll::Ready(())
+ }));
+
+ assert_pending!(task.poll());
});
}
}
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index 618b8f66..288f58d2 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -363,3 +363,23 @@ macro_rules! cfg_unstable {
)*
}
}
+
+macro_rules! cfg_coop {
+ ($($item:item)*) => {
+ $(
+ #[cfg(any(
+ feature = "blocking",
+ feature = "dns",
+ feature = "fs",
+ feature = "io-driver",
+ feature = "io-std",
+ feature = "process",
+ feature = "rt-core",
+ feature = "sync",
+ feature = "stream",
+ feature = "time"
+ ))]
+ $item
+ )*
+ }
+}
diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs
index 436737a6..698e908e 100644
--- a/tokio/src/sync/batch_semaphore.rs
+++ b/tokio/src/sync/batch_semaphore.rs
@@ -386,7 +386,11 @@ impl Future for Acquire<'_> {
type Output = Result<(), AcquireError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // First, ensure the current task has enough budget to proceed.
+ ready!(crate::coop::poll_proceed(cx));
+
let (node, semaphore, needed, queued) = self.project();
+
match semaphore.poll_acquire(cx, needed, node, *queued) {
Pending => {
*queued = true;
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index e0618a5d..539488a3 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -1,4 +1,3 @@
-use crate::coop::CoopFutureExt;
use crate::sync::batch_semaphore as semaphore;
use std::cell::UnsafeCell;
@@ -255,7 +254,7 @@ impl<T> Mutex<T> {
}
async fn acquire(&self) {
- self.s.acquire(1).cooperate().await.unwrap_or_else(|_| {
+ self.s.acquire(1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and
// we own it exclusively, which means that this can never happen.
unreachable!()
diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs
index 68cf710e..4e2fb74d 100644
--- a/tokio/src/sync/rwlock.rs
+++ b/tokio/src/sync/rwlock.rs
@@ -1,4 +1,3 @@
-use crate::coop::CoopFutureExt;
use crate::sync::batch_semaphore::{AcquireError, Semaphore};
use std::cell::UnsafeCell;
use std::ops;
@@ -116,7 +115,7 @@ impl<'a, T> ReleasingPermit<'a, T> {
lock: &'a RwLock<T>,
num_permits: u16,
) -> Result<ReleasingPermit<'a, T>, AcquireError> {
- lock.s.acquire(num_permits).cooperate().await?;
+ lock.s.acquire(num_permits).await?;
Ok(Self { num_permits, lock })
}
}
diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs
index c1dd975f..1bfeaebc 100644
--- a/tokio/src/sync/semaphore.rs
+++ b/tokio/src/sync/semaphore.rs
@@ -1,5 +1,4 @@
use super::batch_semaphore as ll; // low level implementation
-use crate::coop::CoopFutureExt;
use std::sync::Arc;
/// Counting semaphore performing asynchronous permit aquisition.
@@ -87,7 +86,7 @@ impl Semaphore {
/// Acquires permit from the semaphore.
pub async fn acquire(&self) -> SemaphorePermit<'_> {
- self.ll_sem.acquire(1).cooperate().await.unwrap();
+ self.ll_sem.acquire(1).await.unwrap();
SemaphorePermit {
sem: &self,
permits: 1,
@@ -111,7 +110,7 @@ impl Semaphore {
///
/// [`Arc`]: std::sync::Arc
pub async fn acquire_owned(self: Arc<Self>) -> OwnedSemaphorePermit {
- self.ll_sem.acquire(1).cooperate().await.unwrap();
+ self.ll_sem.acquire(1).await.unwrap();
OwnedSemaphorePermit {
sem: self.clone(),
permits: 1,
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index 346fe437..374671fb 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -454,24 +454,20 @@ impl Future for LocalSet {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());
- // Reset any previous task budget while polling tasks spawned on the
- // `LocalSet`, ensuring that each has its own separate budget.
- crate::coop::reset(|| {
- if self.with(|| self.tick()) {
- // If `tick` returns true, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- Poll::Pending
- } else if self.context.tasks.borrow().owned.is_empty() {
- // If the scheduler has no remaining futures, we're done!
- Poll::Ready(())
- } else {
- // There are still futures in the local set, but we've polled all the
- // futures in the run queue. Therefore, we can just return Pending
- // since the remaining futures will be woken from somewhere else.
- Poll::Pending
- }
- })
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ } else if self.context.tasks.borrow().owned.is_empty() {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
}
}
@@ -525,23 +521,19 @@ impl<T: Future> Future for RunUntil<'_, T> {
.register_by_ref(cx.waker());
let _no_blocking = crate::runtime::enter::disallow_blocking();
- // Reset any previous task budget so that the future passed to
- // `run_until` and any tasks spawned on the `LocalSet` have their
- // own budgets.
- crate::coop::reset(|| {
- let f = me.future;
- if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
- return Poll::Ready(output);
- }
-
- if me.local_set.tick() {
- // If `tick` returns `true`, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- }
-
- Poll::Pending
- })
+ let f = me.future;
+
+ if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
})
}
}