diff options
author | Carl Lerche <me@carllerche.com> | 2019-06-24 12:34:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-24 12:34:30 -0700 |
commit | 06c473e62842d257ed275497ce906710ea3f8e19 (patch) | |
tree | 4ca6d337a892aa23266a761b35dc61e988e57954 /tokio-sync/src/watch.rs | |
parent | aa99950b9c983b842bd2107bb771c277d09d495d (diff) |
Update Tokio to use `std::future`. (#1120)
A first pass at updating Tokio to use `std::future`.
Implementations of `Future` from the futures crate are updated to implement
`Future` from std. Implementations of `Stream` are moved to a feature flag.
This commits disables a number of crates that have not yet been updated.
Diffstat (limited to 'tokio-sync/src/watch.rs')
-rw-r--r-- | tokio-sync/src/watch.rs | 108 |
1 files changed, 55 insertions, 53 deletions
diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index aab016b5..ce275b4e 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -53,14 +53,19 @@ //! [`Receiver::poll`]: struct.Receiver.html#method.poll //! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref +use crate::task::AtomicWaker; + +use core::task::Poll::{Pending, Ready}; +use core::task::{Context, Poll}; use fnv::FnvHashMap; -use futures::task::AtomicTask; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; +#[cfg(feature = "async-traits")] +use std::pin::Pin; + /// Receives values from the associated `Sender`. /// /// Instances are created by the [`channel`](fn.channel.html) function. @@ -102,33 +107,12 @@ pub mod error { use std::fmt; - /// Error produced when receiving a value fails. - #[derive(Debug)] - pub struct RecvError { - pub(crate) _p: (), - } - /// Error produced when sending a value fails. #[derive(Debug)] pub struct SendError<T> { pub(crate) inner: T, } - // ===== impl RecvError ===== - - impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - use std::error::Error; - write!(fmt, "{}", self.description()) - } - } - - impl ::std::error::Error for RecvError { - fn description(&self) -> &str { - "channel closed" - } - } - // ===== impl SendError ===== impl<T: fmt::Debug> fmt::Display for SendError<T> { @@ -160,7 +144,7 @@ struct Shared<T> { watchers: Mutex<Watchers>, /// Task to notify when all watchers drop - cancel: AtomicTask, + cancel: AtomicWaker, } #[derive(Debug)] @@ -171,7 +155,7 @@ struct Watchers { #[derive(Debug)] struct WatchInner { - task: AtomicTask, + waker: AtomicWaker, } const CLOSED: usize = 1; @@ -216,7 +200,7 @@ pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) { next_id: INIT_ID + 1, watchers, }), - cancel: AtomicTask::new(), + cancel: AtomicWaker::new(), }); let tx = Sender { @@ -256,14 +240,14 @@ impl<T> Receiver<T> { /// Attempts to receive the latest value sent via the channel. /// /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `NotReady` is returned and + /// returned. If no new value has been sent, then `Pending` is returned and /// the current task is notified once a new value is sent. /// /// Only the **most recent** value is returned. If the receiver is falling /// behind the sender, intermediate values are dropped. - pub fn poll_ref(&mut self) -> Poll<Option<Ref<'_, T>>, error::RecvError> { + pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> { // Make sure the task is up to date - self.inner.task.register(); + self.inner.waker.register_by_ref(cx.waker()); let state = self.shared.version.load(SeqCst); let version = state & !CLOSED; @@ -274,25 +258,35 @@ impl<T> Receiver<T> { let inner = self.shared.value.read().unwrap(); - return Ok(Some(Ref { inner }).into()); + return Ready(Some(Ref { inner })); } if CLOSED == state & CLOSED { // The `Store` handle has been dropped. - return Ok(None.into()); + return Ready(None); } - Ok(Async::NotReady) + Pending } } -impl<T: Clone> Stream for Receiver<T> { +impl<T: Clone> Receiver<T> { + /// Attempts to clone the latest value sent via the channel. + /// + /// This is equivalent to calling `Clone` on the value returned by `poll_ref`. + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + let item = ready!(self.poll_ref(cx)); + Ready(item.map(|v_ref| v_ref.clone())) + } +} + +#[cfg(feature = "async-traits")] +impl<T: Clone> futures_core::Stream for Receiver<T> { type Item = T; - type Error = error::RecvError; - fn poll(&mut self) -> Poll<Option<T>, error::RecvError> { - let item = try_ready!(self.poll_ref()); - Ok(Async::Ready(item.map(|v_ref| v_ref.clone()))) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let item = ready!(self.poll_ref(cx)); + Ready(item.map(|v_ref| v_ref.clone())) } } @@ -332,14 +326,14 @@ impl<T> Drop for Receiver<T> { impl WatchInner { fn new() -> Self { WatchInner { - task: AtomicTask::new(), + waker: AtomicWaker::new(), } } } impl<T> Sender<T> { /// Broadcast a new value via the channel, notifying all receivers. - pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError<T>> { + pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> { let shared = match self.shared.upgrade() { Some(shared) => shared, // All `Watch` handles have been canceled @@ -366,28 +360,36 @@ impl<T> Sender<T> { /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub fn poll_close(&mut self) -> Poll<(), ()> { + pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self.shared.upgrade() { Some(shared) => { - shared.cancel.register(); - Ok(Async::NotReady) + shared.cancel.register_by_ref(cx.waker()); + Pending } - None => Ok(Async::Ready(())), + None => Ready(()), } } } -impl<T> Sink for Sender<T> { - type SinkItem = T; - type SinkError = error::SendError<T>; +#[cfg(feature = "async-traits")] +impl<T> async_sink::Sink<T> for Sender<T> { + type Error = error::SendError<T>; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + let _ = self.as_ref().get_ref().broadcast(item)?; + Ok(()) + } - fn start_send(&mut self, item: T) -> StartSend<T, error::SendError<T>> { - let _ = self.broadcast(item)?; - Ok(AsyncSink::Ready) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Ready(Ok(())) } - fn poll_complete(&mut self) -> Poll<(), error::SendError<T>> { - Ok(().into()) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Ready(Ok(())) } } @@ -397,7 +399,7 @@ fn notify_all<T>(shared: &Shared<T>) { for watcher in watchers.watchers.values() { // Notify the task - watcher.task.notify(); + watcher.waker.wake(); } } @@ -424,6 +426,6 @@ impl<'a, T: 'a> ops::Deref for Ref<'a, T> { impl<T> Drop for Shared<T> { fn drop(&mut self) { - self.cancel.notify(); + self.cancel.wake(); } } |