diff options
Diffstat (limited to 'tokio/src/sync/oneshot.rs')
-rw-r--r-- | tokio/src/sync/oneshot.rs | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 62ad484e..4b033ac3 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -197,13 +197,14 @@ impl<T> Sender<T> { #[doc(hidden)] // TODO: remove pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { // Keep track of task budget - ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::coop::poll_proceed(cx)); let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); if state.is_closed() { + coop.made_progress(); return Poll::Ready(()); } @@ -216,6 +217,7 @@ impl<T> Sender<T> { if state.is_closed() { // Set the flag again so that the waker is released in drop State::set_tx_task(&inner.state); + coop.made_progress(); return Ready(()); } else { unsafe { inner.drop_tx_task() }; @@ -233,6 +235,7 @@ impl<T> Sender<T> { state = State::set_tx_task(&inner.state); if state.is_closed() { + coop.made_progress(); return Ready(()); } } @@ -548,17 +551,19 @@ impl<T> Inner<T> { fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { // Keep track of task budget - ready!(crate::coop::poll_proceed(cx)); + let coop = ready!(crate::coop::poll_proceed(cx)); // Load the state let mut state = State::load(&self.state, Acquire); if state.is_complete() { + coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), } } else if state.is_closed() { + coop.made_progress(); Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { @@ -572,6 +577,7 @@ impl<T> Inner<T> { // Set the flag again so that the waker is released in drop State::set_rx_task(&self.state); + coop.made_progress(); return match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), @@ -592,6 +598,7 @@ impl<T> Inner<T> { state = State::set_rx_task(&self.state); if state.is_complete() { + coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), |