diff options
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 67 | ||||
-rw-r--r-- | tokio/src/io/registration.rs | 15 | ||||
-rw-r--r-- | tokio/src/lib.rs | 2 | ||||
-rw-r--r-- | tokio/src/macros/cfg.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/tcp/incoming.rs | 42 | ||||
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 75 | ||||
-rw-r--r-- | tokio/src/net/tcp/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/task/spawn.rs | 2 |
10 files changed, 95 insertions, 122 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index bdf21798..0c0448c3 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -32,7 +32,7 @@ cfg_io_readiness! { #[derive(Debug, Default)] struct Waiters { - #[cfg(any(feature = "udp", feature = "uds"))] + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] /// List of all current waiters list: WaitList, @@ -186,33 +186,78 @@ impl ScheduledIo { } } + /// Notifies all pending waiters that have registered interest in `ready`. + /// + /// There may be many waiters to notify. Waking the pending task **must** be + /// done from outside of the lock otherwise there is a potential for a + /// deadlock. + /// + /// A stack array of wakers is created and filled with wakers to notify, the + /// lock is released, and the wakers are notified. Because there may be more + /// than 32 wakers to notify, if the stack array fills up, the lock is + /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { + const NUM_WAKERS: usize = 32; + + let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); + let mut curr = 0; + let mut waiters = self.waiters.lock(); // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { - waker.wake(); + wakers[curr] = Some(waker); + curr += 1; } } // check for AsyncWrite slot if ready.is_writable() { if let Some(waker) = waiters.writer.take() { - waker.wake(); + wakers[curr] = Some(waker); + curr += 1; } } - #[cfg(any(feature = "udp", feature = "uds"))] - { - // check list of waiters - for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) { - let waiter = unsafe { &mut *waiter.as_ptr() }; - if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; - waker.wake(); + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + 'outer: loop { + let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); + + while curr < NUM_WAKERS { + match iter.next() { + Some(waiter) => { + let waiter = unsafe { &mut *waiter.as_ptr() }; + + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + wakers[curr] = Some(waker); + curr += 1; + } + } + None => { + break 'outer; + } } } + + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); + } + + curr = 0; + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // Release the lock before notifying + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); } } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 03221b60..ce6cffda 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -132,8 +132,19 @@ impl Registration { cfg_io_readiness! { impl Registration { pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> { - // TODO: does this need to return a `Result`? - Ok(self.shared.readiness(interest).await) + use std::future::Future; + use std::pin::Pin; + + let fut = self.shared.readiness(interest); + pin!(fut); + + crate::future::poll_fn(|cx| { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); + } + + Pin::new(&mut fut).poll(cx).map(Ok) + }).await } } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 1b0dad5d..948ac888 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -306,7 +306,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 328f3230..8f1536f8 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -176,7 +176,7 @@ macro_rules! cfg_not_io_driver { macro_rules! cfg_io_readiness { ($($item:item)*) => { $( - #[cfg(any(feature = "udp", feature = "uds"))] + #[cfg(any(feature = "udp", feature = "uds", feature = "tcp"))] $item )* } diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs deleted file mode 100644 index 062be1e9..00000000 --- a/tokio/src/net/tcp/incoming.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::net::tcp::{TcpListener, TcpStream}; - -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Stream returned by the `TcpListener::incoming` function representing the -/// stream of sockets received from a listener. -#[must_use = "streams do nothing unless polled"] -#[derive(Debug)] -pub struct Incoming<'a> { - inner: &'a mut TcpListener, -} - -impl Incoming<'_> { - pub(crate) fn new(listener: &mut TcpListener) -> Incoming<'_> { - Incoming { inner: listener } - } - - /// Attempts to poll `TcpStream` by polling inner `TcpListener` to accept - /// connection. - /// - /// If `TcpListener` isn't ready yet, `Poll::Pending` is returned and - /// current task will be notified by a waker. - pub fn poll_accept( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<TcpStream>> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Ok(socket)) - } -} - -#[cfg(feature = "stream")] -impl crate::stream::Stream for Incoming<'_> { - type Item = io::Result<TcpStream>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 133852d2..98c8961e 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -1,6 +1,5 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::tcp::{Incoming, TcpStream}; +use crate::net::tcp::TcpStream; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -40,7 +39,7 @@ cfg_tcp! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { /// let (socket, _) = listener.accept().await?; @@ -171,7 +170,7 @@ impl TcpListener { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// match listener.accept().await { /// Ok((_socket, addr)) => println!("new client: {:?}", addr), @@ -181,18 +180,25 @@ impl TcpListener { /// Ok(()) /// } /// ``` - pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { - poll_fn(|cx| self.poll_accept(cx)).await + pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + let (mio, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .await?; + + let stream = TcpStream::new(mio)?; + Ok((stream, addr)) } /// Polls to accept a new incoming connection to this listener. /// - /// If there is no connection to accept, `Poll::Pending` is returned and - /// the current task will be notified by a waker. - pub fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(TcpStream, SocketAddr)>> { + /// If there is no connection to accept, `Poll::Pending` is returned and the + /// current task will be notified by a waker. + /// + /// When ready, the most recent task that called `poll_accept` is notified. + /// The caller is responsble to ensure that `poll_accept` is called from a + /// single task. Failing to do this could result in tasks hanging. + pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; @@ -293,46 +299,6 @@ impl TcpListener { self.io.get_ref().local_addr() } - /// Returns a stream over the connections being received on this listener. - /// - /// Note that `TcpListener` also directly implements `Stream`. - /// - /// The returned stream will never return `None` and will also not yield the - /// peer's `SocketAddr` structure. Iterating over it is equivalent to - /// calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::{net::TcpListener, stream::StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub fn incoming(&mut self) -> Incoming<'_> { - Incoming::new(self) - } - /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. @@ -390,10 +356,7 @@ impl TcpListener { impl crate::stream::Stream for TcpListener { type Item = io::Result<TcpStream>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let (socket, _) = ready!(self.poll_accept(cx))?; Poll::Ready(Some(Ok(socket))) } diff --git a/tokio/src/net/tcp/mod.rs b/tokio/src/net/tcp/mod.rs index c27038f9..7f0f6d91 100644 --- a/tokio/src/net/tcp/mod.rs +++ b/tokio/src/net/tcp/mod.rs @@ -1,10 +1,6 @@ //! TCP utility types pub(crate) mod listener; -pub(crate) use listener::TcpListener; - -mod incoming; -pub use incoming::Incoming; pub(crate) mod socket; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 4349ea80..3f9d6670 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -22,8 +22,8 @@ cfg_tcp! { /// traits. Examples import these traits through [the prelude]. /// /// [`connect`]: method@TcpStream::connect - /// [accepting]: method@super::TcpListener::accept - /// [listener]: struct@super::TcpListener + /// [accepting]: method@crate::net::TcpListener::accept + /// [listener]: struct@crate::net::TcpListener /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt /// [the prelude]: crate::prelude diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index a6a739be..22109f7d 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -25,7 +25,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -73,7 +73,7 @@ //! //! // Spawn the root task //! rt.block_on(async { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 280e90ea..d7aca572 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -37,7 +37,7 @@ doc_rt_core! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { /// let (socket, _) = listener.accept().await?; |