diff options
Diffstat (limited to 'tokio-sync/tests/mpsc.rs')
-rw-r--r-- | tokio-sync/tests/mpsc.rs | 351 |
1 files changed, 205 insertions, 146 deletions
diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index af2d1dd7..17dd3e64 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -1,98 +1,111 @@ #![deny(warnings, rust_2018_idioms)] -use futures; -use futures::prelude::*; -use std::sync::Arc; -use std::thread; -use tokio_mock_task::*; use tokio_sync::mpsc; +use tokio_test::task::MockTask; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; trait AssertSend: Send {} impl AssertSend for mpsc::Sender<i32> {} impl AssertSend for mpsc::Receiver<i32> {} -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::NotReady) => {} - Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - #[test] fn send_recv_with_buffer() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::<i32>(16); // Using poll_ready / try_send - assert_ready!(tx.poll_ready()); + assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx))); tx.try_send(1).unwrap(); // Without poll_ready tx.try_send(2).unwrap(); - // Sink API - assert!(tx.start_send(3).unwrap().is_ready()); - assert_ready!(tx.poll_complete()); - assert_ready!(tx.close()); - drop(tx); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(2)); - let val = assert_ready!(rx.poll()); - assert_eq!(val, Some(3)); - - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert!(val.is_none()); } #[test] +#[cfg(feature = "async-traits")] +fn send_sink_recv_with_buffer() { + use async_sink::Sink; + use futures_core::Stream; + use pin_utils::pin_mut; + + let mut t1 = MockTask::new(); + + let (tx, rx) = mpsc::channel::<i32>(16); + + t1.enter(|cx| { + pin_mut!(tx); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(1)); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(2)); + + assert_ready_ok!(tx.as_mut().poll_flush(cx)); + assert_ready_ok!(tx.as_mut().poll_close(cx)); + }); + + t1.enter(|cx| { + pin_mut!(rx); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(1)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(2)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert!(val.is_none()); + }); +} + +#[test] fn start_send_past_cap() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let mut t3 = MockTask::new(); + let (mut tx1, mut rx) = mpsc::channel(1); let mut tx2 = tx1.clone(); - let mut task1 = MockTask::new(); - let mut task2 = MockTask::new(); + assert_ok!(tx1.try_send(())); - let res = tx1.start_send(()).unwrap(); - assert!(res.is_ready()); - - task1.enter(|| { - let res = tx1.start_send(()).unwrap(); - assert!(!res.is_ready()); + t1.enter(|cx| { + assert_pending!(tx1.poll_ready(cx)); }); - task2.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - let val = assert_ready!(rx.poll()); + let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); assert!(val.is_some()); - assert!(task2.is_notified()); - assert!(!task1.is_notified()); + assert!(t2.is_woken()); + assert!(!t1.is_woken()); drop(tx2); - let val = assert_ready!(rx.poll()); + let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); assert!(val.is_none()); } @@ -104,33 +117,69 @@ fn buffer_gteq_one() { #[test] fn send_recv_unbounded() { + let mut t1 = MockTask::new(); + let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>(); // Using `try_send` - tx.try_send(1).unwrap(); + assert_ok!(tx.try_send(1)); + assert_ok!(tx.try_send(2)); - // Using `Sink` API - assert!(tx.start_send(2).unwrap().is_ready()); - assert_ready!(tx.poll_complete()); - - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(2)); - assert_ready!(tx.poll_complete()); - assert_ready!(tx.close()); - drop(tx); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert!(val.is_none()); } #[test] +#[cfg(feature = "async-traits")] +fn sink_send_recv_unbounded() { + use async_sink::Sink; + use futures_core::Stream; + use pin_utils::pin_mut; + + let mut t1 = MockTask::new(); + + let (tx, rx) = mpsc::unbounded_channel::<i32>(); + + t1.enter(|cx| { + pin_mut!(tx); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(1)); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(2)); + + assert_ready_ok!(tx.as_mut().poll_flush(cx)); + assert_ready_ok!(tx.as_mut().poll_close(cx)); + }); + + t1.enter(|cx| { + pin_mut!(rx); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(1)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(2)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert!(val.is_none()); + }); +} + +#[test] fn no_t_bounds_buffer() { struct NoImpls; + + let mut t1 = MockTask::new(); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug @@ -139,12 +188,16 @@ fn no_t_bounds_buffer() { println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - assert!(assert_ready!(rx.poll()).is_some()); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_some()); } #[test] fn no_t_bounds_unbounded() { struct NoImpls; + + let mut t1 = MockTask::new(); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug @@ -153,186 +206,188 @@ fn no_t_bounds_unbounded() { println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - assert!(assert_ready!(rx.poll()).is_some()); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_some()); } #[test] fn send_recv_buffer_limited() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::<i32>(1); - let mut task = MockTask::new(); // Run on a task context - task.enter(|| { - assert!(tx.poll_complete().unwrap().is_ready()); - assert!(tx.poll_ready().unwrap().is_ready()); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); // Send first message - let res = tx.start_send(1).unwrap(); - assert!(is_ready(&res)); - assert!(tx.poll_ready().unwrap().is_not_ready()); + assert_ok!(tx.try_send(1)); - // Send second message - let res = tx.start_send(2).unwrap(); - assert!(!is_ready(&res)); - - // Take the value - assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1))); - assert!(tx.poll_ready().unwrap().is_ready()); + // Not ready + assert_pending!(tx.poll_ready(cx)); - let res = tx.start_send(2).unwrap(); - assert!(is_ready(&res)); - assert!(tx.poll_ready().unwrap().is_not_ready()); + // Send second message + assert_err!(tx.try_send(1337)); + }); + t2.enter(|cx| { // Take the value - assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2))); - assert!(tx.poll_ready().unwrap().is_ready()); + let val = assert_ready!(rx.poll_next(cx)); + assert_eq!(Some(1), val); }); -} -#[test] -fn send_shared_recv() { - let (tx1, rx) = mpsc::channel::<i32>(16); - let tx2 = tx1.clone(); - let mut rx = rx.wait(); + assert!(t1.is_woken()); - tx1.send(1).wait().unwrap(); - assert_eq!(rx.next().unwrap().unwrap(), 1); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); - tx2.send(2).wait().unwrap(); - assert_eq!(rx.next().unwrap().unwrap(), 2); -} + assert_ok!(tx.try_send(2)); -#[test] -fn send_recv_threads() { - let (tx, rx) = mpsc::channel::<i32>(16); - let mut rx = rx.wait(); + // Not ready + assert_pending!(tx.poll_ready(cx)); + }); - thread::spawn(move || { - tx.send(1).wait().unwrap(); + t2.enter(|cx| { + // Take the value + let val = assert_ready!(rx.poll_next(cx)); + assert_eq!(Some(2), val); }); - assert_eq!(rx.next().unwrap().unwrap(), 1); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); + }); } #[test] fn recv_close_gets_none_idle() { + let mut t1 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::<i32>(10); - let mut task = MockTask::new(); rx.close(); - task.enter(|| { - let val = assert_ready!(rx.poll()); + t1.enter(|cx| { + let val = assert_ready!(rx.poll_next(cx)); assert!(val.is_none()); - assert!(tx.poll_ready().is_err()); + assert_ready_err!(tx.poll_ready(cx)); }); } #[test] fn recv_close_gets_none_reserved() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let mut t3 = MockTask::new(); + let (mut tx1, mut rx) = mpsc::channel::<i32>(1); let mut tx2 = tx1.clone(); - assert_ready!(tx1.poll_ready()); - - let mut task = MockTask::new(); + assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); - task.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); rx.close(); - assert!(task.is_notified()); + assert!(t2.is_woken()); - task.enter(|| { - assert!(tx2.poll_ready().is_err()); - assert_not_ready!(rx.poll()); + t2.enter(|cx| { + assert_ready_err!(tx2.poll_ready(cx)); }); - assert!(!task.is_notified()); + t3.enter(|cx| assert_pending!(rx.poll_next(cx))); + + assert!(!t1.is_woken()); + assert!(!t2.is_woken()); - assert!(tx1.try_send(123).is_ok()); + assert_ok!(tx1.try_send(123)); - assert!(task.is_notified()); + assert!(t3.is_woken()); - task.enter(|| { - let v = assert_ready!(rx.poll()); + t3.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)); assert_eq!(v, Some(123)); - let v = assert_ready!(rx.poll()); + let v = assert_ready!(rx.poll_next(cx)); assert!(v.is_none()); }); } #[test] fn tx_close_gets_none() { + let mut t1 = MockTask::new(); + let (_, mut rx) = mpsc::channel::<i32>(10); - let mut task = MockTask::new(); // Run on a task context - task.enter(|| { - let v = assert_ready!(rx.poll()); + t1.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)); assert!(v.is_none()); }); } -fn is_ready<T>(res: &AsyncSink<T>) -> bool { - match *res { - AsyncSink::Ready => true, - _ => false, - } -} - #[test] fn try_send_fail() { - let (mut tx, rx) = mpsc::channel(1); - let mut rx = rx.wait(); + let mut t1 = MockTask::new(); + + let (mut tx, mut rx) = mpsc::channel(1); tx.try_send("hello").unwrap(); // This should fail - assert!(tx.try_send("fail").unwrap_err().is_full()); + let err = assert_err!(tx.try_send("fail")); + assert!(err.is_full()); - assert_eq!(rx.next().unwrap().unwrap(), "hello"); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert_eq!(val, Some("hello")); - tx.try_send("goodbye").unwrap(); + assert_ok!(tx.try_send("goodbye")); drop(tx); - assert_eq!(rx.next().unwrap().unwrap(), "goodbye"); - assert!(rx.next().is_none()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert_eq!(val, Some("goodbye")); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_none()); } #[test] fn drop_tx_with_permit_releases_permit() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. let (mut tx1, _rx) = mpsc::channel::<i32>(1); let mut tx2 = tx1.clone(); - let mut task = MockTask::new(); - assert_ready!(tx1.poll_ready()); + assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); - task.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - assert!(task.is_notified()); + assert!(t2.is_woken()); - assert_ready!(tx2.poll_ready()); + assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx))); } #[test] fn dropping_rx_closes_channel() { + let mut t1 = MockTask::new(); + let (mut tx, rx) = mpsc::channel(100); let msg = Arc::new(()); - tx.try_send(msg.clone()).unwrap(); + assert_ok!(tx.try_send(msg.clone())); drop(rx); - assert!(tx.poll_ready().is_err()); + assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx))); assert_eq!(1, Arc::strong_count(&msg)); } @@ -345,7 +400,11 @@ fn dropping_rx_closes_channel_for_try() { tx.try_send(msg.clone()).unwrap(); drop(rx); - assert!(tx.try_send(msg.clone()).unwrap_err().is_closed()); + + { + let err = assert_err!(tx.try_send(msg.clone())); + assert!(err.is_closed()); + } assert_eq!(1, Arc::strong_count(&msg)); } |