summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/watch.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-06-24 12:34:30 -0700
committerGitHub <noreply@github.com>2019-06-24 12:34:30 -0700
commit06c473e62842d257ed275497ce906710ea3f8e19 (patch)
tree4ca6d337a892aa23266a761b35dc61e988e57954 /tokio-sync/src/watch.rs
parentaa99950b9c983b842bd2107bb771c277d09d495d (diff)
Update Tokio to use `std::future`. (#1120)
A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated.
Diffstat (limited to 'tokio-sync/src/watch.rs')
-rw-r--r--tokio-sync/src/watch.rs108
1 files changed, 55 insertions, 53 deletions
diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs
index aab016b5..ce275b4e 100644
--- a/tokio-sync/src/watch.rs
+++ b/tokio-sync/src/watch.rs
@@ -53,14 +53,19 @@
//! [`Receiver::poll`]: struct.Receiver.html#method.poll
//! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref
+use crate::task::AtomicWaker;
+
+use core::task::Poll::{Pending, Ready};
+use core::task::{Context, Poll};
use fnv::FnvHashMap;
-use futures::task::AtomicTask;
-use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream};
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
+#[cfg(feature = "async-traits")]
+use std::pin::Pin;
+
/// Receives values from the associated `Sender`.
///
/// Instances are created by the [`channel`](fn.channel.html) function.
@@ -102,33 +107,12 @@ pub mod error {
use std::fmt;
- /// Error produced when receiving a value fails.
- #[derive(Debug)]
- pub struct RecvError {
- pub(crate) _p: (),
- }
-
/// Error produced when sending a value fails.
#[derive(Debug)]
pub struct SendError<T> {
pub(crate) inner: T,
}
- // ===== impl RecvError =====
-
- impl fmt::Display for RecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- use std::error::Error;
- write!(fmt, "{}", self.description())
- }
- }
-
- impl ::std::error::Error for RecvError {
- fn description(&self) -> &str {
- "channel closed"
- }
- }
-
// ===== impl SendError =====
impl<T: fmt::Debug> fmt::Display for SendError<T> {
@@ -160,7 +144,7 @@ struct Shared<T> {
watchers: Mutex<Watchers>,
/// Task to notify when all watchers drop
- cancel: AtomicTask,
+ cancel: AtomicWaker,
}
#[derive(Debug)]
@@ -171,7 +155,7 @@ struct Watchers {
#[derive(Debug)]
struct WatchInner {
- task: AtomicTask,
+ waker: AtomicWaker,
}
const CLOSED: usize = 1;
@@ -216,7 +200,7 @@ pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
next_id: INIT_ID + 1,
watchers,
}),
- cancel: AtomicTask::new(),
+ cancel: AtomicWaker::new(),
});
let tx = Sender {
@@ -256,14 +240,14 @@ impl<T> Receiver<T> {
/// Attempts to receive the latest value sent via the channel.
///
/// If a new, unobserved, value has been sent, a reference to it is
- /// returned. If no new value has been sent, then `NotReady` is returned and
+ /// returned. If no new value has been sent, then `Pending` is returned and
/// the current task is notified once a new value is sent.
///
/// Only the **most recent** value is returned. If the receiver is falling
/// behind the sender, intermediate values are dropped.
- pub fn poll_ref(&mut self) -> Poll<Option<Ref<'_, T>>, error::RecvError> {
+ pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> {
// Make sure the task is up to date
- self.inner.task.register();
+ self.inner.waker.register_by_ref(cx.waker());
let state = self.shared.version.load(SeqCst);
let version = state & !CLOSED;
@@ -274,25 +258,35 @@ impl<T> Receiver<T> {
let inner = self.shared.value.read().unwrap();
- return Ok(Some(Ref { inner }).into());
+ return Ready(Some(Ref { inner }));
}
if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
- return Ok(None.into());
+ return Ready(None);
}
- Ok(Async::NotReady)
+ Pending
}
}
-impl<T: Clone> Stream for Receiver<T> {
+impl<T: Clone> Receiver<T> {
+ /// Attempts to clone the latest value sent via the channel.
+ ///
+ /// This is equivalent to calling `Clone` on the value returned by `poll_ref`.
+ pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let item = ready!(self.poll_ref(cx));
+ Ready(item.map(|v_ref| v_ref.clone()))
+ }
+}
+
+#[cfg(feature = "async-traits")]
+impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
- type Error = error::RecvError;
- fn poll(&mut self) -> Poll<Option<T>, error::RecvError> {
- let item = try_ready!(self.poll_ref());
- Ok(Async::Ready(item.map(|v_ref| v_ref.clone())))
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let item = ready!(self.poll_ref(cx));
+ Ready(item.map(|v_ref| v_ref.clone()))
}
}
@@ -332,14 +326,14 @@ impl<T> Drop for Receiver<T> {
impl WatchInner {
fn new() -> Self {
WatchInner {
- task: AtomicTask::new(),
+ waker: AtomicWaker::new(),
}
}
}
impl<T> Sender<T> {
/// Broadcast a new value via the channel, notifying all receivers.
- pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError<T>> {
+ pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
// All `Watch` handles have been canceled
@@ -366,28 +360,36 @@ impl<T> Sender<T> {
///
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
- pub fn poll_close(&mut self) -> Poll<(), ()> {
+ pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self.shared.upgrade() {
Some(shared) => {
- shared.cancel.register();
- Ok(Async::NotReady)
+ shared.cancel.register_by_ref(cx.waker());
+ Pending
}
- None => Ok(Async::Ready(())),
+ None => Ready(()),
}
}
}
-impl<T> Sink for Sender<T> {
- type SinkItem = T;
- type SinkError = error::SendError<T>;
+#[cfg(feature = "async-traits")]
+impl<T> async_sink::Sink<T> for Sender<T> {
+ type Error = error::SendError<T>;
+
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Ready(Ok(()))
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ let _ = self.as_ref().get_ref().broadcast(item)?;
+ Ok(())
+ }
- fn start_send(&mut self, item: T) -> StartSend<T, error::SendError<T>> {
- let _ = self.broadcast(item)?;
- Ok(AsyncSink::Ready)
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Ready(Ok(()))
}
- fn poll_complete(&mut self) -> Poll<(), error::SendError<T>> {
- Ok(().into())
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Ready(Ok(()))
}
}
@@ -397,7 +399,7 @@ fn notify_all<T>(shared: &Shared<T>) {
for watcher in watchers.watchers.values() {
// Notify the task
- watcher.task.notify();
+ watcher.waker.wake();
}
}
@@ -424,6 +426,6 @@ impl<'a, T: 'a> ops::Deref for Ref<'a, T> {
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
- self.cancel.notify();
+ self.cancel.wake();
}
}