summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/tcp/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/net/tcp/stream.rs')
-rw-r--r--tokio/src/net/tcp/stream.rs254
1 files changed, 127 insertions, 127 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index e0348724..f4f705b4 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -299,15 +299,16 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().peek(buf) {
- Ok(ret) => Poll::Ready(Ok(ret)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().peek(buf) {
+ Ok(ret) => return Poll::Ready(Ok(ret)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -703,26 +704,28 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes.
- let b =
- unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
- match self.io.get_ref().read(b) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- Ok(n) => {
- // Safety: We trust `TcpStream::read` to have filled up `n` bytes
- // in the buffer.
- unsafe {
- buf.assume_init(n);
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().read(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Ok(n) => {
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.add_filled(n);
+ return Poll::Ready(Ok(()));
}
- buf.add_filled(n);
- Poll::Ready(Ok(()))
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -731,14 +734,15 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+ match self.io.get_ref().write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
}
- x => Poll::Ready(x),
}
}
@@ -749,99 +753,100 @@ impl TcpStream {
) -> Poll<io::Result<usize>> {
use std::io::IoSlice;
- ready!(self.io.poll_write_ready(cx))?;
-
- // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
- // a dummy version from a 1-length slice which we'll overwrite with
- // the `bytes_vectored` method.
- static S: &[u8] = &[0];
- const MAX_BUFS: usize = 64;
-
- // IoSlice isn't Copy, so we must expand this manually ;_;
- let mut slices: [IoSlice<'_>; MAX_BUFS] = [
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- ];
- let cnt = buf.bytes_vectored(&mut slices);
-
- let iovec = <&IoVec>::from(S);
- let mut vecs = [iovec; MAX_BUFS];
- for i in 0..cnt {
- vecs[i] = (*slices[i]).into();
- }
-
- match self.io.get_ref().write_bufs(&vecs[..cnt]) {
- Ok(n) => {
- buf.advance(n);
- Poll::Ready(Ok(n))
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
+ // a dummy version from a 1-length slice which we'll overwrite with
+ // the `bytes_vectored` method.
+ static S: &[u8] = &[0];
+ const MAX_BUFS: usize = 64;
+
+ // IoSlice isn't Copy, so we must expand this manually ;_;
+ let mut slices: [IoSlice<'_>; MAX_BUFS] = [
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ ];
+ let cnt = buf.bytes_vectored(&mut slices);
+
+ let iovec = <&IoVec>::from(S);
+ let mut vecs = [iovec; MAX_BUFS];
+ for i in 0..cnt {
+ vecs[i] = (*slices[i]).into();
}
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+
+ match self.io.get_ref().write_bufs(&vecs[..cnt]) {
+ Ok(n) => {
+ buf.advance(n);
+ return Poll::Ready(Ok(n));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
}
@@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
value.io.into_inner()
}