summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-10-26 08:54:25 -0700
committerGitHub <noreply@github.com>2020-10-26 08:54:25 -0700
commit6d0ba19af51015dcd80558ae768215448e285fdf (patch)
tree766424f5bc521e2c5c03a0ce7f9f3ecbc69e97df
parentcbb8fe60694aa87924fff0d1f237bc897cee6d1b (diff)
sync: make oneshot::Sender::poll_closed public again (#3032)
-rw-r--r--tokio/src/sync/oneshot.rs136
1 files changed, 88 insertions, 48 deletions
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
index e0a9e793..ece9abae 100644
--- a/tokio/src/sync/oneshot.rs
+++ b/tokio/src/sync/oneshot.rs
@@ -196,54 +196,6 @@ impl<T> Sender<T> {
Ok(())
}
- fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let inner = self.inner.as_ref().unwrap();
-
- let mut state = State::load(&inner.state, Acquire);
-
- if state.is_closed() {
- coop.made_progress();
- return Poll::Ready(());
- }
-
- if state.is_tx_task_set() {
- let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
-
- if !will_notify {
- state = State::unset_tx_task(&inner.state);
-
- if state.is_closed() {
- // Set the flag again so that the waker is released in drop
- State::set_tx_task(&inner.state);
- coop.made_progress();
- return Ready(());
- } else {
- unsafe { inner.drop_tx_task() };
- }
- }
- }
-
- if !state.is_tx_task_set() {
- // Attempt to set the task
- unsafe {
- inner.set_tx_task(cx);
- }
-
- // Update the state
- state = State::set_tx_task(&inner.state);
-
- if state.is_closed() {
- coop.made_progress();
- return Ready(());
- }
- }
-
- Pending
- }
-
/// Waits for the associated [`Receiver`] handle to close.
///
/// A [`Receiver`] is closed by either calling [`close`] explicitly or the
@@ -350,6 +302,94 @@ impl<T> Sender<T> {
let state = State::load(&inner.state, Acquire);
state.is_closed()
}
+
+ /// Check whether the oneshot channel has been closed, and if not, schedules the
+ /// `Waker` in the provided `Context` to receive a notification when the channel is
+ /// closed.
+ ///
+ /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
+ /// [`Receiver`] value is dropped.
+ ///
+ /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
+ /// to the most recent call will be scheduled to receive a wakeup.
+ ///
+ /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
+ /// [`close`]: fn@crate::sync::oneshot::Receiver::close
+ ///
+ /// # Return value
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the channel is still open.
+ /// * `Poll::Ready(())` if the channel is closed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// use futures::future::poll_fn;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// rx.close();
+ /// });
+ ///
+ /// poll_fn(|cx| tx.poll_closed(cx)).await;
+ ///
+ /// println!("the receiver dropped");
+ /// }
+ /// ```
+ pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let inner = self.inner.as_ref().unwrap();
+
+ let mut state = State::load(&inner.state, Acquire);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Poll::Ready(());
+ }
+
+ if state.is_tx_task_set() {
+ let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
+
+ if !will_notify {
+ state = State::unset_tx_task(&inner.state);
+
+ if state.is_closed() {
+ // Set the flag again so that the waker is released in drop
+ State::set_tx_task(&inner.state);
+ coop.made_progress();
+ return Ready(());
+ } else {
+ unsafe { inner.drop_tx_task() };
+ }
+ }
+ }
+
+ if !state.is_tx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ inner.set_tx_task(cx);
+ }
+
+ // Update the state
+ state = State::set_tx_task(&inner.state);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Ready(());
+ }
+ }
+
+ Pending
+ }
}
impl<T> Drop for Sender<T> {