summaryrefslogtreecommitdiffstats
path: root/tokio-sync/tests/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/tests/watch.rs')
-rw-r--r--tokio-sync/tests/watch.rs195
1 files changed, 117 insertions, 78 deletions
diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs
index 8424a181..10a6a822 100644
--- a/tokio-sync/tests/watch.rs
+++ b/tokio-sync/tests/watch.rs
@@ -1,9 +1,10 @@
#![deny(warnings, rust_2018_idioms)]
-use futures;
-use tokio_mock_task::*;
use tokio_sync::watch;
+use tokio_test::task::MockTask;
+use tokio_test::{assert_pending, assert_ready};
+/*
macro_rules! assert_ready {
($e:expr) => {{
match $e {
@@ -23,143 +24,179 @@ macro_rules! assert_not_ready {
}
}};
}
+*/
#[test]
-fn single_rx() {
- let (mut tx, mut rx) = watch::channel("one");
+fn single_rx_poll_ref() {
+ let (tx, mut rx) = watch::channel("one");
let mut task = MockTask::new();
- task.enter(|| {
- let v = assert_ready!(rx.poll_ref()).unwrap();
- assert_eq!(*v, "one");
+ task.enter(|cx| {
+ {
+ let v = assert_ready!(rx.poll_ref(cx)).unwrap();
+ assert_eq!(*v, "one");
+ }
+ assert_pending!(rx.poll_ref(cx));
});
- task.enter(|| assert_not_ready!(rx.poll_ref()));
+ tx.broadcast("two").unwrap();
- assert!(!task.is_notified());
+ assert!(task.is_woken());
- tx.broadcast("two").unwrap();
+ task.enter(|cx| {
+ {
+ let v = assert_ready!(rx.poll_ref(cx)).unwrap();
+ assert_eq!(*v, "two");
+ }
+ assert_pending!(rx.poll_ref(cx));
+ });
+
+ drop(tx);
+
+ assert!(task.is_woken());
- assert!(task.is_notified());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_ref(cx));
+ assert!(res.is_none());
+ });
+}
- task.enter(|| {
- let v = assert_ready!(rx.poll_ref()).unwrap();
- assert_eq!(*v, "two");
+#[test]
+fn single_rx_poll_next() {
+ let (tx, mut rx) = watch::channel("one");
+ let mut task = MockTask::new();
+
+ task.enter(|cx| {
+ let v = assert_ready!(rx.poll_next(cx)).unwrap();
+ assert_eq!(v, "one");
+ assert_pending!(rx.poll_ref(cx));
});
- task.enter(|| assert_not_ready!(rx.poll_ref()));
+ tx.broadcast("two").unwrap();
+
+ assert!(task.is_woken());
+
+ task.enter(|cx| {
+ let v = assert_ready!(rx.poll_next(cx)).unwrap();
+ assert_eq!(v, "two");
+ assert_pending!(rx.poll_ref(cx));
+ });
drop(tx);
- assert!(task.is_notified());
+ assert!(task.is_woken());
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_next(cx));
assert!(res.is_none());
});
}
#[test]
+#[cfg(feature = "async-traits")]
fn stream_impl() {
- use futures::Stream;
+ use futures_core::Stream;
+ use pin_utils::pin_mut;
- let (mut tx, mut rx) = watch::channel("one");
+ let (tx, rx) = watch::channel("one");
let mut task = MockTask::new();
- task.enter(|| {
- let v = assert_ready!(rx.poll()).unwrap();
- assert_eq!(v, "one");
- });
-
- task.enter(|| assert_not_ready!(rx.poll()));
+ pin_mut!(rx);
- assert!(!task.is_notified());
+ task.enter(|cx| {
+ {
+ let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap();
+ assert_eq!(v, "one");
+ }
+ assert_pending!(rx.poll_ref(cx));
+ });
tx.broadcast("two").unwrap();
- assert!(task.is_notified());
+ assert!(task.is_woken());
- task.enter(|| {
- let v = assert_ready!(rx.poll()).unwrap();
- assert_eq!(v, "two");
+ task.enter(|cx| {
+ {
+ let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap();
+ assert_eq!(v, "two");
+ }
+ assert_pending!(rx.poll_ref(cx));
});
- task.enter(|| assert_not_ready!(rx.poll()));
-
drop(tx);
- assert!(task.is_notified());
+ assert!(task.is_woken());
- task.enter(|| {
- let res = assert_ready!(rx.poll());
+ task.enter(|cx| {
+ let res = assert_ready!(Stream::poll_next(rx, cx));
assert!(res.is_none());
});
}
#[test]
fn multi_rx() {
- let (mut tx, mut rx1) = watch::channel("one");
+ let (tx, mut rx1) = watch::channel("one");
let mut rx2 = rx1.clone();
let mut task1 = MockTask::new();
let mut task2 = MockTask::new();
- task1.enter(|| {
- let res = assert_ready!(rx1.poll_ref());
+ task1.enter(|cx| {
+ let res = assert_ready!(rx1.poll_ref(cx));
assert_eq!(*res.unwrap(), "one");
});
- task2.enter(|| {
- let res = assert_ready!(rx2.poll_ref());
+ task2.enter(|cx| {
+ let res = assert_ready!(rx2.poll_ref(cx));
assert_eq!(*res.unwrap(), "one");
});
tx.broadcast("two").unwrap();
- assert!(task1.is_notified());
- assert!(task2.is_notified());
+ assert!(task1.is_woken());
+ assert!(task2.is_woken());
- task1.enter(|| {
- let res = assert_ready!(rx1.poll_ref());
+ task1.enter(|cx| {
+ let res = assert_ready!(rx1.poll_ref(cx));
assert_eq!(*res.unwrap(), "two");
});
tx.broadcast("three").unwrap();
- assert!(task1.is_notified());
- assert!(task2.is_notified());
+ assert!(task1.is_woken());
+ assert!(task2.is_woken());
- task1.enter(|| {
- let res = assert_ready!(rx1.poll_ref());
+ task1.enter(|cx| {
+ let res = assert_ready!(rx1.poll_ref(cx));
assert_eq!(*res.unwrap(), "three");
});
- task2.enter(|| {
- let res = assert_ready!(rx2.poll_ref());
+ task2.enter(|cx| {
+ let res = assert_ready!(rx2.poll_ref(cx));
assert_eq!(*res.unwrap(), "three");
});
tx.broadcast("four").unwrap();
- task1.enter(|| {
- let res = assert_ready!(rx1.poll_ref());
+ task1.enter(|cx| {
+ let res = assert_ready!(rx1.poll_ref(cx));
assert_eq!(*res.unwrap(), "four");
});
drop(tx);
- task1.enter(|| {
- let res = assert_ready!(rx1.poll_ref());
+ task1.enter(|cx| {
+ let res = assert_ready!(rx1.poll_ref(cx));
assert!(res.is_none());
});
- task2.enter(|| {
- let res = assert_ready!(rx2.poll_ref());
+ task2.enter(|cx| {
+ let res = assert_ready!(rx2.poll_ref(cx));
assert_eq!(*res.unwrap(), "four");
});
- task2.enter(|| {
- let res = assert_ready!(rx2.poll_ref());
+ task2.enter(|cx| {
+ let res = assert_ready!(rx2.poll_ref(cx));
assert!(res.is_none());
});
}
@@ -173,45 +210,47 @@ fn rx_observes_final_value() {
drop(tx);
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_ref(cx));
assert!(res.is_some());
assert_eq!(*res.unwrap(), "one");
});
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_ref(cx));
assert!(res.is_none());
});
// Sending a value
- let (mut tx, mut rx) = watch::channel("one");
+ let (tx, mut rx) = watch::channel("one");
let mut task = MockTask::new();
tx.broadcast("two").unwrap();
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
- assert!(res.is_some());
- assert_eq!(*res.unwrap(), "two");
- });
+ task.enter(|cx| {
+ {
+ let res = assert_ready!(rx.poll_ref(cx));
+ assert!(res.is_some());
+ assert_eq!(*res.unwrap(), "two");
+ }
- task.enter(|| assert_not_ready!(rx.poll_ref()));
+ assert_pending!(rx.poll_ref(cx));
+ });
tx.broadcast("three").unwrap();
drop(tx);
- assert!(task.is_notified());
+ assert!(task.is_woken());
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_ref(cx));
assert!(res.is_some());
assert_eq!(*res.unwrap(), "three");
});
- task.enter(|| {
- let res = assert_ready!(rx.poll_ref());
+ task.enter(|cx| {
+ let res = assert_ready!(rx.poll_ref(cx));
assert!(res.is_none());
});
}
@@ -221,13 +260,13 @@ fn poll_close() {
let (mut tx, rx) = watch::channel("one");
let mut task = MockTask::new();
- task.enter(|| assert_not_ready!(tx.poll_close()));
+ assert_pending!(task.enter(|cx| tx.poll_close(cx)));
drop(rx);
- assert!(task.is_notified());
+ assert!(task.is_woken());
- task.enter(|| assert_ready!(tx.poll_close()));
+ assert_ready!(task.enter(|cx| tx.poll_close(cx)));
assert!(tx.broadcast("two").is_err());
}