diff options
author | Carl Lerche <me@carllerche.com> | 2020-09-11 15:14:45 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-11 15:14:45 -0700 |
commit | 2bc9a4815259c8ff4daa5e24f128ec826970d17f (patch) | |
tree | c075e4d97a145ce104cfc8ee39d8d06acece5c13 /tokio/tests | |
parent | c5a9ede157691ac5ca15283735bd666c6b016188 (diff) |
sync: tweak `watch` API (#2814)
Decouples getting the latest `watch` value from receiving the change
notification. The `Receiver` async method becomes
`Receiver::changed()`. The latest value is obtained from
`Receiver::borrow()`.
The implementation is updated to use `Notify`. This requires adding
`Notify::notify_waiters`. This method is generally useful but is kept
private for now.
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/async_send_sync.rs | 6 | ||||
-rw-r--r-- | tokio/tests/sync_watch.rs | 154 |
2 files changed, 49 insertions, 111 deletions
diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index f1eed0e4..e7011e3b 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -205,7 +205,7 @@ async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock(_): !Send & !Sync); async_assert_fn!(tokio::sync::Mutex<u8>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock<u8>::read(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock<u8>::write(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::read(_): !Send & !Sync); @@ -229,9 +229,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<u8>::recv(_): Send & Sync) async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Cell<u8>>::recv(_): Send & Sync); async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Rc<u8>>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver<u8>::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Receiver<Cell<u8>>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver<Rc<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver<u8>::changed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5d550443..9dcb0c53 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -4,41 +4,41 @@ use tokio::sync::watch; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; #[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); { - let mut t = spawn(rx.recv()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "one"); + // Not initially notified + let mut t = spawn(rx.changed()); + assert_pending!(t.poll()); } + assert_eq!(*rx.borrow(), "one"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); tx.send("two").unwrap(); assert!(t.is_woken()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "two"); + assert_ready_ok!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); drop(tx); - let res = assert_ready!(t.poll()); - assert_eq!(res, "two"); + assert!(t.is_woken()); + assert_ready_err!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); } #[test] @@ -47,20 +47,19 @@ fn multi_rx() { let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); - - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "one"); + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); } + assert_eq!(*rx1.borrow(), "one"); + assert_eq!(*rx2.borrow(), "one"); - let mut t2 = spawn(rx2.recv()); + let mut t2 = spawn(rx2.changed()); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -70,12 +69,12 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx1.borrow(), "two"); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); @@ -84,45 +83,29 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "three"); drop(t2); + assert_eq!(*rx2.borrow(), "three"); + { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); tx.send("four").unwrap(); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - drop(t1); - - let mut t1 = spawn(rx1.recv()); - assert_pending!(t1.poll()); - - drop(tx); - - assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); - - drop(t2); - let mut t2 = spawn(rx2.recv()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "four"); + assert_eq!(*rx2.borrow(), "four"); } #[test] @@ -133,16 +116,10 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); - } - - { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "one"); // Sending a value @@ -151,13 +128,13 @@ fn rx_observes_final_value() { tx.send("two").unwrap(); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + let mut t1 = spawn(rx.changed()); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t1 = spawn(rx.recv()); + let mut t1 = spawn(rx.changed()); assert_pending!(t1.poll()); tx.send("three").unwrap(); @@ -165,20 +142,20 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); } #[test] fn poll_close() { - let (mut tx, rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); { let mut t = spawn(tx.closed()); @@ -192,40 +169,3 @@ fn poll_close() { assert!(tx.send("two").is_err()); } - -#[test] -fn stream_impl() { - use tokio::stream::StreamExt; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.send("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert_eq!(res, Some("two")); - } -} |