#![warn(rust_2018_idioms)] use tokio::io::{AsyncRead, ReadBuf}; use tokio_test::assert_ready; use tokio_test::task; use tokio_util::codec::{Decoder, FramedRead}; use bytes::{Buf, BytesMut}; use futures::Stream; use std::collections::VecDeque; use std::io; use std::pin::Pin; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; macro_rules! mock { ($($x:expr,)*) => {{ let mut v = VecDeque::new(); v.extend(vec![$($x),*]); Mock { calls: v } }}; } macro_rules! assert_read { ($e:expr, $n:expr) => {{ let val = assert_ready!($e); assert_eq!(val.unwrap().unwrap(), $n); }}; } macro_rules! pin { ($id:ident) => { Pin::new(&mut $id) }; } struct U32Decoder; impl Decoder for U32Decoder { type Item = u32; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { if buf.len() < 4 { return Ok(None); } let n = buf.split_to(4).get_u32(); Ok(Some(n)) } } #[test] fn read_multi_frame_in_packet() { let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); } #[test] fn read_multi_frame_across_packets() { let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00\x00\x00".to_vec()), Ok(b"\x00\x00\x00\x01".to_vec()), Ok(b"\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); } #[test] fn read_not_ready() { let mut task = task::spawn(()); let mock = mock! { Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Ok(b"\x00\x00\x00\x00".to_vec()), Ok(b"\x00\x00\x00\x01".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); } #[test] fn read_partial_then_not_ready() { let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); } #[test] fn read_err() { let mut task = task::spawn(()); let mock = mock! { Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert_eq!( io::ErrorKind::Other, assert_ready!(pin!(framed).poll_next(cx)) .unwrap() .unwrap_err() .kind() ) }); } #[test] fn read_partial_then_err() { let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert_eq!( io::ErrorKind::Other, assert_ready!(pin!(framed).poll_next(cx)) .unwrap() .unwrap_err() .kind() ) }); } #[test] fn read_partial_would_block_then_err() { let mut task = task::spawn(()); let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); task.enter(|cx, _| { assert!(pin!(framed).poll_next(cx).is_pending()); assert_eq!( io::ErrorKind::Other, assert_ready!(pin!(framed).poll_next(cx)) .unwrap() .unwrap_err() .kind() ) }); } #[test] fn huge_size() { let mut task = task::spawn(()); let data = &[0; 32 * 1024][..]; let mut framed = FramedRead::new(data, BigDecoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); struct BigDecoder; impl Decoder for BigDecoder { type Item = u32; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { if buf.len() < 32 * 1024 { return Ok(None); } buf.advance(32 * 1024); Ok(Some(0)) } } } #[test] fn data_remaining_is_error() { let mut task = task::spawn(()); let slice = &[0; 5][..]; let mut framed = FramedRead::new(slice, U32Decoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err()); }); } #[test] fn multi_frames_on_eof() { let mut task = task::spawn(()); struct MyDecoder(Vec); impl Decoder for MyDecoder { type Item = u32; type Error = io::Error; fn decode(&mut self, _buf: &mut BytesMut) -> io::Result> { unreachable!(); } fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result> { if self.0.is_empty() { return Ok(None); } Ok(Some(self.0.remove(0))) } } let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); assert_read!(pin!(framed).poll_next(cx), 1); assert_read!(pin!(framed).poll_next(cx), 2); assert_read!(pin!(framed).poll_next(cx), 3); assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); }); } // ===== Mock ====== struct Mock { calls: VecDeque>>, } impl AsyncRead for Mock { fn poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { use io::ErrorKind::WouldBlock; match self.calls.pop_front() { Some(Ok(data)) => { debug_assert!(buf.remaining() >= data.len()); buf.put_slice(&data); Ready(Ok(())) } Some(Err(ref e)) if e.kind() == WouldBlock => Pending, Some(Err(e)) => Ready(Err(e)), None => Ready(Ok(())), } } }