summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio/Cargo.toml5
-rw-r--r--tokio/src/io/async_buf_read_ext.rs29
-rw-r--r--tokio/src/io/lines.rs56
-rw-r--r--tokio/src/io/mod.rs1
-rw-r--r--tokio/tests/io_lines.rs53
5 files changed, 142 insertions, 2 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index d45b8d00..6a630083 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -40,7 +40,7 @@ default = [
codec = ["io", "tokio-codec"]
fs = ["tokio-fs"]
-io = ["bytes", "tokio-io", "memchr"]
+io = ["bytes", "tokio-io", "tokio-futures", "memchr"]
reactor = ["io", "tokio-reactor"]
rt-full = [
"num_cpus",
@@ -83,7 +83,7 @@ tracing-core = { version = "0.1", optional = true }
memchr = { version = "2.2", optional = true }
# Needed for async/await preview support
-#tokio-futures = { version = "0.2.0", optional = true, path = "../tokio-futures" }
+tokio-futures = { version = "0.2.0", optional = true, path = "../tokio-futures" }
[target.'cfg(unix)'.dependencies]
tokio-uds = { version = "0.3.0", optional = true, path = "../tokio-uds" }
@@ -102,3 +102,4 @@ num_cpus = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
time = "0.1"
+futures-util-preview = "0.3.0-alpha.17"
diff --git a/tokio/src/io/async_buf_read_ext.rs b/tokio/src/io/async_buf_read_ext.rs
index e8178074..dfe0864a 100644
--- a/tokio/src/io/async_buf_read_ext.rs
+++ b/tokio/src/io/async_buf_read_ext.rs
@@ -1,3 +1,4 @@
+use crate::io::lines::{lines, Lines};
use crate::io::read_line::{read_line, ReadLine};
use crate::io::read_until::{read_until, ReadUntil};
@@ -66,6 +67,34 @@ pub trait AsyncBufReadExt: AsyncBufRead {
{
read_line(self, buf)
}
+
+ /// Returns a stream over the lines of this reader.
+ /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
+ ///
+ /// The stream returned from this function will yield instances of
+ /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
+ /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
+ ///
+ /// [`io::Result`]: std::io::Result
+ /// [`String`]: String
+ ///
+ /// # Errors
+ ///
+ /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
+ ///
+ /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ fn lines(self) -> Lines<Self>
+ where
+ Self: Sized,
+ {
+ lines(self)
+ }
}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
diff --git a/tokio/src/io/lines.rs b/tokio/src/io/lines.rs
new file mode 100644
index 00000000..b44e4d1b
--- /dev/null
+++ b/tokio/src/io/lines.rs
@@ -0,0 +1,56 @@
+use super::read_line::read_line_internal;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio_futures::Stream;
+use tokio_io::AsyncBufRead;
+
+/// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Lines<R> {
+ reader: R,
+ buf: String,
+ bytes: Vec<u8>,
+ read: usize,
+}
+
+impl<R: Unpin> Unpin for Lines<R> {}
+
+pub(crate) fn lines<R>(reader: R) -> Lines<R>
+where
+ R: AsyncBufRead,
+{
+ Lines {
+ reader,
+ buf: String::new(),
+ bytes: Vec::new(),
+ read: 0,
+ }
+}
+
+impl<R: AsyncBufRead> Stream for Lines<R> {
+ type Item = io::Result<String>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let Self {
+ reader,
+ buf,
+ bytes,
+ read,
+ } = unsafe { self.get_unchecked_mut() };
+ let reader = unsafe { Pin::new_unchecked(reader) };
+ let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
+ if n == 0 && buf.is_empty() {
+ return Poll::Ready(None);
+ }
+ if buf.ends_with('\n') {
+ buf.pop();
+ if buf.ends_with('\r') {
+ buf.pop();
+ }
+ }
+ Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
+ }
+}
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index bd1440ee..837ab5aa 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -40,6 +40,7 @@ mod async_buf_read_ext;
mod async_read_ext;
mod async_write_ext;
mod copy;
+mod lines;
mod read;
mod read_exact;
mod read_line;
diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs
new file mode 100644
index 00000000..02ce6149
--- /dev/null
+++ b/tokio/tests/io_lines.rs
@@ -0,0 +1,53 @@
+#![deny(warnings, rust_2018_idioms)]
+#![feature(async_await)]
+
+use futures_util::StreamExt;
+use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead};
+use tokio_test::assert_ok;
+
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[tokio::test]
+async fn lines() {
+ struct Rd {
+ val: &'static [u8],
+ }
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ _: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ unimplemented!()
+ }
+ }
+
+ impl AsyncBufRead for Rd {
+ fn poll_fill_buf<'a>(
+ self: Pin<&'a mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<io::Result<&'a [u8]>> {
+ Poll::Ready(Ok(self.val))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.val = &self.val[amt..];
+ }
+ }
+
+ let rd = Rd {
+ val: b"hello\r\nworld\n\n",
+ };
+ let mut st = rd.lines();
+
+ let b = assert_ok!(st.next().await.unwrap());
+ assert_eq!(b, "hello");
+ let b = assert_ok!(st.next().await.unwrap());
+ assert_eq!(b, "world");
+ let b = assert_ok!(st.next().await.unwrap());
+ assert_eq!(b, "");
+ assert!(st.next().await.is_none());
+}