summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/io/driver/scheduled_io.rs67
-rw-r--r--tokio/src/io/registration.rs15
-rw-r--r--tokio/src/lib.rs2
-rw-r--r--tokio/src/macros/cfg.rs2
-rw-r--r--tokio/src/net/tcp/incoming.rs42
-rw-r--r--tokio/src/net/tcp/listener.rs75
-rw-r--r--tokio/src/net/tcp/mod.rs4
-rw-r--r--tokio/src/net/tcp/stream.rs4
-rw-r--r--tokio/src/runtime/mod.rs4
-rw-r--r--tokio/src/task/spawn.rs2
10 files changed, 95 insertions, 122 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index bdf21798..0c0448c3 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -32,7 +32,7 @@ cfg_io_readiness! {
#[derive(Debug, Default)]
struct Waiters {
- #[cfg(any(feature = "udp", feature = "uds"))]
+ #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
/// List of all current waiters
list: WaitList,
@@ -186,33 +186,78 @@ impl ScheduledIo {
}
}
+ /// Notifies all pending waiters that have registered interest in `ready`.
+ ///
+ /// There may be many waiters to notify. Waking the pending task **must** be
+ /// done from outside of the lock otherwise there is a potential for a
+ /// deadlock.
+ ///
+ /// A stack array of wakers is created and filled with wakers to notify, the
+ /// lock is released, and the wakers are notified. Because there may be more
+ /// than 32 wakers to notify, if the stack array fills up, the lock is
+ /// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
+ const NUM_WAKERS: usize = 32;
+
+ let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
+ let mut curr = 0;
+
let mut waiters = self.waiters.lock();
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
- waker.wake();
+ wakers[curr] = Some(waker);
+ curr += 1;
}
}
// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
- waker.wake();
+ wakers[curr] = Some(waker);
+ curr += 1;
}
}
- #[cfg(any(feature = "udp", feature = "uds"))]
- {
- // check list of waiters
- for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
- let waiter = unsafe { &mut *waiter.as_ptr() };
- if let Some(waker) = waiter.waker.take() {
- waiter.is_ready = true;
- waker.wake();
+ #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
+ 'outer: loop {
+ let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
+
+ while curr < NUM_WAKERS {
+ match iter.next() {
+ Some(waiter) => {
+ let waiter = unsafe { &mut *waiter.as_ptr() };
+
+ if let Some(waker) = waiter.waker.take() {
+ waiter.is_ready = true;
+ wakers[curr] = Some(waker);
+ curr += 1;
+ }
+ }
+ None => {
+ break 'outer;
+ }
}
}
+
+ drop(waiters);
+
+ for waker in wakers.iter_mut().take(curr) {
+ waker.take().unwrap().wake();
+ }
+
+ curr = 0;
+
+ // Acquire the lock again.
+ waiters = self.waiters.lock();
+ }
+
+ // Release the lock before notifying
+ drop(waiters);
+
+ for waker in wakers.iter_mut().take(curr) {
+ waker.take().unwrap().wake();
}
}
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index 03221b60..ce6cffda 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -132,8 +132,19 @@ impl Registration {
cfg_io_readiness! {
impl Registration {
pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
- // TODO: does this need to return a `Result`?
- Ok(self.shared.readiness(interest).await)
+ use std::future::Future;
+ use std::pin::Pin;
+
+ let fut = self.shared.readiness(interest);
+ pin!(fut);
+
+ crate::future::poll_fn(|cx| {
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
+ }
+
+ Pin::new(&mut fut).poll(cx).map(Ok)
+ }).await
}
}
}
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 1b0dad5d..948ac888 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -306,7 +306,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index 328f3230..8f1536f8 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -176,7 +176,7 @@ macro_rules! cfg_not_io_driver {
macro_rules! cfg_io_readiness {
($($item:item)*) => {
$(
- #[cfg(any(feature = "udp", feature = "uds"))]
+ #[cfg(any(feature = "udp", feature = "uds", feature = "tcp"))]
$item
)*
}
diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs
deleted file mode 100644
index 062be1e9..00000000
--- a/tokio/src/net/tcp/incoming.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-use crate::net::tcp::{TcpListener, TcpStream};
-
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-/// Stream returned by the `TcpListener::incoming` function representing the
-/// stream of sockets received from a listener.
-#[must_use = "streams do nothing unless polled"]
-#[derive(Debug)]
-pub struct Incoming<'a> {
- inner: &'a mut TcpListener,
-}
-
-impl Incoming<'_> {
- pub(crate) fn new(listener: &mut TcpListener) -> Incoming<'_> {
- Incoming { inner: listener }
- }
-
- /// Attempts to poll `TcpStream` by polling inner `TcpListener` to accept
- /// connection.
- ///
- /// If `TcpListener` isn't ready yet, `Poll::Pending` is returned and
- /// current task will be notified by a waker.
- pub fn poll_accept(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<TcpStream>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Ok(socket))
- }
-}
-
-#[cfg(feature = "stream")]
-impl crate::stream::Stream for Incoming<'_> {
- type Item = io::Result<TcpStream>;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Some(Ok(socket)))
- }
-}
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index 133852d2..98c8961e 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -1,6 +1,5 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::tcp::{Incoming, TcpStream};
+use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
@@ -40,7 +39,7 @@ cfg_tcp! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
@@ -171,7 +170,7 @@ impl TcpListener {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
@@ -181,18 +180,25 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
- pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
- poll_fn(|cx| self.poll_accept(cx)).await
+ pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .await?;
+
+ let stream = TcpStream::new(mio)?;
+ Ok((stream, addr))
}
/// Polls to accept a new incoming connection to this listener.
///
- /// If there is no connection to accept, `Poll::Pending` is returned and
- /// the current task will be notified by a waker.
- pub fn poll_accept(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
+ /// If there is no connection to accept, `Poll::Pending` is returned and the
+ /// current task will be notified by a waker.
+ ///
+ /// When ready, the most recent task that called `poll_accept` is notified.
+ /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// single task. Failing to do this could result in tasks hanging.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
@@ -293,46 +299,6 @@ impl TcpListener {
self.io.get_ref().local_addr()
}
- /// Returns a stream over the connections being received on this listener.
- ///
- /// Note that `TcpListener` also directly implements `Stream`.
- ///
- /// The returned stream will never return `None` and will also not yield the
- /// peer's `SocketAddr` structure. Iterating over it is equivalent to
- /// calling accept in a loop.
- ///
- /// # Errors
- ///
- /// Note that accepting a connection can lead to various errors and not all
- /// of them are necessarily fatal ‒ for example having too many open file
- /// descriptors or the other side closing the connection while it waits in
- /// an accept queue. These would terminate the stream if not handled in any
- /// way.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::{net::TcpListener, stream::StreamExt};
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
- /// let mut incoming = listener.incoming();
- ///
- /// while let Some(stream) = incoming.next().await {
- /// match stream {
- /// Ok(stream) => {
- /// println!("new client!");
- /// }
- /// Err(e) => { /* connection failed */ }
- /// }
- /// }
- /// }
- /// ```
- pub fn incoming(&mut self) -> Incoming<'_> {
- Incoming::new(self)
- }
-
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`].
@@ -390,10 +356,7 @@ impl TcpListener {
impl crate::stream::Stream for TcpListener {
type Item = io::Result<TcpStream>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
diff --git a/tokio/src/net/tcp/mod.rs b/tokio/src/net/tcp/mod.rs
index c27038f9..7f0f6d91 100644
--- a/tokio/src/net/tcp/mod.rs
+++ b/tokio/src/net/tcp/mod.rs
@@ -1,10 +1,6 @@
//! TCP utility types
pub(crate) mod listener;
-pub(crate) use listener::TcpListener;
-
-mod incoming;
-pub use incoming::Incoming;
pub(crate) mod socket;
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 4349ea80..3f9d6670 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -22,8 +22,8 @@ cfg_tcp! {
/// traits. Examples import these traits through [the prelude].
///
/// [`connect`]: method@TcpStream::connect
- /// [accepting]: method@super::TcpListener::accept
- /// [listener]: struct@super::TcpListener
+ /// [accepting]: method@crate::net::TcpListener::accept
+ /// [listener]: struct@crate::net::TcpListener
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
/// [the prelude]: crate::prelude
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index a6a739be..22109f7d 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -25,7 +25,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
@@ -73,7 +73,7 @@
//!
//! // Spawn the root task
//! rt.block_on(async {
-//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs
index 280e90ea..d7aca572 100644
--- a/tokio/src/task/spawn.rs
+++ b/tokio/src/task/spawn.rs
@@ -37,7 +37,7 @@ doc_rt_core! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;