summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/tests
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/fs_dir.rs51
-rw-r--r--tokio/tests/fs_file_mocked.rs11
-rw-r--r--tokio/tests/io_lines.rs18
-rw-r--r--tokio/tests/net_driver.rs2
-rw-r--r--tokio/tests/process_issue_42.rs4
-rw-r--r--tokio/tests/rt_common.rs6
-rw-r--r--tokio/tests/signal_ctrl_c.rs7
-rw-r--r--tokio/tests/signal_drop_recv.rs5
-rw-r--r--tokio/tests/signal_drop_rt.rs5
-rw-r--r--tokio/tests/signal_drop_signal.rs5
-rw-r--r--tokio/tests/signal_multi_rt.rs5
-rw-r--r--tokio/tests/signal_notify_both.rs11
-rw-r--r--tokio/tests/signal_twice.rs6
-rw-r--r--tokio/tests/signal_usr1.rs5
-rw-r--r--tokio/tests/sync_errors.rs5
-rw-r--r--tokio/tests/sync_mpsc.rs113
-rw-r--r--tokio/tests/sync_watch.rs157
-rw-r--r--tokio/tests/time_interval.rs46
-rw-r--r--tokio/tests/time_rt.rs11
-rw-r--r--tokio/tests/time_throttle.rs68
-rw-r--r--tokio/tests/time_timeout.rs71
21 files changed, 241 insertions, 371 deletions
diff --git a/tokio/tests/fs_dir.rs b/tokio/tests/fs_dir.rs
index 7ef2db6f..40e20bdb 100644
--- a/tokio/tests/fs_dir.rs
+++ b/tokio/tests/fs_dir.rs
@@ -3,8 +3,6 @@
use tokio::fs;
use tokio_test::assert_ok;
-use futures_util::future;
-use futures_util::stream::TryStreamExt;
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
@@ -42,7 +40,7 @@ async fn remove() {
}
#[tokio::test]
-async fn read() {
+async fn read_inherent() {
let base_dir = tempdir().unwrap();
let p = base_dir.path();
@@ -55,15 +53,44 @@ async fn read() {
let f = files.clone();
let p = p.to_path_buf();
- let read_dir_fut = fs::read_dir(p).await.unwrap();
- read_dir_fut
- .try_for_each(move |e| {
- let s = e.file_name().to_str().unwrap().to_string();
- f.lock().unwrap().push(s);
- future::ok(())
- })
- .await
- .unwrap();
+ let mut entries = fs::read_dir(p).await.unwrap();
+
+ while let Some(e) = assert_ok!(entries.next_entry().await) {
+ let s = e.file_name().to_str().unwrap().to_string();
+ f.lock().unwrap().push(s);
+ }
+
+ let mut files = files.lock().unwrap();
+ files.sort(); // because the order is not guaranteed
+ assert_eq!(
+ *files,
+ vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]
+ );
+}
+
+#[tokio::test]
+async fn read_stream() {
+ use futures::StreamExt;
+
+ let base_dir = tempdir().unwrap();
+
+ let p = base_dir.path();
+ std::fs::create_dir(p.join("aa")).unwrap();
+ std::fs::create_dir(p.join("bb")).unwrap();
+ std::fs::create_dir(p.join("cc")).unwrap();
+
+ let files = Arc::new(Mutex::new(Vec::new()));
+
+ let f = files.clone();
+ let p = p.to_path_buf();
+
+ let mut entries = fs::read_dir(p).await.unwrap();
+
+ while let Some(res) = entries.next().await {
+ let e = assert_ok!(res);
+ let s = e.file_name().to_str().unwrap().to_string();
+ f.lock().unwrap().push(s);
+ }
let mut files = files.lock().unwrap();
files.sort(); // because the order is not guaranteed
diff --git a/tokio/tests/fs_file_mocked.rs b/tokio/tests/fs_file_mocked.rs
index 4697814c..d2eaadde 100644
--- a/tokio/tests/fs_file_mocked.rs
+++ b/tokio/tests/fs_file_mocked.rs
@@ -1,5 +1,16 @@
#![warn(rust_2018_idioms)]
+macro_rules! ready {
+ ($e:expr $(,)?) => {
+ match $e {
+ std::task::Poll::Ready(t) => t,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ }
+ };
+}
+
+use futures::future;
+
// Load source
#[allow(warnings)]
#[path = "../src/fs/file.rs"]
diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs
index e85fbff7..83240d62 100644
--- a/tokio/tests/io_lines.rs
+++ b/tokio/tests/io_lines.rs
@@ -3,10 +3,24 @@
use tokio::io::AsyncBufReadExt;
use tokio_test::assert_ok;
-use futures_util::StreamExt;
+#[tokio::test]
+async fn lines_inherent() {
+ let rd: &[u8] = b"hello\r\nworld\n\n";
+ let mut st = rd.lines();
+
+ let b = assert_ok!(st.next_line().await).unwrap();
+ assert_eq!(b, "hello");
+ let b = assert_ok!(st.next_line().await).unwrap();
+ assert_eq!(b, "world");
+ let b = assert_ok!(st.next_line().await).unwrap();
+ assert_eq!(b, "");
+ assert!(assert_ok!(st.next_line().await).is_none());
+}
#[tokio::test]
-async fn lines() {
+async fn lines_stream() {
+ use futures::StreamExt;
+
let rd: &[u8] = b"hello\r\nworld\n\n";
let mut st = rd.lines();
diff --git a/tokio/tests/net_driver.rs b/tokio/tests/net_driver.rs
index 5285fd13..5baa4eda 100644
--- a/tokio/tests/net_driver.rs
+++ b/tokio/tests/net_driver.rs
@@ -4,7 +4,7 @@ use tokio::net::driver::Reactor;
use tokio::net::TcpListener;
use tokio_test::{assert_ok, assert_pending};
-use futures_util::task::{waker_ref, ArcWake};
+use futures::task::{waker_ref, ArcWake};
use std::future::Future;
use std::net::TcpStream;
use std::pin::Pin;
diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs
index 9de9d0bf..21651ac8 100644
--- a/tokio/tests/process_issue_42.rs
+++ b/tokio/tests/process_issue_42.rs
@@ -5,8 +5,8 @@
use tokio::process::Command;
use tokio::runtime;
-use futures_util::future::FutureExt;
-use futures_util::stream::FuturesOrdered;
+use futures::future::FutureExt;
+use futures::stream::FuturesOrdered;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 73982ced..2637793a 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -41,7 +41,7 @@ rt_test! {
use tokio::time;
use tokio_test::{assert_err, assert_ok};
- use futures_util::future::poll_fn;
+ use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::{mpsc, Arc};
@@ -133,12 +133,12 @@ rt_test! {
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
- let mut done_tx = done_tx.clone();
+ let done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
- assert_ok!(done_tx.try_send(msg));
+ assert_ok!(done_tx.send(msg));
});
tx
diff --git a/tokio/tests/signal_ctrl_c.rs b/tokio/tests/signal_ctrl_c.rs
index ea4efaa2..13eeaa81 100644
--- a/tokio/tests/signal_ctrl_c.rs
+++ b/tokio/tests/signal_ctrl_c.rs
@@ -6,13 +6,13 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal;
use tokio::sync::oneshot;
+use tokio_test::assert_ok;
#[tokio::test]
async fn ctrl_c() {
- let ctrl_c = signal::ctrl_c().expect("failed to init ctrl_c");
+ let ctrl_c = signal::ctrl_c();
let (fire, wait) = oneshot::channel();
@@ -24,5 +24,6 @@ async fn ctrl_c() {
});
let _ = fire.send(());
- let _ = ctrl_c.into_future().await;
+
+ assert_ok!(ctrl_c.await);
}
diff --git a/tokio/tests/signal_drop_recv.rs b/tokio/tests/signal_drop_recv.rs
index 2a5c047f..06dffe12 100644
--- a/tokio/tests/signal_drop_recv.rs
+++ b/tokio/tests/signal_drop_recv.rs
@@ -6,7 +6,6 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal::unix::{signal, SignalKind};
#[tokio::test]
@@ -16,7 +15,7 @@ async fn drop_then_get_a_signal() {
drop(sig);
send_signal(libc::SIGUSR1);
- let sig = signal(kind).expect("failed to create second signal");
+ let mut sig = signal(kind).expect("failed to create second signal");
- let _ = sig.into_future().await;
+ let _ = sig.recv().await;
}
diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs
index 1af8c0a7..7387e312 100644
--- a/tokio/tests/signal_drop_rt.rs
+++ b/tokio/tests/signal_drop_rt.rs
@@ -6,7 +6,6 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::signal::unix::{signal, SignalKind};
@@ -25,7 +24,7 @@ fn dropping_loops_does_not_cause_starvation() {
send_signal(libc::SIGUSR1);
first_rt
- .block_on(first_signal.next())
+ .block_on(first_signal.recv())
.expect("failed to await first signal");
drop(first_rt);
@@ -33,7 +32,7 @@ fn dropping_loops_does_not_cause_starvation() {
send_signal(libc::SIGUSR1);
- second_rt.block_on(second_signal.next());
+ second_rt.block_on(second_signal.recv());
}
fn rt() -> Runtime {
diff --git a/tokio/tests/signal_drop_signal.rs b/tokio/tests/signal_drop_signal.rs
index 3cf5611f..b5bc7dd8 100644
--- a/tokio/tests/signal_drop_signal.rs
+++ b/tokio/tests/signal_drop_signal.rs
@@ -6,7 +6,6 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal::unix::{signal, SignalKind};
#[tokio::test]
@@ -15,12 +14,12 @@ async fn dropping_signal_does_not_deregister_any_other_instances() {
// Signals should not starve based on ordering
let first_duplicate_signal = signal(kind).expect("failed to register first duplicate signal");
- let sig = signal(kind).expect("failed to register signal");
+ let mut sig = signal(kind).expect("failed to register signal");
let second_duplicate_signal = signal(kind).expect("failed to register second duplicate signal");
drop(first_duplicate_signal);
drop(second_duplicate_signal);
send_signal(libc::SIGUSR1);
- let _ = sig.into_future().await;
+ let _ = sig.recv().await;
}
diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs
index 6a16dd88..fb5449f0 100644
--- a/tokio/tests/signal_multi_rt.rs
+++ b/tokio/tests/signal_multi_rt.rs
@@ -6,7 +6,6 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::signal::unix::{signal, SignalKind};
@@ -26,9 +25,9 @@ fn multi_loop() {
thread::spawn(move || {
let mut rt = rt();
let _ = rt.block_on(async {
- let signal = signal(SignalKind::hangup()).unwrap();
+ let mut signal = signal(SignalKind::hangup()).unwrap();
sender.send(()).unwrap();
- signal.into_future().await
+ signal.recv().await
});
})
})
diff --git a/tokio/tests/signal_notify_both.rs b/tokio/tests/signal_notify_both.rs
index 00385478..7d830686 100644
--- a/tokio/tests/signal_notify_both.rs
+++ b/tokio/tests/signal_notify_both.rs
@@ -6,18 +6,17 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal::unix::{signal, SignalKind};
-use futures::future;
-
#[tokio::test]
async fn notify_both() {
let kind = SignalKind::user_defined2();
- let signal1 = signal(kind).expect("failed to create signal1");
- let signal2 = signal(kind).expect("failed to create signal2");
+ let mut signal1 = signal(kind).expect("failed to create signal1");
+ let mut signal2 = signal(kind).expect("failed to create signal2");
send_signal(libc::SIGUSR2);
- let _ = future::join(signal1.into_future(), signal2.into_future()).await;
+
+ signal1.recv().await;
+ signal2.recv().await;
}
diff --git a/tokio/tests/signal_twice.rs b/tokio/tests/signal_twice.rs
index d8e0facc..171d18e6 100644
--- a/tokio/tests/signal_twice.rs
+++ b/tokio/tests/signal_twice.rs
@@ -6,7 +6,6 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal::unix::{signal, SignalKind};
#[tokio::test]
@@ -17,9 +16,6 @@ async fn twice() {
for _ in 0..2 {
send_signal(libc::SIGUSR1);
- let (item, sig_next) = sig.into_future().await;
- assert_eq!(item, Some(()));
-
- sig = sig_next;
+ assert!(sig.recv().await.is_some());
}
}
diff --git a/tokio/tests/signal_usr1.rs b/tokio/tests/signal_usr1.rs
index 9b6a0dec..95fc6c10 100644
--- a/tokio/tests/signal_usr1.rs
+++ b/tokio/tests/signal_usr1.rs
@@ -6,18 +6,17 @@ mod support {
}
use support::signal::send_signal;
-use tokio::prelude::*;
use tokio::signal::unix::{signal, SignalKind};
use tokio_test::assert_ok;
#[tokio::test]
async fn signal_usr1() {
- let signal = assert_ok!(
+ let mut signal = assert_ok!(
signal(SignalKind::user_defined1()),
"failed to create signal"
);
send_signal(libc::SIGUSR1);
- let _ = signal.into_future().await;
+ signal.recv().await;
}
diff --git a/tokio/tests/sync_errors.rs b/tokio/tests/sync_errors.rs
index e68fe081..8cc0c0cd 100644
--- a/tokio/tests/sync_errors.rs
+++ b/tokio/tests/sync_errors.rs
@@ -6,11 +6,8 @@ fn is_error<T: ::std::error::Error + Send + Sync>() {}
fn mpsc_error_bound() {
use tokio::sync::mpsc::error;
- is_error::<error::SendError>();
+ is_error::<error::SendError<()>>();
is_error::<error::TrySendError<()>>();
- is_error::<error::UnboundedRecvError>();
- is_error::<error::UnboundedSendError>();
- is_error::<error::UnboundedTrySendError<()>>();
}
#[test]
diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs
index f724c564..040904e4 100644
--- a/tokio/tests/sync_mpsc.rs
+++ b/tokio/tests/sync_mpsc.rs
@@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -38,47 +39,33 @@ fn send_recv_with_buffer() {
}
#[tokio::test]
-async fn async_send_recv_with_buffer() {
- let (mut tx, mut rx) = mpsc::channel(16);
+async fn send_recv_stream_with_buffer() {
+ use futures::StreamExt;
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});
- assert_eq!(Some(1), rx.recv().await);
- assert_eq!(Some(2), rx.recv().await);
- assert_eq!(None, rx.recv().await);
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
-#[test]
-fn send_sink_recv_with_buffer() {
- use futures_core::Stream;
- use futures_sink::Sink;
-
- let (tx, rx) = mpsc::channel::<i32>(16);
-
- task::spawn(tx).enter(|cx, mut tx| {
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+#[tokio::test]
+async fn async_send_recv_with_buffer() {
+ let (mut tx, mut rx) = mpsc::channel(16);
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
});
- task::spawn(rx).enter(|cx, mut rx| {
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
}
#[test]
@@ -124,11 +111,11 @@ fn buffer_gteq_one() {
fn send_recv_unbounded() {
let mut t1 = task::spawn(());
- let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
@@ -144,11 +131,11 @@ fn send_recv_unbounded() {
#[tokio::test]
async fn async_send_recv_unbounded() {
- let (mut tx, mut rx) = mpsc::unbounded_channel();
+ let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
- assert_ok!(tx.try_send(1));
- assert_ok!(tx.try_send(2));
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
assert_eq!(Some(1), rx.recv().await);
@@ -156,41 +143,20 @@ async fn async_send_recv_unbounded() {
assert_eq!(None, rx.recv().await);
}
-#[test]
-fn sink_send_recv_unbounded() {
- use futures_core::Stream;
- use futures_sink::Sink;
- use futures_util::pin_mut;
-
- let mut t1 = task::spawn(());
-
- let (tx, rx) = mpsc::unbounded_channel::<i32>();
-
- t1.enter(|cx, _| {
- pin_mut!(tx);
-
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(1));
+#[tokio::test]
+async fn send_recv_stream_unbounded() {
+ use futures::StreamExt;
- assert_ready_ok!(tx.as_mut().poll_ready(cx));
- assert_ok!(tx.as_mut().start_send(2));
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
- assert_ready_ok!(tx.as_mut().poll_flush(cx));
- assert_ready_ok!(tx.as_mut().poll_close(cx));
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
});
- t1.enter(|cx, _| {
- pin_mut!(rx);
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(1));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert_eq!(val, Some(2));
-
- let val = assert_ready!(rx.as_mut().poll_next(cx));
- assert!(val.is_none());
- });
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
}
#[test]
@@ -223,7 +189,7 @@ fn no_t_bounds_unbounded() {
// same with Receiver
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
- assert!(tx.clone().try_send(NoImpls).is_ok());
+ assert!(tx.clone().send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
@@ -356,8 +322,10 @@ fn try_send_fail() {
tx.try_send("hello").unwrap();
// This should fail
- let err = assert_err!(tx.try_send("fail"));
- assert!(err.is_full());
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
@@ -421,7 +389,10 @@ fn dropping_rx_closes_channel_for_try() {
{
let err = assert_err!(tx.try_send(msg.clone()));
- assert!(err.is_closed());
+ match err {
+ TrySendError::Closed(..) => {}
+ _ => panic!(),
+ }
}
assert_eq!(1, Arc::strong_count(&msg));
diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs
index 4d73bc81..7ccad5c2 100644
--- a/tokio/tests/sync_watch.rs
+++ b/tokio/tests/sync_watch.rs
@@ -5,41 +5,6 @@ use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
#[test]
-fn single_rx_recv_ref() {
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.recv_ref());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "one");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(*v, "two");
- }
-
- {
- let mut t = spawn(rx.recv_ref());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
@@ -75,62 +40,25 @@ fn single_rx_recv() {
}
#[test]
-fn stream_impl() {
- use tokio::prelude::*;
-
- let (tx, mut rx) = watch::channel("one");
-
- {
- let mut t = spawn(rx.next());
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "one");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- tx.broadcast("two").unwrap();
-
- assert!(t.is_woken());
-
- let v = assert_ready!(t.poll()).unwrap();
- assert_eq!(v, "two");
- }
-
- {
- let mut t = spawn(rx.next());
-
- assert_pending!(t.poll());
-
- drop(tx);
-
- let res = assert_ready!(t.poll());
- assert!(res.is_none());
- }
-}
-
-#[test]
fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");
let mut rx2 = rx1.clone();
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -141,11 +69,11 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
@@ -155,17 +83,17 @@ fn multi_rx() {
assert!(t2.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
drop(t2);
{
- let mut t1 = spawn(rx1.recv_ref());
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t1 = spawn(rx1.recv());
+ let mut t2 = spawn(rx2.recv());
assert_pending!(t1.poll());
assert_pending!(t2.poll());
@@ -173,10 +101,10 @@ fn multi_rx() {
tx.broadcast("four").unwrap();
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t1);
- let mut t1 = spawn(rx1.recv_ref());
+ let mut t1 = spawn(rx1.recv());
assert_pending!(t1.poll());
drop(tx);
@@ -186,10 +114,10 @@ fn multi_rx() {
assert!(res.is_none());
let res = assert_ready!(t2.poll());
- assert_eq!(*res.unwrap(), "four");
+ assert_eq!(res.unwrap(), "four");
drop(t2);
- let mut t2 = spawn(rx2.recv_ref());
+ let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t2.poll());
assert!(res.is_none());
}
@@ -203,13 +131,13 @@ fn rx_observes_final_value() {
drop(tx);
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "one");
+ assert_eq!(res.unwrap(), "one");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -221,13 +149,13 @@ fn rx_observes_final_value() {
tx.broadcast("two").unwrap();
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "two");
+ assert_eq!(res.unwrap(), "two");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
assert_pending!(t1.poll());
tx.broadcast("three").unwrap();
@@ -236,11 +164,11 @@ fn rx_observes_final_value() {
assert!(t1.is_woken());
let res = assert_ready!(t1.poll());
- assert_eq!(*res.unwrap(), "three");
+ assert_eq!(res.unwrap(), "three");
}
{
- let mut t1 = spawn(rx.recv_ref());
+ let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
}
@@ -262,3 +190,40 @@ fn poll_close() {
assert!(tx.broadcast("two").is_err());
}
+
+#[test]
+fn stream_impl() {
+ use futures::StreamExt;
+
+ let (tx, mut rx) = watch::channel("one");
+
+ {
+ let mut t = spawn(rx.next());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
+
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
+
+ {
+ let mut t = spawn(rx.next());
+
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
+ assert!(res.is_none());
+ }
+}
diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs
index c884ca8e..70709f4a 100644
--- a/tokio/tests/time_interval.rs
+++ b/tokio/tests/time_interval.rs
@@ -1,12 +1,14 @@
#![warn(rust_2018_idioms)]
-use tokio::time::{self, Duration, Instant, Interval};
+use tokio::time::{self, Duration, Instant};
use tokio_test::{assert_pending, assert_ready_eq, task};
+use std::task::Poll;
+
#[tokio::test]
#[should_panic]
async fn interval_zero_duration() {
- let _ = Interval::new(Instant::now(), ms(0));
+ let _ = time::interval_at(Instant::now(), ms(0));
}
#[tokio::test]
@@ -18,26 +20,44 @@ async fn usage() {
// TODO: Skip this
time::advance(ms(1)).await;
- let mut int = task::spawn(Interval::new(start, ms(300)));
+ let mut i = task::spawn(time::interval_at(start, ms(300)));
- assert_ready_eq!(int.poll_next(), Some(start));
- assert_pending!(int.poll_next());
+ assert_ready_eq!(poll_next(&mut i), start);
+ assert_pending!(poll_next(&mut i));
time::advance(ms(100)).await;
- assert_pending!(int.poll_next());
+ assert_pending!(poll_next(&mut i));
time::advance(ms(200)).await;
- assert_ready_eq!(int.poll_next(), Some(start + ms(300)));
- assert_pending!(int.poll_next());
+ assert_ready_eq!(poll_next(&mut i), start + ms(300));
+ assert_pending!(poll_next(&mut i));
time::advance(ms(400)).await;
- assert_ready_eq!(int.poll_next(), Some(start + ms(600)));
- assert_pending!(int.poll_next());
+ assert_ready_eq!(poll_next(&mut i), start + ms(600));
+ assert_pending!(poll_next(&mut i));
time::advance(ms(500)).await;
- assert_ready_eq!(int.poll_next(), Some(start + ms(900)));
- assert_ready_eq!(int.poll_next(), Some(start + ms(1200)));
- assert_pending!(int.poll_next());
+ assert_ready_eq!(poll_next(&mut i), start + ms(900));
+ assert_ready_eq!(poll_next(&mut i), start + ms(1200));
+ assert_pending!(poll_next(&mut i));
+}
+
+#[tokio::test]
+async fn usage_stream() {