diff options
Diffstat (limited to 'tokio-sync/src/oneshot.rs')
-rw-r--r-- | tokio-sync/src/oneshot.rs | 90 |
1 files changed, 41 insertions, 49 deletions
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index d3531bd9..c38bb0ce 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -1,15 +1,15 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::{ - futures::task::{self, Task}, - sync::atomic::AtomicUsize, - sync::CausalCell, -}; -use futures::{Async, Future, Poll}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + use std::fmt; +use std::future::Future; use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll, Waker}; /// Sends a value to the associated `Receiver`. /// @@ -82,10 +82,10 @@ struct Inner<T> { value: CausalCell<Option<T>>, /// The task to notify when the receiver drops without consuming the value. - tx_task: CausalCell<ManuallyDrop<Task>>, + tx_task: CausalCell<ManuallyDrop<Waker>>, /// The task to notify when the value is sent. - rx_task: CausalCell<ManuallyDrop<Task>>, + rx_task: CausalCell<ManuallyDrop<Waker>>, } #[derive(Clone, Copy)] @@ -167,33 +167,33 @@ impl<T> Sender<T> { /// /// # Return values /// - /// If `Ok(Ready)` is returned then the associated `Receiver` has been + /// If `Ready(Ok(_))` is returned then the associated `Receiver` has been /// dropped, which means any work required for sending should be canceled. /// - /// If `Ok(NotReady)` is returned then the associated `Receiver` is still + /// If `Pending` is returned then the associated `Receiver` is still /// alive and may be able to receive a message if sent. The current task is /// registered to receive a notification if the `Receiver` handle goes away. /// /// [`Receiver`]: struct.Receiver.html - pub fn poll_close(&mut self) -> Poll<(), ()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); if state.is_closed() { - return Ok(Async::Ready(())); + return Poll::Ready(()); } if state.is_tx_task_set() { let will_notify = inner .tx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); if !will_notify { state = State::unset_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } else { unsafe { inner.drop_tx_task() }; } @@ -203,18 +203,18 @@ impl<T> Sender<T> { if !state.is_tx_task_set() { // Attempt to set the task unsafe { - inner.set_tx_task(); + inner.set_tx_task(cx); } // Update the state state = State::set_tx_task(&inner.state); if state.is_closed() { - return Ok(Async::Ready(())); + return Ready(()); } } - Ok(Async::NotReady) + Pending } /// Check if the associated [`Receiver`] handle has been dropped. @@ -297,25 +297,18 @@ impl<T> Drop for Receiver<T> { } impl<T> Future for Receiver<T> { - type Item = T; - type Error = RecvError; - - fn poll(&mut self) -> Poll<T, RecvError> { - use futures::Async::{NotReady, Ready}; + type Output = Result<T, RecvError>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // If `inner` is `None`, then `poll()` has already completed. - let ret = if let Some(inner) = self.inner.as_ref() { - match inner.poll_recv() { - Ok(Ready(v)) => Ok(Ready(v)), - Ok(NotReady) => return Ok(NotReady), - Err(e) => Err(e), - } + let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { + ready!(inner.poll_recv(cx))? } else { panic!("called after complete"); }; self.inner = None; - ret + Ready(Ok(ret)) } } @@ -328,30 +321,29 @@ impl<T> Inner<T> { } if prev.is_rx_task_set() { - self.rx_task.with(|ptr| unsafe { (&*ptr).notify() }); + // TODO: Consume waker? + self.rx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } true } - fn poll_recv(&self) -> Poll<T, RecvError> { - use futures::Async::{NotReady, Ready}; - + fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { // Load the state let mut state = State::load(&self.state, Acquire); if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else if state.is_closed() { - Err(RecvError(())) + Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { let will_notify = self .rx_task - .with(|ptr| unsafe { (&*ptr).will_notify_current() }); + .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) }); // Check if the task is still the same if !will_notify { @@ -359,8 +351,8 @@ impl<T> Inner<T> { state = State::unset_rx_task(&self.state); if state.is_complete() { return match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), }; } else { unsafe { self.drop_rx_task() }; @@ -371,7 +363,7 @@ impl<T> Inner<T> { if !state.is_rx_task_set() { // Attempt to set the task unsafe { - self.set_rx_task(); + self.set_rx_task(cx); } // Update the state @@ -379,14 +371,14 @@ impl<T> Inner<T> { if state.is_complete() { match unsafe { self.consume_value() } { - Some(value) => Ok(Ready(value)), - None => Err(RecvError(())), + Some(value) => Ready(Ok(value)), + None => Ready(Err(RecvError(()))), } } else { - return Ok(NotReady); + return Pending; } } else { - return Ok(NotReady); + return Pending; } } } @@ -396,7 +388,7 @@ impl<T> Inner<T> { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { - self.tx_task.with(|ptr| unsafe { (&*ptr).notify() }); + self.tx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() }); } } @@ -413,14 +405,14 @@ impl<T> Inner<T> { self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr)) } - unsafe fn set_rx_task(&self) { + unsafe fn set_rx_task(&self, cx: &mut Context<'_>) { self.rx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } - unsafe fn set_tx_task(&self) { + unsafe fn set_tx_task(&self, cx: &mut Context<'_>) { self.tx_task - .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current())); + .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone())); } } |