#![warn(rust_2018_idioms)]
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
use tokio_util::codec::*;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::Poll::*;
use std::task::{Context, Poll};
macro_rules! mock {
($($x:expr,)*) => {{
let mut v = VecDeque::new();
v.extend(vec![$($x),*]);
Mock { calls: v }
}};
}
macro_rules! assert_next_eq {
($io:ident, $expect:expr) => {{
task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
Some(Err(e)) => panic!("error = {:?}", e),
None => panic!("none"),
}
});
}};
}
macro_rules! assert_next_pending {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(e))) => panic!("error = {:?}", e),
Ready(None) => panic!("done"),
Pending => {}
});
}};
}
macro_rules! assert_next_err {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(_))) => {}
Ready(None) => panic!("done"),
Pending => panic!("pending"),
});
}};
}
macro_rules! assert_done {
($io:ident) => {{
task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => panic!("value = {:?}", v),
Some(Err(e)) => panic!("error = {:?}", e),
None => {}
}
});
}};
}
#[test]
fn read_empty_io_yields_nothing() {
let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
pin_mut!(io);
assert_done!(io);
}
#[test]
fn read_single_frame_one_packet() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00\x00\x09abcdefghi"),
},
LengthDelimitedCodec::new(),
);
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_done!(io);
}
#[test]
fn read_single_frame_one_packet_little_endian() {
let io = length_delimited::Builder::new()
.little_endian()
.new_read(mock! {
data(b"\x09\x00\x00\x00abcdefghi"),
});
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_done!(io);
}
#[test]
fn read_single_frame_one_packet_native_endian() {
let d = if cfg!(target_endian = "big") {
b"\x00\x00\x00\x09abcdefghi"
} else {
b"\x09\x00\x00\x00abcdefghi"
};
let io = length_delimited::Builder::new()
.native_endian()
.new_read(mock! {
data(d),
});
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_done!(io);
}
#[test]
fn read_single_multi_frame_one_packet() {
let mut d: Vec<u8> = vec![];
d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
d.extend_from_slice(b"\x00\x00\x00\x03123");
d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
let io = FramedRead::new(
mock! {
data(&d),
},
LengthDelimitedCodec::new(),
);
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_next_eq!(io, b"123");
assert_next_eq!(io, b"hello world");
assert_done!(io);
}
#[test]
fn read_single_frame_multi_packet() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
data(b"\x00\x09abc"),
data(b"defghi"),
},
LengthDelimitedCodec::new(),
);
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_done!(io);
}
#[test]
fn read_multi_frame_multi_packet() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
data(b"\x00\x09abc"),
data(b"defghi"),
data(b"\x00\x00\x00\x0312"),
data(b"3\x00\x00\x00\x0bhello world"),
},
LengthDelimitedCodec::new(),
);
pin_mut!(io);
assert_next_eq!(io, b"abcdefghi");
assert_next_eq!(io, b"123");
assert_next_eq!(io, b"hello world");
assert_done!(io);
}
#[test]
fn read_sing