diff options
author | Eliza Weisman <eliza@buoyant.io> | 2020-12-03 11:19:16 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-03 11:19:16 -0800 |
commit | 647299866a2262c8a1183adad73673e5803293ed (patch) | |
tree | c92df9ae491f0a444e694879858d032c3f6a5373 /tokio-util/src/lib.rs | |
parent | a6051a61ec5c96113f4b543de3ec55431695347a (diff) |
util: add writev-aware `poll_write_buf` (#3156)
## Motivation
In Tokio 0.2, `AsyncRead` and `AsyncWrite` had `poll_write_buf` and
`poll_read_buf` methods for reading and writing to implementers of
`bytes` `Buf` and `BufMut` traits. In 0.3, these were removed, but
`poll_read_buf` was added as a free function in `tokio-util`. However,
there is currently no `poll_write_buf`.
Now that `AsyncWrite` has regained support for vectored writes in #3149,
there's a lot of potential benefit in having a `poll_write_buf` that
uses vectored writes when supported and non-vectored writes when not
supported, so that users don't have to reimplement this.
## Solution
This PR adds a `poll_write_buf` function to `tokio_util::io`, analogous
to the existing `poll_read_buf` function.
This function writes from a `Buf` to an `AsyncWrite`, advancing the
`Buf`'s internal cursor. In addition, when the `AsyncWrite` supports
vectored writes (i.e. its `is_write_vectored` method returns `true`),
it will use vectored IO.
I copied the documentation for this functions from the docs from Tokio
0.2's `AsyncWrite::poll_write_buf` , with some minor modifications as
appropriate.
Finally, I fixed a minor issue in the existing docs for `poll_read_buf`
and `read_buf`, and updated `tokio_util::codec` to use `poll_write_buf`.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio-util/src/lib.rs')
-rw-r--r-- | tokio-util/src/lib.rs | 75 |
1 files changed, 70 insertions, 5 deletions
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index c4d80440..15bfc1a2 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -55,18 +55,18 @@ pub mod time; #[cfg(any(feature = "io", feature = "codec"))] mod util { - use tokio::io::{AsyncRead, ReadBuf}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use bytes::BufMut; + use bytes::{Buf, BufMut}; use futures_core::ready; - use std::io; + use std::io::{self, IoSlice}; use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; - /// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. + /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait. /// - /// [`Buf`]: bytes::Buf + /// [`BufMut`]: bytes::Buf /// /// # Example /// @@ -132,4 +132,69 @@ mod util { Poll::Ready(Ok(n)) } + + /// Try to write data from an implementer of the [`Buf`] trait to an + /// [`AsyncWrite`], advancing the buffer's internal cursor. + /// + /// This function will use [vectored writes] when the [`AsyncWrite`] supports + /// vectored writes. + /// + /// # Examples + /// + /// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements + /// [`Buf`]: + /// + /// ```no_run + /// use tokio_util::io::poll_write_buf; + /// use tokio::io; + /// use tokio::fs::File; + /// + /// use bytes::Buf; + /// use std::io::Cursor; + /// use std::pin::Pin; + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// let mut buf = Cursor::new(b"data to write"); + /// + /// // Loop until the entire contents of the buffer are written to + /// // the file. + /// while buf.has_remaining() { + /// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?; + /// } + /// + /// Ok(()) + /// } + /// ``` + /// + /// [`Buf`]: bytes::Buf + /// [`AsyncWrite`]: tokio::io::AsyncWrite + /// [`File`]: tokio::fs::File + /// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored + #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] + pub fn poll_write_buf<T: AsyncWrite, B: Buf>( + io: Pin<&mut T>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + const MAX_BUFS: usize = 64; + + if !buf.has_remaining() { + return Poll::Ready(Ok(0)); + } + + let n = if io.is_write_vectored() { + let mut slices = [IoSlice::new(&[]); MAX_BUFS]; + let cnt = buf.bytes_vectored(&mut slices); + ready!(io.poll_write_vectored(cx, &slices[..cnt]))? + } else { + ready!(io.poll_write(cx, buf.bytes()))? + }; + + buf.advance(n); + + Poll::Ready(Ok(n)) + } } |