From 987ba7373cf95c570bf23768c6021f7a7508286e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 26 Oct 2019 08:02:49 -0700 Subject: io: move into `tokio` crate (#1691) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `io` implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- Cargo.toml | 1 - azure-pipelines.yml | 3 +- ci/patch.toml | 1 - tokio-io/CHANGELOG.md | 97 -------------- tokio-io/Cargo.toml | 39 ------ tokio-io/LICENSE | 25 ---- tokio-io/README.md | 13 -- tokio-io/src/async_buf_read.rs | 108 ---------------- tokio-io/src/async_read.rs | 185 --------------------------- tokio-io/src/async_write.rs | 211 ------------------------------- tokio-io/src/io/async_buf_read_ext.rs | 106 ---------------- tokio-io/src/io/async_read_ext.rs | 92 -------------- tokio-io/src/io/async_write_ext.rs | 42 ------ tokio-io/src/io/buf_reader.rs | 194 ---------------------------- tokio-io/src/io/buf_stream.rs | 143 --------------------- tokio-io/src/io/buf_writer.rs | 196 ---------------------------- tokio-io/src/io/chain.rs | 145 --------------------- tokio-io/src/io/copy.rs | 133 ------------------- tokio-io/src/io/empty.rs | 79 ------------ tokio-io/src/io/flush.rs | 44 ------- tokio-io/src/io/lines.rs | 70 ---------- tokio-io/src/io/mod.rs | 51 -------- tokio-io/src/io/read.rs | 52 -------- tokio-io/src/io/read_exact.rs | 74 ----------- tokio-io/src/io/read_line.rs | 80 ------------ tokio-io/src/io/read_to_end.rs | 108 ---------------- tokio-io/src/io/read_to_string.rs | 81 ------------ tokio-io/src/io/read_until.rs | 84 ------------ tokio-io/src/io/repeat.rs | 67 ---------- tokio-io/src/io/shutdown.rs | 44 ------- tokio-io/src/io/sink.rs | 77 ----------- tokio-io/src/io/split.rs | 67 ---------- tokio-io/src/io/take.rs | 130 ------------------- tokio-io/src/io/write.rs | 45 ------- tokio-io/src/io/write_all.rs | 55 -------- tokio-io/src/lib.rs | 47 ------- tokio-io/src/split.rs | 178 -------------------------- tokio-io/tests/async_read.rs | 166 ------------------------ tokio-io/tests/chain.rs | 15 --- tokio-io/tests/copy.rs | 36 ------ tokio-io/tests/lines.rs | 19 --- tokio-io/tests/read.rs | 37 ------ tokio-io/tests/read_exact.rs | 14 -- tokio-io/tests/read_line.rs | 27 ---- tokio-io/tests/read_to_end.rs | 14 -- tokio-io/tests/read_to_string.rs | 14 -- tokio-io/tests/read_until.rs | 22 ---- tokio-io/tests/split.rs | 66 ---------- tokio-io/tests/take.rs | 15 --- tokio-io/tests/write.rs | 46 ------- tokio-io/tests/write_all.rs | 50 -------- tokio-test/Cargo.toml | 2 +- tokio-test/src/io.rs | 5 +- tokio-tls/Cargo.toml | 2 +- tokio-tls/src/lib.rs | 3 +- tokio-util/Cargo.toml | 1 - tokio-util/src/codec/decoder.rs | 2 +- tokio-util/src/codec/framed.rs | 2 +- tokio-util/src/codec/framed_read.rs | 2 +- tokio-util/src/codec/framed_write.rs | 2 +- tokio-util/src/codec/length_delimited.rs | 42 +++--- tokio-util/tests/framed_write.rs | 2 +- tokio/Cargo.toml | 7 +- tokio/src/fs/blocking.rs | 3 +- tokio/src/fs/file.rs | 3 +- tokio/src/io/async_buf_read.rs | 109 ++++++++++++++++ tokio/src/io/async_read.rs | 183 +++++++++++++++++++++++++++ tokio/src/io/async_write.rs | 211 +++++++++++++++++++++++++++++++ tokio/src/io/io/async_buf_read_ext.rs | 106 ++++++++++++++++ tokio/src/io/io/async_read_ext.rs | 92 ++++++++++++++ tokio/src/io/io/async_write_ext.rs | 42 ++++++ tokio/src/io/io/buf_reader.rs | 195 ++++++++++++++++++++++++++++ tokio/src/io/io/buf_stream.rs | 144 +++++++++++++++++++++ tokio/src/io/io/buf_writer.rs | 197 +++++++++++++++++++++++++++++ tokio/src/io/io/chain.rs | 146 +++++++++++++++++++++ tokio/src/io/io/copy.rs | 134 ++++++++++++++++++++ tokio/src/io/io/empty.rs | 80 ++++++++++++ tokio/src/io/io/flush.rs | 45 +++++++ tokio/src/io/io/lines.rs | 70 ++++++++++ tokio/src/io/io/mod.rs | 51 ++++++++ tokio/src/io/io/read.rs | 53 ++++++++ tokio/src/io/io/read_exact.rs | 75 +++++++++++ tokio/src/io/io/read_line.rs | 81 ++++++++++++ tokio/src/io/io/read_to_end.rs | 109 ++++++++++++++++ tokio/src/io/io/read_to_string.rs | 82 ++++++++++++ tokio/src/io/io/read_until.rs | 85 +++++++++++++ tokio/src/io/io/repeat.rs | 67 ++++++++++ tokio/src/io/io/shutdown.rs | 45 +++++++ tokio/src/io/io/sink.rs | 78 ++++++++++++ tokio/src/io/io/split.rs | 67 ++++++++++ tokio/src/io/io/take.rs | 131 +++++++++++++++++++ tokio/src/io/io/write.rs | 46 +++++++ tokio/src/io/io/write_all.rs | 56 ++++++++ tokio/src/io/mod.rs | 30 ++++- tokio/src/io/split.rs | 178 ++++++++++++++++++++++++++ tokio/src/io/stderr.rs | 3 +- tokio/src/io/stdin.rs | 3 +- tokio/src/io/stdout.rs | 3 +- tokio/src/lib.rs | 4 + tokio/src/net/tcp/split.rs | 5 +- tokio/src/net/tcp/stream.rs | 3 +- tokio/src/net/unix/split.rs | 5 +- tokio/src/net/unix/stream.rs | 3 +- tokio/src/net/util/poll_evented.rs | 4 +- tokio/src/prelude.rs | 4 +- tokio/src/process/mod.rs | 30 ++--- tokio/src/signal/unix.rs | 4 +- tokio/tests/fs_file_mocked.rs | 1 + tokio/tests/io_async_read.rs | 166 ++++++++++++++++++++++++ tokio/tests/io_chain.rs | 15 +++ tokio/tests/io_copy.rs | 36 ++++++ tokio/tests/io_lines.rs | 20 +++ tokio/tests/io_read.rs | 37 ++++++ tokio/tests/io_read_exact.rs | 14 ++ tokio/tests/io_read_line.rs | 28 ++++ tokio/tests/io_read_to_end.rs | 14 ++ tokio/tests/io_read_to_string.rs | 14 ++ tokio/tests/io_read_until.rs | 22 ++++ tokio/tests/io_split.rs | 66 ++++++++++ tokio/tests/io_take.rs | 15 +++ tokio/tests/io_write.rs | 46 +++++++ tokio/tests/io_write_all.rs | 50 ++++++++ 122 files changed, 3600 insertions(+), 3789 deletions(-) delete mode 100644 tokio-io/CHANGELOG.md delete mode 100644 tokio-io/Cargo.toml delete mode 100644 tokio-io/LICENSE delete mode 100644 tokio-io/README.md delete mode 100644 tokio-io/src/async_buf_read.rs delete mode 100644 tokio-io/src/async_read.rs delete mode 100644 tokio-io/src/async_write.rs delete mode 100644 tokio-io/src/io/async_buf_read_ext.rs delete mode 100644 tokio-io/src/io/async_read_ext.rs delete mode 100644 tokio-io/src/io/async_write_ext.rs delete mode 100644 tokio-io/src/io/buf_reader.rs delete mode 100644 tokio-io/src/io/buf_stream.rs delete mode 100644 tokio-io/src/io/buf_writer.rs delete mode 100644 tokio-io/src/io/chain.rs delete mode 100644 tokio-io/src/io/copy.rs delete mode 100644 tokio-io/src/io/empty.rs delete mode 100644 tokio-io/src/io/flush.rs delete mode 100644 tokio-io/src/io/lines.rs delete mode 100644 tokio-io/src/io/mod.rs delete mode 100644 tokio-io/src/io/read.rs delete mode 100644 tokio-io/src/io/read_exact.rs delete mode 100644 tokio-io/src/io/read_line.rs delete mode 100644 tokio-io/src/io/read_to_end.rs delete mode 100644 tokio-io/src/io/read_to_string.rs delete mode 100644 tokio-io/src/io/read_until.rs delete mode 100644 tokio-io/src/io/repeat.rs delete mode 100644 tokio-io/src/io/shutdown.rs delete mode 100644 tokio-io/src/io/sink.rs delete mode 100644 tokio-io/src/io/split.rs delete mode 100644 tokio-io/src/io/take.rs delete mode 100644 tokio-io/src/io/write.rs delete mode 100644 tokio-io/src/io/write_all.rs delete mode 100644 tokio-io/src/lib.rs delete mode 100644 tokio-io/src/split.rs delete mode 100644 tokio-io/tests/async_read.rs delete mode 100644 tokio-io/tests/chain.rs delete mode 100644 tokio-io/tests/copy.rs delete mode 100644 tokio-io/tests/lines.rs delete mode 100644 tokio-io/tests/read.rs delete mode 100644 tokio-io/tests/read_exact.rs delete mode 100644 tokio-io/tests/read_line.rs delete mode 100644 tokio-io/tests/read_to_end.rs delete mode 100644 tokio-io/tests/read_to_string.rs delete mode 100644 tokio-io/tests/read_until.rs delete mode 100644 tokio-io/tests/split.rs delete mode 100644 tokio-io/tests/take.rs delete mode 100644 tokio-io/tests/write.rs delete mode 100644 tokio-io/tests/write_all.rs create mode 100644 tokio/src/io/async_buf_read.rs create mode 100644 tokio/src/io/async_read.rs create mode 100644 tokio/src/io/async_write.rs create mode 100644 tokio/src/io/io/async_buf_read_ext.rs create mode 100644 tokio/src/io/io/async_read_ext.rs create mode 100644 tokio/src/io/io/async_write_ext.rs create mode 100644 tokio/src/io/io/buf_reader.rs create mode 100644 tokio/src/io/io/buf_stream.rs create mode 100644 tokio/src/io/io/buf_writer.rs create mode 100644 tokio/src/io/io/chain.rs create mode 100644 tokio/src/io/io/copy.rs create mode 100644 tokio/src/io/io/empty.rs create mode 100644 tokio/src/io/io/flush.rs create mode 100644 tokio/src/io/io/lines.rs create mode 100644 tokio/src/io/io/mod.rs create mode 100644 tokio/src/io/io/read.rs create mode 100644 tokio/src/io/io/read_exact.rs create mode 100644 tokio/src/io/io/read_line.rs create mode 100644 tokio/src/io/io/read_to_end.rs create mode 100644 tokio/src/io/io/read_to_string.rs create mode 100644 tokio/src/io/io/read_until.rs create mode 100644 tokio/src/io/io/repeat.rs create mode 100644 tokio/src/io/io/shutdown.rs create mode 100644 tokio/src/io/io/sink.rs create mode 100644 tokio/src/io/io/split.rs create mode 100644 tokio/src/io/io/take.rs create mode 100644 tokio/src/io/io/write.rs create mode 100644 tokio/src/io/io/write_all.rs create mode 100644 tokio/src/io/split.rs create mode 100644 tokio/tests/io_async_read.rs create mode 100644 tokio/tests/io_chain.rs create mode 100644 tokio/tests/io_copy.rs create mode 100644 tokio/tests/io_lines.rs create mode 100644 tokio/tests/io_read.rs create mode 100644 tokio/tests/io_read_exact.rs create mode 100644 tokio/tests/io_read_line.rs create mode 100644 tokio/tests/io_read_to_end.rs create mode 100644 tokio/tests/io_read_to_string.rs create mode 100644 tokio/tests/io_read_until.rs create mode 100644 tokio/tests/io_split.rs create mode 100644 tokio/tests/io_take.rs create mode 100644 tokio/tests/io_write.rs create mode 100644 tokio/tests/io_write_all.rs diff --git a/Cargo.toml b/Cargo.toml index 38a26aeb..d6703e16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "tokio", "tokio-executor", - "tokio-io", "tokio-macros", "tokio-sync", "tokio-test", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 1dad7b1d..f9b0d505 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -28,6 +28,7 @@ jobs: tokio: - fs - io + - io-util - net-driver - process - rt-full @@ -49,8 +50,6 @@ jobs: tokio-executor: - current-thread - thread-pool - tokio-io: - - util tokio-sync: - async-traits tokio-macros: [] diff --git a/ci/patch.toml b/ci/patch.toml index 02290de0..6718f231 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -3,7 +3,6 @@ [patch.crates-io] tokio = { path = "tokio" } tokio-executor = { path = "tokio-executor" } -tokio-io = { path = "tokio-io" } tokio-macros = { path = "tokio-macros" } tokio-sync = { path = "tokio-sync" } tokio-tls = { path = "tokio-tls" } diff --git a/tokio-io/CHANGELOG.md b/tokio-io/CHANGELOG.md deleted file mode 100644 index 78c6e7a4..00000000 --- a/tokio-io/CHANGELOG.md +++ /dev/null @@ -1,97 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Added -- bring back generic `split` utility (#1521). -- enable buffering both reads and writes (#1558). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 28, 2019) - -### Added -- `AsyncReadExt::chain` and `AsyncReadExt::take` (#1484). - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -### Added -- Implement `AsyncWrite` for `Vec` (#1409). -- Add `BufReader`, `BufWriter` (#1438). - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.12 (March 1, 2019) - -### Added -- Add `unsplit` to join previously split `AsyncRead + AsyncWrite` (#807). - -# 0.1.11 (January 6, 2019) - -* Fix minor error in Decoder::decode API documentation (#797). - -# 0.1.10 (October 23, 2018) - -* Expose inner codec from `Framed` (#686). -* Implement AsyncRead::prepare_uninitialized_buffer for Take and Chain (#678). - -# 0.1.9 (September 27, 2018) - -* Fix bug in `AsyncRead::split()` (#655). -* Fix non-terminating loop in `length_delimited::FramedWrite` (#576). - -# 0.1.8 (August 23, 2018) - -* Documentation improvements - -# 0.1.7 (June 13, 2018) - -* Move `codec::{Encode, Decode, Framed*}` into `tokio-codec` (#353) - -# 0.1.6 (March 09, 2018) - -* Add native endian builder fn to length_delimited (#144) -* Add AsyncRead::poll_read, AsyncWrite::poll_write (#170) - -# 0.1.5 (February 07, 2018) - -* Fix bug in `BytesCodec` and `LinesCodec`. -* Performance improvement to `split`. - -# 0.1.4 (November 10, 2017) - -* Use `FrameTooBig` as length delimited error type (#70). -* Provide `Bytes` and `Lines` codecs (#78). -* Provide `AllowStdIo` wrapper (#76). - -# 0.1.3 (August 14, 2017) - -* Fix bug involving zero sized writes in copy helper (#57). -* Add get / set accessors for length delimited max frame length setting. (#65). -* Add `Framed::into_parts_and_codec` (#59). - -# 0.1.2 (May 23, 2017) - -* Add `from_parts` and `into_parts` to the framing combinators. -* Support passing an initialized buffer to the framing combinators. -* Add `length_adjustment` support to length delimited encoding (#48). - -# 0.1.1 (March 22, 2017) - -* Add some omitted `Self: Sized` bounds. -* Add missing "inner" fns. - -# 0.1.0 (March 15, 2017) - -* Initial release diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml deleted file mode 100644 index bab90731..00000000 --- a/tokio-io/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "tokio-io" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.1.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors "] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-io/0.2.0-alpha.6/tokio_io" -description = """ -Core I/O primitives for asynchronous I/O in Rust. -""" -categories = ["asynchronous"] - -[features] -util = ["memchr", "pin-project"] - -[dependencies] -bytes = "0.4.7" -log = "0.4" -futures-core-preview = "=0.3.0-alpha.19" -memchr = { version = "2.2", optional = true } -pin-project = { version = "0.4", optional = true } - -[dev-dependencies] -tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } -tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } - -futures-util-preview = "=0.3.0-alpha.19" - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-io/LICENSE b/tokio-io/LICENSE deleted file mode 100644 index cdb28b4b..00000000 --- a/tokio-io/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-io/README.md b/tokio-io/README.md deleted file mode 100644 index 9aad7a32..00000000 --- a/tokio-io/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-io - -Core I/O abstractions for the Tokio stack. - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-io/src/async_buf_read.rs b/tokio-io/src/async_buf_read.rs deleted file mode 100644 index dfac44f3..00000000 --- a/tokio-io/src/async_buf_read.rs +++ /dev/null @@ -1,108 +0,0 @@ -use crate::AsyncRead; -use std::io; -use std::ops::DerefMut; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Read bytes asynchronously. -/// -/// This trait inherits from `std::io::BufRead` and indicates that an I/O object is -/// **non-blocking**. All non-blocking I/O objects must return an error when -/// bytes are unavailable instead of blocking the current thread. -pub trait AsyncBufRead: AsyncRead { - /// Attempt to return the contents of the internal buffer, filling it with more data - /// from the inner reader if it is empty. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If no data is available for reading, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// readable or is closed. - /// - /// This function is a lower-level call. It needs to be paired with the - /// [`consume`] method to function properly. When calling this - /// method, none of the contents will be "read" in the sense that later - /// calling [`poll_read`] may return the same contents. As such, [`consume`] must - /// be called with the number of bytes that are consumed from this buffer to - /// ensure that the bytes are never returned twice. - /// - /// An empty buffer returned indicates that the stream has reached EOF. - /// - /// [`poll_read`]: AsyncRead::poll_read - /// [`consume`]: AsyncBufRead::consume - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Tells this buffer that `amt` bytes have been consumed from the buffer, - /// so they should no longer be returned in calls to [`poll_read`]. - /// - /// This function is a lower-level call. It needs to be paired with the - /// [`poll_fill_buf`] method to function properly. This function does - /// not perform any I/O, it simply informs this object that some amount of - /// its buffer, returned from [`poll_fill_buf`], has been consumed and should - /// no longer be returned. As such, this function may do odd things if - /// [`poll_fill_buf`] isn't called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by - /// [`poll_fill_buf`]. - /// - /// [`poll_read`]: AsyncRead::poll_read - /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf - fn consume(self: Pin<&mut Self>, amt: usize); -} - -macro_rules! deref_async_buf_read { - () => { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { - Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut **self).consume(amt) - } - } -} - -impl AsyncBufRead for Box { - deref_async_buf_read!(); -} - -impl AsyncBufRead for &mut T { - deref_async_buf_read!(); -} - -impl

AsyncBufRead for Pin

-where - P: DerefMut + Unpin, - P::Target: AsyncBufRead, -{ - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_mut().as_mut().consume(amt) - } -} - -impl AsyncBufRead for &[u8] { - fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(*self)) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - *self = &self[amt..]; - } -} - -impl + Unpin> AsyncBufRead for io::Cursor { - fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(io::BufRead::fill_buf(self.get_mut())) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - io::BufRead::consume(self.get_mut(), amt) - } -} diff --git a/tokio-io/src/async_read.rs b/tokio-io/src/async_read.rs deleted file mode 100644 index 97ca2b93..00000000 --- a/tokio-io/src/async_read.rs +++ /dev/null @@ -1,185 +0,0 @@ -//use crate::split::{ReadHalf, WriteHalf}; -//use crate::{framed, split, AsyncWrite}; -use bytes::BufMut; -use futures_core::ready; -use std::io; -use std::ops::DerefMut; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Read bytes asynchronously. -/// -/// This trait inherits from `std::io::Read` and indicates that an I/O object is -/// **non-blocking**. All non-blocking I/O objects must return an error when -/// bytes are unavailable instead of blocking the current thread. -/// -/// Specifically, this means that the `poll_read` function will return one of -/// the following: -/// -/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately read -/// and placed into the output buffer, where `n` == 0 implies that EOF has -/// been reached. -/// -/// * `Poll::Pending` means that no data was read into the buffer -/// provided. The I/O object is not currently readable but may become readable -/// in the future. Most importantly, **the current future's task is scheduled -/// to get unparked when the object is readable**. This means that like -/// `Future::poll` you'll receive a notification when the I/O object is -/// readable again. -/// -/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the -/// underlying object. -/// -/// This trait importantly means that the `read` method only works in the -/// context of a future's task. The object may panic if used outside of a task. -pub trait AsyncRead { - /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns - /// `true` if the supplied buffer was zeroed out. - /// - /// While it would be highly unusual, implementations of [`io::Read`] are - /// able to read data from the buffer passed as an argument. Because of - /// this, the buffer passed to [`io::Read`] must be initialized memory. In - /// situations where large numbers of buffers are used, constantly having to - /// zero out buffers can be expensive. - /// - /// This function does any necessary work to prepare an uninitialized buffer - /// to be safe to pass to `read`. If `read` guarantees to never attempt to - /// read data out of the supplied buffer, then `prepare_uninitialized_buffer` - /// doesn't need to do any work. - /// - /// If this function returns `true`, then the memory has been zeroed out. - /// This allows implementations of `AsyncRead` which are composed of - /// multiple subimplementations to efficiently implement - /// `prepare_uninitialized_buffer`. - /// - /// This function isn't actually `unsafe` to call but `unsafe` to implement. - /// The implementer must ensure that either the whole `buf` has been zeroed - /// or `poll_read_buf()` overwrites the buffer without reading it and returns - /// correct value. - /// - /// This function is called from [`poll_read_buf`]. - /// - /// [`io::Read`]: std::io::Read - /// [`poll_read_buf`]: #method.poll_read_buf - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - for x in buf { - *x = 0; - } - - true - } - - /// Attempt to read from the `AsyncRead` into `buf`. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. - /// - /// If no data is available for reading, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker()`) to receive a notification when the object becomes - /// readable or is closed. - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll>; - - /// Pull some bytes from this source into the specified `BufMut`, returning - /// how many bytes were read. - /// - /// The `buf` provided will have bytes read into it and the internal cursor - /// will be advanced if any bytes were read. Note that this method typically - /// will not reallocate the buffer provided. - fn poll_read_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - unsafe { - let n = { - let b = buf.bytes_mut(); - - self.prepare_uninitialized_buffer(b); - - ready!(self.poll_read(cx, b))? - }; - - buf.advance_mut(n); - Poll::Ready(Ok(n)) - } - } -} - -macro_rules! deref_async_read { - () => { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - (**self).prepare_uninitialized_buffer(buf) - } - - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - Pin::new(&mut **self).poll_read(cx, buf) - } - } -} - -impl AsyncRead for Box { - deref_async_read!(); -} - -impl AsyncRead for &mut T { - deref_async_read!(); -} - -impl

