diff options
author | Jon Gjengset <jon@thesquareplanet.com> | 2020-05-21 17:07:23 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-21 17:07:23 -0400 |
commit | 9f63911adc5b809fd3df7cfbb736897a86895e0c (patch) | |
tree | e5fe08b297c137a475bd7531d61d46c14cf7636f /tokio/src/sync/oneshot.rs | |
parent | 1e54a35325fa371d61c428ead5879a6bb1b0ddf6 (diff) |
coop: Undo budget decrement on Pending (#2549)
This patch updates the coop logic so that the budget is only decremented
if a future makes progress (that is, if it returns `Ready`). This is
realized by restoring the budget to its former value after
`poll_proceed` _unless_ the caller indicates that it made progress.
The thinking here is that we always want tasks to make progress when we
poll them. With the way things were, if a task polled 128 resources that
could make no progress, and just returned `Pending`, then a 129th
resource that _could_ make progress would not be polled. Worse yet, this
could manifest as a deadlock, if the first 128 resources were all
_waiting_ for the 129th resource, since it would _never_ be polled.
The downside of this change is that `Pending` resources now do not take
up any part of the budget, even though they _do_ take up time on the
executor. If a task is particularly aggressive (or unoptimized), and
polls a large number of resources that cannot make progress whenever it
is polled, then coop will allow it to run potentially much longer before
yielding than it could before. The impact of this should be relatively
contained though, because tasks that behaved in this way in the past
probably ignored `Pending` _anyway_, so whether a resource returned
`Pending` due to coop or due to lack of progress may not make a
difference to it.
Diffstat (limited to 'tokio/src/sync/oneshot.rs')
-rw-r--r-- | tokio/src/sync/oneshot.rs | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 62ad484e..4b033ac3 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -197,13 +197,14 @@ impl<T> Sender<T> { #[doc(hidden)] // TODO: remove pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { // Keep track of task budget - ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::coop::poll_proceed(cx)); let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); if state.is_closed() { + coop.made_progress(); return Poll::Ready(()); } @@ -216,6 +217,7 @@ impl<T> Sender<T> { if state.is_closed() { // Set the flag again so that the waker is released in drop State::set_tx_task(&inner.state); + coop.made_progress(); return Ready(()); } else { unsafe { inner.drop_tx_task() }; @@ -233,6 +235,7 @@ impl<T> Sender<T> { state = State::set_tx_task(&inner.state); if state.is_closed() { + coop.made_progress(); return Ready(()); } } @@ -548,17 +551,19 @@ impl<T> Inner<T> { fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { // Keep track of task budget - ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::coop::poll_proceed(cx)); // Load the state let mut state = State::load(&self.state, Acquire); if state.is_complete() { + coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), } } else if state.is_closed() { + coop.made_progress(); Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { @@ -572,6 +577,7 @@ impl<T> Inner<T> { // Set the flag again so that the waker is released in drop State::set_rx_task(&self.state); + coop.made_progress(); return match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), @@ -592,6 +598,7 @@ impl<T> Inner<T> { state = State::set_rx_task(&self.state); if state.is_complete() { + coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), |