diff options
-rw-r--r-- | tokio/Cargo.toml | 5 | ||||
-rw-r--r-- | tokio/src/io/async_buf_read_ext.rs | 29 | ||||
-rw-r--r-- | tokio/src/io/lines.rs | 56 | ||||
-rw-r--r-- | tokio/src/io/mod.rs | 1 | ||||
-rw-r--r-- | tokio/tests/io_lines.rs | 53 |
5 files changed, 142 insertions, 2 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index d45b8d00..6a630083 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -40,7 +40,7 @@ default = [ codec = ["io", "tokio-codec"] fs = ["tokio-fs"] -io = ["bytes", "tokio-io", "memchr"] +io = ["bytes", "tokio-io", "tokio-futures", "memchr"] reactor = ["io", "tokio-reactor"] rt-full = [ "num_cpus", @@ -83,7 +83,7 @@ tracing-core = { version = "0.1", optional = true } memchr = { version = "2.2", optional = true } # Needed for async/await preview support -#tokio-futures = { version = "0.2.0", optional = true, path = "../tokio-futures" } +tokio-futures = { version = "0.2.0", optional = true, path = "../tokio-futures" } [target.'cfg(unix)'.dependencies] tokio-uds = { version = "0.3.0", optional = true, path = "../tokio-uds" } @@ -102,3 +102,4 @@ num_cpus = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" time = "0.1" +futures-util-preview = "0.3.0-alpha.17" diff --git a/tokio/src/io/async_buf_read_ext.rs b/tokio/src/io/async_buf_read_ext.rs index e8178074..dfe0864a 100644 --- a/tokio/src/io/async_buf_read_ext.rs +++ b/tokio/src/io/async_buf_read_ext.rs @@ -1,3 +1,4 @@ +use crate::io::lines::{lines, Lines}; use crate::io::read_line::{read_line, ReadLine}; use crate::io::read_until::{read_until, ReadUntil}; @@ -66,6 +67,34 @@ pub trait AsyncBufReadExt: AsyncBufRead { { read_line(self, buf) } + + /// Returns a stream over the lines of this reader. + /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). + /// + /// The stream returned from this function will yield instances of + /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline + /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. + /// + /// [`io::Result`]: std::io::Result + /// [`String`]: String + /// + /// # Errors + /// + /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. + /// + /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line + /// + /// # Examples + /// + /// ``` + /// unimplemented!(); + /// ``` + fn lines(self) -> Lines<Self> + where + Self: Sized, + { + lines(self) + } } impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} diff --git a/tokio/src/io/lines.rs b/tokio/src/io/lines.rs new file mode 100644 index 00000000..b44e4d1b --- /dev/null +++ b/tokio/src/io/lines.rs @@ -0,0 +1,56 @@ +use super::read_line::read_line_internal; +use std::io; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio_futures::Stream; +use tokio_io::AsyncBufRead; + +/// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Lines<R> { + reader: R, + buf: String, + bytes: Vec<u8>, + read: usize, +} + +impl<R: Unpin> Unpin for Lines<R> {} + +pub(crate) fn lines<R>(reader: R) -> Lines<R> +where + R: AsyncBufRead, +{ + Lines { + reader, + buf: String::new(), + bytes: Vec::new(), + read: 0, + } +} + +impl<R: AsyncBufRead> Stream for Lines<R> { + type Item = io::Result<String>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let Self { + reader, + buf, + bytes, + read, + } = unsafe { self.get_unchecked_mut() }; + let reader = unsafe { Pin::new_unchecked(reader) }; + let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?; + if n == 0 && buf.is_empty() { + return Poll::Ready(None); + } + if buf.ends_with('\n') { + buf.pop(); + if buf.ends_with('\r') { + buf.pop(); + } + } + Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index bd1440ee..837ab5aa 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -40,6 +40,7 @@ mod async_buf_read_ext; mod async_read_ext; mod async_write_ext; mod copy; +mod lines; mod read; mod read_exact; mod read_line; diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs new file mode 100644 index 00000000..02ce6149 --- /dev/null +++ b/tokio/tests/io_lines.rs @@ -0,0 +1,53 @@ +#![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] + +use futures_util::StreamExt; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead}; +use tokio_test::assert_ok; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn lines() { + struct Rd { + val: &'static [u8], + } + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll<io::Result<usize>> { + unimplemented!() + } + } + + impl AsyncBufRead for Rd { + fn poll_fill_buf<'a>( + self: Pin<&'a mut Self>, + _: &mut Context<'_>, + ) -> Poll<io::Result<&'a [u8]>> { + Poll::Ready(Ok(self.val)) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.val = &self.val[amt..]; + } + } + + let rd = Rd { + val: b"hello\r\nworld\n\n", + }; + let mut st = rd.lines(); + + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, "hello"); + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, "world"); + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, ""); + assert!(st.next().await.is_none()); +} |