summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorBlas Rodriguez Irizar <rodrigblas@gmail.com>2020-09-02 05:57:48 +0200
committerGitHub <noreply@github.com>2020-09-01 20:57:48 -0700
commit5a1a6dc90c6d5a7eb5f31ae215f9ec383d6767aa (patch)
treec4ca532bc6a9a09c1defa9e4eb9527fd92d9e7d9 /tokio/src
parent827077409c8a8ef7adb4d05d522fcf6c1949c876 (diff)
sync: watch channel breaking changes (#2806)
Fixes: #2172
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/sync/barrier.rs4
-rw-r--r--tokio/src/sync/mod.rs6
-rw-r--r--tokio/src/sync/watch.rs34
3 files changed, 23 insertions, 21 deletions
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs
index 62863349..23713251 100644
--- a/tokio/src/sync/barrier.rs
+++ b/tokio/src/sync/barrier.rs
@@ -96,7 +96,7 @@ impl Barrier {
// wake everyone, increment the generation, and return
state
.waker
- .broadcast(state.generation)
+ .send(state.generation)
.expect("there is at least one receiver");
state.arrived = 0;
state.generation += 1;
@@ -112,7 +112,7 @@ impl Barrier {
loop {
// note that the first time through the loop, this _will_ yield a generation
// immediately, since we cloned a receiver that has never seen any values.
- if wait.recv().await.expect("sender hasn't been closed") >= generation {
+ if wait.recv().await >= generation {
break;
}
}
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index 7052976b..5d66512d 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -330,7 +330,7 @@
//! // If the configuration changed, send the new config value
//! // on the watch channel.
//! if new_config != config {
-//! tx.broadcast(new_config.clone()).unwrap();
+//! tx.send(new_config.clone()).unwrap();
//! config = new_config;
//! }
//! }
@@ -358,7 +358,7 @@
//! // Receive the **initial** configuration value. As this is the
//! // first time the config is received from the watch, it will
//! // always complete immediatedly.
-//! let mut conf = rx.recv().await.unwrap();
+//! let mut conf = rx.recv().await;
//!
//! let mut op_start = Instant::now();
//! let mut delay = time::delay_until(op_start + conf.timeout);
@@ -376,7 +376,7 @@
//! delay = time::delay_until(op_start + conf.timeout);
//! }
//! new_conf = rx.recv() => {
-//! conf = new_conf.unwrap();
+//! conf = new_conf;
//!
//! // The configuration has been updated. Update the
//! // `delay` using the new `timeout` value.
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 13033d9e..f6660b6e 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -23,12 +23,12 @@
//! let (tx, mut rx) = watch::channel("hello");
//!
//! tokio::spawn(async move {
-//! while let Some(value) = rx.recv().await {
+//! while let Some(value) = Some(rx.recv().await) {
//! println!("received = {:?}", value);
//! }
//! });
//!
-//! tx.broadcast("world")?;
+//! tx.send("world")?;
//! # Ok(())
//! # }
//! ```
@@ -162,12 +162,12 @@ const CLOSED: usize = 1;
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
-/// while let Some(value) = rx.recv().await {
+/// while let Some(value) = Some(rx.recv().await) {
/// println!("received = {:?}", value);
/// }
/// });
///
-/// tx.broadcast("world")?;
+/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
@@ -223,7 +223,7 @@ impl<T> Receiver<T> {
// TODO: document
#[doc(hidden)]
- pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
+ pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Ref<'a, T>> {
// Make sure the task is up to date
self.inner.waker.register_by_ref(cx.waker());
@@ -233,12 +233,14 @@ impl<T> Receiver<T> {
if self.inner.version.swap(version, Relaxed) != version {
let inner = self.shared.value.read().unwrap();
- return Ready(Some(Ref { inner }));
+ return Ready(Ref { inner });
}
if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
- return Ready(None);
+ let inner = self.shared.value.read().unwrap();
+
+ return Ready(Ref { inner });
}
Pending
@@ -264,25 +266,25 @@ impl<T: Clone> Receiver<T> {
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
- /// let v = rx.recv().await.unwrap();
+ /// let v = rx.recv().await;
/// assert_eq!(v, "hello");
///
/// tokio::spawn(async move {
- /// tx.broadcast("goodbye").unwrap();
+ /// tx.send("goodbye").unwrap();
/// });
///
/// // Waits for the new task to spawn and send the value.
- /// let v = rx.recv().await.unwrap();
+ /// let v = rx.recv().await;
/// assert_eq!(v, "goodbye");
///
/// let v = rx.recv().await;
- /// assert!(v.is_none());
+ /// assert_eq!(v, "goodbye");
/// }
/// ```
- pub async fn recv(&mut self) -> Option<T> {
+ pub async fn recv(&mut self) -> T {
poll_fn(|cx| {
let v_ref = ready!(self.poll_recv_ref(cx));
- Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
+ Poll::Ready((*v_ref).clone())
})
.await
}
@@ -295,7 +297,7 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> {
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let v_ref = ready!(self.poll_recv_ref(cx));
- Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
+ Poll::Ready(Some((*v_ref).clone()))
}
}
@@ -318,8 +320,8 @@ impl<T> Drop for Receiver<T> {
}
impl<T> Sender<T> {
- /// Broadcasts a new value via the channel, notifying all receivers.
- pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
+ /// Sends a new value via the channel, notifying all receivers.
+ pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
// All `Watch` handles have been canceled