diff options
49 files changed, 1091 insertions, 914 deletions
diff --git a/tokio-macros/Cargo.toml b/tokio-macros/Cargo.toml index 21fe9117..4bf87927 100644 --- a/tokio-macros/Cargo.toml +++ b/tokio-macros/Cargo.toml @@ -29,7 +29,7 @@ quote = "1" syn = { version = "1.0.3", features = ["full"] } [dev-dependencies] -tokio = { version = "=0.2.0-alpha.6", path = "../tokio", default-features = false, features = ["rt-full"] } +tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } [package.metadata.docs.rs] all-features = true diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 1c2b24f1..9364ea82 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -24,14 +24,21 @@ categories = ["asynchronous", "network-programming"] keywords = ["io", "async", "non-blocking", "futures"] [features] -default = [ +default = ["full"] + +# enable everything +full = [ "blocking", + "dns", "fs", + "io-driver", "io-util", - "io", + "io-std", + "macros", "net", "process", - "rt-full", + "rt-core", + "rt-threaded", "signal", "stream", "sync", @@ -39,15 +46,16 @@ default = [ ] blocking = ["rt-core"] -dns = ["blocking"] -fs = ["blocking"] -io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync +dns = ["rt-core"] +fs = ["rt-core"] +io-driver = ["rt-core", "mio", "lazy_static"] io-util = ["memchr"] -io = ["io-util", "blocking"] +# stdin, stdout, stderr +io-std = ["rt-core"] macros = ["tokio-macros"] net = ["dns", "tcp", "udp", "uds"] process = [ - "io-util", # TODO: Get rid of + "io-driver", "libc", "mio-named-pipes", "signal", @@ -58,14 +66,9 @@ process = [ ] # Includes basic task execution capabilities rt-core = [] -# TODO: rename this -> `rt-threaded` -rt-full = [ - "macros", +rt-threaded = [ "num_cpus", - "net", "rt-core", - "sync", - "time", ] signal = [ "io-driver", @@ -80,7 +83,7 @@ stream = ["futures-core"] sync = ["fnv"] test-util = [] tcp = ["io-driver"] -time = ["rt-core", "sync", "slab"] +time = ["rt-core", "slab"] udp = ["io-driver"] uds = ["io-driver", "mio-uds", "libc"] diff --git a/tokio/src/blocking/mod.rs b/tokio/src/blocking/mod.rs index ae830b49..500055ae 100644 --- a/tokio/src/blocking/mod.rs +++ b/tokio/src/blocking/mod.rs @@ -1,7 +1,11 @@ +#![cfg_attr(not(feature = "blocking"), allow(dead_code, unused_imports))] + //! Perform blocking operations from an asynchronous context. -mod pool; -pub(crate) use self::pool::{spawn_blocking, BlockingPool, Spawner}; +cfg_blocking_impl! { + mod pool; + pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; -mod schedule; -mod task; + mod schedule; + mod task; +} diff --git a/tokio/src/blocking/pool.rs b/tokio/src/blocking/pool.rs index fccfc18e..f75af780 100644 --- a/tokio/src/blocking/pool.rs +++ b/tokio/src/blocking/pool.rs @@ -116,16 +116,20 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== -impl Spawner { - #[cfg(feature = "rt-full")] - pub(crate) fn spawn_background<F>(&self, func: F) - where - F: FnOnce() + Send + 'static, - { - let task = task::background(BlockingTask::new(func)); - self.schedule(task); + +cfg_rt_threaded! { + impl Spawner { + pub(crate) fn spawn_background<F>(&self, func: F) + where + F: FnOnce() + Send + 'static, + { + let task = task::background(BlockingTask::new(func)); + self.schedule(task); + } } +} +impl Spawner { /// Set the blocking pool for the duration of the closure /// /// If a blocking pool is already set, it will be restored when the closure diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 79e2e0b7..87b251b7 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -34,13 +34,14 @@ enum State<T> { Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), } -impl<T> Blocking<T> { - #[cfg(feature = "io")] - pub(crate) fn new(inner: T) -> Blocking<T> { - Blocking { - inner: Some(inner), - state: State::Idle(Some(Buf::with_capacity(0))), - need_flush: false, +cfg_io_std! { + impl<T> Blocking<T> { + pub(crate) fn new(inner: T) -> Blocking<T> { + Blocking { + inner: Some(inner), + state: State::Idle(Some(Buf::with_capacity(0))), + need_flush: false, + } } } } @@ -264,12 +265,15 @@ impl Buf { self.buf.clear(); res } +} - #[cfg(feature = "fs")] - pub(crate) fn discard_read(&mut self) -> i64 { - let ret = -(self.bytes().len() as i64); - self.pos = 0; - self.buf.truncate(0); - ret +cfg_fs! { + impl Buf { + pub(crate) fn discard_read(&mut self) -> i64 { + let ret = -(self.bytes().len() as i64); + self.pos = 0; + self.buf.truncate(0); + ret + } } } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index df84efd7..df1888ce 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -36,8 +36,9 @@ //! [`ErrorKind`]: enum.ErrorKind.html //! [`Result`]: type.Result.html -#[cfg(any(feature = "io", feature = "fs"))] -pub(crate) mod blocking; +cfg_io_blocking! { + pub(crate) mod blocking; +} mod async_buf_read; pub use self::async_buf_read::AsyncBufRead; @@ -48,43 +49,43 @@ pub use self::async_read::AsyncRead; mod async_write; pub use self::async_write::AsyncWrite; -#[cfg(feature = "io-util")] -pub mod split; -#[cfg(feature = "io-util")] -pub use self::split::split; +cfg_io_std! { + mod stderr; + pub use stderr::{stderr, Stderr}; -#[cfg(feature = "io-util")] -mod util; -#[cfg(feature = "io-util")] -pub use self::util::{ - copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, - BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, -}; + mod stdin; + pub use stdin::{stdin, Stdin}; -#[cfg(feature = "io")] -mod stderr; -#[cfg(feature = "io")] -pub use self::stderr::{stderr, Stderr}; + mod stdout; + pub use stdout::{stdout, Stdout}; +} -#[cfg(feature = "io")] -mod stdin; -#[cfg(feature = "io")] -pub use self::stdin::{stdin, Stdin}; +cfg_io_util! { + pub mod split; + pub use split::split; -#[cfg(feature = "io")] -mod stdout; -#[cfg(feature = "io")] -pub use self::stdout::{stdout, Stdout}; + pub(crate) mod util; + pub use util::{ + copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, + BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, + }; -// Re-export io::Error so that users don't have to deal -// with conflicts when `use`ing `tokio::io` and `std::io`. -#[cfg(feature = "io-util")] -pub use std::io::{Error, ErrorKind, Result}; + // Re-export io::Error so that users don't have to deal with conflicts when + // `use`ing `tokio::io` and `std::io`. + pub use std::io::{Error, ErrorKind, Result}; +} + +cfg_not_io_util! { + cfg_process! { + pub(crate) mod util; + } +} -/// Types in this module can be mocked out in tests. -#[cfg(any(feature = "io", feature = "fs"))] -mod sys { - // TODO: don't rename - pub(crate) use crate::blocking::spawn_blocking as run; - pub(crate) use crate::task::JoinHandle as Blocking; +cfg_io_blocking! { + /// Types in this module can be mocked out in tests. + mod sys { + // TODO: don't rename + pub(crate) use crate::blocking::spawn_blocking as run; + pub(crate) use crate::task::JoinHandle as Blocking; + } } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 8d4e7678..24451ef6 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -1,71 +1,75 @@ -mod async_buf_read_ext; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::async_buf_read_ext::AsyncBufReadExt; +#![allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -mod async_read_ext; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::async_read_ext::AsyncReadExt; +cfg_io_util! { + mod async_buf_read_ext; + pub use async_buf_read_ext::AsyncBufReadExt; -mod async_write_ext; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::async_write_ext::AsyncWriteExt; + mod async_read_ext; + pub use async_read_ext::AsyncReadExt; -mod buf_reader; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::buf_reader::BufReader; + mod async_write_ext; + pub use async_write_ext::AsyncWriteExt; -mod buf_stream; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::buf_stream::BufStream; + mod buf_reader; + pub use buf_reader::BufReader; -mod buf_writer; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::buf_writer::BufWriter; + mod buf_stream; + pub use buf_stream::BufStream; -mod chain; + mod buf_writer; + pub use buf_writer::BufWriter; -mod copy; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::copy::{copy, Copy}; + mod chain; -mod empty; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::empty::{empty, Empty}; + mod copy; + pub use copy::{copy, Copy}; -mod flush; + mod empty; + pub use empty::{empty, Empty}; -mod lines; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::lines::Lines; + mod flush; -mod read; -mod read_exact; -mod read_line; -mod read_to_end; -mod read_to_string; -mod read_until; + mod lines; + pub use lines::Lines; -mod repeat; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::repeat::{repeat, Repeat}; + mod read; + mod read_exact; + mod read_line; -mod shutdown; + mod read_to_end; + cfg_process! { + pub(crate) use read_to_end::read_to_end; + } -mod sink; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::sink::{sink, Sink}; + mod read_to_string; + mod read_until; -mod split; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::split::Split; + mod repeat; + pub use repeat::{repeat, Repeat}; -mod take; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::take::Take; + mod shutdown; -mod write; -mod write_all; + mod sink; + pub use sink::{sink, Sink}; -// used by `BufReader` and `BufWriter` -// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 -const DEFAULT_BUF_SIZE: usize = 8 * 1024; + mod split; + pub use split::Split; + + mod take; + pub use take::Take; + + mod write; + mod write_all; + + // used by `BufReader` and `BufWriter` + // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 + const DEFAULT_BUF_SIZE: usize = 8 * 1024; +} + +cfg_not_io_util! { + cfg_process! { + mod read_to_end; + // Used by process + pub(crate) use read_to_end::read_to_end; + } +} diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 81d93c68..d82d58d8 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -70,75 +70,68 @@ //! } //! ``` -#[cfg(all(loom, test))] -macro_rules! thread_local { - ($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } } -} - -macro_rules! ready { - ($e:expr $(,)?) => { - match $e { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - } - }; -} - -// At the top due to macros -#[cfg(test)] +// macros used internally #[macro_use] -mod tests; +mod macros; -#[cfg(feature = "blocking")] +// Blocking task implementation pub(crate) mod blocking; -#[cfg(feature = "fs")] -pub mod fs; +cfg_fs! { + pub mod fs; +} mod future; pub mod io; -#[cfg(feature = "io-driver")] pub mod net; mod loom; pub mod prelude; -#[cfg(feature = "process")] -#[cfg(not(loom))] -pub mod process; +cfg_process! { + pub mod process; +} pub mod runtime; -#[cfg(feature = "signal")] -#[cfg(not(loom))] -pub mod signal; +cfg_signal! { + pub mod signal; +} -#[cfg(feature = "sync")] -pub mod sync; +cfg_sync! { + pub mod sync; +} +cfg_not_sync! { + mod sync; +} -#[cfg(feature = "rt-core")] -pub mod task; -#[cfg(feature = "rt-core")] -pub use crate::task::spawn; +cfg_rt_core! { + pub mod task; + pub use task::spawn; +} -#[cfg(feature = "time")] -pub mod time; +cfg_time! { + pub mod time; +} -#[cfg(feature = "rt-full")] -mod util; +cfg_rt_threaded! { + mod util; +} -#[cfg(not(test))] // Work around for rust-lang/rust#62127 -#[cfg(feature = "macros")] -#[doc(inline)] -pub use tokio_macros::main; +cfg_macros! { + #[cfg(not(test))] // Work around for rust-lang/rust#62127 + pub use tokio_macros::main; + pub use tokio_macros::test; +} -#[cfg(feature = "macros")] -#[doc(inline)] -pub use tokio_macros::test; +// Tests +#[cfg(test)] +mod tests; +// TODO: rm #[cfg(feature = "io-util")] #[cfg(test)] fn is_unpin<T: Unpin>() {} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index c5bd6039..2c5b7eaa 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(feature = "rt-full"), allow(unused_imports, dead_code))] +#![cfg_attr(not(feature = "full"), allow(unused_imports, dead_code))] mod atomic_u32; mod atomic_u64; @@ -11,7 +11,7 @@ pub(crate) mod cell { pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; } -#[cfg(feature = "sync")] +#[cfg(any(feature = "sync", feature = "io-driver"))] pub(crate) mod future { pub(crate) use crate::sync::AtomicWaker; } @@ -51,12 +51,12 @@ pub(crate) mod sync { } pub(crate) mod sys { - #[cfg(feature = "rt-full")] + #[cfg(feature = "rt-threaded")] pub(crate) fn num_cpus() -> usize { usize::max(1, num_cpus::get_physical()) } - #[cfg(not(feature = "rt-full"))] + #[cfg(not(feature = "rt-threaded"))] pub(crate) fn num_cpus() -> usize { 1 } diff --git a/tokio/src/macros/assert.rs b/tokio/src/macros/assert.rs new file mode 100644 index 00000000..fd6601b4 --- /dev/null +++ b/tokio/src/macros/assert.rs @@ -0,0 +1,19 @@ +/// Assert option is some +macro_rules! assert_some { + ($e:expr) => {{ + match $e { + Some(v) => v, + _ => panic!("expected some, was none"), + } + }}; +} + +/// Assert option is none +macro_rules! assert_none { + ($e:expr) => {{ + match $e { + Some(v) => panic!("expected none, was {:?}", v), + _ => {} + } + }}; +} diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs new file mode 100644 index 00000000..5e84a3ac --- /dev/null +++ b/tokio/src/macros/cfg.rs @@ -0,0 +1,217 @@ +#![allow(unused_macros)] + +macro_rules! cfg_atomic_waker { + ($($item:item)*) => { + $( #[cfg(any(feature = "io-driver", feature = "time"))] $item )* + } +} + +macro_rules! cfg_blocking { + ($($item:item)*) => { + $( #[cfg(feature = "blocking")] $item )* + } +} + +/// Enable blocking API internals +macro_rules! cfg_blocking_impl { + ($($item:item)*) => { + $( + #[cfg(any( + feature = "blocking", + feature = "fs", + feature = "dns", + feature = "io-std", + feature = "rt-threaded", + ))] + $item + )* + } +} + +/// Enable blocking API internals +macro_rules! cfg_not_blocking_impl { + ($($item:item)*) => { + $( + #[cfg(not(any( + feature = "blocking", + feature = "fs", + feature = "dns", + feature = "io-std", + feature = "rt-threaded", + )))] + $item + )* + } +} + +macro_rules! cfg_dns { + ($($item:item)*) => { + $( #[cfg(feature = "dns")] $item )* + } +} + +macro_rules! cfg_fs { + ($($item:item)*) => { $( #[cfg(feature = "fs")] $item )* } +} + +macro_rules! cfg_io_blocking { + ($($item:item)*) => { + $( #[cfg(any(feature = "io-std", feature = "fs"))] $item )* + } +} + +macro_rules! cfg_io_driver { + ($($item:item)*) => { + $( #[cfg(feature = "io-driver")] $item )* + } +} + +macro_rules! cfg_not_io_driver { + ($($item:item)*) => { + $( #[cfg(not(feature = "io-driver"))] $item )* + } +} + +macro_rules! cfg_io_std { + ($($item:item)*) => { + $( #[cfg(feature = "io-std")] $item )* + } +} + +macro_rules! cfg_io_util { + ($($item:item)*) => { + $( #[cfg(feature = "io-util")] $item )* + } +} + +macro_rules! cfg_not_io_util { + ($($item:item)*) => { + $( #[cfg(not(feature = "io-util"))] $item )* + } +} + +macro_rules! cfg_loom { + ($($item:item)*) => { + $( #[cfg(loom)] $item )* + } +} + +macro_rules! cfg_not_loom { + ($($item:item)*) => { + $( #[cfg(not(loom))] $item )* + } +} + +macro_rules! cfg_macros { + ($($item:item)*) => { + $( + #[cfg(feature = "macros")] + #[doc(inline)] + $item + )* + } +} + +macro_rules! cfg_process { + ($($item:item)*) => { + $( + #[cfg(feature = "process")] + #[cfg(not(loom))] + $item + )* + } +} + +macro_rules! cfg_signal { + ($($item:item)*) => { + $( + #[cfg(feature = "signal")] + #[cfg(not(loom))] + $item + )* + } +} + +macro_rules! cfg_stream { + ($($item:item)*) => { + $( #[cfg(feature = "stream")] $item )* + } +} + +macro_rules! cfg_sync { + ($($item:item)*) => { + $( #[cfg(feature = "sync")] $item )* + } +} + +macro_rules! cfg_not_sync { + ($($item:item)*) => { + $( #[cfg(not(feature = "sync"))] $item )* + } +} + +macro_rules! cfg_rt_core { + ($($item:item)*) => { + $( #[cfg(feature = "rt-core")] $item )* + } +} + +macro_rules! cfg_not_rt_core { + ($($item:item)*) => { + $( #[cfg(not(feature = "rt-core"))] $item )* + } +} + +macro_rules! cfg_rt_threaded { + ($($item:item)*) => { + $( #[cfg(feature = "rt-threaded")] $item )* + } +} + +macro_rules! cfg_not_rt_threaded { + ($($item:item)*) => { + $( #[cfg(not(feature = "rt-threaded"))] $item )* + } +} + +macro_rules! cfg_tcp { + ($($item:item)*) => { + $( #[cfg(feature = "tcp")] $item )* + } +} + +macro_rules! cfg_test_util { + ($($item:item)*) => { + $( #[cfg(feature = "test-util")] $item )* + } +} + +macro_rules! cfg_not_test_util { + ($($item:item)*) => { + $( #[cfg(not(feature = "test-util"))] $item )* + } +} + +macro_rules! cfg_time { + ($($item:item)*) => { + $( #[cfg(feature = "time")] $item )* + } +} + +macro_rules! cfg_not_time { + ($($item:item)*) => { + $( #[cfg(not(feature = "time"))] $item )* + } +} + +macro_rules! cfg_udp { + ($($item:item)*) => { + $( #[cfg(feature = "udp")] $item )* + } +} + +macro_rules! cfg_uds { + ($($item:item)*) => { + $( #[cfg(all(unix, feature = "uds"))] $item )* + } +} diff --git a/tokio/src/macros/loom.rs b/tokio/src/macros/loom.rs new file mode 100644 index 00000000..d57d9fb0 --- /dev/null +++ b/tokio/src/macros/loom.rs @@ -0,0 +1,12 @@ +macro_rules! if_loom { + ($($t:tt)*) => {{ + #[cfg(loom)] + const LOOM: bool = true; + #[cfg(not(loom))] + const LOOM: bool = false; + + if LOOM { + $($t)* + } + }} +} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs new file mode 100644 index 00000000..9136e594 --- /dev/null +++ b/tokio/src/macros/mod.rs @@ -0,0 +1,17 @@ +#![cfg_attr(not(feature = "full"), allow(unused_macros))] + +#[macro_use] +#[cfg(test)] +mod assert; + +#[macro_use] +mod cfg; + +#[macro_use] +mod loom; + +#[macro_use] +mod ready; + +#[macro_use] +mod thread_local; diff --git a/tokio/src/macros/ready.rs b/tokio/src/macros/ready.rs new file mode 100644 index 00000000..1f48623b --- /dev/null +++ b/tokio/src/macros/ready.rs @@ -0,0 +1,8 @@ +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => |