diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-25 12:50:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-25 12:50:15 -0700 |
commit | 227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch) | |
tree | 498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio | |
parent | 03a9378297c73c2e56a6d6b55db22b92427b850a (diff) |
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The `net` implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio')
72 files changed, 8837 insertions, 104 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ba6e0af0..4b87917c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -27,7 +27,7 @@ keywords = ["io", "async", "non-blocking", "futures"] default = [ "fs", "io", - "net", + "net-full", "process", "rt-full", "signal", @@ -35,31 +35,47 @@ default = [ "timer", ] -fs = [] -io = ["tokio-io"] +fs = ["tokio-executor/blocking"] +io = ["tokio-io", "bytes", "iovec"] macros = ["tokio-macros"] -net = ["tcp", "udp", "uds"] +net-full = ["tcp", "udp", "uds"] +net-driver = ["mio", "tokio-executor/blocking"] rt-current-thread = [ "timer", - "tokio-net", "tokio-executor/current-thread", ] rt-full = [ "macros", "num_cpus", - "net", + "net-full", "sync", "timer", "tokio-executor/current-thread", "tokio-executor/thread-pool", ] -signal = ["tokio-net/signal"] +signal = [ + "lazy_static", + "libc", + "mio-uds", + "net-driver", + "signal-hook-registry" +] sync = ["tokio-sync"] -tcp = ["io", "tokio-net/tcp"] +tcp = ["io", "net-driver"] timer = ["crossbeam-utils", "slab"] -udp = ["io", "tokio-net/udp"] -uds = ["io", "tokio-net/uds"] -process = ["io", "tokio-net/process"] +udp = ["io", "net-driver"] +uds = ["io", "net-driver", "mio-uds", "libc"] +process = [ + "crossbeam-queue", + "io", + "libc", + "mio-named-pipes", + "signal", + "winapi/consoleapi", + "winapi/minwindef", + "winapi/threadpoollegacyapiset", + "winapi/winerror", +] [dependencies] futures-core-preview = "=0.3.0-alpha.19" @@ -67,23 +83,38 @@ futures-sink-preview = "=0.3.0-alpha.19" futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink"] } # Everything else is optional... +bytes = { version = "0.4", optional = true } crossbeam-utils = { version = "0.6.0", optional = true } +iovec = { version = "0.1", optional = true } +lazy_static = { version = "1.0.2", optional = true } +mio = { version = "0.6.14", optional = true } num_cpus = { version = "1.8.0", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } tokio-io = { version = "=0.2.0-alpha.6", optional = true, features = ["util"], path = "../tokio-io" } tokio-executor = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-executor" } tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } -tokio-net = { version = "=0.2.0-alpha.6", optional = true, features = ["async-traits"], path = "../tokio-net" } tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] } +[target.'cfg(unix)'.dependencies] +crossbeam-queue = { version = "0.1.2", optional = true } +mio-uds = { version = "0.6.5", optional = true } +libc = { version = "0.2.42", optional = true } +signal-hook-registry = { version = "1.1.1", optional = true } + +[target.'cfg(windows)'.dependencies] +mio-named-pipes = { version = "0.1.6", optional = true } + +[target.'cfg(windows)'.dependencies.winapi] +version = "0.3.8" +default-features = false +optional = true + [dev-dependencies] tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" } tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } -futures-preview = "=0.3.0-alpha.19" -futures-util-preview = "=0.3.0-alpha.19" -pin-utils = "=0.1.0-alpha.4" +futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } env_logger = { version = "0.6", default-features = false } flate2 = { version = "1", features = ["tokio"] } http = "0.1" diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b8682c52..814f2a59 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -26,7 +26,7 @@ //! //! Guide level documentation is found on the [website]. //! -//! [driver]: tokio_net::driver +//! [driver]: tokio::net::driver //! [website]: https://tokio.rs/docs/ //! //! # Examples @@ -82,23 +82,34 @@ macro_rules! if_runtime { #[cfg(feature = "timer")] pub mod clock; + #[cfg(feature = "codec")] pub mod codec; + #[cfg(feature = "fs")] pub mod fs; + pub mod future; + #[cfg(feature = "io")] pub mod io; -#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + +#[cfg(feature = "net-driver")] pub mod net; + pub mod prelude; + #[cfg(feature = "process")] pub mod process; + #[cfg(feature = "signal")] pub mod signal; + pub mod stream; + #[cfg(feature = "sync")] pub mod sync; + #[cfg(feature = "timer")] pub mod timer; @@ -113,6 +124,7 @@ if_runtime! { #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::main; + #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::test; diff --git a/tokio/src/net.rs b/tokio/src/net.rs deleted file mode 100644 index 6aa1e499..00000000 --- a/tokio/src/net.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! TCP/UDP/Unix bindings for `tokio`. -//! -//! This module contains the TCP/UDP/Unix networking types, similar to the standard -//! library, which can be used to implement networking protocols. -//! -//! # Organization -//! -//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP -//! * [`UdpSocket`] provides functionality for communication over UDP -//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a -//! Unix Domain Stream Socket **(available on Unix only)** -//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication -//! over Unix Domain Datagram Socket **(available on Unix only)** - -//! -//! [`TcpListener`]: struct.TcpListener.html -//! [`TcpStream`]: struct.TcpStream.html -//! [`UdpSocket`]: struct.UdpSocket.html -//! [`UnixListener`]: struct.UnixListener.html -//! [`UnixStream`]: struct.UnixStream.html -//! [`UnixDatagram`]: struct.UnixDatagram.html -//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html - -#[cfg(feature = "tcp")] -pub mod tcp { - //! TCP bindings for `tokio`. - //! - //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s - //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` - //! implements a future which returns a `TcpStream`. - //! - //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s - //! [`incoming`][incoming_method] method can be used to accept new connections. - //! It return the [`Incoming`] struct, which implements a stream which returns - //! `TcpStream`s. - //! - //! [`TcpStream`]: struct.TcpStream.html - //! [`connect`]: struct.TcpStream.html#method.connect - //! [`ConnectFuture`]: struct.ConnectFuture.html - //! [`TcpListener`]: struct.TcpListener.html - //! [incoming_method]: struct.TcpListener.html#method.incoming - //! [`Incoming`]: struct.Incoming.html - pub use tokio_net::tcp::{split, Incoming, TcpListener, TcpStream}; -} -#[cfg(feature = "tcp")] -pub use self::tcp::{TcpListener, TcpStream}; - -#[cfg(feature = "udp")] -pub mod udp { - //! UDP bindings for `tokio`. - //! - //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. - //! - //! [`UdpSocket`]: struct.UdpSocket.html - pub use tokio_net::udp::{split, UdpSocket}; -} -#[cfg(feature = "udp")] -pub use self::udp::UdpSocket; - -#[cfg(all(unix, feature = "uds"))] -pub mod unix { - //! Unix domain socket bindings for `tokio` (only available on unix systems). - - pub use tokio_net::uds::{split, UCred, UnixDatagram, UnixListener, UnixStream}; -} -#[cfg(all(unix, feature = "uds"))] -pub use self::unix::{UnixDatagram, UnixListener, UnixStream}; diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs new file mode 100644 index 00000000..8b782cfa --- /dev/null +++ b/tokio/src/net/addr.rs @@ -0,0 +1,214 @@ +use tokio_executor::blocking; + +use futures_util::future; +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +/// Convert or resolve without blocking to one or more `SocketAddr` values. +/// +/// Currently, this trait is only used as an argument to Tokio functions that +/// need to reference a target socket address. +/// +/// This trait is sealed and is intended to be opaque. Users of Tokio should +/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call +/// the functions directly or reference associated types. Changing these is not +/// considered a breaking change. +pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {} + +type ReadyFuture<T> = future::Ready<io::Result<T>>; + +// ===== impl SocketAddr ===== + +impl ToSocketAddrs for SocketAddr {} + +impl sealed::ToSocketAddrsPriv for SocketAddr { + type Iter = std::option::IntoIter<SocketAddr>; + type Future = ReadyFuture<Self::Iter>; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(*self).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl str ===== + +impl ToSocketAddrs for str {} + +impl sealed::ToSocketAddrsPriv for str { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + + // First check if the input parses as a socket address + let res: Result<SocketAddr, _> = self.parse(); + + if let Ok(addr) = res { + return MaybeReady::Ready(Some(addr)); + } + + // Run DNS lookup on the blocking pool + let s = self.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&s) + })) + } +} + +// ===== impl (&str, u16) ===== + +impl ToSocketAddrs for (&'_ str, u16) {} + +impl sealed::ToSocketAddrsPriv for (&'_ str, u16) { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + use std::net::{SocketAddrV4, SocketAddrV6}; + + let (host, port) = *self; + + // try to parse the host as a regular IP address first + if let Ok(addr) = host.parse::<Ipv4Addr>() { + let addr = SocketAddrV4::new(addr, port); + let addr = SocketAddr::V4(addr); + + return MaybeReady::Ready(Some(addr)); + } + + if let Ok(addr) = host.parse::<Ipv6Addr>() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + let addr = SocketAddr::V6(addr); + + return MaybeReady::Ready(Some(addr)); + } + + let host = host.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) + })) + } +} + +// ===== impl (IpAddr, u16) ===== + +impl ToSocketAddrs for (IpAddr, u16) {} + +impl sealed::ToSocketAddrsPriv for (IpAddr, u16) { + type Iter = std::option::IntoIter<SocketAddr>; + type Future = ReadyFuture<Self::Iter>; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(SocketAddr::from(*self)).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl String ===== + +impl ToSocketAddrs for String {} + +impl sealed::ToSocketAddrsPriv for String { + type Iter = <str as sealed::ToSocketAddrsPriv>::Iter; + type Future = <str as sealed::ToSocketAddrsPriv>::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (&self[..]).to_socket_addrs() + } +} + +// ===== impl &'_ impl ToSocketAddrs ===== + +impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &'_ T {} + +impl<T> sealed::ToSocketAddrsPriv for &'_ T +where + T: sealed::ToSocketAddrsPriv + ?Sized, +{ + type Iter = T::Iter; + type Future = T::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (**self).to_socket_addrs() + } +} + +pub(crate) mod sealed { + //! The contents of this trait are intended to remain private and __not__ + //! part of the `ToSocketAddrs` public API. The details will change over + //! time. + + use tokio_executor::blocking::Blocking; + + use futures_core::ready; + use std::future::Future; + use std::io; + use std::net::SocketAddr; + use std::option; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::vec; + + #[doc(hidden)] + pub trait ToSocketAddrsPriv { + type Iter: Iterator<Item = SocketAddr> + Send + 'static; + type Future: Future<Output = io::Result<Self::Iter>> + Send + 'static; + + fn to_socket_addrs(&self) -> Self::Future; + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum MaybeReady { + Ready(Option<SocketAddr>), + Blocking(Blocking<io::Result<vec::IntoIter<SocketAddr>>>), + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum OneOrMore { + One(option::IntoIter<SocketAddr>), + More(vec::IntoIter<SocketAddr>), + } + + impl Future for MaybeReady { + type Output = io::Result<OneOrMore>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match *self { + MaybeReady::Ready(ref mut i) => { + let iter = OneOrMore::One(i.take().into_iter()); + Poll::Ready(Ok(iter)) + } + MaybeReady::Blocking(ref mut rx) => { + let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More); + + Poll::Ready(res) + } + } + } + } + + impl Iterator for OneOrMore { + type Item = SocketAddr; + + fn next(&mut self) -> Option<Self::Item> { + match self { + OneOrMore::One(i) => i.next(), + OneOrMore::More(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + OneOrMore::One(i) => i.size_hint(), + OneOrMore::More(i) => i.size_hint(), + } + } + } +} diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs new file mode 100644 index 00000000..9079ccc7 --- /dev/null +++ b/tokio/src/net/driver/mod.rs @@ -0,0 +1,133 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ``` +//! use tokio::net::TcpStream; +//! +//! # async fn process<T>(_t: T) {} +//! +//! # #[tokio::main] +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let stream = TcpStream::connect("93.184.216.34:9243").await?; +//! +//! println!("successfully connected"); +//! +//! process(stream).await; +//! # Ok(()) +//! # } +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Eager registration +//! +//! Notice how the snippet does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with the current +//! reactor, but no reactor is specified. This works because a reactor +//! instance is automatically made available when using the Tokio [runtime], +//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a +//! thread-local variable referencing the associated [`Reactor`] instance and +//! [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## |