From 0589acc9ffb9175647d6b7a794edd8812f79399f Mon Sep 17 00:00:00 2001 From: Akshay Narayan Date: Wed, 26 Feb 2020 13:38:41 -0500 Subject: Implement Stream for Listener types (#2275) The Incoming types currently don't take ownership of the listener, but in most cases, users who want to use the Listener as a stream will only want to use the stream from that point on. So, implement Stream directly on the Listener types. --- tokio/src/net/tcp/listener.rs | 45 +++++++++++++++++++++++++++++++++++++++++ tokio/src/net/unix/listener.rs | 46 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index b1a81758..75253d3f 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -12,8 +12,22 @@ use std::task::{Context, Poll}; cfg_tcp! { /// A TCP socket server, listening for connections. /// + /// Also implements a stream over the connections being received on this listener. + /// + /// The stream will never return `None` and will also not yield the peer's + /// `SocketAddr` structure. Iterating over it is equivalent to calling accept in a loop. + /// + /// # Errors + /// + /// Note that accepting a connection can lead to various errors and not all + /// of them are necessarily fatal ‒ for example having too many open file + /// descriptors or the other side closing the connection while it waits in + /// an accept queue. These would terminate the stream if not handled in any + /// way. + /// /// # Examples /// + /// Using [`TcpListener::accept`]: /// ```no_run /// use tokio::net::TcpListener; /// @@ -34,6 +48,24 @@ cfg_tcp! { /// } /// } /// ``` + /// + /// Using `impl Stream`: + /// ```no_run + /// use tokio::{net::TcpListener, stream::StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + /// while let Some(stream) = listener.next().await { + /// match stream { + /// Ok(stream) => { + /// println!("new client!"); + /// } + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// } + /// ``` pub struct TcpListener { io: PollEvented, } @@ -335,6 +367,19 @@ impl TcpListener { } } +#[cfg(feature = "stream")] +impl crate::stream::Stream for TcpListener { + type Item = io::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let (socket, _) = ready!(self.poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + impl TryFrom for mio::net::TcpListener { type Error = io::Error; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index d61d0392..0e7affc2 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -14,6 +14,39 @@ use std::task::{Context, Poll}; cfg_uds! { /// A Unix socket which can accept connections from other Unix sockets. + /// + /// Also implements a stream over the connections being received on this listener. + /// + /// The stream will never return `None` and will also not yield the peer's + /// `SocketAddr` structure. Iterating over it is equivalent to calling accept in a loop. + /// + /// # Errors + /// + /// Note that accepting a connection can lead to various errors and not all + /// of them are necessarily fatal ‒ for example having too many open file + /// descriptors or the other side closing the connection while it waits in + /// an accept queue. These would terminate the stream if not handled in any + /// way. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixListener; + /// use tokio::stream::StreamExt; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap(); + /// while let Some(stream) = listener.next().await { + /// match stream { + /// Ok(stream) => { + /// println!("new client!"); + /// } + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// } + /// ``` pub struct UnixListener { io: PollEvented, } @@ -142,6 +175,19 @@ impl UnixListener { } } +#[cfg(feature = "stream")] +impl crate::stream::Stream for UnixListener { + type Item = io::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let (socket, _) = ready!(self.poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + impl TryFrom for mio_uds::UnixListener { type Error = io::Error; -- cgit v1.2.3