AsyncRead for Pin

-where - P: DerefMut + Unpin, - P::Target: AsyncRead, -{ - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - (**self).prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.get_mut().as_mut().poll_read(cx, buf) - } -} - -impl AsyncRead for &[u8] { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { - false - } - - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Poll::Ready(io::Read::read(self.get_mut(), buf)) - } -} - -impl + Unpin> AsyncRead for io::Cursor { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { - false - } - - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Poll::Ready(io::Read::read(self.get_mut(), buf)) - } -} diff --git a/tokio-io/src/async_write.rs b/tokio-io/src/async_write.rs deleted file mode 100644 index dc8b852f..00000000 --- a/tokio-io/src/async_write.rs +++ /dev/null @@ -1,211 +0,0 @@ -use bytes::Buf; -use futures_core::ready; -use std::io; -use std::ops::DerefMut; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Writes bytes asynchronously. -/// -/// The trait inherits from `std::io::Write` and indicates that an I/O object is -/// **nonblocking**. All non-blocking I/O objects must return an error when -/// bytes cannot be written instead of blocking the current thread. -/// -/// Specifically, this means that the `poll_write` function will return one of -/// the following: -/// -/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately -/// written. -/// -/// * `Poll::Pending` means that no data was written from the buffer -/// provided. The I/O object is not currently writable but may become writable -/// in the future. Most importantly, **the current future's task is scheduled -/// to get unparked when the object is writable**. This means that like -/// `Future::poll` you'll receive a notification when the I/O object is -/// writable again. -/// -/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the -/// underlying object. -/// -/// This trait importantly means that the `write` method only works in the -/// context of a future's task. The object may panic if used outside of a task. -/// -/// Note that this trait also represents that the `Write::flush` method works -/// very similarly to the `write` method, notably that `Ok(())` means that the -/// writer has successfully been flushed, a "would block" error means that the -/// current task is ready to receive a notification when flushing can make more -/// progress, and otherwise normal errors can happen as well. -pub trait AsyncWrite { - /// Attempt to write bytes from `buf` into the object. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. - /// - /// If the object is not ready for writing, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker()`) to receive a notification when the object becomes - /// readable or is closed. - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll>; - - /// Attempt to flush the object, ensuring that any buffered data reach - /// their destination. - /// - /// On success, returns `Poll::Ready(Ok(()))`. - /// - /// If flushing cannot immediately complete, this method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker()`) to receive a notification when the object can make - /// progress towards flushing. - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Initiates or attempts to shut down this writer, returning success when - /// the I/O connection has completely shut down. - /// - /// This method is intended to be used for asynchronous shutdown of I/O - /// connections. For example this is suitable for implementing shutdown of a - /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. - /// Protocols sometimes need to flush out final pieces of data or otherwise - /// perform a graceful shutdown handshake, reading/writing more data as - /// appropriate. This method is the hook for such protocols to implement the - /// graceful shutdown logic. - /// - /// This `shutdown` method is required by implementers of the - /// `AsyncWrite` trait. Wrappers typically just want to proxy this call - /// through to the wrapped type, and base types will typically implement - /// shutdown logic here or just return `Ok(().into())`. Note that if you're - /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that - /// transitively the entire stream has been shut down. After your wrapper's - /// shutdown logic has been executed you should shut down the underlying - /// stream. - /// - /// Invocation of a `shutdown` implies an invocation of `flush`. Once this - /// method returns `Ready` it implies that a flush successfully happened - /// before the shutdown happened. That is, callers don't need to call - /// `flush` before calling `shutdown`. They can rely that by calling - /// `shutdown` any pending buffered data will be written out. - /// - /// # Return value - /// - /// This function returns a `Poll>` classified as such: - /// - /// * `Poll::Ready(Ok(()))` - indicates that the connection was - /// successfully shut down and is now safe to deallocate/drop/close - /// resources associated with it. This method means that the current task - /// will no longer receive any notifications due to this method and the - /// I/O object itself is likely no longer usable. - /// - /// * `Poll::Pending` - indicates that shutdown is initiated but could - /// not complete just yet. This may mean that more I/O needs to happen to - /// continue this shutdown operation. The current task is scheduled to - /// receive a notification when it's otherwise ready to continue the - /// shutdown operation. When woken up this method should be called again. - /// - /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown, - /// indicating that the shutdown operation did not complete successfully. - /// This typically means that the I/O object is no longer usable. - /// - /// # Errors - /// - /// This function can return normal I/O errors through `Err`, described - /// above. Additionally this method may also render the underlying - /// `Write::write` method no longer usable (e.g. will return errors in the - /// future). It's recommended that once `shutdown` is called the - /// `write` method is no longer called. - /// - /// # Panics - /// - /// This function will panic if not called within the context of a future's - /// task. - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Write a `Buf` into this value, returning how many bytes were written. - /// - /// Note that this method will advance the `buf` provided automatically by - /// the number of bytes written. - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining() { - return Poll::Ready(Ok(0)); - } - - let n = ready!(self.poll_write(cx, buf.bytes()))?; - buf.advance(n); - Poll::Ready(Ok(n)) - } -} - -macro_rules! deref_async_write { - () => { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> Poll> - { - Pin::new(&mut **self).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_shutdown(cx) - } - } -} - -impl AsyncWrite for Box { - deref_async_write!(); -} - -impl AsyncWrite for &mut T { - deref_async_write!(); -} - -impl

