diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-20 14:27:49 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-20 14:27:49 -0800 |
commit | 5cd665afd7b70b184b559e6407fdf645983e1314 (patch) | |
tree | 2197d4502d01218ca6df8c076f55d83027a18230 | |
parent | 3e643c7b81736a4c2b11387a6f71aba99450270b (diff) |
chore: update `bytes` dependency to git master (#1796)
Tokio will track changes to bytes until 0.5 is released.
26 files changed, 101 insertions, 331 deletions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 782a16cb..ed273590 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } -bytes = "0.4.12" +bytes = { git = "https://github.com/tokio-rs/bytes" } futures = "0.3.0" [[example]] diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 60cb3e50..b1ac4de1 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -22,7 +22,7 @@ categories = ["asynchronous", "testing"] [dependencies] tokio = { version = "=0.2.0-alpha.6", path = "../tokio", features = ["test-util"] } -bytes = "0.4" +bytes = { git = "https://github.com/tokio-rs/bytes" } futures-core = "0.3.0" [dev-dependencies] diff --git a/tokio-tls/src/lib.rs b/tokio-tls/src/lib.rs index de29b244..3a36ceaa 100644 --- a/tokio-tls/src/lib.rs +++ b/tokio-tls/src/lib.rs @@ -35,6 +35,7 @@ use std::fmt; use std::future::Future; use std::io::{self, Read, Write}; use std::marker::Unpin; +use std::mem::MaybeUninit; use std::pin::Pin; use std::ptr::null_mut; use std::task::{Context, Poll}; @@ -182,7 +183,7 @@ impl<S> AsyncRead for TlsStream<S> where S: AsyncRead + AsyncWrite + Unpin, { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { // Note that this does not forward to `S` because the buffer is // unconditionally filled in by OpenSSL, not the actual object `S`. // We're decrypting bytes from `S` into the buffer above! diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index c857c839..8e5542be 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -22,7 +22,7 @@ categories = ["asynchronous"] [dependencies] tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } -bytes = "0.4.7" +bytes = { git = "https://github.com/tokio-rs/bytes" } futures-core = "0.3.0" futures-sink = "0.3.0" log = "0.4" diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index 62403d56..0c5ef9f6 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -11,6 +11,7 @@ use futures_sink::Sink; use pin_project_lite::pin_project; use std::fmt; use std::io::{self, BufRead, Read, Write}; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; @@ -261,7 +262,7 @@ impl<T: BufRead, U> BufRead for Fuse<T, U> { } impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.io.prepare_uninitialized_buffer(buf) } diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 4296e993..9aed7ea3 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -11,6 +11,7 @@ use log::trace; use pin_project_lite::pin_project; use std::fmt; use std::io::{self, BufRead, Read}; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; @@ -284,7 +285,7 @@ impl<T: BufRead> BufRead for FramedWrite2<T> { } impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } diff --git a/tokio-util/src/codec/length_delimited.rs b/tokio-util/src/codec/length_delimited.rs index b36b6aa7..01ba2aec 100644 --- a/tokio-util/src/codec/length_delimited.rs +++ b/tokio-util/src/codec/length_delimited.rs @@ -345,7 +345,7 @@ use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; use tokio::io::{AsyncRead, AsyncWrite}; -use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::error::Error as StdError; use std::io::{self, Cursor}; use std::{cmp, fmt}; @@ -457,7 +457,7 @@ impl LengthDelimitedCodec { // match endianess let n = if self.builder.length_field_is_big_endian { - src.get_uint_be(field_len) + src.get_uint(field_len) } else { src.get_uint_le(field_len) }; @@ -551,7 +551,7 @@ impl Encoder for LengthDelimitedCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { - let n = (&data).into_buf().remaining(); + let n = (&data).remaining(); if n > self.builder.max_frame_len { return Err(io::Error::new( @@ -579,7 +579,7 @@ impl Encoder for LengthDelimitedCodec { dst.reserve(self.builder.length_field_len + n); if self.builder.length_field_is_big_endian { - dst.put_uint_be(n as u64, self.builder.length_field_len); + dst.put_uint(n as u64, self.builder.length_field_len); } else { dst.put_uint_le(n as u64, self.builder.length_field_len); } diff --git a/tokio-util/src/codec/lines_codec.rs b/tokio-util/src/codec/lines_codec.rs index bca69723..8029956f 100644 --- a/tokio-util/src/codec/lines_codec.rs +++ b/tokio-util/src/codec/lines_codec.rs @@ -1,7 +1,7 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use bytes::{BufMut, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use std::{cmp, fmt, io, str, usize}; /// A simple `Codec` implementation that splits up data into lines. @@ -168,7 +168,7 @@ impl Decoder for LinesCodec { if buf.is_empty() || buf == &b"\r"[..] { None } else { - let line = buf.take(); + let line = buf.split_to(buf.len()); let line = without_carriage_return(&line); let line = utf8(line)?; self.next_index = 0; @@ -185,7 +185,7 @@ impl Encoder for LinesCodec { fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> { buf.reserve(line.len() + 1); - buf.put(line); + buf.put(line.as_bytes()); buf.put_u8(b'\n'); Ok(()) } diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index 4a8c3c0b..d37b20cd 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -47,7 +47,14 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> { let (_n, addr) = unsafe { // Read into the buffer without having to initialize the memory. - let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, pin.rd.bytes_mut())); + // + // safety: we know tokio::net::UdpSocket never reads from the memory + // during a recv + let res = { + let bytes = &mut *(pin.rd.bytes_mut() as *mut _ as *mut [u8]); + ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, bytes)) + }; + let (n, addr) = res?; pin.rd.advance_mut(n); (n, addr) diff --git a/tokio-util/tests/codecs.rs b/tokio-util/tests/codecs.rs index 4709854a..d1212866 100644 --- a/tokio-util/tests/codecs.rs +++ b/tokio-util/tests/codecs.rs @@ -45,14 +45,14 @@ fn lines_decoder() { let mut codec = LinesCodec::new(); let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("line 1\nline 2\r\nline 3\n\r\n\r"); + buf.put_slice(b"line 1\nline 2\r\nline 3\n\r\n\r"); assert_eq!("line 1", codec.decode(buf).unwrap().unwrap()); assert_eq!("line 2", codec.decode(buf).unwrap().unwrap()); assert_eq!("line 3", codec.decode(buf).unwrap().unwrap()); assert_eq!("", codec.decode(buf).unwrap().unwrap()); assert_eq!(None, codec.decode(buf).unwrap()); assert_eq!(None, codec.decode_eof(buf).unwrap()); - buf.put("k"); + buf.put_slice(b"k"); assert_eq!(None, codec.decode(buf).unwrap()); assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap()); assert_eq!(None, codec.decode(buf).unwrap()); @@ -67,7 +67,7 @@ fn lines_decoder_max_length() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("line 1 is too long\nline 2\nline 3\r\nline 4\n\r\n\r"); + buf.put_slice(b"line 1 is too long\nline 2\nline 3\r\nline 4\n\r\n\r"); assert!(codec.decode(buf).is_err()); @@ -102,7 +102,7 @@ fn lines_decoder_max_length() { assert_eq!(None, codec.decode(buf).unwrap()); assert_eq!(None, codec.decode_eof(buf).unwrap()); - buf.put("k"); + buf.put_slice(b"k"); assert_eq!(None, codec.decode(buf).unwrap()); let line = codec.decode_eof(buf).unwrap().unwrap(); @@ -119,7 +119,7 @@ fn lines_decoder_max_length() { // Line that's one character too long. This could cause an out of bounds // error if we peek at the next characters using slice indexing. - buf.put("aaabbbc"); + buf.put_slice(b"aaabbbc"); assert!(codec.decode(buf).is_err()); } @@ -131,16 +131,16 @@ fn lines_decoder_max_length_underrun() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("line "); + buf.put_slice(b"line "); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("too l"); + buf.put_slice(b"too l"); assert!(codec.decode(buf).is_err()); - buf.put("ong\n"); + buf.put_slice(b"ong\n"); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("line 2"); + buf.put_slice(b"line 2"); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("\n"); + buf.put_slice(b"\n"); assert_eq!("line 2", codec.decode(buf).unwrap().unwrap()); } @@ -152,11 +152,11 @@ fn lines_decoder_max_length_bursts() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("line "); + buf.put_slice(b"line "); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("too l"); + buf.put_slice(b"too l"); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("ong\n"); + buf.put_slice(b"ong\n"); assert!(codec.decode(buf).is_err()); } @@ -168,9 +168,9 @@ fn lines_decoder_max_length_big_burst() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("line "); + buf.put_slice(b"line "); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("too long!\n"); + buf.put_slice(b"too long!\n"); assert!(codec.decode(buf).is_err()); } @@ -182,10 +182,10 @@ fn lines_decoder_max_length_newline_between_decodes() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("hello"); + buf.put_slice(b"hello"); assert_eq!(None, codec.decode(buf).unwrap()); - buf.put("\nworld"); + buf.put_slice(b"\nworld"); assert_eq!("hello", codec.decode(buf).unwrap().unwrap()); } @@ -198,9 +198,9 @@ fn lines_decoder_discard_repeat() { let buf = &mut BytesMut::new(); buf.reserve(200); - buf.put("aa"); + buf.put_slice(b"aa"); assert!(codec.decode(buf).is_err()); - buf.put("a"); + buf.put_slice(b"a"); assert!(codec.decode(buf).is_err()); } diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index 25ddf86a..b98df736 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -4,7 +4,7 @@ use tokio::prelude::*; use tokio_test::assert_ok; use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts}; -use bytes::{Buf, BufMut, BytesMut, IntoBuf}; +use bytes::{Buf, BufMut, BytesMut}; use futures::StreamExt; use std::io::{self, Read}; use std::pin::Pin; @@ -24,7 +24,7 @@ impl Decoder for U32Codec { return Ok(None); } - let n = buf.split_to(4).into_buf().get_u32_be(); + let n = buf.split_to(4).get_u32(); Ok(Some(n)) } } @@ -36,7 +36,7 @@ impl Encoder for U32Codec { fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { // Reserve space dst.reserve(4); - dst.put_u32_be(item); + dst.put_u32(item); Ok(()) } } @@ -66,7 +66,7 @@ impl AsyncRead for DontReadIntoThis { #[tokio::test] async fn can_read_from_existing_buf() { let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); - parts.read_buf = vec![0, 0, 0, 42].into(); + parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]); let mut framed = Framed::from_parts(parts); let num = assert_ok!(framed.next().await.unwrap()); @@ -77,7 +77,7 @@ async fn can_read_from_existing_buf() { #[test] fn external_buf_grows_to_init() { let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); - parts.read_buf = vec![0, 0, 0, 42].into(); + parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]); let framed = Framed::from_parts(parts); let FramedParts { read_buf, .. } = framed.into_parts(); @@ -88,7 +88,7 @@ fn external_buf_grows_to_init() { #[test] fn external_buf_does_not_shrink() { let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); - parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into(); + parts.read_buf = BytesMut::from(&vec![0; INITIAL_CAPACITY * 2][..]); let framed = Framed::from_parts(parts); let FramedParts { read_buf, .. } = framed.into_parts(); diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs index 9aa644aa..06caa0a4 100644 --- a/tokio-util/tests/framed_read.rs +++ b/tokio-util/tests/framed_read.rs @@ -5,7 +5,7 @@ use tokio_test::assert_ready; use tokio_test::task; use tokio_util::codec::{Decoder, FramedRead}; -use bytes::{Buf, BytesMut, IntoBuf}; +use bytes::{Buf, BytesMut}; use futures::Stream; use std::collections::VecDeque; use std::io; @@ -45,7 +45,7 @@ impl Decoder for U32Decoder { return Ok(None); } - let n = buf.split_to(4).into_buf().get_u32_be(); + let n = buf.split_to(4).get_u32(); Ok(Some(n)) } } diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index 9d760766..706e6792 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -35,7 +35,7 @@ impl Encoder for U32Encoder { fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { // Reserve space dst.reserve(4); - dst.put_u32_be(item); + dst.put_u32(item); Ok(()) } } @@ -78,7 +78,7 @@ fn write_hits_backpressure() { for i in 0..=ITER { let mut b = BytesMut::with_capacity(4); - b.put_u32_be(i as u32); + b.put_u32(i as u32); // Append to the end match mock.calls.back_mut().unwrap() { diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index be05dbc4..af8002bd 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -73,7 +73,7 @@ impl Encoder for ByteCodec { fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> { buf.reserve(data.len()); - buf.put(data); + buf.put_slice(&data); Ok(()) } } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 9364ea82..0619a50f 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -91,8 +91,7 @@ uds = ["io-driver", "mio-uds", "libc"] [dependencies] tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } -bytes = "0.4" -iovec = "0.1" +bytes = { git = "https://github.com/tokio-rs/bytes" } pin-project-lite = "0.1.1" # Everything else is optional... diff --git a/tokio/src/io/async_read.rs b/tokio/src/io/async_read.rs index 974cf346..d7e703d4 100644 --- a/tokio/src/io/async_read.rs +++ b/tokio/src/io/async_read.rs @@ -1,5 +1,6 @@ use bytes::BufMut; use std::io; +use std::mem::MaybeUninit; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -63,9 +64,9 @@ pub trait AsyncRead { /// /// [`io::Read`]: std::io::Read /// [`poll_read_buf`]: #method.poll_read_buf - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { for x in buf { - *x = 0; + *x.as_mut_ptr() = 0; } true @@ -109,6 +110,9 @@ pub trait AsyncRead { self.prepare_uninitialized_buffer(b); + // Convert to `&mut [u8]` + let b = &mut *(b as *mut [MaybeUninit<u8>] as *mut [u8]); + ready!(self.poll_read(cx, b))? }; @@ -120,7 +124,7 @@ pub trait AsyncRead { macro_rules! deref_async_read { () => { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { (**self).prepare_uninitialized_buffer(buf) } @@ -145,7 +149,7 @@ where P: DerefMut + Unpin, P::Target: AsyncRead, { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { (**self).prepare_uninitialized_buffer(buf) } @@ -159,7 +163,7 @@ where } impl AsyncRead for &[u8] { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool { false } @@ -173,7 +177,7 @@ impl AsyncRead for &[u8] { } impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool { false } diff --git a/tokio/src/io/util/buf_reader.rs b/tokio/src/io/util/buf_reader.rs index 46caa1fa..5cf2c179 100644 --- a/tokio/src/io/util/buf_reader.rs +++ b/tokio/src/io/util/buf_reader.rs @@ -3,6 +3,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; use pin_project_lite::pin_project; use std::io::{self, Read}; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; use std::{cmp, fmt}; @@ -45,7 +46,12 @@ impl<R: AsyncRead> BufReader<R> { unsafe { let mut buffer = Vec::with_capacity(capacity); buffer.set_len(capacity); - inner.prepare_uninitialized_buffer(&mut buffer); + + { + // Convert to MaybeUninit + let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]); + inner.prepare_uninitialized_buffer(b); + } Self { inner, buf: buffer.into_boxed_slice(), @@ -120,7 +126,7 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> { } // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } } diff --git a/tokio/src/io/util/buf_stream.rs b/tokio/src/io/util/buf_stream.rs index 7ff1d3f4..a1676c29 100644 --- a/tokio/src/io/util/buf_stream.rs +++ b/tokio/src/io/util/buf_stream.rs @@ -2,11 +2,10 @@ use crate::io::util::{BufReader, BufWriter}; use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; use pin_project_lite::pin_project; -use std::io::{self}; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; +use std::io; +use std::mem::MaybeUninit; +use std::pin::Pin; +use std::task::{Context, Poll}; pin_project! { /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. @@ -126,7 +125,7 @@ impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { } // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } } diff --git a/tokio/src/io/util/buf_writer.rs b/tokio/src/io/util/buf_writer.rs index 5c98bd82..e7ae57a0 100644 --- a/tokio/src/io/util/buf_writer.rs +++ b/tokio/src/io/util/buf_writer.rs @@ -4,6 +4,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; @@ -152,7 +153,7 @@ impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> { } // we can't skip unconditionally because of the large buffer case in read. - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.get_ref().prepare_uninitialized_buffer(buf) } } diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index 36eba5bb..0cd91f84 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -2,6 +2,7 @@ use crate::io::AsyncRead; use std::future::Future; use std::io; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; @@ -64,7 +65,10 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( g.buf.reserve(32); let capacity = g.buf.capacity(); g.buf.set_len(capacity); - rd.prepare_uninitialized_buffer(&mut g.buf[g.len..]); + + let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]); + + rd.prepare_uninitialized_buffer(b); } } diff --git a/tokio/src/io/util/take.rs b/tokio/src/io/util/take.rs index 86403853..18332603 100644 --- a/tokio/src/io/util/take.rs +++ b/tokio/src/io/util/take.rs @@ -1,6 +1,7 @@ use crate::io::{AsyncBufRead, AsyncRead}; use pin_project_lite::pin_project; +use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; use std::{cmp, io}; @@ -74,7 +75,7 @@ impl<R: AsyncRead> Take<R> { } impl<R: AsyncRead> AsyncRead for Take<R> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 07c7c3ff..2b337c08 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -11,8 +11,8 @@ use crate::io::{AsyncRead, AsyncWrite}; use crate::net::TcpStream; -use bytes::{Buf, BufMut}; use std::io; +use std::mem::MaybeUninit; use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; @@ -33,7 +33,7 @@ pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) { } impl AsyncRead for ReadHalf<'_> { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { false } @@ -44,14 +44,6 @@ impl AsyncRead for ReadHalf<'_> { ) -> Poll<io::Result<usize>> { self.0.poll_read_priv(cx, buf) } - - fn poll_read_buf<B: BufMut>( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll<io::Result<usize>> { - self.0.poll_read_buf_priv(cx, buf) - } } impl AsyncWrite for WriteHalf<'_> { @@ -73,14 +65,6 @@ impl AsyncWrite for WriteHalf<'_> { fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { self.0.shutdown(Shutdown::Write).into() } - - fn poll_write_buf<B: Buf>( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll<io::Result<usize>> { - self.0.poll_write_buf_priv(cx, buf) - } } impl AsRef<TcpStream> for ReadHalf<'_> { diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index b73114b4..34d3a493 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -3,11 +3,10 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::ToSocketAddrs; -use bytes::{Buf, BufMut}; -use iovec::IoVec; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; use std::net::{self, Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -601,70 +600,6 @@ impl TcpStream { } } - pub(crate) fn poll_read_buf_priv<B: BufMut>( - &self, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - let r = unsafe { - // The `IoVec` type can't have a 0-length size, so we create a bunch - // of dummy versions on the stack with 1 length which we'll quickly - // overwrite. - let b1: &mut [u8] = &mut [0]; - let b2: &mut [u8] = &mut [0]; - let b3: &mut [u8] = &mut [0]; - let b4: &mut [u8] = &mut [0]; - let b5: &mut [u8] = &mut [0]; - let b6: &mut [u8] = &mut [0]; - let b7: &mut [u8] = &mut [0]; - let b8: &mut [u8] = &mut [0]; - let b9: &mut [u8] = &mut [0]; - let b10: &mut [u8] = &mut [0]; - let b11: &mut [u8] = &mut [0]; - let b12: &mut [u8] = &mut [0]; - let b13: &mut [u8] = &mut [0]; - let b14: &mut [u8] = &mut [0]; - let b15: &mut [u8] = &mut [0]; - let b16: &mut [u8] = &mut [0]; - let mut bufs: [&mut IoVec; 16] = [ - b1.into(), - b2.into(), - b3.into(), - b4.into(), - b5.into(), - b6.into(), - b7.into(), - b8.into(), - b9.into(), - b10.into(), - b11.into(), - b12.into(), - b13.into(), - b14.into(), - b15.into(), - b16.into(), - ]; - let n = buf.bytes_vec_mut(&mut bufs); - self.io.get_ref().read_bufs(&mut bufs[..n]) - }; - - match r { - Ok(n) => { - unsafe { - buf.advance_mut(n); - } - Poll::Ready(Ok(n)) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Err(e) => Poll::Ready(Err(e)), - } - } - pub(crate) fn poll_write_priv( &self, cx: &mut Context<'_>, @@ -680,36 +615,6 @@ impl TcpStream { x => Poll::Ready(x), } } - - pub(crate) fn poll_write_buf_priv<B: Buf>( - &self, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll<io::Result<usize>> { - |