summaryrefslogtreecommitdiffstats
path: root/tokio-util/tests/io_reader_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-util/tests/io_reader_stream.rs')
-rw-r--r--tokio-util/tests/io_reader_stream.rs65
1 files changed, 65 insertions, 0 deletions
diff --git a/tokio-util/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs
new file mode 100644
index 00000000..b906de09
--- /dev/null
+++ b/tokio-util/tests/io_reader_stream.rs
@@ -0,0 +1,65 @@
+#![warn(rust_2018_idioms)]
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio::stream::StreamExt;
+
+/// produces at most `remaining` zeros, that returns error.
+/// each time it reads at most 31 byte.
+struct Reader {
+ remaining: usize,
+}
+
+impl AsyncRead for Reader {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ let this = Pin::into_inner(self);
+ assert_ne!(buf.remaining(), 0);
+ if this.remaining > 0 {
+ let n = std::cmp::min(this.remaining, buf.remaining());
+ let n = std::cmp::min(n, 31);
+ for x in &mut buf.initialize_unfilled_to(n)[..n] {
+ *x = 0;
+ }
+ buf.add_filled(n);
+ this.remaining -= n;
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Ready(Err(std::io::Error::from_raw_os_error(22)))
+ }
+ }
+}
+
+#[tokio::test]
+async fn correct_behavior_on_errors() {
+ let reader = Reader { remaining: 8000 };
+ let mut stream = tokio_util::io::ReaderStream::new(reader);
+ let mut zeros_received = 0;
+ let mut had_error = false;
+ loop {
+ let item = stream.next().await.unwrap();
+ println!("{:?}", item);
+ match item {
+ Ok(bytes) => {
+ let bytes = &*bytes;
+ for byte in bytes {
+ assert_eq!(*byte, 0);
+ zeros_received += 1;
+ }
+ }
+ Err(_) => {
+ assert!(!had_error);
+ had_error = true;
+ break;
+ }
+ }
+ }
+
+ assert!(had_error);
+ assert_eq!(zeros_received, 8000);
+ assert!(stream.next().await.is_none());
+}