summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-10-09 09:16:42 -0700
committerGitHub <noreply@github.com>2020-10-09 09:16:42 -0700
commitee597347c5e612611142ece09c79e55f2d243590 (patch)
tree5ef874817b2a75a9eea7a807cac48ec6ae776733
parent41ac1ae2bc9b2e4b6d03205bde19a4bbc20b368d (diff)
net: switch socket methods to &self (#2934)
Switches various socket methods from &mut self to &self. This uses the intrusive waker infrastructure to handle multiple waiters. Refs: #2928
-rw-r--r--tokio/src/net/tcp/split.rs2
-rw-r--r--tokio/src/net/tcp/split_owned.rs2
-rw-r--r--tokio/src/net/tcp/stream.rs18
-rw-r--r--tokio/src/net/unix/datagram/socket.rs20
-rw-r--r--tokio/src/net/unix/incoming.rs42
-rw-r--r--tokio/src/net/unix/listener.rs71
-rw-r--r--tokio/src/net/unix/mod.rs4
-rw-r--r--tokio/tests/uds_datagram.rs2
-rw-r--r--tokio/tests/uds_stream.rs4
9 files changed, 38 insertions, 127 deletions
diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs
index 6e927f05..9a257f8b 100644
--- a/tokio/src/net/tcp/split.rs
+++ b/tokio/src/net/tcp/split.rs
@@ -81,7 +81,7 @@ impl ReadHalf<'_> {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
- self.0.poll_peek2(cx, buf)
+ self.0.poll_peek(cx, buf)
}
/// Receives data on the socket from the remote address to which it is
diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs
index 2f35f495..4b4e2636 100644
--- a/tokio/src/net/tcp/split_owned.rs
+++ b/tokio/src/net/tcp/split_owned.rs
@@ -136,7 +136,7 @@ impl OwnedReadHalf {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
- self.inner.poll_peek2(cx, buf)
+ self.inner.poll_peek(cx, buf)
}
/// Receives data on the socket from the remote address to which it is
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 3f9d6670..ee24ee32 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -257,7 +257,7 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
+ /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let mut buf = [0; 10];
///
/// poll_fn(|cx| {
@@ -267,15 +267,7 @@ impl TcpStream {
/// Ok(())
/// }
/// ```
- pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
- self.poll_peek2(cx, buf)
- }
-
- pub(super) fn poll_peek2(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
@@ -326,8 +318,10 @@ impl TcpStream {
///
/// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
- pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_peek(cx, buf)).await
+ pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::READABLE, |io| io.peek(buf))
+ .await
}
/// Shuts down the read, write, or both halves of this connection.
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index 20c6c227..5dcda6c9 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -314,7 +314,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
- /// let (mut first, mut second) = UnixDatagram::pair()?;
+ /// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
@@ -327,7 +327,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
+ pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf)
}
@@ -346,10 +346,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
- /// let mut server = UnixDatagram::bind(&server_path)?;
+ /// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
- /// let mut client = UnixDatagram::bind(&client_path)?;
+ /// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
@@ -363,7 +363,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>,
{
@@ -413,7 +413,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
- /// let (mut first, mut second) = UnixDatagram::pair()?;
+ /// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
@@ -426,7 +426,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf)
}
@@ -531,10 +531,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
- /// let mut server = UnixDatagram::bind(&server_path)?;
+ /// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
- /// let mut client = UnixDatagram::bind(&client_path)?;
+ /// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
@@ -548,7 +548,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.get_ref().recv_from(buf)?;
Ok((n, SocketAddr(addr)))
}
diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs
deleted file mode 100644
index af493604..00000000
--- a/tokio/src/net/unix/incoming.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-use crate::net::unix::{UnixListener, UnixStream};
-
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-/// Stream of listeners
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Incoming<'a> {
- inner: &'a mut UnixListener,
-}
-
-impl Incoming<'_> {
- pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> {
- Incoming { inner: listener }
- }
-
- /// Attempts to poll `UnixStream` by polling inner `UnixListener` to accept
- /// connection.
- ///
- /// If `UnixListener` isn't ready yet, `Poll::Pending` is returned and
- /// current task will be notified by a waker. Otherwise `Poll::Ready` with
- /// `Result` containing `UnixStream` will be returned.
- pub fn poll_accept(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<UnixStream>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Ok(socket))
- }
-}
-
-#[cfg(feature = "stream")]
-impl crate::stream::Stream for Incoming<'_> {
- type Item = io::Result<UnixStream>;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Some(Ok(socket)))
- }
-}
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 5d586ec3..fb85da9f 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -1,6 +1,5 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::{Incoming, SocketAddr, UnixStream};
+use crate::net::unix::{SocketAddr, UnixStream};
use std::convert::TryFrom;
use std::fmt;
@@ -99,18 +98,26 @@ impl UnixListener {
}
/// Accepts a new incoming connection to this listener.
- pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
- poll_fn(|cx| self.poll_accept(cx)).await
+ pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .await?;
+
+ let addr = SocketAddr(addr);
+ let stream = UnixStream::new(mio)?;
+ Ok((stream, addr))
}
/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
- /// the current task will be notified by a waker.
- pub fn poll_accept(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
+ /// the current task will be notified by a waker.
+ ///
+ /// When ready, the most recent task that called `poll_accept` is notified.
+ /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// single task. Failing to do this could result in tasks hanging.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
@@ -127,57 +134,13 @@ impl UnixListener {
}
}
}
-
- /// Returns a stream over the connections being received on this listener.
- ///
- /// Note that `UnixListener` also directly implements `Stream`.
- ///
- /// The returned stream will never return `None` and will also not yield the
- /// peer's `SocketAddr` structure. Iterating over it is equivalent to
- /// calling accept in a loop.
- ///
- /// # Errors
- ///
- /// Note that accepting a connection can lead to various errors and not all
- /// of them are necessarily fatal ‒ for example having too many open file
- /// descriptors or the other side closing the connection while it waits in
- /// an accept queue. These would terminate the stream if not handled in any
- /// way.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::UnixListener;
- /// use tokio::stream::StreamExt;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
- /// let mut incoming = listener.incoming();
- ///
- /// while let Some(stream) = incoming.next().await {
- /// match stream {
- /// Ok(stream) => {
- /// println!("new client!");
- /// }
- /// Err(e) => { /* connection failed */ }
- /// }
- /// }
- /// }
- /// ```
- pub fn incoming(&mut self) -> Incoming<'_> {
- Incoming::new(self)
- }
}
#[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs
index 21aa4fe7..19ee34a5 100644
--- a/tokio/src/net/unix/mod.rs
+++ b/tokio/src/net/unix/mod.rs
@@ -2,11 +2,7 @@
pub mod datagram;
-mod incoming;
-pub use incoming::Incoming;
-
pub(crate) mod listener;
-pub(crate) use listener::UnixListener;
mod split;
pub use split::{ReadHalf, WriteHalf};
diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs
index 18dfcca0..ec2f6f82 100644
--- a/tokio/tests/uds_datagram.rs
+++ b/tokio/tests/uds_datagram.rs
@@ -78,7 +78,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
let payload = b"PAYLOAD";
let mut count = 0;
- let (mut dgram1, mut dgram2) = UnixDatagram::pair()?;
+ let (dgram1, dgram2) = UnixDatagram::pair()?;
// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
diff --git a/tokio/tests/uds_stream.rs b/tokio/tests/uds_stream.rs
index 29f118a2..cd557e54 100644
--- a/tokio/tests/uds_stream.rs
+++ b/tokio/tests/uds_stream.rs
@@ -15,7 +15,7 @@ async fn accept_read_write() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");
- let mut listener = UnixListener::bind(&sock_path)?;
+ let listener = UnixListener::bind(&sock_path)?;
let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);
@@ -42,7 +42,7 @@ async fn shutdown() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");
- let mut listener = UnixListener::bind(&sock_path)?;
+ let listener = UnixListener::bind(&sock_path)?;
let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);