From 227533d456fe32e48ffcd3796f1e6c8f9318b230 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 25 Oct 2019 12:50:15 -0700 Subject: net: move into tokio crate (#1683) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `net` implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- tokio/Cargo.toml | 61 +- tokio/src/lib.rs | 16 +- tokio/src/net.rs | 67 -- tokio/src/net/addr.rs | 214 ++++++ tokio/src/net/driver/mod.rs | 133 ++++ tokio/src/net/driver/platform.rs | 28 + tokio/src/net/driver/reactor.rs | 451 ++++++++++++ tokio/src/net/driver/registration.rs | 269 +++++++ tokio/src/net/driver/sharded_rwlock.rs | 217 ++++++ tokio/src/net/mod.rs | 47 ++ tokio/src/net/tcp/incoming.rs | 31 + tokio/src/net/tcp/listener.rs | 358 +++++++++ tokio/src/net/tcp/mod.rs | 29 + tokio/src/net/tcp/split.rs | 97 +++ tokio/src/net/tcp/stream.rs | 827 +++++++++++++++++++++ tokio/src/net/udp/mod.rs | 13 + tokio/src/net/udp/socket.rs | 419 +++++++++++ tokio/src/net/udp/split.rs | 148 ++++ tokio/src/net/unix/datagram.rs | 233 ++++++ tokio/src/net/unix/incoming.rs | 31 + tokio/src/net/unix/listener.rs | 135 ++++ tokio/src/net/unix/mod.rs | 21 + tokio/src/net/unix/split.rs | 91 +++ tokio/src/net/unix/stream.rs | 339 +++++++++ tokio/src/net/unix/ucred.rs | 154 ++++ tokio/src/net/util/mod.rs | 4 + tokio/src/net/util/poll_evented.rs | 415 +++++++++++ tokio/src/process.rs | 2 - tokio/src/process/kill.rs | 13 + tokio/src/process/mod.rs | 1060 +++++++++++++++++++++++++++ tokio/src/process/unix/mod.rs | 225 ++++++ tokio/src/process/unix/orphan.rs | 190 +++++ tokio/src/process/unix/reap.rs | 334 +++++++++ tokio/src/process/windows.rs | 193 +++++ tokio/src/runtime/current_thread/builder.rs | 2 +- tokio/src/runtime/current_thread/runtime.rs | 2 +- tokio/src/runtime/mod.rs | 2 +- tokio/src/runtime/threadpool/builder.rs | 4 +- tokio/src/runtime/threadpool/mod.rs | 4 +- tokio/src/signal.rs | 4 - tokio/src/signal/ctrl_c.rs | 46 ++ tokio/src/signal/mod.rs | 93 +++ tokio/src/signal/registry.rs | 310 ++++++++ tokio/src/signal/unix.rs | 426 +++++++++++ tokio/src/signal/windows.rs | 217 ++++++ tokio/tests/buffered.rs | 1 - tokio/tests/drop-core.rs | 48 -- tokio/tests/net_bind_resource.rs | 11 + tokio/tests/net_driver.rs | 83 +++ tokio/tests/net_driver_drop.rs | 47 ++ tokio/tests/process_issue_42.rs | 56 ++ tokio/tests/process_smoke.rs | 29 + tokio/tests/reactor.rs | 84 --- tokio/tests/signal_ctrl_c.rs | 28 + tokio/tests/signal_drop_recv.rs | 22 + tokio/tests/signal_drop_rt.rs | 37 + tokio/tests/signal_drop_signal.rs | 26 + tokio/tests/signal_multi_rt.rs | 47 ++ tokio/tests/signal_no_rt.rs | 10 + tokio/tests/signal_notify_both.rs | 23 + tokio/tests/signal_twice.rs | 25 + tokio/tests/signal_usr1.rs | 23 + tokio/tests/support/signal.rs | 7 + tokio/tests/tcp_accept.rs | 39 + tokio/tests/tcp_connect.rs | 228 ++++++ tokio/tests/tcp_echo.rs | 41 ++ tokio/tests/tcp_peek.rs | 28 + tokio/tests/tcp_shutdown.rs | 28 + tokio/tests/tcp_split.rs | 1 + tokio/tests/udp.rs | 72 ++ tokio/tests/uds_cred.rs | 29 + tokio/tests/uds_datagram.rs | 69 ++ tokio/tests/uds_split.rs | 42 ++ tokio/tests/uds_stream.rs | 34 + 74 files changed, 8963 insertions(+), 230 deletions(-) delete mode 100644 tokio/src/net.rs create mode 100644 tokio/src/net/addr.rs create mode 100644 tokio/src/net/driver/mod.rs create mode 100644 tokio/src/net/driver/platform.rs create mode 100644 tokio/src/net/driver/reactor.rs create mode 100644 tokio/src/net/driver/registration.rs create mode 100644 tokio/src/net/driver/sharded_rwlock.rs create mode 100644 tokio/src/net/mod.rs create mode 100644 tokio/src/net/tcp/incoming.rs create mode 100644 tokio/src/net/tcp/listener.rs create mode 100644 tokio/src/net/tcp/mod.rs create mode 100644 tokio/src/net/tcp/split.rs create mode 100644 tokio/src/net/tcp/stream.rs create mode 100644 tokio/src/net/udp/mod.rs create mode 100644 tokio/src/net/udp/socket.rs create mode 100644 tokio/src/net/udp/split.rs create mode 100644 tokio/src/net/unix/datagram.rs create mode 100644 tokio/src/net/unix/incoming.rs create mode 100644 tokio/src/net/unix/listener.rs create mode 100644 tokio/src/net/unix/mod.rs create mode 100644 tokio/src/net/unix/split.rs create mode 100644 tokio/src/net/unix/stream.rs create mode 100644 tokio/src/net/unix/ucred.rs create mode 100644 tokio/src/net/util/mod.rs create mode 100644 tokio/src/net/util/poll_evented.rs delete mode 100644 tokio/src/process.rs create mode 100644 tokio/src/process/kill.rs create mode 100644 tokio/src/process/mod.rs create mode 100644 tokio/src/process/unix/mod.rs create mode 100644 tokio/src/process/unix/orphan.rs create mode 100644 tokio/src/process/unix/reap.rs create mode 100644 tokio/src/process/windows.rs delete mode 100644 tokio/src/signal.rs create mode 100644 tokio/src/signal/ctrl_c.rs create mode 100644 tokio/src/signal/mod.rs create mode 100644 tokio/src/signal/registry.rs create mode 100644 tokio/src/signal/unix.rs create mode 100644 tokio/src/signal/windows.rs delete mode 100644 tokio/tests/drop-core.rs create mode 100644 tokio/tests/net_bind_resource.rs create mode 100644 tokio/tests/net_driver.rs create mode 100644 tokio/tests/net_driver_drop.rs create mode 100644 tokio/tests/process_issue_42.rs create mode 100644 tokio/tests/process_smoke.rs delete mode 100644 tokio/tests/reactor.rs create mode 100644 tokio/tests/signal_ctrl_c.rs create mode 100644 tokio/tests/signal_drop_recv.rs create mode 100644 tokio/tests/signal_drop_rt.rs create mode 100644 tokio/tests/signal_drop_signal.rs create mode 100644 tokio/tests/signal_multi_rt.rs create mode 100644 tokio/tests/signal_no_rt.rs create mode 100644 tokio/tests/signal_notify_both.rs create mode 100644 tokio/tests/signal_twice.rs create mode 100644 tokio/tests/signal_usr1.rs create mode 100644 tokio/tests/support/signal.rs create mode 100644 tokio/tests/tcp_accept.rs create mode 100644 tokio/tests/tcp_connect.rs create mode 100644 tokio/tests/tcp_echo.rs create mode 100644 tokio/tests/tcp_peek.rs create mode 100644 tokio/tests/tcp_shutdown.rs create mode 100644 tokio/tests/tcp_split.rs create mode 100644 tokio/tests/udp.rs create mode 100644 tokio/tests/uds_cred.rs create mode 100644 tokio/tests/uds_datagram.rs create mode 100644 tokio/tests/uds_split.rs create mode 100644 tokio/tests/uds_stream.rs (limited to 'tokio') diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ba6e0af0..4b87917c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -27,7 +27,7 @@ keywords = ["io", "async", "non-blocking", "futures"] default = [ "fs", "io", - "net", + "net-full", "process", "rt-full", "signal", @@ -35,31 +35,47 @@ default = [ "timer", ] -fs = [] -io = ["tokio-io"] +fs = ["tokio-executor/blocking"] +io = ["tokio-io", "bytes", "iovec"] macros = ["tokio-macros"] -net = ["tcp", "udp", "uds"] +net-full = ["tcp", "udp", "uds"] +net-driver = ["mio", "tokio-executor/blocking"] rt-current-thread = [ "timer", - "tokio-net", "tokio-executor/current-thread", ] rt-full = [ "macros", "num_cpus", - "net", + "net-full", "sync", "timer", "tokio-executor/current-thread", "tokio-executor/thread-pool", ] -signal = ["tokio-net/signal"] +signal = [ + "lazy_static", + "libc", + "mio-uds", + "net-driver", + "signal-hook-registry" +] sync = ["tokio-sync"] -tcp = ["io", "tokio-net/tcp"] +tcp = ["io", "net-driver"] timer = ["crossbeam-utils", "slab"] -udp = ["io", "tokio-net/udp"] -uds = ["io", "tokio-net/uds"] -process = ["io", "tokio-net/process"] +udp = ["io", "net-driver"] +uds = ["io", "net-driver", "mio-uds", "libc"] +process = [ + "crossbeam-queue", + "io", + "libc", + "mio-named-pipes", + "signal", + "winapi/consoleapi", + "winapi/minwindef", + "winapi/threadpoollegacyapiset", + "winapi/winerror", +] [dependencies] futures-core-preview = "=0.3.0-alpha.19" @@ -67,23 +83,38 @@ futures-sink-preview = "=0.3.0-alpha.19" futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink"] } # Everything else is optional... +bytes = { version = "0.4", optional = true } crossbeam-utils = { version = "0.6.0", optional = true } +iovec = { version = "0.1", optional = true } +lazy_static = { version = "1.0.2", optional = true } +mio = { version = "0.6.14", optional = true } num_cpus = { version = "1.8.0", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } tokio-io = { version = "=0.2.0-alpha.6", optional = true, features = ["util"], path = "../tokio-io" } tokio-executor = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-executor" } tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } -tokio-net = { version = "=0.2.0-alpha.6", optional = true, features = ["async-traits"], path = "../tokio-net" } tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] } +[target.'cfg(unix)'.dependencies] +crossbeam-queue = { version = "0.1.2", optional = true } +mio-uds = { version = "0.6.5", optional = true } +libc = { version = "0.2.42", optional = true } +signal-hook-registry = { version = "1.1.1", optional = true } + +[target.'cfg(windows)'.dependencies] +mio-named-pipes = { version = "0.1.6", optional = true } + +[target.'cfg(windows)'.dependencies.winapi] +version = "0.3.8" +default-features = false +optional = true + [dev-dependencies] tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } -futures-preview = "=0.3.0-alpha.19" -futures-util-preview = "=0.3.0-alpha.19" -pin-utils = "=0.1.0-alpha.4" +futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } env_logger = { version = "0.6", default-features = false } flate2 = { version = "1", features = ["tokio"] } http = "0.1" diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b8682c52..814f2a59 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -26,7 +26,7 @@ //! //! Guide level documentation is found on the [website]. //! -//! [driver]: tokio_net::driver +//! [driver]: tokio::net::driver //! [website]: https://tokio.rs/docs/ //! //! # Examples @@ -82,23 +82,34 @@ macro_rules! if_runtime { #[cfg(feature = "timer")] pub mod clock; + #[cfg(feature = "codec")] pub mod codec; + #[cfg(feature = "fs")] pub mod fs; + pub mod future; + #[cfg(feature = "io")] pub mod io; -#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + +#[cfg(feature = "net-driver")] pub mod net; + pub mod prelude; + #[cfg(feature = "process")] pub mod process; + #[cfg(feature = "signal")] pub mod signal; + pub mod stream; + #[cfg(feature = "sync")] pub mod sync; + #[cfg(feature = "timer")] pub mod timer; @@ -113,6 +124,7 @@ if_runtime! { #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::main; + #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::test; diff --git a/tokio/src/net.rs b/tokio/src/net.rs deleted file mode 100644 index 6aa1e499..00000000 --- a/tokio/src/net.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! TCP/UDP/Unix bindings for `tokio`. -//! -//! This module contains the TCP/UDP/Unix networking types, similar to the standard -//! library, which can be used to implement networking protocols. -//! -//! # Organization -//! -//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP -//! * [`UdpSocket`] provides functionality for communication over UDP -//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a -//! Unix Domain Stream Socket **(available on Unix only)** -//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication -//! over Unix Domain Datagram Socket **(available on Unix only)** - -//! -//! [`TcpListener`]: struct.TcpListener.html -//! [`TcpStream`]: struct.TcpStream.html -//! [`UdpSocket`]: struct.UdpSocket.html -//! [`UnixListener`]: struct.UnixListener.html -//! [`UnixStream`]: struct.UnixStream.html -//! [`UnixDatagram`]: struct.UnixDatagram.html -//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html - -#[cfg(feature = "tcp")] -pub mod tcp { - //! TCP bindings for `tokio`. - //! - //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s - //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` - //! implements a future which returns a `TcpStream`. - //! - //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s - //! [`incoming`][incoming_method] method can be used to accept new connections. - //! It return the [`Incoming`] struct, which implements a stream which returns - //! `TcpStream`s. - //! - //! [`TcpStream`]: struct.TcpStream.html - //! [`connect`]: struct.TcpStream.html#method.connect - //! [`ConnectFuture`]: struct.ConnectFuture.html - //! [`TcpListener`]: struct.TcpListener.html - //! [incoming_method]: struct.TcpListener.html#method.incoming - //! [`Incoming`]: struct.Incoming.html - pub use tokio_net::tcp::{split, Incoming, TcpListener, TcpStream}; -} -#[cfg(feature = "tcp")] -pub use self::tcp::{TcpListener, TcpStream}; - -#[cfg(feature = "udp")] -pub mod udp { - //! UDP bindings for `tokio`. - //! - //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. - //! - //! [`UdpSocket`]: struct.UdpSocket.html - pub use tokio_net::udp::{split, UdpSocket}; -} -#[cfg(feature = "udp")] -pub use self::udp::UdpSocket; - -#[cfg(all(unix, feature = "uds"))] -pub mod unix { - //! Unix domain socket bindings for `tokio` (only available on unix systems). - - pub use tokio_net::uds::{split, UCred, UnixDatagram, UnixListener, UnixStream}; -} -#[cfg(all(unix, feature = "uds"))] -pub use self::unix::{UnixDatagram, UnixListener, UnixStream}; diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs new file mode 100644 index 00000000..8b782cfa --- /dev/null +++ b/tokio/src/net/addr.rs @@ -0,0 +1,214 @@ +use tokio_executor::blocking; + +use futures_util::future; +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +/// Convert or resolve without blocking to one or more `SocketAddr` values. +/// +/// Currently, this trait is only used as an argument to Tokio functions that +/// need to reference a target socket address. +/// +/// This trait is sealed and is intended to be opaque. Users of Tokio should +/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call +/// the functions directly or reference associated types. Changing these is not +/// considered a breaking change. +pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {} + +type ReadyFuture = future::Ready>; + +// ===== impl SocketAddr ===== + +impl ToSocketAddrs for SocketAddr {} + +impl sealed::ToSocketAddrsPriv for SocketAddr { + type Iter = std::option::IntoIter; + type Future = ReadyFuture; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(*self).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl str ===== + +impl ToSocketAddrs for str {} + +impl sealed::ToSocketAddrsPriv for str { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + + // First check if the input parses as a socket address + let res: Result = self.parse(); + + if let Ok(addr) = res { + return MaybeReady::Ready(Some(addr)); + } + + // Run DNS lookup on the blocking pool + let s = self.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&s) + })) + } +} + +// ===== impl (&str, u16) ===== + +impl ToSocketAddrs for (&'_ str, u16) {} + +impl sealed::ToSocketAddrsPriv for (&'_ str, u16) { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + use std::net::{SocketAddrV4, SocketAddrV6}; + + let (host, port) = *self; + + // try to parse the host as a regular IP address first + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV4::new(addr, port); + let addr = SocketAddr::V4(addr); + + return MaybeReady::Ready(Some(addr)); + } + + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + let addr = SocketAddr::V6(addr); + + return MaybeReady::Ready(Some(addr)); + } + + let host = host.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) + })) + } +} + +// ===== impl (IpAddr, u16) ===== + +impl ToSocketAddrs for (IpAddr, u16) {} + +impl sealed::ToSocketAddrsPriv for (IpAddr, u16) { + type Iter = std::option::IntoIter; + type Future = ReadyFuture; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(SocketAddr::from(*self)).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl String ===== + +impl ToSocketAddrs for String {} + +impl sealed::ToSocketAddrsPriv for String { + type Iter = ::Iter; + type Future = ::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (&self[..]).to_socket_addrs() + } +} + +// ===== impl &'_ impl ToSocketAddrs ===== + +impl ToSocketAddrs for &'_ T {} + +impl sealed::ToSocketAddrsPriv for &'_ T +where + T: sealed::ToSocketAddrsPriv + ?Sized, +{ + type Iter = T::Iter; + type Future = T::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (**self).to_socket_addrs() + } +} + +pub(crate) mod sealed { + //! The contents of this trait are intended to remain private and __not__ + //! part of the `ToSocketAddrs` public API. The details will change over + //! time. + + use tokio_executor::blocking::Blocking; + + use futures_core::ready; + use std::future::Future; + use std::io; + use std::net::SocketAddr; + use std::option; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::vec; + + #[doc(hidden)] + pub trait ToSocketAddrsPriv { + type Iter: Iterator + Send + 'static; + type Future: Future> + Send + 'static; + + fn to_socket_addrs(&self) -> Self::Future; + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum MaybeReady { + Ready(Option), + Blocking(Blocking>>), + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum OneOrMore { + One(option::IntoIter), + More(vec::IntoIter), + } + + impl Future for MaybeReady { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match *self { + MaybeReady::Ready(ref mut i) => { + let iter = OneOrMore::One(i.take().into_iter()); + Poll::Ready(Ok(iter)) + } + MaybeReady::Blocking(ref mut rx) => { + let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More); + + Poll::Ready(res) + } + } + } + } + + impl Iterator for OneOrMore { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + match self { + OneOrMore::One(i) => i.next(), + OneOrMore::More(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + OneOrMore::One(i) => i.size_hint(), + OneOrMore::More(i) => i.size_hint(), + } + } + } +} diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs new file mode 100644 index 00000000..9079ccc7 --- /dev/null +++ b/tokio/src/net/driver/mod.rs @@ -0,0 +1,133 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ``` +//! use tokio::net::TcpStream; +//! +//! # async fn process(_t: T) {} +//! +//! # #[tokio::main] +//! # async fn dox() -> Result<(), Box> { +//! let stream = TcpStream::connect("93.184.216.34:9243").await?; +//! +//! println!("successfully connected"); +//! +//! process(stream).await; +//! # Ok(()) +//! # } +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Eager registration +//! +//! Notice how the snippet does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with the current +//! reactor, but no reactor is specified. This works because a reactor +//! instance is automatically made available when using the Tokio [runtime], +//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a +//! thread-local variable referencing the associated [`Reactor`] instance and +//! [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/internals/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub(crate) mod platform; +mod reactor; +mod registration; + +pub use self::reactor::{set_default, DefaultGuard, Handle, Reactor}; +pub use self::registration::Registration; diff --git a/tokio/src/net/driver/platform.rs b/tokio/src/net/driver/platform.rs new file mode 100644 index 00000000..4cfe7345 --- /dev/null +++ b/tokio/src/net/driver/platform.rs @@ -0,0 +1,28 @@ +pub(crate) use self::sys::*; + +#[cfg(unix)] +mod sys { + use mio::unix::UnixReady; + use mio::Ready; + + pub(crate) fn hup() -> Ready { + UnixReady::hup().into() + } + + pub(crate) fn is_hup(ready: Ready) -> bool { + UnixReady::from(ready).is_hup() + } +} + +#[cfg(windows)] +mod sys { + use mio::Ready; + + pub(crate) fn hup() -> Ready { + Ready::empty() + } + + pub(crate) fn is_hup(_: Ready) -> bool { + false + } +} diff --git a/tokio/src/net/driver/reactor.rs b/tokio/src/net/driver/reactor.rs new file mode 100644 index 00000000..384abe47 --- /dev/null +++ b/tokio/src/net/driver/reactor.rs @@ -0,0 +1,451 @@ +use super::platform; + +use tokio_executor::park::{Park, Unpark}; +use tokio_sync::AtomicWaker; + +use mio::event::Evented; +use slab::Slab; +use std::cell::RefCell; +use std::io; +use std::marker::PhantomData; +#[cfg(all(unix, not(target_os = "fuchsia")))] +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, RwLock, Weak}; +use std::task::Waker; +use std::time::Duration; +use std::{fmt, usize}; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +#[derive(Clone)] +pub struct Handle { + inner: Weak, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +pub(super) struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + pub(super) io_dispatch: RwLock>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness, +} + +pub(super) struct ScheduledIo { + aba_guard: usize, + pub(super) readiness: AtomicUsize, + pub(super) reader: AtomicWaker, + pub(super) writer: AtomicWaker, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +thread_local! { + /// Tracks the reactor for the current execution context. + static CURRENT_REACTOR: RefCell> = RefCell::new(None) +} + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert() {} + + _assert::(); +} + +// ===== impl Reactor ===== + +#[derive(Debug)] +/// Guard that resets current reactor on drop. +pub struct DefaultGuard<'a> { + _lifetime: PhantomData<&'a u8>, +} + +impl Drop for DefaultGuard<'_> { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } +} + +/// Sets handle for a default reactor, returning guard that unsets it on drop. +pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + + assert!( + current.is_none(), + "default Tokio reactor already set \ + for execution context" + ); + + *current = Some(handle.clone()); + }); + + DefaultGuard { + _lifetime: PhantomData, + } +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register( + &wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level(), + )?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option) -> io::Result { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + self.inner.io_dispatch.read().unwrap().is_empty() + } + + fn poll(&mut self, max_wait: Option) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + // Process all the events that came in, dispatching appropriately + + for event in self.events.iter() { + let token = event.token(); + + if token == TOKEN_WAKEUP { + self.inner + .wakeup + .set_readiness(mio::Ready::empty()) + .unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let aba_guard = token.0 & !MAX_SOURCES; + let token = token.0 & MAX_SOURCES; + + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read().unwrap(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; + + if aba_guard != io.aba_guard { + return; + } + + io.readiness.fetch_or(ready.as_usize(), Relaxed); + + if ready.is_writable() || platform::is_hup(ready) { + wr = io.writer.take_waker(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_waker(); + } + } + + if let Some(w) = rd { + w.wake(); + } + + if let Some(w) = wr { + w.wake(); + } + } +} + +#[cfg(all(unix, not(target_os = "fuchsia")))] +impl AsRawFd for Reactor { + fn as_raw_fd(&self) -> RawFd { + self.inner.io.as_raw_fd() + } +} + +impl Park for Reactor { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Reactor") + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set. + pub(super) fn current() -> Self { + CURRENT_REACTOR.with(|current| match *current.borrow() { + Some(ref handle) => handle.clone(), + None => panic!("no current reactor"), + }) + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + pub(super) fn inner(&self) -> Option> { + self.inner.upgrade() + } +} + +impl Unpark for Handle { + fn unpark(&self) { + self.wakeup(); + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Register an I/O resource with the reactor. + /// + /// The registration token is returned. + pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result { + // Get an ABA guard value + let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); + + let key = { + // Block to contain the write lock + let mut io_dispatch = self.io_dispatch.write().unwrap(); + + if io_dispatch.len() == MAX_SOURCES { + return Err(io::Error::new( + io::ErrorKind::Other, + "reactor at max \ + registered I/O resources", + )); + } + + io_dispatch.insert(ScheduledIo { + aba_guard, + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + }) + }; + + let token = aba_guard | key; + + self.io.register( + source, + mio::Token(token), + mio::Ready::all(), + mio::PollOpt::edge(), + )?; + + Ok(key) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { + self.io.deregister(source) + } + + pub(super) fn drop_source(&self, token: usize) { + self.io_dispatch.write().unwrap().remove(token); + } + + /// Registers interest in the I/O resource associated with `token`. + pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) { + let io_dispatch = self.io_dispatch.read().unwrap(); + let sched = io_dispatch.get(token).unwrap(); + + let (waker, ready) = match dir { + Direction::Read => (&sched.reader, !mio::Ready::writable()), + Direction::Write => (&sched.writer, mio::Ready::writable()), + }; + + waker.register(w); + + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + waker.wake(); + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // When a reactor is dropped it needs to wake up all blocked tasks as + // they'll never receive a notification, and all connected I/O objects + // will start returning errors pretty quickly. + let io = self.io_dispatch.read().unwrap(); + for (_, io) in io.iter() { + io.writer.wake(); + io.reader.wake(); + } + } +} + +impl Direction { + pub(super) fn mask(self) -> mio::Ready { + match self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), + } + } +} diff --git a/tokio/src/net/driver/registration.rs b/tokio/src/net/driver/registration.rs new file mode 100644 index 00000000..91cceef6 --- /dev/null +++ b/tokio/src/net/driver/registration.rs @@ -0,0 +1,269 @@ +use super::platform; +use super::reactor::{Direction, Handle}; + +use mio::{self, Evented}; +use std::sync::atomic::Ordering::SeqCst; +use std::task::{Context, Poll}; +use std::{io, usize}; + +/// Associates an I/O resource with the reactor instance that drives it. +/// +/// A registration represents an I/O resource registered with a Reactor such +/// that it will receive task notifications on readiness. This is the lowest +/// level API for integrating with a reactor. +/// +/// The association between an I/O resource is made by calling [`new`]. Once +/// the association is established, it remains established until the +/// registration instance is dropped. +/// +/// A registration instance represents two separate readiness streams. One for +/// the read readiness and one for write readiness. These streams are +/// independent and can be consumed from separate tasks. +/// +/// **Note**: while `Registration` is `Sync`, the caller must ensure that there +/// are at most two tasks that use a registration instance concurrently. One +/// task for [`poll_read_ready`] and one task for [`poll_write_ready`]. While +/// violating this requirement is "safe" from a Rust memory safety point of +/// view, it will result in unexpected behavior in the form of lost +/// notifications and tasks hanging. +/// +/// ## Platform-specific events +/// +/// `Registration` also allows receiving platform-specific `mio::Ready` events. +/// These events are included as part of the read readiness event stream. The +/// write readiness event stream is only for `Ready::writable()` events. +/// +/// [`new`]: #method.new +/// [`poll_read_ready`]: #method.poll_read_ready`] +/// [`poll_write_ready`]: #method.poll_write_ready`] +#[derive(Debug)] +pub struct Registration { + handle: Handle, + token: usize, +} + +// ===== impl Registration ===== + +impl Registration { + /// Register the I/O resource with the default reactor. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + pub fn new(io: &T) -> io::Result + where + T: Evented, + { + let handle = Handle::current(); + let token = if let Some(inner) = handle.inner() { + inner.add_source(io)? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + Ok(Self { handle, token }) + } + + /// Deregister the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub fn deregister(&mut self, io: &T) -> io::Result<()> + where + T: Evented, + { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + inner.deregister_source(io) + } + + /// Poll for events on the I/O resource's read readiness stream. + /// + /// If the I/O resource receives a new read readiness event since the last + /// call to `poll_read_ready`, it is returned. If it has not, the current + /// task is notified once a new event is received. + /// + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. + /// + /// Ensure that [`register`] has been called first. + /// + /// # Return value + /// + /// There are several possible return values: + /// + /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received + /// a new readiness event. The readiness value is included. + /// + /// * `Poll::Pending` means that no new readiness events have been received + /// since the last call to `poll_read_ready`. + /// + /// * `Poll::Ready(Err(err))` means that the registration has encountered an + /// error. This error either represents a permanent internal error **or** + /// the fact that [`register`] was not called first. + /// + /// [`register`]: #method.register + /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + let v = self.poll_ready(Direction::Read, Some(cx))?; + match v { + Some(v) => Poll::Ready(Ok(v)), + None => Poll::Pending, + } + } + + /// Consume any pending read readiness event. + /// + /// This function is identical to [`poll_read_ready`] **except** that it + /// will not notify the current task when a new event is received. As such, + /// it is safe to call this function from outside of a task context. + /// + /// [`poll_read_ready`]: #method.poll_read_ready + pub fn take_read_ready(&self) -> io::Result> { + self.poll_ready(Direction::Read, None) + } + + /// Poll for events on the I/O resource's write readiness stream. + /// + /// If the I/O resource receives a new write readiness event since the last + /// call to `poll_write_ready`, it is returned. If it has not, the current + /// task is notified once a new event is received. + /// + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. + /// + /// Ensure that [`register`] has been called first. + /// + /// # Return value + /// + /// There are several possible return values: + /// + /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received + /// a new readiness event. The readiness value is included. + /// + /// * `Poll::Pending` means that no new readiness events have been received + /// since the last call to `poll_write_ready`. + /// + /// * `Poll::Ready(Err(err))` means that the registration has encountered an + /// error. This error either represents a permanent internal error **or** + /// the fact that [`register`] was not called first. + /// + /// [`register`]: #method.register + /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + let v = self.poll_ready(Direction::Write, Some(cx))?; + match v { + Some(v) => Poll::Ready(Ok(v)), + None => Poll::Pending, + } + } + + /// Consume any pending write readiness event. + /// + /// This function is identical to [`poll_write_ready`] **except** that it + /// will not notify the current task when a new event is received. As such, + /// it is safe to call this function from outside of a task context. + /// + /// [`poll_write_ready`]: #method.poll_write_ready + pub fn take_write_ready(&self) -> io::Result> { + self.poll_ready(Direction::Write, None) + } + + /// Poll for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. + fn poll_ready( + &self, + direction: Direction, + cx: Option<&mut Context<'_>>, + ) -> io::Result> { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + + // If the task should be notified about new events, ensure that it has + // been registered + if let Some(ref cx) = cx { + inner.register(self.token, direction, cx.waker().clone()) + } + + let mask = direction.mask(); + let mask_no_hup = (mask - platform::hup()).as_usize(); + + let io_dispatch = inner.io_dispatch.read().unwrap(); + let sched = &io_dispatch[self.token]; + + // This consumes the current readiness state **except** for HUP. HUP is + // excluded because a) it is a final state and never transitions out of + // HUP and b) both the read AND the write directions need to be able to + // observe this state. + // + // If HUP were to be cleared when `direction` is `Read`, then when + // `poll_ready` is called again with a _`direction` of `Write`, the HUP + // state would not be visible. + let mut ready = + mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + + if ready.is_empty() { + if let Some(cx) = cx { + // Update the task info + match direction { + Direction::Read => sched.reader.register_by_ref(cx.waker()), + Direction::Write => sched.writer.register_by_ref(cx.waker()), + } + + // Try again + ready = + mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + } + } + + if ready.is_empty() { + Ok(None) + } else { + Ok(Some(ready)) + } + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +impl Drop for Registration { + fn drop(&mut self) { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return, + }; + inner.drop_source(self.token); + } +} diff --git a/tokio/src/net/driver/sharded_rwlock.rs b/tokio/src/net/driver/sharded_rwlock.rs new file mode 100644 index 00000000..67892481 --- /dev/null +++ b/tokio/src/net/driver/sharded_rwlock.rs @@ -0,0 +1,217 @@ +//! A scalable reader-writer lock. +//! +//! This implementation makes read operations faster and more scalable due to less contention, +//! while making write operations slower. It also incurs much higher memory overhead than +//! traditional reader-writer locks. + +use crossbeam_utils::CachePadded; +use lazy_static::lazy_static; +use num_cpus; +use parking_lot; +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Mutex; +use std::thread::{self, ThreadId}; + +/// A scalable read-writer lock. +/// +/// This type of lock allows a number of readers or at most one writer at any point in time. The +/// write portion of this lock typically allows modification of the underlying data (exclusive +/// access) and the read portion of this lock typically allows for read-only access (shared +/// access). +/// +/// This reader-writer lock differs from typical implementations in that it internally creates a +/// list of reader-writer locks called 'shards'. Shards are aligned and padded to the cache line +/// size. +/// +/// Read operations lock only one shard specific to the current thread, while write operations lock +/// every shard in succession. This strategy makes concurrent read operations faster due to less +/// contention, but write operations are slower due to increased amount of locking. +pub(crate) struct RwLock { + /// A list of locks protecting the internal data. + shards: Vec>>, + + /// The internal data. + value: UnsafeCell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +impl RwLock { + /// Creates a new `RwLock` initialized with `value`. + pub(crate) fn new(value: T) -> RwLock { + // The number of shards is a power of two so that the modulo operation in `read` becomes a + // simple bitwise "and". + let num_shards = num_cpus::get().next_power_of_two(); + + RwLock { + shards: (0..num_shards) + .map(|_| CachePadded::new(parking_lot::RwLock::new(()))) + .collect(), + value: UnsafeCell::new(value), + } + } + + /// Locks this `RwLock` with shared read access, blocking the current thread until it can be + /// acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access once it is dropped. + pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let shard_index = thread_index() & (self.shards.len() - 1); + + RwLockReadGuard { + parent: self, + _guard: self.shards[shard_index].read(), + _marker: PhantomData, + } + } + + /// Locks this rwlock with exclusive write access, blocking the current thread until it can be + /// acquired. + /// + /// This function will not return while other writers or other readers currently have access to + /// the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock when dropped. + pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> { + // Write-lock each shard in succession. + for shard in &self.shards { + // The write guard is forgotten, but the lock will be manually unlocked in `drop`. + mem::forget(shard.write()); + } + + RwLockWriteGuard { + parent: self, + _marker: PhantomData, + } + } +} + +/// A guard used to release the shared read access of a `RwLock` when dropped. +pub(crate) struct RwLockReadGuard<'a, T> { + parent: &'a RwLock, + _guard: parking_lot::RwLockReadGuard<'a, ()>, + _marker: PhantomData>, +} + +unsafe impl<'a, T: Sync> Sync for RwLockReadGuard<'a, T> {} + +impl<'a, T> Deref for RwLockReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +/// A guard used to release the exclusive write access of a `RwLock` when dropped. +pub(crate) struct RwLockWriteGuard<'a, T> { + parent: &'a RwLock, + _marker: PhantomData>, +} + +unsafe impl<'a, T: Sync> Sync for RwLockWriteGuard<'a, T> {} + +impl<'a, T> Drop for RwLockWriteGuard<'a, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.parent.shards.iter().rev() { + unsafe { + shard.force_unlock_write(); + } + } + } +} + +impl<'a, T> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.parent.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. Indices usually tend to be consecutive numbers +/// between 0 and the number of running threads, but there are no guarantees. During TLS teardown +/// the associated index might change. +#[inline] +pub(crate) fn thread_index() -> usize { + REGISTRATION.try_with(|reg| reg.index).unwrap_or(0) +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap, + + /// A list of free indices. + free_list: Vec, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +lazy_static! { + static ref THREAD_INDICES: Mutex = Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }); +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = THREAD_INDICES.lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = THREAD_INDICES.lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +} diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs new file mode 100644 index 00000000..2ebf773f --- /dev/null +++ b/tokio/src/net/mod.rs @@ -0,0 +1,47 @@ +//! TCP/UDP/Unix bindings for `tokio`. +//! +//! This module contains the TCP/UDP/Unix networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] provides functionality for communication over UDP +//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a +//! Unix Domain Stream Socket **(available on Unix only)** +//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication +//! over Unix Domain Datagram Socket **(available on Unix only)** + +//! +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! [`UnixListener`]: struct.UnixListener.html +//! [`UnixStream`]: struct.UnixStream.html +//! [`UnixDatagram`]: struct.UnixDatagram.html +//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html + +mod addr; +pub use addr::ToSocketAddrs; + +pub mod driver; + +pub mod util; + +#[cfg(feature = "tcp")] +pub mod tcp; + +#[cfg(feature = "tcp")] +pub use self::tcp::{TcpListener, TcpStream}; + +#[cfg(feature = "udp")] +pub mod udp; + +#[cfg(feature = "udp")] +pub use self::udp::UdpSocket; + +#[cfg(all(unix, feature = "uds"))] +pub mod unix; + +#[cfg(all(unix, feature = "uds"))] +pub use self::unix::{UnixDatagram, UnixListener, UnixStream}; diff --git a/tokio/src/net/tcp/incoming.rs b/tokio/src/net/tcp/incoming.rs new file mode 100644 index 00000000..0339615a --- /dev/null +++ b/tokio/src/net/tcp/incoming.rs @@ -0,0 +1,31 @@ +use crate::net::tcp::TcpListener; +use crate::net::tcp::TcpStream; + +use futures_core::ready; +use futures_core::stream::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Stream returned by the `TcpListener::incoming` function representing the +/// stream of sockets received from a listener. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Incoming { + inner: TcpListener, +} + +impl Incoming { + pub(crate) fn new(listener: TcpListener) -> Incoming { + Incoming { inner: listener } + } +} + +impl Stream for Incoming { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (socket, _) = ready!(self.inner.poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs new file mode 100644 index 00000000..b4bf4fcb --- /dev/null +++ b/tokio/src/net/tcp/listener.rs @@ -0,0 +1,358 @@ +use crate::net::tcp::{Incoming, TcpStream}; +use crate::net::util::PollEvented; +use crate::net::ToSocketAddrs; + +use futures_core::ready; +use futures_util::future::poll_fn; +use std::convert::TryFrom; +use std::fmt; +use std::io; +use std::net::{self, SocketAddr}; +use std::task::{Context, Poll}; + +/// An I/O object representing a TCP socket listening for incoming connections. +/// +/// This object can be converted into a stream of incoming connections for +/// various forms of processing. +/// +/// # Examples +/// +/// ```no_run +/// use tokio::net::TcpListener; +/// +/// use std::io; +/// # async fn process_socket(_socket: T) {} +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// let (socket, _) = listener.accept().await?; +/// process_socket(socket).await; +/// } +/// } +/// ``` +pub struct TcpListener { + io: PollEvented, +} + +impl TcpListener { + /// Creates a new TcpListener which will be bound to the specified address. + /// + /// The returned listener is ready for accepting connections. + /// + /// Binding with a po