diff options
Diffstat (limited to 'tokio/src/task')
-rw-r--r-- | tokio/src/task/local.rs | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 9af50cee..346fe437 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -454,20 +454,24 @@ impl Future for LocalSet { // Register the waker before starting to work self.context.shared.waker.register_by_ref(cx.waker()); - if self.with(|| self.tick()) { - // If `tick` returns true, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - Poll::Pending - } else if self.context.tasks.borrow().owned.is_empty() { - // If the scheduler has no remaining futures, we're done! - Poll::Ready(()) - } else { - // There are still futures in the local set, but we've polled all the - // futures in the run queue. Therefore, we can just return Pending - // since the remaining futures will be woken from somewhere else. - Poll::Pending - } + // Reset any previous task budget while polling tasks spawned on the + // `LocalSet`, ensuring that each has its own separate budget. + crate::coop::reset(|| { + if self.with(|| self.tick()) { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + Poll::Pending + } else if self.context.tasks.borrow().owned.is_empty() { + // If the scheduler has no remaining futures, we're done! + Poll::Ready(()) + } else { + // There are still futures in the local set, but we've polled all the + // futures in the run queue. Therefore, we can just return Pending + // since the remaining futures will be woken from somewhere else. + Poll::Pending + } + }) } } @@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> { .register_by_ref(cx.waker()); let _no_blocking = crate::runtime::enter::disallow_blocking(); - - if let Poll::Ready(output) = me.future.poll(cx) { - return Poll::Ready(output); - } - - if me.local_set.tick() { - // If `tick` returns `true`, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - } - - Poll::Pending + // Reset any previous task budget so that the future passed to + // `run_until` and any tasks spawned on the `LocalSet` have their + // own budgets. + crate::coop::reset(|| { + let f = me.future; + if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + return Poll::Ready(output); + } + + if me.local_set.tick() { + // If `tick` returns `true`, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + + Poll::Pending + }) }) } } |