diff options
author | Blas Rodriguez Irizar <rodrigblas@gmail.com> | 2020-09-02 05:57:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-01 20:57:48 -0700 |
commit | 5a1a6dc90c6d5a7eb5f31ae215f9ec383d6767aa (patch) | |
tree | c4ca532bc6a9a09c1defa9e4eb9527fd92d9e7d9 /tokio/src/sync/watch.rs | |
parent | 827077409c8a8ef7adb4d05d522fcf6c1949c876 (diff) |
sync: watch channel breaking changes (#2806)
Fixes: #2172
Diffstat (limited to 'tokio/src/sync/watch.rs')
-rw-r--r-- | tokio/src/sync/watch.rs | 34 |
1 files changed, 18 insertions, 16 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 13033d9e..f6660b6e 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -23,12 +23,12 @@ //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while let Some(value) = rx.recv().await { +//! while let Some(value) = Some(rx.recv().await) { //! println!("received = {:?}", value); //! } //! }); //! -//! tx.broadcast("world")?; +//! tx.send("world")?; //! # Ok(()) //! # } //! ``` @@ -162,12 +162,12 @@ const CLOSED: usize = 1; /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { -/// while let Some(value) = rx.recv().await { +/// while let Some(value) = Some(rx.recv().await) { /// println!("received = {:?}", value); /// } /// }); /// -/// tx.broadcast("world")?; +/// tx.send("world")?; /// # Ok(()) /// # } /// ``` @@ -223,7 +223,7 @@ impl<T> Receiver<T> { // TODO: document #[doc(hidden)] - pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> { + pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Ref<'a, T>> { // Make sure the task is up to date self.inner.waker.register_by_ref(cx.waker()); @@ -233,12 +233,14 @@ impl<T> Receiver<T> { if self.inner.version.swap(version, Relaxed) != version { let inner = self.shared.value.read().unwrap(); - return Ready(Some(Ref { inner })); + return Ready(Ref { inner }); } if CLOSED == state & CLOSED { // The `Store` handle has been dropped. - return Ready(None); + let inner = self.shared.value.read().unwrap(); + + return Ready(Ref { inner }); } Pending @@ -264,25 +266,25 @@ impl<T: Clone> Receiver<T> { /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// - /// let v = rx.recv().await.unwrap(); + /// let v = rx.recv().await; /// assert_eq!(v, "hello"); /// /// tokio::spawn(async move { - /// tx.broadcast("goodbye").unwrap(); + /// tx.send("goodbye").unwrap(); /// }); /// /// // Waits for the new task to spawn and send the value. - /// let v = rx.recv().await.unwrap(); + /// let v = rx.recv().await; /// assert_eq!(v, "goodbye"); /// /// let v = rx.recv().await; - /// assert!(v.is_none()); + /// assert_eq!(v, "goodbye"); /// } /// ``` - pub async fn recv(&mut self) -> Option<T> { + pub async fn recv(&mut self) -> T { poll_fn(|cx| { let v_ref = ready!(self.poll_recv_ref(cx)); - Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) + Poll::Ready((*v_ref).clone()) }) .await } @@ -295,7 +297,7 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> { fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { let v_ref = ready!(self.poll_recv_ref(cx)); - Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) + Poll::Ready(Some((*v_ref).clone())) } } @@ -318,8 +320,8 @@ impl<T> Drop for Receiver<T> { } impl<T> Sender<T> { - /// Broadcasts a new value via the channel, notifying all receivers. - pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> { + /// Sends a new value via the channel, notifying all receivers. + pub fn send(&self, value: T) -> Result<(), error::SendError<T>> { let shared = match self.shared.upgrade() { Some(shared) => shared, // All `Watch` handles have been canceled |