From 2bc9a4815259c8ff4daa5e24f128ec826970d17f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 11 Sep 2020 15:14:45 -0700 Subject: sync: tweak `watch` API (#2814) Decouples getting the latest `watch` value from receiving the change notification. The `Receiver` async method becomes `Receiver::changed()`. The latest value is obtained from `Receiver::borrow()`. The implementation is updated to use `Notify`. This requires adding `Notify::notify_waiters`. This method is generally useful but is kept private for now. --- tokio/tests/async_send_sync.rs | 6 +- tokio/tests/sync_watch.rs | 154 +++++++++++++---------------------------- 2 files changed, 49 insertions(+), 111 deletions(-) (limited to 'tokio/tests') diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index f1eed0e4..e7011e3b 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -205,7 +205,7 @@ async_assert_fn!(tokio::sync::Mutex>::lock(_): !Send & !Sync); async_assert_fn!(tokio::sync::Mutex::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::read(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::write(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock>::read(_): !Send & !Sync); @@ -229,9 +229,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver::recv(_): Send & Sync) async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): Send & Sync); async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver::changed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5d550443..9dcb0c53 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -4,41 +4,41 @@ use tokio::sync::watch; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; #[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); { - let mut t = spawn(rx.recv()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "one"); + // Not initially notified + let mut t = spawn(rx.changed()); + assert_pending!(t.poll()); } + assert_eq!(*rx.borrow(), "one"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); tx.send("two").unwrap(); assert!(t.is_woken()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "two"); + assert_ready_ok!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); drop(tx); - let res = assert_ready!(t.poll()); - assert_eq!(res, "two"); + assert!(t.is_woken()); + assert_ready_err!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); } #[test] @@ -47,20 +47,19 @@ fn multi_rx() { let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); - - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "one"); + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); } + assert_eq!(*rx1.borrow(), "one"); + assert_eq!(*rx2.borrow(), "one"); - let mut t2 = spawn(rx2.recv()); + let mut t2 = spawn(rx2.changed()); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -70,12 +69,12 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx1.borrow(), "two"); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); @@ -84,45 +83,29 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "three"); drop(t2); + assert_eq!(*rx2.borrow(), "three"); + { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); tx.send("four").unwrap(); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - drop(t1); - - let mut t1 = spawn(rx1.recv()); - assert_pending!(t1.poll()); - - drop(tx); - - assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); - - drop(t2); - let mut t2 = spawn(rx2.recv()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "four"); + assert_eq!(*rx2.borrow(), "four"); } #[test] @@ -133,16 +116,10 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); - } - - { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "one"); // Sending a value @@ -151,13 +128,13 @@ fn rx_observes_final_value() { tx.send("two").unwrap(); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + let mut t1 = spawn(rx.changed()); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t1 = spawn(rx.recv()); + let mut t1 = spawn(rx.changed()); assert_pending!(t1.poll()); tx.send("three").unwrap(); @@ -165,20 +142,20 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); } #[test] fn poll_close() { - let (mut tx, rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); { let mut t = spawn(tx.closed()); @@ -192,40 +169,3 @@ fn poll_close() { assert!(tx.send("two").is_err()); } - -#[test] -fn stream_impl() { - use tokio::stream::StreamExt; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.send("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert_eq!(res, Some("two")); - } -} -- cgit v1.2.3