summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:03 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:03 -0700
commit4186b0aa38abbec7670d53882d5cdfd4b12add5c (patch)
treeb067117fcb1a4c479cd274465bcac0431c2e59f7
parent760ae89401d9addb71ebf19674980577b5501edd (diff)
io: remove poll_{read,write}_buf from traits (#2882)
These functions have object safety issues. It also has been decided to avoid vectored operations on the I/O traits. A later PR will bring back vectored operations on specific types that support them. Refs: #2879, #2716
-rw-r--r--tokio-test/src/io.rs11
-rw-r--r--tokio-util/src/codec/framed_impl.rs4
-rw-r--r--tokio-util/src/io/reader_stream.rs4
-rw-r--r--tokio-util/src/io/stream_reader.rs25
-rw-r--r--tokio-util/src/lib.rs34
-rw-r--r--tokio/Cargo.toml3
-rw-r--r--tokio/docs/reactor-refactor.md276
-rw-r--r--tokio/src/io/async_read.rs31
-rw-r--r--tokio/src/io/async_write.rs22
-rw-r--r--tokio/src/io/split.rs19
-rw-r--r--tokio/src/io/util/async_read_ext.rs68
-rw-r--r--tokio/src/io/util/async_write_ext.rs76
-rw-r--r--tokio/src/io/util/buf_reader.rs9
-rw-r--r--tokio/src/io/util/mod.rs2
-rw-r--r--tokio/src/io/util/read_buf.rs38
-rw-r--r--tokio/src/io/util/write_buf.rs40
-rw-r--r--tokio/src/net/tcp/split.rs9
-rw-r--r--tokio/src/net/tcp/split_owned.rs9
-rw-r--r--tokio/src/net/tcp/stream.rs48
-rw-r--r--tokio/tests/io_async_read.rs105
20 files changed, 319 insertions, 514 deletions
diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs
index ac70a06f..4f0b5897 100644
--- a/tokio-test/src/io.rs
+++ b/tokio-test/src/io.rs
@@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Delay, Duration, Instant};
-use bytes::Buf;
use futures_core::ready;
use std::collections::VecDeque;
use std::future::Future;
@@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
}
}
- fn poll_write_buf<B: Buf>(
- self: Pin<&mut Self>,
- cx: &mut task::Context<'_>,
- buf: &mut B,
- ) -> Poll<io::Result<usize>> {
- let n = ready!(self.poll_write(cx, buf.bytes()))?;
- buf.advance(n);
- Poll::Ready(Ok(n))
- }
-
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs
index eb2e0d38..c161808f 100644
--- a/tokio-util/src/codec/framed_impl.rs
+++ b/tokio-util/src/codec/framed_impl.rs
@@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
@@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
- let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
+ let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs
index bde7ccee..ab0c22fb 100644
--- a/tokio-util/src/io/reader_stream.rs
+++ b/tokio-util/src/io/reader_stream.rs
@@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
@@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}
- match reader.poll_read_buf(cx, &mut this.buf) {
+ match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs
index 5c3ab019..def843b1 100644
--- a/tokio-util/src/io/stream_reader.rs
+++ b/tokio-util/src/io/stream_reader.rs
@@ -1,4 +1,4 @@
-use bytes::{Buf, BufMut};
+use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
@@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
- fn poll_read_buf<BM: BufMut>(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut BM,
- ) -> Poll<io::Result<usize>>
- where
- Self: Sized,
- {
- if !buf.has_remaining_mut() {
- return Poll::Ready(Ok(0));
- }
-
- let inner_buf = match self.as_mut().poll_fill_buf(cx) {
- Poll::Ready(Ok(buf)) => buf,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
- buf.put_slice(&inner_buf[..len]);
-
- self.consume(len);
- Poll::Ready(Ok(len))
- }
}
impl<S, B, E> AsyncBufRead for StreamReader<S, B>
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs
index b96d9044..eb35345e 100644
--- a/tokio-util/src/lib.rs
+++ b/tokio-util/src/lib.rs
@@ -52,3 +52,37 @@ pub mod context;
pub mod sync;
pub mod either;
+
+#[cfg(any(feature = "io", feature = "codec"))]
+mod util {
+ use tokio::io::{AsyncRead, ReadBuf};
+
+ use bytes::BufMut;
+ use futures_core::ready;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ pub(crate) fn poll_read_buf<T: AsyncRead>(
+ cx: &mut Context<'_>,
+ io: Pin<&mut T>,
+ buf: &mut impl BufMut,
+ ) -> Poll<io::Result<usize>> {
+ if !buf.has_remaining_mut() {
+ return Poll::Ready(Ok(0));
+ }
+
+ let orig = buf.bytes_mut().as_ptr() as *const u8;
+ let mut b = ReadBuf::uninit(buf.bytes_mut());
+
+ ready!(io.poll_read(cx, &mut b))?;
+ let n = b.filled().len();
+
+ // Safety: we can assume `n` bytes were read, since they are in`filled`.
+ assert_eq!(orig, b.filled().as_ptr());
+ unsafe {
+ buf.advance_mut(n);
+ }
+ Poll::Ready(Ok(n))
+ }
+}
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 9e7ebf12..0b201795 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -82,7 +82,7 @@ signal = [
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
-tcp = ["iovec", "lazy_static", "mio"]
+tcp = ["lazy_static", "mio"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
@@ -99,7 +99,6 @@ futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
-iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
diff --git a/tokio/docs/reactor-refactor.md b/tokio/docs/reactor-refactor.md
new file mode 100644
index 00000000..a0b54474
--- /dev/null
+++ b/tokio/docs/reactor-refactor.md
@@ -0,0 +1,276 @@
+# Refactor I/O driver
+
+Describes changes to the I/O driver for the Tokio 0.3 release.
+
+## Goals
+
+* Support `async fn` on I/O types with `&self`.
+* Refine the `Registration` API.
+
+### Non-goals
+
+* Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type.
+
+## Overview
+
+Currently, I/O types require `&mut self` for `async` functions. The reason for
+this is the task's waker is stored in the I/O resource's internal state
+(`ScheduledIo`) instead of in the future returned by the `async` function.
+Because of this limitation, I/O types limit the number of wakers to one per
+direction (a direction is either read-related events or write-related events).
+
+Moving the waker from the internal I/O resource's state to the operation's
+future enables multiple wakers to be registered per operation. The "intrusive
+wake list" strategy used by `Notify` applies to this case, though there are some
+concerns unique to the I/O driver.
+
+## Reworking the `Registration` type
+
+While `Registration` is made private (per #2728), it remains in Tokio as an
+implementation detail backing I/O resources such as `TcpStream`. The API of
+`Registration` is updated to support waiting for an arbitrary interest set with
+`&self`. This supports concurrent waiters with a different readiness interest.
+
+```rust
+struct Registration { ... }
+
+// TODO: naming
+struct ReadyEvent {
+ tick: u32,
+ ready: mio::Ready,
+}
+
+impl Registration {
+ /// `interest` must be a super set of **all** interest sets specified in
+ /// the other methods. This is the interest set passed to `mio`.
+ pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration>
+ where T: mio::Evented;
+
+ /// Awaits for any readiness event included in `interest`. Returns a
+ /// `ReadyEvent` representing the received readiness event.
+ async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>;
+
+ /// Clears resource level readiness represented by the specified `ReadyEvent`
+ async fn clear_readiness(&self, ready_event: ReadyEvent);
+```
+
+A new registration is created for a `T: mio::Evented` and a `interest`. This
+creates a `ScheduledIo` entry with the I/O driver and registers the resource
+with `mio`.
+
+Because Tokio uses **edge-triggered** notifications, the I/O driver only
+receives readiness from the OS once the ready state **changes**. The I/O driver
+must track each resource's known readiness state. This helps prevent syscalls
+when the process knows the syscall should return with `EWOULDBLOCK`.
+
+A call to `readiness()` checks if the currently known resource readiness
+overlaps with `interest`. If it does, then the `readiness()` immediately
+returns. If it does not, then the task waits until the I/O driver receives a
+readiness event.
+
+The pseudocode to perform a TCP read is as follows.
+
+```rust
+async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ loop {
+ // Await readiness
+ let event = self.readiness(interest).await?;
+
+ match self.mio_socket.read(buf) {
+ Ok(v) => return Ok(v),
+ Err(ref e) if e.kind() == WouldBlock => {
+ self.clear_readiness(event);
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+```
+
+## Reworking the `ScheduledIo` type
+
+The `ScheduledIo` type is switched to use an intrusive waker linked list. Each
+entry in the linked list includes the `interest` set passed to `readiness()`.
+
+```rust
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ /// Resource's known state packed with other state that must be
+ /// atomically updated.
+ readiness: AtomicUsize,
+
+ /// Tracks tasks waiting on the resource
+ waiters: Mutex<Waiters>,
+}
+
+#[derive(Debug)]
+struct Waiters {
+ // List of intrusive waiters.
+ list: LinkedList<Waiter>,
+
+ /// Waiter used by `AsyncRead` implementations.
+ reader: Option<Waker>,
+
+ /// Waiter used by `AsyncWrite` implementations.
+ writer: Option<Waker>,
+}
+
+// This struct is contained by the **future** returned by `readiness()`.
+#[derive(Debug)]
+struct Waiter {
+ /// Intrusive linked-list pointers
+ pointers: linked_list::Pointers<Waiter>,
+
+ /// Waker for task waiting on I/O resource
+ waiter: Option<Waker>,
+
+ /// Readiness events being waited on. This is
+ /// the value passed to `readiness()`
+ interest: mio::Ready,
+
+ /// Should not be `Unpin`.
+ _p: PhantomPinned,
+}
+```
+
+When an I/O event is received from `mio`, the associated resources' readiness is
+updated and the waiter list is iterated. All waiters with `interest` that
+overlap the received readiness event are notified. Any waiter with an `interest`
+that does not overlap the readiness event remains in the list.
+
+## Cancel interest on drop
+
+The future returned by `readiness()` uses an intrusive linked list to store the
+waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many
+wakers may be stored simultaneously in the list. If the `readiness()` future is
+dropped early, it is essential that the waker is removed from the list. This
+prevents leaking memory.
+
+## Race condition
+
+Consider how many tasks may concurrently attempt I/O operations. This, combined
+with how Tokio uses edge-triggered events, can result in a race condition. Let's
+revisit the TCP read function:
+
+```rust
+async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ loop {
+ // Await readiness
+ let event = self.readiness(interest).await?;
+
+ match self.mio_socket.read(buf) {
+ Ok(v) => return Ok(v),
+ Err(ref e) if e.kind() == WouldBlock => {
+ self.clear_readiness(event);
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+```
+
+If care is not taken, if between `mio_socket.read(buf)` returning and
+`clear_readiness(event)` is called, a readiness event arrives, the `read()`
+function could deadlock. This happens because the readiness event is received,
+`clear_readiness()` unsets the readiness event, and on the next iteration,
+`readiness().await` will block forever as a new readiness event is not received.
+
+The current I/O driver handles this condition by always registering the task's
+waker before performing the operation. This is not ideal as it will result in
+unnecessary task notification.
+
+Instead, we will use a strategy to prevent clearing readiness if an "unseen"
+readiness event has been received. The I/O driver will maintain a "tick" value.
+Every time the `mio` `poll()` function is called, the tick is incremented. Each
+readiness event has an associated tick. When the I/O driver sets the resource's
+readiness, the driver's tick is packed into the atomic `usize`.
+
+The `ScheduledIo` readiness `AtomicUsize` is structured as:
+
+```
+| reserved | generation | driver tick | readinesss |
+|----------+------------+--------------+------------|
+| 1 bit | 7 bits + 8 bits + 16 bits |
+```
+
+The `reserved` and `generation` components exist today.
+
+The `readiness()` function returns a `ReadyEvent` value. This value includes the
+`tick` component read with the resource's readiness value. When
+`clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only
+cleared if the current `tick` matches the `tick` included in the `ReadyEvent`.
+If the tick values do not match, the call to `readiness()` on the next iteration
+will not block and the new `tick` is included in the new `ReadyToken.`
+
+TODO
+
+## Implementing `AsyncRead` / `AsyncWrite`
+
+The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that
+it is not possible to use an intrusive linked list to track the waker.
+Additionally, there is no future associated with the operation which means it is
+not possible to cancel interest in the readiness events.
+
+To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated
+waker values for the read direction and the write direction. These values are
+used to store the waker. Specific `interest` is not tracked for `AsyncRead` and
+`AsyncWrite` implementations. It is assumed that only events of interest are:
+
+* Read ready
+* Read closed
+* Write ready
+* Write closed
+
+Note that "read closed" and "write closed" are only available with Mio 0.7. With
+Mio 0.6, things were a bit messy.
+
+It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types
+themselves and not for `&Resource`. Implementing the traits for `&Resource`
+would permit concurrent operations to the resource. Because only a single waker
+is stored per direction, any concurrent usage would result in deadlocks. An
+alterate implementation would call for a `Vec<Waker>` but this would result in
+memory leaks.
+
+## Enabling reads and writes for `&TcpStream`
+
+Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new
+function is added to `TcpStream`.
+
+```rust
+impl TcpStream {
+ /// Naming TBD
+ fn by_ref(&self) -> TcpStreamRef<'_>;
+}
+
+struct TcpStreamRef<'a> {
+ stream: &'a TcpStream,
+
+ // `Waiter` is the node in the intrusive waiter linked-list
+ read_waiter: Waiter,
+ write_waiter: Waiter,
+}
+```
+
+Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When
+the `TcpStreamRef` is dropped, all associated waker resources are cleaned up.
+
+### Removing all the `split()` functions
+
+With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead,
+it is possible to do something as follows.
+
+```rust
+let rd = my_stream.by_ref();
+let wr = my_stream.by_ref();
+
+select! {
+ // use `rd` and `wr` in separate branches.
+}
+```
+
+It is also possible to sotre a `TcpStream` in an `Arc`.
+
+```rust
+let arc_stream = Arc::new(my_tcp_stream);
+let n = arc_stream.by_ref().read(buf).await?;
+``` \ No newline at end of file
diff --git a/tokio/src/io/async_read.rs b/tokio/src/io/async_read.rs
index d341b63d..ba2303d1 100644
--- a/tokio/src/io/async_read.rs
+++ b/tokio/src/io/async_read.rs
@@ -1,5 +1,4 @@
use super::ReadBuf;
-use bytes::BufMut;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
@@ -54,36 +53,6 @@ pub trait AsyncRead {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
-
- /// Pulls some bytes from this source into the specified `BufMut`, returning
- /// how many bytes were read.
- ///
- /// The `buf` provided will have bytes read into it and the internal cursor
- /// will be advanced if any bytes were read. Note that this method typically
- /// will not reallocate the buffer provided.
- fn poll_read_buf<B: BufMut>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<io::Result<usize>>
- where
- Self: Sized,
- {
- if !buf.has_remaining_mut() {
- return Poll::Ready(Ok(0));
- }
-
- let mut b = ReadBuf::uninit(buf.bytes_mut());
-
- ready!(self.poll_read(cx, &mut b))?;
- let n = b.filled().len();
-
- // Safety: we can assume `n` bytes were read, since they are in`filled`.
- unsafe {
- buf.advance_mut(n);
- }
- Poll::Ready(Ok(n))
- }
}
macro_rules! deref_async_read {
diff --git a/tokio/src/io/async_write.rs b/tokio/src/io/async_write.rs
index ecf7575b..66ba4bf3 100644
--- a/tokio/src/io/async_write.rs
+++ b/tokio/src/io/async_write.rs
@@ -1,4 +1,3 @@
-use bytes::Buf;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
@@ -128,27 +127,6 @@ pub trait AsyncWrite {
/// This function will panic if not called within the context of a future's
/// task.
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
-
- /// Writes a `Buf` into this value, returning how many bytes were written.
- ///
- /// Note that this method will advance the `buf` provided automatically by
- /// the number of bytes written.
- fn poll_write_buf<B: Buf>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<Result<usize, io::Error>>
- where
- Self: Sized,
- {
- if !buf.has_remaining() {
- return Poll::Ready(Ok(0));
- }
-
- let n = ready!(self.poll_write(cx, buf.bytes()))?;
- buf.advance(n);
- Poll::Ready(Ok(n))
- }
}
macro_rules! deref_async_write {
diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs
index dcd3da20..fd3273ee 100644
--- a/tokio/src/io/split.rs
+++ b/tokio/src/io/split.rs
@@ -6,7 +6,6 @@
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
-use bytes::{Buf, BufMut};
use std::cell::UnsafeCell;
use std::fmt;
use std::io;
@@ -107,15 +106,6 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_read(cx, buf)
}
-
- fn poll_read_buf<B: BufMut>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<io::Result<usize>> {
- let mut inner = ready!(self.inner.poll_lock(cx));
- inner.stream_pin().poll_read_buf(cx, buf)
- }
}
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
@@ -137,15 +127,6 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_shutdown(cx)
}
-
- fn poll_write_buf<B: Buf>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<Result<usize, io::Error>> {
- let mut inner = ready!(self.inner.poll_lock(cx));
- inner.stream_pin().poll_write_buf(cx, buf)
- }
}
impl<T> Inner<T> {
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs
index 0ab66c28..d631bd7e 100644
--- a/tokio/src/io/util/async_read_ext.rs
+++ b/tokio/src/io/util/async_read_ext.rs
@@ -1,6 +1,5 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
-use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
@@ -13,8 +12,6 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;
-use bytes::BufMut;
-
cfg_io_util! {
/// Defines numeric reader
macro_rules! read_impl {
@@ -166,71 +163,6 @@ cfg_io_util! {
read(self, buf)
}
- /// Pulls some bytes from this source into the specified buffer,
- /// advancing the buffer's internal cursor.
- ///
- /// Equivalent to:
- ///
- /// ```ignore
- /// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
- /// ```
- ///
- /// Usually, only a single `read` syscall is issued, even if there is
- /// more space in the supplied buffer.
- ///
- /// This function does not provide any guarantees about whether it
- /// completes immediately or asynchronously
- ///
- /// # Return
- ///
- /// On a successful read, the number of read bytes is returned. If the
- /// supplied buffer is not empty and the function returns `Ok(0)` then
- /// the source as reached an "end-of-file" event.
- ///
- /// # Errors
- ///
- /// If this function encounters any form of I/O or other error, an error
- /// variant will be returned. If an error is returned then it must be
- /// guaranteed that no bytes were read.
- ///
- /// # Examples
- ///
- /// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
- ///
- /// [`File`]: crate::fs::File
- /// [`BytesMut`]: bytes::BytesMut
- /// [`BufMut`]: bytes::BufMut
- ///
- /// ```no_run
- /// use tokio::fs::File;
- /// use tokio::io::{self, AsyncReadExt};
- ///
- /// use bytes::BytesMut;
- ///
- /// #[tokio::main]
- /// async fn main() -> io::Result<()> {
- /// let mut f = File::open("foo.txt").await?;
- /// let mut buffer = BytesMut::with_capacity(10);
- ///
- /// assert!(buffer.is_empty());
- ///
- /// // read up to 10 bytes, note that the return value is not needed
- /// // to access the data that was read as `buffer`'s internal
- /// // cursor is updated.
- /// f.read_buf(&mut buffer).await?;
- ///
- /// println!("The bytes: {:?}", &buffer[..]);
- /// Ok(())
- /// }
- /// ```
- fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
- where
- Self: Sized + Unpin,
- B: BufMut,
- {
- read_buf(self, buf)
- }
-
/// Reads the exact number of bytes required to fill `buf`.
///
/// Equivalent to:
diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs
index 321301e2..5c6187b7 100644
--- a/tokio/src/io/util/async_write_ext.rs
+++ b/tokio/src/io/util/async_write_ext.rs
@@ -2,7 +2,6 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
-use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
WriteI8,
@@ -13,8 +12,6 @@ use crate::io::util::write_int::{
};
use crate::io::AsyncWrite;
-use bytes::Buf;
-
cfg_io_util! {
/// Defines numeric writer
macro_rules! write_impl {
@@ -119,79 +116,6 @@ cfg_io_util! {
write(self, src)
}
- /// Writes a buffer into this writer, advancing the buffer's internal
- /// cursor.
- ///
- /// Equivalent to:
- ///
- /// ```ignore
- /// async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<usize>;
- /// ```
- ///
- /// This function will attempt to write the entire contents of `buf`, but
- /// the entire write may not succeed, or the write may also generate an
- /// error. After the operation completes, the buffer's
- /// internal cursor is advanced by the number of bytes written. A
- /// subsequent call to `write_buf` using the **same** `buf` value will
- /// resume from the point that the first call to `write_buf` completed.
- /// A call to `write` represents *at most one* attempt to write to any
- /// wrapped object.
- ///
- /// # Return
- ///
- /// If the return value is `Ok(n)` then it must be guaranteed that `n <=
- /// buf.len()`. A return value of `0` typically means that the
- /// underlying object is no longer able to accept bytes and will likely
- /// not be able to in the future as well, or that the buffer provided is
- /// empty.
- ///
- /// # Errors
- ///
- /// Each call to `write` may generate an I/O error indicating that the
- /// operation could not be completed. If an error is returned then no bytes
- /// in the buffer were written to this writer.
- ///
- /// It is **not** considered an error if the entire buffer could not be
- /// written to this writer.
- ///
- /// # Examples
- ///
- /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
- ///
- /// [`File`]: crate::fs::File
- /// [`Buf`]: bytes::Buf
- ///
- /// ```no_run
- /// use tokio::io::{self, AsyncWriteExt};
- /// use tokio::fs::File;
- ///
- /// use bytes::Buf;
- /// use std::io::Cursor;
- ///
- /// #[tokio::main]
- /// async fn main() -> io::Result<()> {
- /// let mut file = File::create("foo.txt").await?;
- /// let mut buffer = Cursor::new(b"data to write");
- ///
- /// // Loop until the entire contents of the buffer are written to
- /// // the file.
- /// while buffer.has_remaining() {
- /// // Writes some prefix of the byte string, not necessarily
- /// // all of it.
- /// file.write_buf(&mut buffer).await?;
- /// }
- ///
- /// Ok(())
- /// }
- /// ```
- fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
- where
- Self: Sized + Unpin,
- B: Buf,
- {
- write_buf(self, src)
- }
-
/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
diff --git a/tokio/src/io/util/buf_reader.rs b/tokio/src/io/util/buf_reader.rs
index 3ab78f0e..9264ca59 100644
--- a/tokio/src/io/util/buf_reader.rs
+++ b/tokio/src/io/util/buf_reader.rs
@@ -1,7 +1,6 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
-use bytes::Buf;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
@@ -151,14 +150,6 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
self.get_pin_mut().poll_write(cx, buf)
}
- fn poll_write_buf<B: Buf>(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut B,
- ) -> Poll<io::Result<usize>> {
- self.get_pin_mut().poll_write_buf(cx, buf)
- }
-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}
diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs
index 1bd0a3f8..52dab990 100644
--- a/tokio/src/io/util/mod.rs
+++ b/tokio/src/io/util/mod.rs
@@ -39,7 +39,6 @@ cfg_io_util! {
pub use mem::{duplex, DuplexStream};
mod read;
- mod read_buf;
mod read_exact;
mod read_int;
mod read_line;
@@ -68,7 +67,6 @@ cfg_io_util! {
mod write;
mod write_all;
- mod write_buf;
mod write_int;
diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs
deleted file mode 100644
index 6ee3d249..00000000
--- a/tokio/src/io/util/read_buf.rs