summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/oneshot.rs')
-rw-r--r--tokio/src/sync/oneshot.rs11
1 files changed, 9 insertions, 2 deletions
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
index 62ad484e..4b033ac3 100644
--- a/tokio/src/sync/oneshot.rs
+++ b/tokio/src/sync/oneshot.rs
@@ -197,13 +197,14 @@ impl<T> Sender<T> {
#[doc(hidden)] // TODO: remove
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
- ready!(crate::coop::poll_proceed(cx));
+ let coop = ready!(crate::coop::poll_proceed(cx));
let inner = self.inner.as_ref().unwrap();
let mut state = State::load(&inner.state, Acquire);
if state.is_closed() {
+ coop.made_progress();
return Poll::Ready(());
}
@@ -216,6 +217,7 @@ impl<T> Sender<T> {
if state.is_closed() {
// Set the flag again so that the waker is released in drop
State::set_tx_task(&inner.state);
+ coop.made_progress();
return Ready(());
} else {
unsafe { inner.drop_tx_task() };
@@ -233,6 +235,7 @@ impl<T> Sender<T> {
state = State::set_tx_task(&inner.state);
if state.is_closed() {
+ coop.made_progress();
return Ready(());
}
}
@@ -548,17 +551,19 @@ impl<T> Inner<T> {
fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Keep track of task budget
- ready!(crate::coop::poll_proceed(cx));
+ let coop = ready!(crate::coop::poll_proceed(cx));
// Load the state
let mut state = State::load(&self.state, Acquire);
if state.is_complete() {
+ coop.made_progress();
match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),
}
} else if state.is_closed() {
+ coop.made_progress();
Ready(Err(RecvError(())))
} else {
if state.is_rx_task_set() {
@@ -572,6 +577,7 @@ impl<T> Inner<T> {
// Set the flag again so that the waker is released in drop
State::set_rx_task(&self.state);
+ coop.made_progress();
return match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),
@@ -592,6 +598,7 @@ impl<T> Inner<T> {
state = State::set_rx_task(&self.state);
if state.is_complete() {
+ coop.made_progress();
match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),