diff options
Diffstat (limited to 'tokio-sync/tests/watch.rs')
-rw-r--r-- | tokio-sync/tests/watch.rs | 195 |
1 files changed, 117 insertions, 78 deletions
diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs index 8424a181..10a6a822 100644 --- a/tokio-sync/tests/watch.rs +++ b/tokio-sync/tests/watch.rs @@ -1,9 +1,10 @@ #![deny(warnings, rust_2018_idioms)] -use futures; -use tokio_mock_task::*; use tokio_sync::watch; +use tokio_test::task::MockTask; +use tokio_test::{assert_pending, assert_ready}; +/* macro_rules! assert_ready { ($e:expr) => {{ match $e { @@ -23,143 +24,179 @@ macro_rules! assert_not_ready { } }}; } +*/ #[test] -fn single_rx() { - let (mut tx, mut rx) = watch::channel("one"); +fn single_rx_poll_ref() { + let (tx, mut rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| { - let v = assert_ready!(rx.poll_ref()).unwrap(); - assert_eq!(*v, "one"); + task.enter(|cx| { + { + let v = assert_ready!(rx.poll_ref(cx)).unwrap(); + assert_eq!(*v, "one"); + } + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll_ref())); + tx.broadcast("two").unwrap(); - assert!(!task.is_notified()); + assert!(task.is_woken()); - tx.broadcast("two").unwrap(); + task.enter(|cx| { + { + let v = assert_ready!(rx.poll_ref(cx)).unwrap(); + assert_eq!(*v, "two"); + } + assert_pending!(rx.poll_ref(cx)); + }); + + drop(tx); + + assert!(task.is_woken()); - assert!(task.is_notified()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); + assert!(res.is_none()); + }); +} - task.enter(|| { - let v = assert_ready!(rx.poll_ref()).unwrap(); - assert_eq!(*v, "two"); +#[test] +fn single_rx_poll_next() { + let (tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + task.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)).unwrap(); + assert_eq!(v, "one"); + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll_ref())); + tx.broadcast("two").unwrap(); + + assert!(task.is_woken()); + + task.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)).unwrap(); + assert_eq!(v, "two"); + assert_pending!(rx.poll_ref(cx)); + }); drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_next(cx)); assert!(res.is_none()); }); } #[test] +#[cfg(feature = "async-traits")] fn stream_impl() { - use futures::Stream; + use futures_core::Stream; + use pin_utils::pin_mut; - let (mut tx, mut rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| { - let v = assert_ready!(rx.poll()).unwrap(); - assert_eq!(v, "one"); - }); - - task.enter(|| assert_not_ready!(rx.poll())); + pin_mut!(rx); - assert!(!task.is_notified()); + task.enter(|cx| { + { + let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); + assert_eq!(v, "one"); + } + assert_pending!(rx.poll_ref(cx)); + }); tx.broadcast("two").unwrap(); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let v = assert_ready!(rx.poll()).unwrap(); - assert_eq!(v, "two"); + task.enter(|cx| { + { + let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); + assert_eq!(v, "two"); + } + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll())); - drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll()); + task.enter(|cx| { + let res = assert_ready!(Stream::poll_next(rx, cx)); assert!(res.is_none()); }); } #[test] fn multi_rx() { - let (mut tx, mut rx1) = watch::channel("one"); + let (tx, mut rx1) = watch::channel("one"); let mut rx2 = rx1.clone(); let mut task1 = MockTask::new(); let mut task2 = MockTask::new(); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "one"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "one"); }); tx.broadcast("two").unwrap(); - assert!(task1.is_notified()); - assert!(task2.is_notified()); + assert!(task1.is_woken()); + assert!(task2.is_woken()); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "two"); }); tx.broadcast("three").unwrap(); - assert!(task1.is_notified()); - assert!(task2.is_notified()); + assert!(task1.is_woken()); + assert!(task2.is_woken()); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "three"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "three"); }); tx.broadcast("four").unwrap(); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "four"); }); drop(tx); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert!(res.is_none()); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "four"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert!(res.is_none()); }); } @@ -173,45 +210,47 @@ fn rx_observes_final_value() { drop(tx); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_some()); assert_eq!(*res.unwrap(), "one"); }); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_none()); }); // Sending a value - let (mut tx, mut rx) = watch::channel("one"); + let (tx, mut rx) = watch::channel("one"); let mut task = MockTask::new(); tx.broadcast("two").unwrap(); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); - assert!(res.is_some()); - assert_eq!(*res.unwrap(), "two"); - }); + task.enter(|cx| { + { + let res = assert_ready!(rx.poll_ref(cx)); + assert!(res.is_some()); + assert_eq!(*res.unwrap(), "two"); + } - task.enter(|| assert_not_ready!(rx.poll_ref())); + assert_pending!(rx.poll_ref(cx)); + }); tx.broadcast("three").unwrap(); drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_some()); assert_eq!(*res.unwrap(), "three"); }); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_none()); }); } @@ -221,13 +260,13 @@ fn poll_close() { let (mut tx, rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| assert_not_ready!(tx.poll_close())); + assert_pending!(task.enter(|cx| tx.poll_close(cx))); drop(rx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| assert_ready!(tx.poll_close())); + assert_ready!(task.enter(|cx| tx.poll_close(cx))); assert!(tx.broadcast("two").is_err()); } |