summaryrefslogtreecommitdiffstats
path: root/tokio-io
diff options
context:
space:
mode:
authorTaiki Endo <te316e89@gmail.com>2019-08-15 02:24:07 +0900
committerCarl Lerche <me@carllerche.com>2019-08-14 10:24:07 -0700
commit999a600494c7b491e65094518ca72a57e27c3ab0 (patch)
tree4bb2729117d8c7ebed2adc09e1f39508cd8b1035 /tokio-io
parentfb9809c0684633cbaaee871dce084c9228598a78 (diff)
io: add async BufReader/BufWriter (#1438)
Diffstat (limited to 'tokio-io')
-rw-r--r--tokio-io/Cargo.toml3
-rw-r--r--tokio-io/src/io/buf_reader.rs168
-rw-r--r--tokio-io/src/io/buf_writer.rs159
-rw-r--r--tokio-io/src/io/mod.rs10
-rw-r--r--tokio-io/src/lib.rs2
5 files changed, 340 insertions, 2 deletions
diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml
index 8a350a90..521ca2b7 100644
--- a/tokio-io/Cargo.toml
+++ b/tokio-io/Cargo.toml
@@ -20,13 +20,14 @@ Core I/O primitives for asynchronous I/O in Rust.
categories = ["asynchronous"]
[features]
-util = ["memchr"]
+util = ["memchr", "pin-utils"]
[dependencies]
bytes = "0.4.7"
log = "0.4"
futures-core-preview = "=0.3.0-alpha.18"
memchr = { version = "2.2", optional = true }
+pin-utils = { version = "=0.1.0-alpha.4", optional = true }
[dev-dependencies]
tokio = { version = "=0.2.0-alpha.1", path = "../tokio" }
diff --git a/tokio-io/src/io/buf_reader.rs b/tokio-io/src/io/buf_reader.rs
new file mode 100644
index 00000000..b8347d09
--- /dev/null
+++ b/tokio-io/src/io/buf_reader.rs
@@ -0,0 +1,168 @@
+use super::DEFAULT_BUF_SIZE;
+use crate::{AsyncBufRead, AsyncRead};
+use futures_core::ready;
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{cmp, fmt};
+
+/// The `BufReader` struct adds buffering to any reader.
+///
+/// It can be excessively inefficient to work directly with a [`AsyncRead`]
+/// instance. A `BufReader` performs large, infrequent reads on the underlying
+/// [`AsyncRead`] and maintains an in-memory buffer of the results.
+///
+/// `BufReader` can improve the speed of programs that make *small* and
+/// *repeated* read calls to the same file or network socket. It does not
+/// help when reading very large amounts at once, or reading just one or a few
+/// times. It also provides no advantage when reading from a source that is
+/// already in memory, like a `Vec<u8>`.
+///
+/// When the `BufReader` is dropped, the contents of its buffer will be
+/// discarded. Creating multiple instances of a `BufReader` on the same
+/// stream can cause data loss.
+///
+/// [`AsyncRead`]: tokio_io::AsyncRead
+///
+// TODO: Examples
+pub struct BufReader<R> {
+ inner: R,
+ buf: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+}
+
+impl<R: AsyncRead> BufReader<R> {
+ unsafe_pinned!(inner: R);
+ unsafe_unpinned!(pos: usize);
+ unsafe_unpinned!(cap: usize);
+
+ /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: R) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufReader` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: R) -> Self {
+ unsafe {
+ let mut buffer = Vec::with_capacity(capacity);
+ buffer.set_len(capacity);
+ inner.prepare_uninitialized_buffer(&mut buffer);
+ Self {
+ inner,
+ buf: buffer.into_boxed_slice(),
+ pos: 0,
+ cap: 0,
+ }
+ }
+ }
+
+ /// Gets a reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_ref(&self) -> &R {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_mut(&mut self) -> &mut R {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
+ self.inner()
+ }
+
+ /// Consumes this `BufWriter`, returning the underlying reader.
+ ///
+ /// Note that any leftover data in the internal buffer is lost.
+ pub fn into_inner(self) -> R {
+ self.inner
+ }
+
+ /// Returns a reference to the internally buffered data.
+ ///
+ /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buf[self.pos..self.cap]
+ }
+
+ /// Invalidates all data in the internal buffer.
+ #[inline]
+ fn discard_buffer(mut self: Pin<&mut Self>) {
+ *self.as_mut().pos() = 0;
+ *self.cap() = 0;
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for BufReader<R> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ // If we don't have any buffered data and we're doing a massive read
+ // (larger than our internal buffer), bypass our internal buffer
+ // entirely.
+ if self.pos == self.cap && buf.len() >= self.buf.len() {
+ let res = ready!(self.as_mut().inner().poll_read(cx, buf));
+ self.discard_buffer();
+ return Poll::Ready(res);
+ }
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let nread = rem.read(buf)?;
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+
+ // we can't skip unconditionally because of the large buffer case in read.
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let Self {
+ inner,
+ buf,
+ cap,
+ pos,
+ } = unsafe { self.get_unchecked_mut() };
+ let mut inner = unsafe { Pin::new_unchecked(inner) };
+
+ // If we've reached the end of our internal buffer then we need to fetch
+ // some more data from the underlying reader.
+ // Branch using `>=` instead of the more correct `==`
+ // to tell the compiler that the pos..cap slice is always valid.
+ if *pos >= *cap {
+ debug_assert!(*pos == *cap);
+ *cap = ready!(inner.as_mut().poll_read(cx, buf))?;
+ *pos = 0;
+ }
+ Poll::Ready(Ok(&buf[*pos..*cap]))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
+ }
+}
+
+impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufReader")
+ .field("reader", &self.inner)
+ .field(
+ "buffer",
+ &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
+ )
+ .finish()
+ }
+}
diff --git a/tokio-io/src/io/buf_writer.rs b/tokio-io/src/io/buf_writer.rs
new file mode 100644
index 00000000..c91cb6fd
--- /dev/null
+++ b/tokio-io/src/io/buf_writer.rs
@@ -0,0 +1,159 @@
+use super::DEFAULT_BUF_SIZE;
+use crate::AsyncWrite;
+use futures_core::ready;
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::fmt;
+use std::io::{self, Write};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Wraps a writer and buffers its output.
+///
+/// It can be excessively inefficient to work directly with something that
+/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
+/// writes it to an underlying writer in large, infrequent batches.
+///
+/// `BufWriter` can improve the speed of programs that make *small* and
+/// *repeated* write calls to the same file or network socket. It does not
+/// help when writing very large amounts at once, or writing just one or a few
+/// times. It also provides no advantage when writing to a destination that is
+/// in memory, like a `Vec<u8>`.
+///
+/// When the `BufWriter` is dropped, the contents of its buffer will be
+/// discarded. Creating multiple instances of a `BufWriter` on the same
+/// stream can cause data loss. If you need to write out the contents of its
+/// buffer, you must manually call flush before the writer is dropped.
+///
+/// [`AsyncWrite`]: tokio_io::AsyncWrite
+/// [`flush`]: super::AsyncWriteExt::flush
+///
+// TODO: Examples
+pub struct BufWriter<W> {
+ inner: W,
+ buf: Vec<u8>,
+ written: usize,
+}
+
+impl<W: AsyncWrite> BufWriter<W> {
+ unsafe_pinned!(inner: W);
+ unsafe_unpinned!(buf: Vec<u8>);
+
+ /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: W) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufWriter` with the specified buffer capacity.
+ pub fn with_capacity(cap: usize, inner: W) -> Self {
+ Self {
+ inner,
+ buf: Vec::with_capacity(cap),
+ written: 0,
+ }
+ }
+
+ fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let Self {
+ inner,
+ buf,
+ written,
+ } = unsafe { self.get_unchecked_mut() };
+ let mut inner = unsafe { Pin::new_unchecked(inner) };
+
+ let len = buf.len();
+ let mut ret = Ok(());
+ while *written < len {
+ match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
+ Ok(0) => {
+ ret = Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to write the buffered data",
+ ));
+ break;
+ }
+ Ok(n) => *written += n,
+ Err(e) => {
+ ret = Err(e);
+ break;
+ }
+ }
+ }
+ if *written > 0 {
+ buf.drain(..*written);
+ }
+ *written = 0;
+ Poll::Ready(ret)
+ }
+
+ /// Gets a reference to the underlying writer.
+ pub fn get_ref(&self) -> &W {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying writer.
+ ///
+ /// It is inadvisable to directly write to the underlying writer.
+ pub fn get_mut(&mut self) -> &mut W {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying writer.
+ ///
+ /// It is inadvisable to directly write to the underlying writer.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
+ self.inner()
+ }
+
+ /// Consumes this `BufWriter`, returning the underlying writer.
+ ///
+ /// Note that any leftover data in the internal buffer is lost.
+ pub fn into_inner(self) -> W {
+ self.inner
+ }
+
+ /// Returns a reference to the internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buf
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.buf.len() + buf.len() > self.buf.capacity() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ if buf.len() >= self.buf.capacity() {
+ self.inner().poll_write(cx, buf)
+ } else {
+ Poll::Ready(self.buf().write(buf))
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.inner().poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.inner().poll_shutdown(cx)
+ }
+}
+
+impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufWriter")
+ .field("writer", &self.inner)
+ .field(
+ "buffer",
+ &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
+ )
+ .field("written", &self.written)
+ .finish()
+ }
+}
diff --git a/tokio-io/src/io/mod.rs b/tokio-io/src/io/mod.rs
index 71904c1f..8e05bb08 100644
--- a/tokio-io/src/io/mod.rs
+++ b/tokio-io/src/io/mod.rs
@@ -39,6 +39,8 @@
mod async_buf_read_ext;
mod async_read_ext;
mod async_write_ext;
+mod buf_reader;
+mod buf_writer;
mod copy;
mod flush;
mod lines;
@@ -58,3 +60,11 @@ pub use self::async_buf_read_ext::AsyncBufReadExt;
pub use self::async_read_ext::AsyncReadExt;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_write_ext::AsyncWriteExt;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::buf_reader::BufReader;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::buf_writer::BufWriter;
+
+// used by `BufReader` and `BufWriter`
+// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
+const DEFAULT_BUF_SIZE: usize = 8 * 1024;
diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs
index df7ffdef..0747905f 100644
--- a/tokio-io/src/lib.rs
+++ b/tokio-io/src/lib.rs
@@ -27,7 +27,7 @@ pub use self::async_read::AsyncRead;
pub use self::async_write::AsyncWrite;
#[cfg(feature = "util")]
-pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
+pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
// Re-export `Buf` and `BufMut` since they are part of the API
pub use bytes::{Buf, BufMut};