diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/tests | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/fs_dir.rs | 51 | ||||
-rw-r--r-- | tokio/tests/fs_file_mocked.rs | 11 | ||||
-rw-r--r-- | tokio/tests/io_lines.rs | 18 | ||||
-rw-r--r-- | tokio/tests/net_driver.rs | 2 | ||||
-rw-r--r-- | tokio/tests/process_issue_42.rs | 4 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 6 | ||||
-rw-r--r-- | tokio/tests/signal_ctrl_c.rs | 7 | ||||
-rw-r--r-- | tokio/tests/signal_drop_recv.rs | 5 | ||||
-rw-r--r-- | tokio/tests/signal_drop_rt.rs | 5 | ||||
-rw-r--r-- | tokio/tests/signal_drop_signal.rs | 5 | ||||
-rw-r--r-- | tokio/tests/signal_multi_rt.rs | 5 | ||||
-rw-r--r-- | tokio/tests/signal_notify_both.rs | 11 | ||||
-rw-r--r-- | tokio/tests/signal_twice.rs | 6 | ||||
-rw-r--r-- | tokio/tests/signal_usr1.rs | 5 | ||||
-rw-r--r-- | tokio/tests/sync_errors.rs | 5 | ||||
-rw-r--r-- | tokio/tests/sync_mpsc.rs | 113 | ||||
-rw-r--r-- | tokio/tests/sync_watch.rs | 157 | ||||
-rw-r--r-- | tokio/tests/time_interval.rs | 46 | ||||
-rw-r--r-- | tokio/tests/time_rt.rs | 11 | ||||
-rw-r--r-- | tokio/tests/time_throttle.rs | 68 | ||||
-rw-r--r-- | tokio/tests/time_timeout.rs | 71 |
21 files changed, 241 insertions, 371 deletions
diff --git a/tokio/tests/fs_dir.rs b/tokio/tests/fs_dir.rs index 7ef2db6f..40e20bdb 100644 --- a/tokio/tests/fs_dir.rs +++ b/tokio/tests/fs_dir.rs @@ -3,8 +3,6 @@ use tokio::fs; use tokio_test::assert_ok; -use futures_util::future; -use futures_util::stream::TryStreamExt; use std::sync::{Arc, Mutex}; use tempfile::tempdir; @@ -42,7 +40,7 @@ async fn remove() { } #[tokio::test] -async fn read() { +async fn read_inherent() { let base_dir = tempdir().unwrap(); let p = base_dir.path(); @@ -55,15 +53,44 @@ async fn read() { let f = files.clone(); let p = p.to_path_buf(); - let read_dir_fut = fs::read_dir(p).await.unwrap(); - read_dir_fut - .try_for_each(move |e| { - let s = e.file_name().to_str().unwrap().to_string(); - f.lock().unwrap().push(s); - future::ok(()) - }) - .await - .unwrap(); + let mut entries = fs::read_dir(p).await.unwrap(); + + while let Some(e) = assert_ok!(entries.next_entry().await) { + let s = e.file_name().to_str().unwrap().to_string(); + f.lock().unwrap().push(s); + } + + let mut files = files.lock().unwrap(); + files.sort(); // because the order is not guaranteed + assert_eq!( + *files, + vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] + ); +} + +#[tokio::test] +async fn read_stream() { + use futures::StreamExt; + + let base_dir = tempdir().unwrap(); + + let p = base_dir.path(); + std::fs::create_dir(p.join("aa")).unwrap(); + std::fs::create_dir(p.join("bb")).unwrap(); + std::fs::create_dir(p.join("cc")).unwrap(); + + let files = Arc::new(Mutex::new(Vec::new())); + + let f = files.clone(); + let p = p.to_path_buf(); + + let mut entries = fs::read_dir(p).await.unwrap(); + + while let Some(res) = entries.next().await { + let e = assert_ok!(res); + let s = e.file_name().to_str().unwrap().to_string(); + f.lock().unwrap().push(s); + } let mut files = files.lock().unwrap(); files.sort(); // because the order is not guaranteed diff --git a/tokio/tests/fs_file_mocked.rs b/tokio/tests/fs_file_mocked.rs index 4697814c..d2eaadde 100644 --- a/tokio/tests/fs_file_mocked.rs +++ b/tokio/tests/fs_file_mocked.rs @@ -1,5 +1,16 @@ #![warn(rust_2018_idioms)] +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => t, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} + +use futures::future; + // Load source #[allow(warnings)] #[path = "../src/fs/file.rs"] diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs index e85fbff7..83240d62 100644 --- a/tokio/tests/io_lines.rs +++ b/tokio/tests/io_lines.rs @@ -3,10 +3,24 @@ use tokio::io::AsyncBufReadExt; use tokio_test::assert_ok; -use futures_util::StreamExt; +#[tokio::test] +async fn lines_inherent() { + let rd: &[u8] = b"hello\r\nworld\n\n"; + let mut st = rd.lines(); + + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, "hello"); + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, "world"); + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, ""); + assert!(assert_ok!(st.next_line().await).is_none()); +} #[tokio::test] -async fn lines() { +async fn lines_stream() { + use futures::StreamExt; + let rd: &[u8] = b"hello\r\nworld\n\n"; let mut st = rd.lines(); diff --git a/tokio/tests/net_driver.rs b/tokio/tests/net_driver.rs index 5285fd13..5baa4eda 100644 --- a/tokio/tests/net_driver.rs +++ b/tokio/tests/net_driver.rs @@ -4,7 +4,7 @@ use tokio::net::driver::Reactor; use tokio::net::TcpListener; use tokio_test::{assert_ok, assert_pending}; -use futures_util::task::{waker_ref, ArcWake}; +use futures::task::{waker_ref, ArcWake}; use std::future::Future; use std::net::TcpStream; use std::pin::Pin; diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index 9de9d0bf..21651ac8 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -5,8 +5,8 @@ use tokio::process::Command; use tokio::runtime; -use futures_util::future::FutureExt; -use futures_util::stream::FuturesOrdered; +use futures::future::FutureExt; +use futures::stream::FuturesOrdered; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 73982ced..2637793a 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -41,7 +41,7 @@ rt_test! { use tokio::time; use tokio_test::{assert_err, assert_ok}; - use futures_util::future::poll_fn; + use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::sync::{mpsc, Arc}; @@ -133,12 +133,12 @@ rt_test! { let mut txs = (0..ITER) .map(|i| { let (tx, rx) = oneshot::channel(); - let mut done_tx = done_tx.clone(); + let done_tx = done_tx.clone(); tokio::spawn(async move { let msg = assert_ok!(rx.await); assert_eq!(i, msg); - assert_ok!(done_tx.try_send(msg)); + assert_ok!(done_tx.send(msg)); }); tx diff --git a/tokio/tests/signal_ctrl_c.rs b/tokio/tests/signal_ctrl_c.rs index ea4efaa2..13eeaa81 100644 --- a/tokio/tests/signal_ctrl_c.rs +++ b/tokio/tests/signal_ctrl_c.rs @@ -6,13 +6,13 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal; use tokio::sync::oneshot; +use tokio_test::assert_ok; #[tokio::test] async fn ctrl_c() { - let ctrl_c = signal::ctrl_c().expect("failed to init ctrl_c"); + let ctrl_c = signal::ctrl_c(); let (fire, wait) = oneshot::channel(); @@ -24,5 +24,6 @@ async fn ctrl_c() { }); let _ = fire.send(()); - let _ = ctrl_c.into_future().await; + + assert_ok!(ctrl_c.await); } diff --git a/tokio/tests/signal_drop_recv.rs b/tokio/tests/signal_drop_recv.rs index 2a5c047f..06dffe12 100644 --- a/tokio/tests/signal_drop_recv.rs +++ b/tokio/tests/signal_drop_recv.rs @@ -6,7 +6,6 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal::unix::{signal, SignalKind}; #[tokio::test] @@ -16,7 +15,7 @@ async fn drop_then_get_a_signal() { drop(sig); send_signal(libc::SIGUSR1); - let sig = signal(kind).expect("failed to create second signal"); + let mut sig = signal(kind).expect("failed to create second signal"); - let _ = sig.into_future().await; + let _ = sig.recv().await; } diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index 1af8c0a7..7387e312 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -6,7 +6,6 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::runtime::Runtime; use tokio::signal::unix::{signal, SignalKind}; @@ -25,7 +24,7 @@ fn dropping_loops_does_not_cause_starvation() { send_signal(libc::SIGUSR1); first_rt - .block_on(first_signal.next()) + .block_on(first_signal.recv()) .expect("failed to await first signal"); drop(first_rt); @@ -33,7 +32,7 @@ fn dropping_loops_does_not_cause_starvation() { send_signal(libc::SIGUSR1); - second_rt.block_on(second_signal.next()); + second_rt.block_on(second_signal.recv()); } fn rt() -> Runtime { diff --git a/tokio/tests/signal_drop_signal.rs b/tokio/tests/signal_drop_signal.rs index 3cf5611f..b5bc7dd8 100644 --- a/tokio/tests/signal_drop_signal.rs +++ b/tokio/tests/signal_drop_signal.rs @@ -6,7 +6,6 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal::unix::{signal, SignalKind}; #[tokio::test] @@ -15,12 +14,12 @@ async fn dropping_signal_does_not_deregister_any_other_instances() { // Signals should not starve based on ordering let first_duplicate_signal = signal(kind).expect("failed to register first duplicate signal"); - let sig = signal(kind).expect("failed to register signal"); + let mut sig = signal(kind).expect("failed to register signal"); let second_duplicate_signal = signal(kind).expect("failed to register second duplicate signal"); drop(first_duplicate_signal); drop(second_duplicate_signal); send_signal(libc::SIGUSR1); - let _ = sig.into_future().await; + let _ = sig.recv().await; } diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 6a16dd88..fb5449f0 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -6,7 +6,6 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::runtime::Runtime; use tokio::signal::unix::{signal, SignalKind}; @@ -26,9 +25,9 @@ fn multi_loop() { thread::spawn(move || { let mut rt = rt(); let _ = rt.block_on(async { - let signal = signal(SignalKind::hangup()).unwrap(); + let mut signal = signal(SignalKind::hangup()).unwrap(); sender.send(()).unwrap(); - signal.into_future().await + signal.recv().await }); }) }) diff --git a/tokio/tests/signal_notify_both.rs b/tokio/tests/signal_notify_both.rs index 00385478..7d830686 100644 --- a/tokio/tests/signal_notify_both.rs +++ b/tokio/tests/signal_notify_both.rs @@ -6,18 +6,17 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal::unix::{signal, SignalKind}; -use futures::future; - #[tokio::test] async fn notify_both() { let kind = SignalKind::user_defined2(); - let signal1 = signal(kind).expect("failed to create signal1"); - let signal2 = signal(kind).expect("failed to create signal2"); + let mut signal1 = signal(kind).expect("failed to create signal1"); + let mut signal2 = signal(kind).expect("failed to create signal2"); send_signal(libc::SIGUSR2); - let _ = future::join(signal1.into_future(), signal2.into_future()).await; + + signal1.recv().await; + signal2.recv().await; } diff --git a/tokio/tests/signal_twice.rs b/tokio/tests/signal_twice.rs index d8e0facc..171d18e6 100644 --- a/tokio/tests/signal_twice.rs +++ b/tokio/tests/signal_twice.rs @@ -6,7 +6,6 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal::unix::{signal, SignalKind}; #[tokio::test] @@ -17,9 +16,6 @@ async fn twice() { for _ in 0..2 { send_signal(libc::SIGUSR1); - let (item, sig_next) = sig.into_future().await; - assert_eq!(item, Some(())); - - sig = sig_next; + assert!(sig.recv().await.is_some()); } } diff --git a/tokio/tests/signal_usr1.rs b/tokio/tests/signal_usr1.rs index 9b6a0dec..95fc6c10 100644 --- a/tokio/tests/signal_usr1.rs +++ b/tokio/tests/signal_usr1.rs @@ -6,18 +6,17 @@ mod support { } use support::signal::send_signal; -use tokio::prelude::*; use tokio::signal::unix::{signal, SignalKind}; use tokio_test::assert_ok; #[tokio::test] async fn signal_usr1() { - let signal = assert_ok!( + let mut signal = assert_ok!( signal(SignalKind::user_defined1()), "failed to create signal" ); send_signal(libc::SIGUSR1); - let _ = signal.into_future().await; + signal.recv().await; } diff --git a/tokio/tests/sync_errors.rs b/tokio/tests/sync_errors.rs index e68fe081..8cc0c0cd 100644 --- a/tokio/tests/sync_errors.rs +++ b/tokio/tests/sync_errors.rs @@ -6,11 +6,8 @@ fn is_error<T: ::std::error::Error + Send + Sync>() {} fn mpsc_error_bound() { use tokio::sync::mpsc::error; - is_error::<error::SendError>(); + is_error::<error::SendError<()>>(); is_error::<error::TrySendError<()>>(); - is_error::<error::UnboundedRecvError>(); - is_error::<error::UnboundedSendError>(); - is_error::<error::UnboundedTrySendError<()>>(); } #[test] diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f724c564..040904e4 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -38,47 +39,33 @@ fn send_recv_with_buffer() { } #[tokio::test] -async fn async_send_recv_with_buffer() { - let (mut tx, mut rx) = mpsc::channel(16); +async fn send_recv_stream_with_buffer() { + use futures::StreamExt; + + let (mut tx, mut rx) = mpsc::channel::<i32>(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); assert_ok!(tx.send(2).await); }); - assert_eq!(Some(1), rx.recv().await); - assert_eq!(Some(2), rx.recv().await); - assert_eq!(None, rx.recv().await); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } -#[test] -fn send_sink_recv_with_buffer() { - use futures_core::Stream; - use futures_sink::Sink; - - let (tx, rx) = mpsc::channel::<i32>(16); - - task::spawn(tx).enter(|cx, mut tx| { - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); }); - task::spawn(rx).enter(|cx, mut rx| { - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); } #[test] @@ -124,11 +111,11 @@ fn buffer_gteq_one() { fn send_recv_unbounded() { let mut t1 = task::spawn(()); - let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); // Using `try_send` - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); @@ -144,11 +131,11 @@ fn send_recv_unbounded() { #[tokio::test] async fn async_send_recv_unbounded() { - let (mut tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { - assert_ok!(tx.try_send(1)); - assert_ok!(tx.try_send(2)); + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); assert_eq!(Some(1), rx.recv().await); @@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() { assert_eq!(None, rx.recv().await); } -#[test] -fn sink_send_recv_unbounded() { - use futures_core::Stream; - use futures_sink::Sink; - use futures_util::pin_mut; - - let mut t1 = task::spawn(()); - - let (tx, rx) = mpsc::unbounded_channel::<i32>(); - - t1.enter(|cx, _| { - pin_mut!(tx); - - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(1)); +#[tokio::test] +async fn send_recv_stream_unbounded() { + use futures::StreamExt; - assert_ready_ok!(tx.as_mut().poll_ready(cx)); - assert_ok!(tx.as_mut().start_send(2)); + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); - assert_ready_ok!(tx.as_mut().poll_flush(cx)); - assert_ready_ok!(tx.as_mut().poll_close(cx)); + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); }); - t1.enter(|cx, _| { - pin_mut!(rx); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(1)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert_eq!(val, Some(2)); - - let val = assert_ready!(rx.as_mut().poll_next(cx)); - assert!(val.is_none()); - }); + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); } #[test] @@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() { // same with Receiver println!("{:?}", rx); // and sender should be Clone even though T isn't Clone - assert!(tx.clone().try_send(NoImpls).is_ok()); + assert!(tx.clone().send(NoImpls).is_ok()); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); @@ -356,8 +322,10 @@ fn try_send_fail() { tx.try_send("hello").unwrap(); // This should fail - let err = assert_err!(tx.try_send("fail")); - assert!(err.is_full()); + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); @@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() { { let err = assert_err!(tx.try_send(msg.clone())); - assert!(err.is_closed()); + match err { + TrySendError::Closed(..) => {} + _ => panic!(), + } } assert_eq!(1, Arc::strong_count(&msg)); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 4d73bc81..7ccad5c2 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -5,41 +5,6 @@ use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; #[test] -fn single_rx_recv_ref() { - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.recv_ref()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(*v, "one"); - } - - { - let mut t = spawn(rx.recv_ref()); - - assert_pending!(t.poll()); - - tx.broadcast("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(*v, "two"); - } - - { - let mut t = spawn(rx.recv_ref()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert!(res.is_none()); - } -} - -#[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); @@ -75,62 +40,25 @@ fn single_rx_recv() { } #[test] -fn stream_impl() { - use tokio::prelude::*; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.broadcast("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert!(res.is_none()); - } -} - -#[test] fn multi_rx() { let (tx, mut rx1) = watch::channel("one"); let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv_ref()); - let mut t2 = spawn(rx2.recv_ref()); + let mut t1 = spawn(rx1.recv()); + let mut t2 = spawn(rx2.recv()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "one"); + assert_eq!(res.unwrap(), "one"); let res = assert_ready!(t2.poll()); - assert_eq!(*res.unwrap(), "one"); + assert_eq!(res.unwrap(), "one"); } - let mut t2 = spawn(rx2.recv_ref()); + let mut t2 = spawn(rx2.recv()); { - let mut t1 = spawn(rx1.recv_ref()); + let mut t1 = spawn(rx1.recv()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -141,11 +69,11 @@ fn multi_rx() { assert!(t2.is_woken()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "two"); + assert_eq!(res.unwrap(), "two"); } { - let mut t1 = spawn(rx1.recv_ref()); + let mut t1 = spawn(rx1.recv()); assert_pending!(t1.poll()); @@ -155,17 +83,17 @@ fn multi_rx() { assert!(t2.is_woken()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "three"); + assert_eq!(res.unwrap(), "three"); let res = assert_ready!(t2.poll()); - assert_eq!(*res.unwrap(), "three"); + assert_eq!(res.unwrap(), "three"); } drop(t2); { - let mut t1 = spawn(rx1.recv_ref()); - let mut t2 = spawn(rx2.recv_ref()); + let mut t1 = spawn(rx1.recv()); + let mut t2 = spawn(rx2.recv()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -173,10 +101,10 @@ fn multi_rx() { tx.broadcast("four").unwrap(); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "four"); + assert_eq!(res.unwrap(), "four"); drop(t1); - let mut t1 = spawn(rx1.recv_ref()); + let mut t1 = spawn(rx1.recv()); assert_pending!(t1.poll()); drop(tx); @@ -186,10 +114,10 @@ fn multi_rx() { assert!(res.is_none()); let res = assert_ready!(t2.poll()); - assert_eq!(*res.unwrap(), "four"); + assert_eq!(res.unwrap(), "four"); drop(t2); - let mut t2 = spawn(rx2.recv_ref()); + let mut t2 = spawn(rx2.recv()); let res = assert_ready!(t2.poll()); assert!(res.is_none()); } @@ -203,13 +131,13 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv_ref()); + let mut t1 = spawn(rx.recv()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "one"); + assert_eq!(res.unwrap(), "one"); } { - let mut t1 = spawn(rx.recv_ref()); + let mut t1 = spawn(rx.recv()); let res = assert_ready!(t1.poll()); assert!(res.is_none()); } @@ -221,13 +149,13 @@ fn rx_observes_final_value() { tx.broadcast("two").unwrap(); { - let mut t1 = spawn(rx.recv_ref()); + let mut t1 = spawn(rx.recv()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "two"); + assert_eq!(res.unwrap(), "two"); } { - let mut t1 = spawn(rx.recv_ref()); + let mut t1 = spawn(rx.recv()); assert_pending!(t1.poll()); tx.broadcast("three").unwrap(); @@ -236,11 +164,11 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); let res = assert_ready!(t1.poll()); - assert_eq!(*res.unwrap(), "three"); + assert_eq!(res.unwrap(), "three"); } { - let mut t1 = spawn(rx.recv_ref()); + let mut t1 = spawn(rx.recv()); let res = assert_ready!(t1.poll()); assert!(res.is_none()); } @@ -262,3 +190,40 @@ fn poll_close() { assert!(tx.broadcast("two").is_err()); } + +#[test] +fn stream_impl() { + use futures::StreamExt; + + let (tx, mut rx) = watch::channel("one"); + + { + let mut t = spawn(rx.next()); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "one"); + } + + { + let mut t = spawn(rx.next()); + + assert_pending!(t.poll()); + + tx.broadcast("two").unwrap(); + + assert!(t.is_woken()); + + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "two"); + } + + { + let mut t = spawn(rx.next()); + + assert_pending!(t.poll()); + + drop(tx); + + let res = assert_ready!(t.poll()); + assert!(res.is_none()); + } +} diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index c884ca8e..70709f4a 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -1,12 +1,14 @@ #![warn(rust_2018_idioms)] -use tokio::time::{self, Duration, Instant, Interval}; +use tokio::time::{self, Duration, Instant}; use tokio_test::{assert_pending, assert_ready_eq, task}; +use std::task::Poll; + #[tokio::test] #[should_panic] async fn interval_zero_duration() { - let _ = Interval::new(Instant::now(), ms(0)); + let _ = time::interval_at(Instant::now(), ms(0)); } #[tokio::test] @@ -18,26 +20,44 @@ async fn usage() { // TODO: Skip this time::advance(ms(1)).await; - let mut int = task::spawn(Interval::new(start, ms(300))); + let mut i = task::spawn(time::interval_at(start, ms(300))); - assert_ready_eq!(int.poll_next(), Some(start)); - assert_pending!(int.poll_next()); + assert_ready_eq!(poll_next(&mut i), start); + assert_pending!(poll_next(&mut i)); time::advance(ms(100)).await; - assert_pending!(int.poll_next()); + assert_pending!(poll_next(&mut i)); time::advance(ms(200)).await; - assert_ready_eq!(int.poll_next(), Some(start + ms(300))); - assert_pending!(int.poll_next()); + assert_ready_eq!(poll_next(&mut i), start + ms(300)); + assert_pending!(poll_next(&mut i)); time::advance(ms(400)).await; - assert_ready_eq!(int.poll_next(), Some(start + ms(600))); - assert_pending!(int.poll_next()); + assert_ready_eq!(poll_next(&mut i), start + ms(600)); + assert_pending!(poll_next(&mut i)); time::advance(ms(500)).await; - assert_ready_eq!(int.poll_next(), Some(start + ms(900))); - assert_ready_eq!(int.poll_next(), Some(start + ms(1200))); - assert_pending!(int.poll_next()); + assert_ready_eq!(poll_next(&mut i), start + ms(900)); + assert_ready_eq!(poll_next(&mut i), start + ms(1200)); + assert_pending!(poll_next(&mut i)); +} + +#[tokio::test] +async fn usage_stream() { |