summaryrefslogtreecommitdiffstats
path: root/tokio-sync/tests/mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/tests/mpsc.rs')
-rw-r--r--tokio-sync/tests/mpsc.rs351
1 files changed, 205 insertions, 146 deletions
diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs
index af2d1dd7..17dd3e64 100644
--- a/tokio-sync/tests/mpsc.rs
+++ b/tokio-sync/tests/mpsc.rs
@@ -1,98 +1,111 @@
#![deny(warnings, rust_2018_idioms)]
-use futures;
-use futures::prelude::*;
-use std::sync::Arc;
-use std::thread;
-use tokio_mock_task::*;
use tokio_sync::mpsc;
+use tokio_test::task::MockTask;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+
+use std::sync::Arc;
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
-macro_rules! assert_ready {
- ($e:expr) => {{
- match $e {
- Ok(futures::Async::Ready(v)) => v,
- Ok(_) => panic!("not ready"),
- Err(e) => panic!("error = {:?}", e),
- }
- }};
-}
-
-macro_rules! assert_not_ready {
- ($e:expr) => {{
- match $e {
- Ok(futures::Async::NotReady) => {}
- Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
- Err(e) => panic!("error = {:?}", e),
- }
- }};
-}
-
#[test]
fn send_recv_with_buffer() {
+ let mut t1 = MockTask::new();
+ let mut t2 = MockTask::new();
+
let (mut tx, mut rx) = mpsc::channel::<i32>(16);
// Using poll_ready / try_send
- assert_ready!(tx.poll_ready());
+ assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx)));
tx.try_send(1).unwrap();
// Without poll_ready
tx.try_send(2).unwrap();
- // Sink API
- assert!(tx.start_send(3).unwrap().is_ready());
- assert_ready!(tx.poll_complete());
- assert_ready!(tx.close());
-
drop(tx);
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
assert_eq!(val, Some(2));
- let val = assert_ready!(rx.poll());
- assert_eq!(val, Some(3));
-
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx)));
assert!(val.is_none());
}
#[test]
+#[cfg(feature = "async-traits")]
+fn send_sink_recv_with_buffer() {
+ use async_sink::Sink;
+ use futures_core::Stream;
+ use pin_utils::pin_mut;
+
+ let mut t1 = MockTask::new();
+
+ let (tx, rx) = mpsc::channel::<i32>(16);
+
+ t1.enter(|cx| {
+ pin_mut!(tx);
+
+ assert_ready_ok!(tx.as_mut().poll_ready(cx));
+ assert_ok!(tx.as_mut().start_send(1));
+
+ assert_ready_ok!(tx.as_mut().poll_ready(cx));
+ assert_ok!(tx.as_mut().start_send(2));
+
+ assert_ready_ok!(tx.as_mut().poll_flush(cx));
+ assert_ready_ok!(tx.as_mut().poll_close(cx));
+ });
+
+ t1.enter(|cx| {
+ pin_mut!(rx);
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert_eq!(val, Some(1));
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert_eq!(val, Some(2));
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert!(val.is_none());
+ });
+}
+
+#[test]
fn start_send_past_cap() {
+ let mut t1 = MockTask::new();
+ let mut t2 = MockTask::new();
+ let mut t3 = MockTask::new();
+
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
- let mut task1 = MockTask::new();
- let mut task2 = MockTask::new();
+ assert_ok!(tx1.try_send(()));
- let res = tx1.start_send(()).unwrap();
- assert!(res.is_ready());
-
- task1.enter(|| {
- let res = tx1.start_send(()).unwrap();
- assert!(!res.is_ready());
+ t1.enter(|cx| {
+ assert_pending!(tx1.poll_ready(cx));
});
- task2.enter(|| {
- assert_not_ready!(tx2.poll_ready());
+ t2.enter(|cx| {
+ assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
- let val = assert_ready!(rx.poll());
+ let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
assert!(val.is_some());
- assert!(task2.is_notified());
- assert!(!task1.is_notified());
+ assert!(t2.is_woken());
+ assert!(!t1.is_woken());
drop(tx2);
- let val = assert_ready!(rx.poll());
+ let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx)));
assert!(val.is_none());
}
@@ -104,33 +117,69 @@ fn buffer_gteq_one() {
#[test]
fn send_recv_unbounded() {
+ let mut t1 = MockTask::new();
+
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
- tx.try_send(1).unwrap();
+ assert_ok!(tx.try_send(1));
+ assert_ok!(tx.try_send(2));
- // Using `Sink` API
- assert!(tx.start_send(2).unwrap().is_ready());
- assert_ready!(tx.poll_complete());
-
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
assert_eq!(val, Some(1));
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
assert_eq!(val, Some(2));
- assert_ready!(tx.poll_complete());
- assert_ready!(tx.close());
-
drop(tx);
- let val = assert_ready!(rx.poll());
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
assert!(val.is_none());
}
#[test]
+#[cfg(feature = "async-traits")]
+fn sink_send_recv_unbounded() {
+ use async_sink::Sink;
+ use futures_core::Stream;
+ use pin_utils::pin_mut;
+
+ let mut t1 = MockTask::new();
+
+ let (tx, rx) = mpsc::unbounded_channel::<i32>();
+
+ t1.enter(|cx| {
+ pin_mut!(tx);
+
+ assert_ready_ok!(tx.as_mut().poll_ready(cx));
+ assert_ok!(tx.as_mut().start_send(1));
+
+ assert_ready_ok!(tx.as_mut().poll_ready(cx));
+ assert_ok!(tx.as_mut().start_send(2));
+
+ assert_ready_ok!(tx.as_mut().poll_flush(cx));
+ assert_ready_ok!(tx.as_mut().poll_close(cx));
+ });
+
+ t1.enter(|cx| {
+ pin_mut!(rx);
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert_eq!(val, Some(1));
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert_eq!(val, Some(2));
+
+ let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx));
+ assert!(val.is_none());
+ });
+}
+
+#[test]
fn no_t_bounds_buffer() {
struct NoImpls;
+
+ let mut t1 = MockTask::new();
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
@@ -139,12 +188,16 @@ fn no_t_bounds_buffer() {
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- assert!(assert_ready!(rx.poll()).is_some());
+
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ assert!(val.is_some());
}
#[test]
fn no_t_bounds_unbounded() {
struct NoImpls;
+
+ let mut t1 = MockTask::new();
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
@@ -153,186 +206,188 @@ fn no_t_bounds_unbounded() {
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
- assert!(assert_ready!(rx.poll()).is_some());
+
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ assert!(val.is_some());
}
#[test]
fn send_recv_buffer_limited() {
+ let mut t1 = MockTask::new();
+ let mut t2 = MockTask::new();
+
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
- let mut task = MockTask::new();
// Run on a task context
- task.enter(|| {
- assert!(tx.poll_complete().unwrap().is_ready());
- assert!(tx.poll_ready().unwrap().is_ready());
+ t1.enter(|cx| {
+ assert_ready_ok!(tx.poll_ready(cx));
// Send first message
- let res = tx.start_send(1).unwrap();
- assert!(is_ready(&res));
- assert!(tx.poll_ready().unwrap().is_not_ready());
+ assert_ok!(tx.try_send(1));
- // Send second message
- let res = tx.start_send(2).unwrap();
- assert!(!is_ready(&res));
-
- // Take the value
- assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
- assert!(tx.poll_ready().unwrap().is_ready());
+ // Not ready
+ assert_pending!(tx.poll_ready(cx));
- let res = tx.start_send(2).unwrap();
- assert!(is_ready(&res));
- assert!(tx.poll_ready().unwrap().is_not_ready());
+ // Send second message
+ assert_err!(tx.try_send(1337));
+ });
+ t2.enter(|cx| {
// Take the value
- assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
- assert!(tx.poll_ready().unwrap().is_ready());
+ let val = assert_ready!(rx.poll_next(cx));
+ assert_eq!(Some(1), val);
});
-}
-#[test]
-fn send_shared_recv() {
- let (tx1, rx) = mpsc::channel::<i32>(16);
- let tx2 = tx1.clone();
- let mut rx = rx.wait();
+ assert!(t1.is_woken());
- tx1.send(1).wait().unwrap();
- assert_eq!(rx.next().unwrap().unwrap(), 1);
+ t1.enter(|cx| {
+ assert_ready_ok!(tx.poll_ready(cx));
- tx2.send(2).wait().unwrap();
- assert_eq!(rx.next().unwrap().unwrap(), 2);
-}
+ assert_ok!(tx.try_send(2));
-#[test]
-fn send_recv_threads() {
- let (tx, rx) = mpsc::channel::<i32>(16);
- let mut rx = rx.wait();
+ // Not ready
+ assert_pending!(tx.poll_ready(cx));
+ });
- thread::spawn(move || {
- tx.send(1).wait().unwrap();
+ t2.enter(|cx| {
+ // Take the value
+ let val = assert_ready!(rx.poll_next(cx));
+ assert_eq!(Some(2), val);
});
- assert_eq!(rx.next().unwrap().unwrap(), 1);
+ t1.enter(|cx| {
+ assert_ready_ok!(tx.poll_ready(cx));
+ });
}
#[test]
fn recv_close_gets_none_idle() {
+ let mut t1 = MockTask::new();
+
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
- let mut task = MockTask::new();
rx.close();
- task.enter(|| {
- let val = assert_ready!(rx.poll());
+ t1.enter(|cx| {
+ let val = assert_ready!(rx.poll_next(cx));
assert!(val.is_none());
- assert!(tx.poll_ready().is_err());
+ assert_ready_err!(tx.poll_ready(cx));
});
}
#[test]
fn recv_close_gets_none_reserved() {
+ let mut t1 = MockTask::new();
+ let mut t2 = MockTask::new();
+ let mut t3 = MockTask::new();
+
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
- assert_ready!(tx1.poll_ready());
-
- let mut task = MockTask::new();
+ assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
- task.enter(|| {
- assert_not_ready!(tx2.poll_ready());
+ t2.enter(|cx| {
+ assert_pending!(tx2.poll_ready(cx));
});
rx.close();
- assert!(task.is_notified());
+ assert!(t2.is_woken());
- task.enter(|| {
- assert!(tx2.poll_ready().is_err());
- assert_not_ready!(rx.poll());
+ t2.enter(|cx| {
+ assert_ready_err!(tx2.poll_ready(cx));
});
- assert!(!task.is_notified());
+ t3.enter(|cx| assert_pending!(rx.poll_next(cx)));
+
+ assert!(!t1.is_woken());
+ assert!(!t2.is_woken());
- assert!(tx1.try_send(123).is_ok());
+ assert_ok!(tx1.try_send(123));
- assert!(task.is_notified());
+ assert!(t3.is_woken());
- task.enter(|| {
- let v = assert_ready!(rx.poll());
+ t3.enter(|cx| {
+ let v = assert_ready!(rx.poll_next(cx));
assert_eq!(v, Some(123));
- let v = assert_ready!(rx.poll());
+ let v = assert_ready!(rx.poll_next(cx));
assert!(v.is_none());
});
}
#[test]
fn tx_close_gets_none() {
+ let mut t1 = MockTask::new();
+
let (_, mut rx) = mpsc::channel::<i32>(10);
- let mut task = MockTask::new();
// Run on a task context
- task.enter(|| {
- let v = assert_ready!(rx.poll());
+ t1.enter(|cx| {
+ let v = assert_ready!(rx.poll_next(cx));
assert!(v.is_none());
});
}
-fn is_ready<T>(res: &AsyncSink<T>) -> bool {
- match *res {
- AsyncSink::Ready => true,
- _ => false,
- }
-}
-
#[test]
fn try_send_fail() {
- let (mut tx, rx) = mpsc::channel(1);
- let mut rx = rx.wait();
+ let mut t1 = MockTask::new();
+
+ let (mut tx, mut rx) = mpsc::channel(1);
tx.try_send("hello").unwrap();
// This should fail
- assert!(tx.try_send("fail").unwrap_err().is_full());
+ let err = assert_err!(tx.try_send("fail"));
+ assert!(err.is_full());
- assert_eq!(rx.next().unwrap().unwrap(), "hello");
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ assert_eq!(val, Some("hello"));
- tx.try_send("goodbye").unwrap();
+ assert_ok!(tx.try_send("goodbye"));
drop(tx);
- assert_eq!(rx.next().unwrap().unwrap(), "goodbye");
- assert!(rx.next().is_none());
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ assert_eq!(val, Some("goodbye"));
+
+ let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx)));
+ assert!(val.is_none());
}
#[test]
fn drop_tx_with_permit_releases_permit() {
+ let mut t1 = MockTask::new();
+ let mut t2 = MockTask::new();
+
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
- let mut task = MockTask::new();
- assert_ready!(tx1.poll_ready());
+ assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
- task.enter(|| {
- assert_not_ready!(tx2.poll_ready());
+ t2.enter(|cx| {
+ assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
- assert!(task.is_notified());
+ assert!(t2.is_woken());
- assert_ready!(tx2.poll_ready());
+ assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx)));
}
#[test]
fn dropping_rx_closes_channel() {
+ let mut t1 = MockTask::new();
+
let (mut tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
- tx.try_send(msg.clone()).unwrap();
+ assert_ok!(tx.try_send(msg.clone()));
drop(rx);
- assert!(tx.poll_ready().is_err());
+ assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx)));
assert_eq!(1, Arc::strong_count(&msg));
}
@@ -345,7 +400,11 @@ fn dropping_rx_closes_channel_for_try() {
tx.try_send(msg.clone()).unwrap();
drop(rx);
- assert!(tx.try_send(msg.clone()).unwrap_err().is_closed());
+
+ {
+ let err = assert_err!(tx.try_send(msg.clone()));
+ assert!(err.is_closed());
+ }
assert_eq!(1, Arc::strong_count(&msg));
}