diff options
Diffstat (limited to 'tokio/src/net/tcp/stream.rs')
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 254 |
1 files changed, 127 insertions, 127 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e0348724..f4f705b4 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -299,15 +299,16 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { - Ok(ret) => Poll::Ready(Ok(ret)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().peek(buf) { + Ok(ret) => return Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -703,26 +704,28 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. - let b = - unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `TcpStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.add_filled(n); + return Poll::Ready(Ok(())); } - buf.add_filled(n); - Poll::Ready(Ok(())) + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -731,14 +734,15 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } @@ -749,99 +753,100 @@ impl TcpStream { ) -> Poll<io::Result<usize>> { use std::io::IoSlice; - ready!(self.io.poll_write_ready(cx))?; - - // The `IoVec` (v0.1.x) type can't have a zero-length size, so create - // a dummy version from a 1-length slice which we'll overwrite with - // the `bytes_vectored` method. - static S: &[u8] = &[0]; - const MAX_BUFS: usize = 64; - - // IoSlice isn't Copy, so we must expand this manually ;_; - let mut slices: [IoSlice<'_>; MAX_BUFS] = [ - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - ]; - let cnt = buf.bytes_vectored(&mut slices); - - let iovec = <&IoVec>::from(S); - let mut vecs = [iovec; MAX_BUFS]; - for i in 0..cnt { - vecs[i] = (*slices[i]).into(); - } - - match self.io.get_ref().write_bufs(&vecs[..cnt]) { - Ok(n) => { - buf.advance(n); - Poll::Ready(Ok(n)) + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + return Poll::Ready(Ok(n)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } } @@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpStream) -> Result<Self, Self::Error> { value.io.into_inner() } |