summaryrefslogtreecommitdiffstats
path: root/tokio/src/task
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/task')
-rw-r--r--tokio/src/task/local.rs61
1 files changed, 35 insertions, 26 deletions
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index 9af50cee..346fe437 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -454,20 +454,24 @@ impl Future for LocalSet {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());
- if self.with(|| self.tick()) {
- // If `tick` returns true, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- Poll::Pending
- } else if self.context.tasks.borrow().owned.is_empty() {
- // If the scheduler has no remaining futures, we're done!
- Poll::Ready(())
- } else {
- // There are still futures in the local set, but we've polled all the
- // futures in the run queue. Therefore, we can just return Pending
- // since the remaining futures will be woken from somewhere else.
- Poll::Pending
- }
+ // Reset any previous task budget while polling tasks spawned on the
+ // `LocalSet`, ensuring that each has its own separate budget.
+ crate::coop::reset(|| {
+ if self.with(|| self.tick()) {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ } else if self.context.tasks.borrow().owned.is_empty() {
+ // If the scheduler has no remaining futures, we're done!
+ Poll::Ready(())
+ } else {
+ // There are still futures in the local set, but we've polled all the
+ // futures in the run queue. Therefore, we can just return Pending
+ // since the remaining futures will be woken from somewhere else.
+ Poll::Pending
+ }
+ })
}
}
@@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> {
.register_by_ref(cx.waker());
let _no_blocking = crate::runtime::enter::disallow_blocking();
-
- if let Poll::Ready(output) = me.future.poll(cx) {
- return Poll::Ready(output);
- }
-
- if me.local_set.tick() {
- // If `tick` returns `true`, we need to notify the local future again:
- // there are still tasks remaining in the run queue.
- cx.waker().wake_by_ref();
- }
-
- Poll::Pending
+ // Reset any previous task budget so that the future passed to
+ // `run_until` and any tasks spawned on the `LocalSet` have their
+ // own budgets.
+ crate::coop::reset(|| {
+ let f = me.future;
+ if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
+ return Poll::Ready(output);
+ }
+
+ if me.local_set.tick() {
+ // If `tick` returns `true`, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ })
})
}
}