summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAkshay Narayan <akshay.k.narayan@gmail.com>2020-02-26 13:38:41 -0500
committerGitHub <noreply@github.com>2020-02-26 13:38:41 -0500
commit0589acc9ffb9175647d6b7a794edd8812f79399f (patch)
tree58813a656a9c9c527b023118ea6e7cc48f11260b
parent1dadc701c04879f1df4ddaa0df362297a144d7b3 (diff)
Implement Stream for Listener types (#2275)
The Incoming types currently don't take ownership of the listener, but in most cases, users who want to use the Listener as a stream will only want to use the stream from that point on. So, implement Stream directly on the Listener types.
-rw-r--r--tokio/src/net/tcp/listener.rs45
-rw-r--r--tokio/src/net/unix/listener.rs46
2 files changed, 91 insertions, 0 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index b1a81758..75253d3f 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -12,8 +12,22 @@ use std::task::{Context, Poll};
cfg_tcp! {
/// A TCP socket server, listening for connections.
///
+ /// Also implements a stream over the connections being received on this listener.
+ ///
+ /// The stream will never return `None` and will also not yield the peer's
+ /// `SocketAddr` structure. Iterating over it is equivalent to calling accept in a loop.
+ ///
+ /// # Errors
+ ///
+ /// Note that accepting a connection can lead to various errors and not all
+ /// of them are necessarily fatal ‒ for example having too many open file
+ /// descriptors or the other side closing the connection while it waits in
+ /// an accept queue. These would terminate the stream if not handled in any
+ /// way.
+ ///
/// # Examples
///
+ /// Using [`TcpListener::accept`]:
/// ```no_run
/// use tokio::net::TcpListener;
///
@@ -34,6 +48,24 @@ cfg_tcp! {
/// }
/// }
/// ```
+ ///
+ /// Using `impl Stream`:
+ /// ```no_run
+ /// use tokio::{net::TcpListener, stream::StreamExt};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
+ /// while let Some(stream) = listener.next().await {
+ /// match stream {
+ /// Ok(stream) => {
+ /// println!("new client!");
+ /// }
+ /// Err(e) => { /* connection failed */ }
+ /// }
+ /// }
+ /// }
+ /// ```
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
}
@@ -335,6 +367,19 @@ impl TcpListener {
}
}
+#[cfg(feature = "stream")]
+impl crate::stream::Stream for TcpListener {
+ type Item = io::Result<TcpStream>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let (socket, _) = ready!(self.poll_accept(cx))?;
+ Poll::Ready(Some(Ok(socket)))
+ }
+}
+
impl TryFrom<TcpListener> for mio::net::TcpListener {
type Error = io::Error;
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index d61d0392..0e7affc2 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -14,6 +14,39 @@ use std::task::{Context, Poll};
cfg_uds! {
/// A Unix socket which can accept connections from other Unix sockets.
+ ///
+ /// Also implements a stream over the connections being received on this listener.
+ ///
+ /// The stream will never return `None` and will also not yield the peer's
+ /// `SocketAddr` structure. Iterating over it is equivalent to calling accept in a loop.
+ ///
+ /// # Errors
+ ///
+ /// Note that accepting a connection can lead to various errors and not all
+ /// of them are necessarily fatal ‒ for example having too many open file
+ /// descriptors or the other side closing the connection while it waits in
+ /// an accept queue. These would terminate the stream if not handled in any
+ /// way.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixListener;
+ /// use tokio::stream::StreamExt;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
+ /// while let Some(stream) = listener.next().await {
+ /// match stream {
+ /// Ok(stream) => {
+ /// println!("new client!");
+ /// }
+ /// Err(e) => { /* connection failed */ }
+ /// }
+ /// }
+ /// }
+ /// ```
pub struct UnixListener {
io: PollEvented<mio_uds::UnixListener>,
}
@@ -142,6 +175,19 @@ impl UnixListener {
}
}
+#[cfg(feature = "stream")]
+impl crate::stream::Stream for UnixListener {
+ type Item = io::Result<UnixStream>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let (socket, _) = ready!(self.poll_accept(cx))?;
+ Poll::Ready(Some(Ok(socket)))
+ }
+}
+
impl TryFrom<UnixListener> for mio_uds::UnixListener {
type Error = io::Error;