AsyncWrite for Pin

-where - P: DerefMut + Unpin, - P::Target: AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.get_mut().as_mut().poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_shutdown(cx) - } -} - -impl AsyncWrite for Vec { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.get_mut().extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} diff --git a/tokio-io/src/io/async_buf_read_ext.rs b/tokio-io/src/io/async_buf_read_ext.rs deleted file mode 100644 index 98d5d9b8..00000000 --- a/tokio-io/src/io/async_buf_read_ext.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::io::lines::{lines, Lines}; -use crate::io::read_line::{read_line, ReadLine}; -use crate::io::read_until::{read_until, ReadUntil}; -use crate::io::split::{split, Split}; -use crate::AsyncBufRead; - -/// An extension trait which adds utility methods to `AsyncBufRead` types. -pub trait AsyncBufReadExt: AsyncBufRead { - /// Creates a future which will read all the bytes associated with this I/O - /// object into `buf` until the delimiter `byte` or EOF is reached. - /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). - /// - /// This function will read bytes from the underlying stream until the - /// delimiter or EOF is found. Once found, all bytes up to, and including, - /// the delimiter (if found) will be appended to `buf`. - /// - /// The returned future will resolve to the number of bytes read once the read - /// operation is completed. - /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. - fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec) -> ReadUntil<'a, Self> - where - Self: Unpin, - { - read_until(self, byte, buf) - } - - /// Creates a future which will read all the bytes associated with this I/O - /// object into `buf` until a newline (the 0xA byte) or EOF is reached, - /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). - /// - /// This function will read bytes from the underlying stream until the - /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes - /// up to, and including, the delimiter (if found) will be appended to - /// `buf`. - /// - /// The returned future will resolve to the number of bytes read once the read - /// operation is completed. - /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. - /// - /// # Errors - /// - /// This function has the same error semantics as [`read_until`] and will - /// also return an error if the read bytes are not valid UTF-8. If an I/O - /// error is encountered then `buf` may contain some bytes already read in - /// the event that all data read so far was valid UTF-8. - /// - /// [`read_until`]: AsyncBufReadExt::read_until - fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> - where - Self: Unpin, - { - read_line(self, buf) - } - - /// Returns a stream of the contents of this reader split on the byte - /// `byte`. - /// - /// This method is the async equivalent to - /// [`BufRead::split`](std::io::BufRead::split). - /// - /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`Vec`]`>`. Each vector returned will *not* have - /// the delimiter byte at the end. - /// - /// [`io::Result`]: std::io::Result - /// [`Vec`]: std::vec::Vec - /// - /// # Errors - /// - /// Each item of the stream has the same error semantics as - /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). - fn split(self, byte: u8) -> Split - where - Self: Sized, - { - split(self, byte) - } - - /// Returns a stream over the lines of this reader. - /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). - /// - /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline - /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. - /// - /// [`io::Result`]: std::io::Result - /// [`String`]: String - /// - /// # Errors - /// - /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. - /// - /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line - fn lines(self) -> Lines - where - Self: Sized, - { - lines(self) - } -} - -impl AsyncBufReadExt for R {} diff --git a/tokio-io/src/io/async_read_ext.rs b/tokio-io/src/io/async_read_ext.rs deleted file mode 100644 index 69265df2..00000000 --- a/tokio-io/src/io/async_read_ext.rs +++ /dev/null @@ -1,92 +0,0 @@ -use crate::io::chain::{chain, Chain}; -use crate::io::copy::{copy, Copy}; -use crate::io::read::{read, Read}; -use crate::io::read_exact::{read_exact, ReadExact}; -use crate::io::read_to_end::{read_to_end, ReadToEnd}; -use crate::io::read_to_string::{read_to_string, ReadToString}; -use crate::io::take::{take, Take}; -use crate::{AsyncRead, AsyncWrite}; - -/// An extension trait which adds utility methods to `AsyncRead` types. -pub trait AsyncReadExt: AsyncRead { - /// Creates an adaptor which will chain this stream with another. - /// - /// The returned `AsyncRead` instance will first read all bytes from this object - /// until EOF is encountered. Afterwards the output is equivalent to the - /// output of `next`. - fn chain(self, next: R) -> Chain - where - Self: Sized, - R: AsyncRead, - { - chain(self, next) - } - - /// Copy all data from `self` into the provided `AsyncWrite`. - /// - /// The returned future will copy all the bytes read from `reader` into the - /// `writer` specified. This future will only complete once the `reader` - /// has hit EOF and all bytes have been written to and flushed from the - /// `writer` provided. - /// - /// On success the number of bytes is returned and the `reader` and `writer` - /// are consumed. On error the error is returned and the I/O objects are - /// consumed as well. - fn copy<'a, W>(&'a mut self, dst: &'a mut W) -> Copy<'a, Self, W> - where - Self: Unpin, - W: AsyncWrite + Unpin + ?Sized, - { - copy(self, dst) - } - - /// Read data into the provided buffer. - /// - /// The returned future will resolve to the number of bytes read once the - /// read operation is completed. - fn read<'a>(&'a mut self, dst: &'a mut [u8]) -> Read<'a, Self> - where - Self: Unpin, - { - read(self, dst) - } - - /// Read exactly the amount of data needed to fill the provided buffer. - fn read_exact<'a>(&'a mut self, dst: &'a mut [u8]) -> ReadExact<'a, Self> - where - Self: Unpin, - { - read_exact(self, dst) - } - - /// Read all bytes until EOF in this source, placing them into `dst`. - /// - /// On success the total number of bytes read is returned. - fn read_to_end<'a>(&'a mut self, dst: &'a mut Vec) -> ReadToEnd<'a, Self> - where - Self: Unpin, - { - read_to_end(self, dst) - } - - /// Read all bytes until EOF in this source, placing them into `dst`. - /// - /// On success the total number of bytes read is returned. - fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> - where - Self: Unpin, - { - read_to_string(self, dst) - } - - /// Creates an AsyncRead adapter which will read at most `limit` bytes - /// from the underlying reader. - fn take(self, limit: u64) -> Take - where - Self: Sized, - { - take(self, limit) - } -} - -impl AsyncReadExt for R {} diff --git a/tokio-io/src/io/async_write_ext.rs b/tokio-io/src/io/async_write_ext.rs deleted file mode 100644 index 84671a03..00000000 --- a/tokio-io/src/io/async_write_ext.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::io::flush::{flush, Flush}; -use crate::io::shutdown::{shutdown, Shutdown}; -use crate::io::write::{write, Write}; -use crate::io::write_all::{write_all, WriteAll}; -use crate::AsyncWrite; - -/// An extension trait which adds utility methods to `AsyncWrite` types. -pub trait AsyncWriteExt: AsyncWrite { - /// Write a buffer into this writter, returning how many bytes were written. - fn write<'a>(&'a mut self, src: &'a [u8]) -> Write<'a, Self> - where - Self: Unpin, - { - write(self, src) - } - - /// Attempt to write an entire buffer into this writter. - fn write_all<'a>(&'a mut self, src: &'a [u8]) -> WriteAll<'a, Self> - where - Self: Unpin, - { - write_all(self, src) - } - - /// Flush the contents of this writer. - fn flush(&mut self) -> Flush<'_, Self> - where - Self: Unpin, - { - flush(self) - } - - /// Shutdown this writer. - fn shutdown(&mut self) -> Shutdown<'_, Self> - where - Self: Unpin, - { - shutdown(self) - } -} - -impl AsyncWriteExt for W {} diff --git a/tokio-io/src/io/buf_reader.rs b/tokio-io/src/io/buf_reader.rs deleted file mode 100644 index 971332bc..00000000 --- a/tokio-io/src/io/buf_reader.rs +++ /dev/null @@ -1,194 +0,0 @@ -use super::DEFAULT_BUF_SIZE; -use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::io::{self, Read}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{cmp, fmt}; - -/// The `BufReader` struct adds buffering to any reader. -/// -/// It can be excessively inefficient to work directly with a [`AsyncRead`] -/// instance. A `BufReader` performs large, infrequent reads on the underlying -/// [`AsyncRead`] and maintains an in-memory buffer of the results. -/// -/// `BufReader` can improve the speed of programs that make *small* and -/// *repeated* read calls to the same file or network socket. It does not -/// help when reading very large amounts at once, or reading just one or a few -/// times. It also provides no advantage when reading from a source that is -/// already in memory, like a `Vec`. -/// -/// When the `BufReader` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufReader` on the same -/// stream can cause data loss. -// TODO: Examples -#[pin_project] -pub struct BufReader { - #[pin] - pub(super) inner: R, - pub(super) buf: Box<[u8]>, - pub(super) pos: usize, - pub(super) cap: usize, -} - -impl BufReader { - /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: R) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufReader` with the specified buffer capacity. - pub fn with_capacity(capacity: usize, inner: R) -> Self { - unsafe { - let mut buffer = Vec::with_capacity(capacity); - buffer.set_len(capacity); - inner.prepare_uninitialized_buffer(&mut buffer); - Self { - inner, - buf: buffer.into_boxed_slice(), - pos: 0, - cap: 0, - } - } - } - - /// Gets a reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_ref(&self) -> &R { - &self.inner - } - - /// Gets a mutable reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_mut(&mut self) -> &mut R { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying reader. - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying reader. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> R { - self.inner - } - - /// Returns a reference to the internally buffered data. - /// - /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. - pub fn buffer(&self) -> &[u8] { - &self.buf[self.pos..self.cap] - } - - /// Invalidates all data in the internal buffer. - #[inline] - fn discard_buffer(self: Pin<&mut Self>) { - let me = self.project(); - *me.pos = 0; - *me.cap = 0; - } -} - -impl AsyncRead for BufReader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // If we don't have any buffered data and we're doing a massive read - // (larger than our internal buffer), bypass our internal buffer - // entirely. - if self.pos == self.cap && buf.len() >= self.buf.len() { - let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); - self.discard_buffer(); - return Poll::Ready(res); - } - let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; - let nread = rem.read(buf)?; - self.consume(nread); - Poll::Ready(Ok(nread)) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufReader { - #[project] - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - let BufReader { - inner, - buf, - cap, - pos, - } = self.project(); - - // If we've reached the end of our internal buffer then we need to fetch - // some more data from the underlying reader. - // Branch using `>=` instead of the more correct `==` - // to tell the compiler that the pos..cap slice is always valid. - if *pos >= *cap { - debug_assert!(*pos == *cap); - *cap = ready!(inner.poll_read(cx, buf))?; - *pos = 0; - } - Poll::Ready(Ok(&buf[*pos..*cap])) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - let me = self.project(); - *me.pos = cmp::min(*me.pos + amt, *me.cap); - } -} - -impl AsyncWrite for BufReader { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.get_pin_mut().poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_shutdown(cx) - } -} - -impl fmt::Debug for BufReader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufReader") - .field("reader", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), - ) - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio-io/src/io/buf_stream.rs b/tokio-io/src/io/buf_stream.rs deleted file mode 100644 index 4daa4088..00000000 --- a/tokio-io/src/io/buf_stream.rs +++ /dev/null @@ -1,143 +0,0 @@ -use crate::io::{BufReader, BufWriter}; -use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; -use pin_project::pin_project; -use std::io::{self}; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. -/// -/// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] -/// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall -/// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] -/// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps -/// one in the other so that both directions are buffered. See their documentation for details. -#[pin_project] -#[derive(Debug)] -pub struct BufStream(#[pin] BufReader>); - -impl BufStream { - /// Wrap a type in both [`BufWriter`] and [`BufReader`]. - /// - /// See the documentation for those types and [`BufStream`] for details. - pub fn new(stream: RW) -> BufStream { - BufStream(BufReader::new(BufWriter::new(stream))) - } - - /// Gets a reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_ref(&self) -> &RW { - self.0.get_ref().get_ref() - } - - /// Gets a mutable reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_mut(&mut self) -> &mut RW { - self.0.get_mut().get_mut() - } - - /// Gets a pinned mutable reference to the underlying I/O object. - /// - /// It is inadvisable to directly read from the underlying I/O object. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { - self.project().0.get_pin_mut().get_pin_mut() - } - - /// Consumes this `BufStream`, returning the underlying I/O object. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> RW { - self.0.into_inner().into_inner() - } -} - -impl From>> for BufStream { - fn from(b: BufReader>) -> Self { - BufStream(b) - } -} - -impl From>> for BufStream { - fn from(b: BufWriter>) -> Self { - // we need to "invert" the reader and writer - let BufWriter { - inner: - BufReader { - inner, - buf: rbuf, - pos, - cap, - }, - buf: wbuf, - written, - } = b; - - BufStream(BufReader { - inner: BufWriter { - inner, - buf: wbuf, - written, - }, - buf: rbuf, - pos, - cap, - }) - } -} - -impl AsyncWrite for BufStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().0.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_shutdown(cx) - } -} - -impl AsyncRead for BufStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.project().0.poll_read(cx, buf) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufStream { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().0.consume(amt) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio-io/src/io/buf_writer.rs b/tokio-io/src/io/buf_writer.rs deleted file mode 100644 index 67c825d2..00000000 --- a/tokio-io/src/io/buf_writer.rs +++ /dev/null @@ -1,196 +0,0 @@ -use super::DEFAULT_BUF_SIZE; -use crate::{AsyncBufRead, AsyncRead, AsyncWrite}; -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::fmt; -use std::io::{self, Write}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Wraps a writer and buffers its output. -/// -/// It can be excessively inefficient to work directly with something that -/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and -/// writes it to an underlying writer in large, infrequent batches. -/// -/// `BufWriter` can improve the speed of programs that make *small* and -/// *repeated* write calls to the same file or network socket. It does not -/// help when writing very large amounts at once, or writing just one or a few -/// times. It also provides no advantage when writing to a destination that is -/// in memory, like a `Vec`. -/// -/// When the `BufWriter` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufWriter` on the same -/// stream can cause data loss. If you need to write out the contents of its -/// buffer, you must manually call flush before the writer is dropped. -/// -/// [`AsyncWrite`]: AsyncWrite -/// [`flush`]: super::AsyncWriteExt::flush -/// -// TODO: Examples -#[pin_project] -pub struct BufWriter { - #[pin] - pub(super) inner: W, - pub(super) buf: Vec, - pub(super) written: usize, -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: Vec::with_capacity(cap), - written: 0, - } - } - - #[project] - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - #[project] - let BufWriter { - mut inner, - buf, - written, - } = self.project(); - - let len = buf.len(); - let mut ret = Ok(()); - while *written < len { - match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - if *written > 0 { - buf.drain(..*written); - } - *written = 0; - Poll::Ready(ret) - } - - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } - - /// Returns a reference to the internally buffered data. - pub fn buffer(&self) -> &[u8] { - &self.buf - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.buf.len() + buf.len() > self.buf.capacity() { - ready!(self.as_mut().flush_buf(cx))?; - } - - let me = self.project(); - if buf.len() >= me.buf.capacity() { - me.inner.poll_write(cx, buf) - } else { - Poll::Ready(me.buf.write(buf)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.get_pin_mut().poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.get_pin_mut().poll_shutdown(cx) - } -} - -impl AsyncRead for BufWriter { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.get_pin_mut().poll_read(cx, buf) - } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.get_ref().prepare_uninitialized_buffer(buf) - } -} - -impl AsyncBufRead for BufWriter { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_pin_mut().consume(amt) - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buf.len(), self.buf.capacity()), - ) - .field("written", &self.written) - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - crate::is_unpin::>(); - } -} diff --git a/tokio-io/src/io/chain.rs b/tokio-io/src/io/chain.rs deleted file mode 100644 index 99cce4c6..00000000 --- a/tokio-io/src/io/chain.rs +++ /dev/null @@ -1,145 +0,0 @@ -use crate::{AsyncBufRead, AsyncRead}; -use futures_core::ready; -use pin_project::{pin_project, project}; -use std::fmt; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Stream for the [`chain`](super::AsyncReadExt::chain) method. -#[pin_project] -#[must_use = "streams do nothin