From 06c473e62842d257ed275497ce906710ea3f8e19 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 24 Jun 2019 12:34:30 -0700 Subject: Update Tokio to use `std::future`. (#1120) A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated. --- .cirrus.yml | 13 +- Cargo.toml | 22 +- async-await/.cargo/config | 2 - async-await/Cargo.toml | 31 - async-await/README.md | 5 - async-await/src/chat.rs | 131 ---- async-await/src/echo_client.rs | 50 -- async-await/src/echo_server.rs | 42 -- async-await/src/hyper.rs | 29 - async-await/tests/macros.rs | 22 - azure-pipelines.yml | 184 ++--- ci/azure-install-rust.yml | 3 + ci/azure-rustfmt.yml | 1 + ci/azure-test-stable.yml | 25 +- tokio-current-thread/Cargo.toml | 4 +- tokio-current-thread/src/lib.rs | 111 ++- tokio-current-thread/src/scheduler.rs | 170 +++-- tokio-current-thread/tests/current_thread.rs | 365 ++++----- tokio-executor/Cargo.toml | 6 +- tokio-executor/src/enter.rs | 24 +- tokio-executor/src/executor.rs | 15 +- tokio-executor/src/global.rs | 33 +- tokio-executor/src/park.rs | 43 ++ tokio-executor/tests/enter.rs | 18 + tokio-executor/tests/executor.rs | 15 +- tokio-futures/Cargo.toml | 17 +- tokio-futures/README.md | 41 +- tokio-futures/src/async_wait.rs | 15 - tokio-futures/src/compat/backward.rs | 86 --- tokio-futures/src/compat/forward.rs | 69 -- tokio-futures/src/compat/mod.rs | 42 -- tokio-futures/src/future.rs | 3 + tokio-futures/src/io/flush.rs | 29 - tokio-futures/src/io/mod.rs | 192 ----- tokio-futures/src/io/read.rs | 32 - tokio-futures/src/io/read_exact.rs | 50 -- tokio-futures/src/io/write.rs | 32 - tokio-futures/src/io/write_all.rs | 51 -- tokio-futures/src/lib.rs | 32 +- tokio-futures/src/macros.rs | 12 + tokio-futures/src/sink.rs | 68 ++ tokio-futures/src/sink/mod.rs | 24 - tokio-futures/src/sink/send.rs | 51 -- tokio-futures/src/stream.rs | 3 + tokio-futures/src/stream/mod.rs | 38 - tokio-futures/src/stream/next.rs | 28 - tokio-io/Cargo.toml | 4 +- tokio-io/src/_tokio_codec/decoder.rs | 3 - tokio-io/src/_tokio_codec/encoder.rs | 3 - tokio-io/src/_tokio_codec/framed.rs | 281 ------- tokio-io/src/_tokio_codec/framed_read.rs | 215 ------ tokio-io/src/_tokio_codec/framed_write.rs | 245 ------- tokio-io/src/_tokio_codec/mod.rs | 35 - tokio-io/src/allow_std.rs | 93 --- tokio-io/src/async_read.rs | 122 ++- tokio-io/src/async_write.rs | 132 ++-- tokio-io/src/codec/bytes_codec.rs | 42 -- tokio-io/src/codec/decoder.rs | 115 --- tokio-io/src/codec/encoder.rs | 25 - tokio-io/src/codec/lines_codec.rs | 88 --- tokio-io/src/codec/mod.rs | 375 ---------- tokio-io/src/framed.rs | 246 ------- tokio-io/src/framed_read.rs | 219 ------ tokio-io/src/framed_write.rs | 249 ------- tokio-io/src/io/copy.rs | 98 --- tokio-io/src/io/flush.rs | 41 -- tokio-io/src/io/mod.rs | 32 - tokio-io/src/io/read.rs | 58 -- tokio-io/src/io/read_exact.rs | 83 --- tokio-io/src/io/read_to_end.rs | 64 -- tokio-io/src/io/read_until.rs | 74 -- tokio-io/src/io/shutdown.rs | 42 -- tokio-io/src/io/write_all.rs | 86 --- tokio-io/src/length_delimited.rs | 936 ------------------------ tokio-io/src/lib.rs | 44 +- tokio-io/src/lines.rs | 60 -- tokio-io/src/split.rs | 243 ------ tokio-io/src/window.rs | 117 --- tokio-io/tests/async_read.rs | 180 +++-- tokio-io/tests/length_delimited.rs | 548 -------------- tokio-macros/Cargo.toml | 3 - tokio-macros/src/lib.rs | 1 - tokio-reactor/Cargo.toml | 3 +- tokio-reactor/benches/basic.rs | 2 + tokio-reactor/src/background.rs | 214 ------ tokio-reactor/src/lib.rs | 206 +----- tokio-reactor/src/poll_evented.rs | 174 ++--- tokio-reactor/src/registration.rs | 100 +-- tokio-sync/Cargo.toml | 14 +- tokio-sync/src/lib.rs | 13 + tokio-sync/src/lock.rs | 14 +- tokio-sync/src/loom.rs | 3 +- tokio-sync/src/mpsc/bounded.rs | 65 +- tokio-sync/src/mpsc/chan.rs | 46 +- tokio-sync/src/mpsc/unbounded.rs | 44 +- tokio-sync/src/oneshot.rs | 90 ++- tokio-sync/src/semaphore.rs | 73 +- tokio-sync/src/task/atomic_task.rs | 336 --------- tokio-sync/src/task/atomic_waker.rs | 317 ++++++++ tokio-sync/src/task/mod.rs | 4 +- tokio-sync/src/watch.rs | 108 +-- tokio-sync/tests/atomic_task.rs | 52 -- tokio-sync/tests/atomic_waker.rs | 37 + tokio-sync/tests/errors.rs | 2 - tokio-sync/tests/fuzz_atomic_task.rs | 54 -- tokio-sync/tests/fuzz_atomic_waker.rs | 53 ++ tokio-sync/tests/fuzz_mpsc.rs | 7 +- tokio-sync/tests/fuzz_oneshot.rs | 65 +- tokio-sync/tests/fuzz_semaphore.rs | 63 +- tokio-sync/tests/lock.rs | 73 +- tokio-sync/tests/mpsc.rs | 351 +++++---- tokio-sync/tests/oneshot.rs | 160 ++-- tokio-sync/tests/semaphore.rs | 102 ++- tokio-sync/tests/watch.rs | 195 +++-- tokio-tcp/Cargo.toml | 13 +- tokio-tcp/src/incoming.rs | 14 +- tokio-tcp/src/lib.rs | 16 +- tokio-tcp/src/listener.rs | 42 +- tokio-tcp/src/stream.rs | 175 ++--- tokio-test/Cargo.toml | 5 +- tokio-test/src/lib.rs | 6 +- tokio-test/src/macros.rs | 76 +- tokio-test/src/task.rs | 145 ++-- tokio-timer/Cargo.toml | 20 +- tokio-timer/src/delay.rs | 26 +- tokio-timer/src/delay_queue.rs | 56 +- tokio-timer/src/interval.rs | 15 +- tokio-timer/src/lib.rs | 19 +- tokio-timer/src/throttle.rs | 137 +--- tokio-timer/src/timeout.rs | 178 ++--- tokio-timer/src/timer/entry.rs | 35 +- tokio-timer/src/timer/handle.rs | 12 +- tokio-timer/src/timer/registration.rs | 11 +- tokio/Cargo.toml | 49 +- tokio/src/async_await.rs | 17 - tokio/src/executor.rs | 107 +++ tokio/src/executor/current_thread/mod.rs | 166 ----- tokio/src/executor/mod.rs | 143 ---- tokio/src/io.rs | 21 +- tokio/src/lib.rs | 17 +- tokio/src/net.rs | 20 +- tokio/src/prelude.rs | 13 +- tokio/src/reactor.rs | 139 ++++ tokio/src/reactor/mod.rs | 143 ---- tokio/src/reactor/poll_evented.rs | 545 -------------- tokio/src/runtime/current_thread/async_await.rs | 17 - tokio/src/runtime/current_thread/builder.rs | 24 +- tokio/src/runtime/current_thread/mod.rs | 13 +- tokio/src/runtime/current_thread/runtime.rs | 69 +- tokio/src/runtime/mod.rs | 9 +- tokio/src/runtime/threadpool/async_await.rs | 18 - tokio/src/runtime/threadpool/mod.rs | 3 - tokio/src/sync.rs | 2 +- 153 files changed, 2963 insertions(+), 10094 deletions(-) delete mode 100644 async-await/.cargo/config delete mode 100644 async-await/Cargo.toml delete mode 100644 async-await/README.md delete mode 100644 async-await/src/chat.rs delete mode 100644 async-await/src/echo_client.rs delete mode 100644 async-await/src/echo_server.rs delete mode 100644 async-await/src/hyper.rs delete mode 100644 async-await/tests/macros.rs create mode 100644 tokio-executor/tests/enter.rs delete mode 100644 tokio-futures/src/async_wait.rs delete mode 100644 tokio-futures/src/compat/backward.rs delete mode 100644 tokio-futures/src/compat/forward.rs delete mode 100644 tokio-futures/src/compat/mod.rs create mode 100644 tokio-futures/src/future.rs delete mode 100644 tokio-futures/src/io/flush.rs delete mode 100644 tokio-futures/src/io/mod.rs delete mode 100644 tokio-futures/src/io/read.rs delete mode 100644 tokio-futures/src/io/read_exact.rs delete mode 100644 tokio-futures/src/io/write.rs delete mode 100644 tokio-futures/src/io/write_all.rs create mode 100644 tokio-futures/src/macros.rs create mode 100644 tokio-futures/src/sink.rs delete mode 100644 tokio-futures/src/sink/mod.rs delete mode 100644 tokio-futures/src/sink/send.rs create mode 100644 tokio-futures/src/stream.rs delete mode 100644 tokio-futures/src/stream/mod.rs delete mode 100644 tokio-futures/src/stream/next.rs delete mode 100644 tokio-io/src/_tokio_codec/decoder.rs delete mode 100644 tokio-io/src/_tokio_codec/encoder.rs delete mode 100644 tokio-io/src/_tokio_codec/framed.rs delete mode 100644 tokio-io/src/_tokio_codec/framed_read.rs delete mode 100644 tokio-io/src/_tokio_codec/framed_write.rs delete mode 100644 tokio-io/src/_tokio_codec/mod.rs delete mode 100644 tokio-io/src/allow_std.rs delete mode 100644 tokio-io/src/codec/bytes_codec.rs delete mode 100644 tokio-io/src/codec/decoder.rs delete mode 100644 tokio-io/src/codec/encoder.rs delete mode 100644 tokio-io/src/codec/lines_codec.rs delete mode 100644 tokio-io/src/codec/mod.rs delete mode 100644 tokio-io/src/framed.rs delete mode 100644 tokio-io/src/framed_read.rs delete mode 100644 tokio-io/src/framed_write.rs delete mode 100644 tokio-io/src/io/copy.rs delete mode 100644 tokio-io/src/io/flush.rs delete mode 100644 tokio-io/src/io/mod.rs delete mode 100644 tokio-io/src/io/read.rs delete mode 100644 tokio-io/src/io/read_exact.rs delete mode 100644 tokio-io/src/io/read_to_end.rs delete mode 100644 tokio-io/src/io/read_until.rs delete mode 100644 tokio-io/src/io/shutdown.rs delete mode 100644 tokio-io/src/io/write_all.rs delete mode 100644 tokio-io/src/length_delimited.rs delete mode 100644 tokio-io/src/lines.rs delete mode 100644 tokio-io/src/split.rs delete mode 100644 tokio-io/src/window.rs delete mode 100644 tokio-io/tests/length_delimited.rs delete mode 100644 tokio-reactor/src/background.rs delete mode 100644 tokio-sync/src/task/atomic_task.rs create mode 100644 tokio-sync/src/task/atomic_waker.rs delete mode 100644 tokio-sync/tests/atomic_task.rs create mode 100644 tokio-sync/tests/atomic_waker.rs delete mode 100644 tokio-sync/tests/fuzz_atomic_task.rs create mode 100644 tokio-sync/tests/fuzz_atomic_waker.rs delete mode 100644 tokio/src/async_await.rs create mode 100644 tokio/src/executor.rs delete mode 100644 tokio/src/executor/current_thread/mod.rs delete mode 100644 tokio/src/executor/mod.rs create mode 100644 tokio/src/reactor.rs delete mode 100644 tokio/src/reactor/mod.rs delete mode 100644 tokio/src/reactor/poll_evented.rs delete mode 100644 tokio/src/runtime/current_thread/async_await.rs delete mode 100644 tokio/src/runtime/threadpool/async_await.rs diff --git a/.cirrus.yml b/.cirrus.yml index 89161f5c..75eccc81 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -12,7 +12,7 @@ task: setup_script: - pkg install -y curl - curl https://sh.rustup.rs -sSf --output rustup.sh - - sh rustup.sh -y + - sh rustup.sh -y --default-toolchain nightly - . $HOME/.cargo/env - rustup target add i686-unknown-freebsd - | @@ -31,13 +31,14 @@ task: folder: $HOME/.cargo/registry test_script: - . $HOME/.cargo/env - - cargo test --all + - cargo test --all --lib && cargo test --all --tests - (cd tokio-trace/test-log-support && cargo test) - (cd tokio-trace/test_static_max_level_features && cargo test) - cargo doc --all - i686_test_script: - - . $HOME/.cargo/env - - | - cargo test --all --exclude tokio-tls --exclude tokio-macros --target i686-unknown-freebsd + # TODO: Re-enable + # i686_test_script: + # - . $HOME/.cargo/env + # - | + # cargo test --all --exclude tokio-tls --exclude tokio-macros --target i686-unknown-freebsd before_cache_script: - rm -rf $HOME/.cargo/registry/index diff --git a/Cargo.toml b/Cargo.toml index bf2fe1e1..e522bc7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,24 +2,24 @@ members = [ "tokio", - "tokio-buf", - "tokio-codec", + # "tokio-buf", + # "tokio-codec", "tokio-current-thread", "tokio-executor", - "tokio-fs", + # "tokio-fs", "tokio-futures", "tokio-io", - "tokio-macros", + # "tokio-macros", "tokio-reactor", - "tokio-signal", + # "tokio-signal", "tokio-sync", "tokio-test", - "tokio-threadpool", + # "tokio-threadpool", "tokio-timer", "tokio-tcp", - "tokio-tls", - "tokio-trace", - "tokio-trace/tokio-trace-core", - "tokio-udp", - "tokio-uds", + # "tokio-tls", + # "tokio-trace", + # "tokio-trace/tokio-trace-core", + # "tokio-udp", + # "tokio-uds", ] diff --git a/async-await/.cargo/config b/async-await/.cargo/config deleted file mode 100644 index ec6a5d0a..00000000 --- a/async-await/.cargo/config +++ /dev/null @@ -1,2 +0,0 @@ -[build] -target-dir = "../target" diff --git a/async-await/Cargo.toml b/async-await/Cargo.toml deleted file mode 100644 index 52933b28..00000000 --- a/async-await/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "examples" -edition = "2018" -version = "0.1.0" -authors = ["Carl Lerche "] -license = "MIT" - -# Break out of the parent workspace -[workspace] - -[[bin]] -name = "chat" -path = "src/chat.rs" - -[[bin]] -name = "echo_client" -path = "src/echo_client.rs" - -[[bin]] -name = "echo_server" -path = "src/echo_server.rs" - -[[bin]] -name = "hyper" -path = "src/hyper.rs" - -[dependencies] -tokio = { version = "0.2.0", features = ["async-await-preview"], path = "../tokio" } -futures = "0.1.23" -bytes = "0.4.9" -hyper = "0.12.8" diff --git a/async-await/README.md b/async-await/README.md deleted file mode 100644 index bd360f0e..00000000 --- a/async-await/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# Tokio async/await examples - -These are a separate crate in order to work around some cargo bugs. It also -allows `[patch]` to be used in `Cargo.toml` to ensure the correct lib versions -are being pulled in. diff --git a/async-await/src/chat.rs b/async-await/src/chat.rs deleted file mode 100644 index d3a0c992..00000000 --- a/async-await/src/chat.rs +++ /dev/null @@ -1,131 +0,0 @@ -#![feature(await_macro, async_await)] - -use tokio::async_wait; -use tokio::codec::{LinesCodec, Decoder}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; - -use futures::sync::mpsc; - -use std::collections::HashMap; -use std::io; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; - -/// Shorthand for the transmit half of the message channel. -type Tx = mpsc::UnboundedSender; - -struct Shared { - peers: HashMap, -} - -impl Shared { - /// Create a new, empty, instance of `Shared`. - fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } -} - -async fn process(stream: TcpStream, state: Arc>) -> io::Result<()> { - let addr = stream.peer_addr().unwrap(); - let mut lines = LinesCodec::new().framed(stream); - - // Extract the peer's name - let name = match async_wait!(lines.next()) { - Some(name) => name?, - None => { - // Disconnected early - return Ok(()); - } - }; - - println!("`{}` is joining the chat", name); - - let (tx, mut rx) = mpsc::unbounded(); - - // Register the socket - state.lock().unwrap() - .peers.insert(addr, tx); - - // Split the `lines` handle into send and recv handles. This allows spawning - // separate tasks. - let (mut lines_tx, mut lines_rx) = lines.split(); - - // Spawn a task that receives all lines broadcasted to us from other peers - // and writes it to the client. - tokio::spawn_async(async move { - while let Some(line) = async_wait!(rx.next()) { - let line = line.unwrap(); - async_wait!(lines_tx.send_async(line)).unwrap(); - } - }); - - // Use the current task to read lines from the socket and broadcast them to - // other peers. - while let Some(message) = async_wait!(lines_rx.next()) { - // TODO: Error handling - let message = message.unwrap(); - - let mut line = name.clone(); - line.push_str(": "); - line.push_str(&message); - line.push_str("\r\n"); - - let state = state.lock().unwrap(); - - for (peer_addr, tx) in &state.peers { - if *peer_addr != addr { - // TODO: Error handling - tx.unbounded_send(line.clone()).unwrap(); - } - } - } - - // Remove the client from the shared state. Doing so will also result in the - // tx task to terminate. - state.lock().unwrap() - .peers.remove(&addr) - .expect("bug"); - - Ok(()) -} - -#[tokio::main] -async fn main() { - // Create the shared state. This is how all the peers communicate. - // - // The server task will hold a handle to this. For every new client, the - // `state` handle is cloned and passed into the task that processes the - // client connection. - let state = Arc::new(Mutex::new(Shared::new())); - - let addr = "127.0.0.1:6142".parse().unwrap(); - - // Bind a TCP listener to the socket address. - // - // Note that this is the Tokio TcpListener, which is fully async. - let listener = TcpListener::bind(&addr).unwrap(); - - println!("server running on localhost:6142"); - - // Start the Tokio runtime. - let mut incoming = listener.incoming(); - - while let Some(stream) = async_wait!(incoming.next()) { - let stream = match stream { - Ok(stream) => stream, - Err(_) => continue, - }; - - let state = state.clone(); - - tokio::spawn_async(async move { - if let Err(_) = async_wait!(process(stream, state)) { - eprintln!("failed to process connection"); - } - }); - } -} - diff --git a/async-await/src/echo_client.rs b/async-await/src/echo_client.rs deleted file mode 100644 index 302b7ea2..00000000 --- a/async-await/src/echo_client.rs +++ /dev/null @@ -1,50 +0,0 @@ -#![feature(await_macro, async_await)] - -use tokio::async_wait; -use tokio::net::TcpStream; -use tokio::prelude::*; - -use std::io; -use std::net::SocketAddr; - -const MESSAGES: &[&str] = &[ - "hello", - "world", - "one two three", -]; - -async fn run_client(addr: &SocketAddr) -> io::Result<()> { - let mut stream = async_wait!(TcpStream::connect(addr))?; - - // Buffer to read into - let mut buf = [0; 128]; - - for msg in MESSAGES { - println!(" > write = {:?}", msg); - - // Write the message to the server - async_wait!(stream.write_all_async(msg.as_bytes()))?; - - // Read the message back from the server - async_wait!(stream.read_exact_async(&mut buf[..msg.len()]))?; - - assert_eq!(&buf[..msg.len()], msg.as_bytes()); - } - - Ok(()) -} - -#[tokio::main] -async fn main() { - use std::env; - - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::().unwrap(); - - // Connect to the echo serveer - - match async_wait!(run_client(&addr)) { - Ok(_) => println!("done."), - Err(e) => eprintln!("echo client failed; error = {:?}", e), - } -} diff --git a/async-await/src/echo_server.rs b/async-await/src/echo_server.rs deleted file mode 100644 index 63e10e31..00000000 --- a/async-await/src/echo_server.rs +++ /dev/null @@ -1,42 +0,0 @@ -#![feature(await_macro, async_await)] - -use tokio::async_wait; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; - -use std::net::SocketAddr; - -fn handle(mut stream: TcpStream) { - tokio::spawn_async(async move { - let mut buf = [0; 1024]; - - loop { - match async_wait!(stream.read_async(&mut buf)).unwrap() { - 0 => break, // Socket closed - n => { - // Send the data back - async_wait!(stream.write_all_async(&buf[0..n])).unwrap(); - } - } - } - }); -} - -#[tokio::main] -async fn main() { - use std::env; - - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::().unwrap(); - - // Bind the TCP listener - let listener = TcpListener::bind(&addr).unwrap(); - println!("Listening on: {}", addr); - - let mut incoming = listener.incoming(); - - while let Some(stream) = async_wait!(incoming.next()) { - let stream = stream.unwrap(); - handle(stream); - } -} diff --git a/async-await/src/hyper.rs b/async-await/src/hyper.rs deleted file mode 100644 index 37332ee4..00000000 --- a/async-await/src/hyper.rs +++ /dev/null @@ -1,29 +0,0 @@ -#![feature(await_macro, async_await)] - -use tokio::async_wait; -use tokio::prelude::*; -use hyper::Client; - -use std::time::Duration; -use std::str; - -#[tokio::main] -async fn main() { - let client = Client::new(); - - let uri = "http://httpbin.org/ip".parse().unwrap(); - - let response = async_wait!({ - client.get(uri) - .timeout(Duration::from_secs(10)) - }).unwrap(); - - println!("Response: {}", response.status()); - - let mut body = response.into_body(); - - while let Some(chunk) = async_wait!(body.next()) { - let chunk = chunk.unwrap(); - println!("chunk = {}", str::from_utf8(&chunk[..]).unwrap()); - } -} diff --git a/async-await/tests/macros.rs b/async-await/tests/macros.rs deleted file mode 100644 index 1fcbf77b..00000000 --- a/async-await/tests/macros.rs +++ /dev/null @@ -1,22 +0,0 @@ -#![feature(await_macro, async_await)] - -use tokio::async_wait; -use tokio::timer::Delay; -use std::time::{Duration, Instant}; - -#[tokio::test] -async fn success_no_async() { - assert!(true); -} - -#[tokio::test] -#[should_panic] -async fn fail_no_async() { - assert!(false); -} - -#[tokio::test] -async fn use_timer() { - let when = Instant::now() + Duration::from_millis(10); - async_wait!(Delay::new(when)); -} diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 1ad71db8..be5d1e9b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,118 +1,120 @@ -trigger: ["master"] -pr: ["master"] +trigger: ["master", "std-future"] +pr: ["master", "std-future"] variables: - nightly: nightly-2019-05-09 + nightly: nightly-2019-06-10 jobs: -# Check formatting -- template: ci/azure-rustfmt.yml - parameters: - name: rustfmt +# # Check formatting +# - template: ci/azure-rustfmt.yml +# parameters: +# name: rustfmt # Test top level crate -- template: ci/azure-test-stable.yml - parameters: - name: test_tokio - displayName: Test tokio - cross: true - crates: - - tokio +# - template: ci/azure-test-stable.yml +# parameters: +# name: test_tokio +# displayName: Test tokio +# cross: true +# crates: +# - tokio # Test crates that are platform specific - template: ci/azure-test-stable.yml parameters: name: test_sub_cross - displayName: Test sub crates - + displayName: Test sub crates (cross) - cross: true + rust: $(nightly) crates: - - tokio-fs +# - tokio-fs - tokio-reactor - - tokio-signal - - tokio-tcp - - tokio-tls - - tokio-udp - - tokio-uds +# - tokio-signal +# - tokio-tcp +# - tokio-tls +# - tokio-udp +# - tokio-uds # Test crates that are NOT platform specific - template: ci/azure-test-stable.yml parameters: name: test_linux displayName: Test sub crates - + rust: $(nightly) crates: - - tokio-buf - - tokio-codec + # - tokio-buf + # - tokio-codec - tokio-current-thread - tokio-executor - tokio-io - tokio-sync - - tokio-threadpool - - tokio-timer - - tokio-test - - tokio-trace - - tokio-trace/tokio-trace-core - - tokio-trace/test-log-support - - tokio-trace/test_static_max_level_features - -- template: ci/azure-cargo-check.yml - parameters: - name: features - displayName: Check feature permtuations - rust: stable - crates: - tokio: - - codec - - fs - - io - - reactor - - rt-full - - tcp - - timer - - udp - - uds - - sync - tokio-buf: - - util - -# Run async-await tests -- template: ci/azure-test-nightly.yml - parameters: - name: test_nightly - displayName: Test Async / Await - rust: $(nightly) - -# Try cross compiling -- template: ci/azure-cross-compile.yml - parameters: - name: cross_32bit_linux - target: i686-unknown-linux-gnu + # - tokio-threadpool + # - tokio-timer + # - tokio-test + # - tokio-trace + # - tokio-trace/tokio-trace-core + # - tokio-trace/test-log-support + # - tokio-trace/test_static_max_level_features -# This represents the minimum Rust version supported by -# Tokio. Updating this should be done in a dedicated PR and -# cannot be greater than two 0.x releases prior to the -# current stable. +# - template: ci/azure-cargo-check.yml +# parameters: +# name: features +# displayName: Check feature permtuations +# rust: stable +# crates: +# tokio: +# - codec +# - fs +# - io +# - reactor +# - rt-full +# - tcp +# - timer +# - udp +# - uds +# - sync +# tokio-buf: +# - util # -# Tests are not run as tests may require newer versions of -# rust. -- template: ci/azure-check-minrust.yml - parameters: - name: minrust - rust_version: 1.34.0 - -- template: ci/azure-tsan.yml - parameters: - name: tsan - rust: $(nightly) - -- template: ci/azure-deploy-docs.yml - parameters: - dependsOn: - - rustfmt - - test_tokio - - test_sub_cross - - test_linux - - features - - test_nightly - - cross_32bit_linux - - minrust - - tsan +# # Run async-await tests +# - template: ci/azure-test-nightly.yml +# parameters: +# name: test_nightly +# displayName: Test Async / Await +# rust: $(nightly) +# +# # Try cross compiling +# - template: ci/azure-cross-compile.yml +# parameters: +# name: cross_32bit_linux +# target: i686-unknown-linux-gnu +# +# # This represents the minimum Rust version supported by +# # Tokio. Updating this should be done in a dedicated PR and +# # cannot be greater than two 0.x releases prior to the +# # current stable. +# # +# # Tests are not run as tests may require newer versions of +# # rust. +# - template: ci/azure-check-minrust.yml +# parameters: +# name: minrust +# rust_version: 1.34.0 +# +# - template: ci/azure-tsan.yml +# parameters: +# name: tsan +# rust: $(nightly) +# +# - template: ci/azure-deploy-docs.yml +# parameters: +# dependsOn: +# - rustfmt +# - test_tokio +# - test_sub_cross +# - test_linux +# - features +# - test_nightly +# - cross_32bit_linux +# - minrust +# - tsan diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml index 43d806e7..2892ab89 100644 --- a/ci/azure-install-rust.yml +++ b/ci/azure-install-rust.yml @@ -27,6 +27,9 @@ steps: # All platforms. - script: | + rustup toolchain install nightly + rustup update + rustup toolchain list rustc -Vv cargo -V displayName: Query rust and cargo versions diff --git a/ci/azure-rustfmt.yml b/ci/azure-rustfmt.yml index 60bb51aa..0f50b6c2 100644 --- a/ci/azure-rustfmt.yml +++ b/ci/azure-rustfmt.yml @@ -10,6 +10,7 @@ jobs: rust_version: stable - script: | rustup component add rustfmt + cargo fmt --version displayName: Install rustfmt - script: | cargo fmt --all -- --check diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index f53ca0eb..f2fa7897 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -17,23 +17,24 @@ jobs: steps: - template: azure-install-rust.yml parameters: - rust_version: stable + # rust_version: stable + rust_version: ${{ parameters.rust }} - - template: azure-is-release.yml - - - ${{ each crate in parameters.crates }}: - - script: cargo test - env: - LOOM_MAX_DURATION: 10 - CI: 'True' - displayName: cargo test -p ${{ crate }} - workingDirectory: $(Build.SourcesDirectory)/${{ crate }} - condition: and(succeeded(), ne(variables['isRelease'], 'true')) +# - template: azure-is-release.yml +# +# - ${{ each crate in parameters.crates }}: +# - script: cargo test +# env: +# LOOM_MAX_DURATION: 10 +# CI: 'True' +# displayName: cargo test -p ${{ crate }} +# workingDirectory: $(Build.SourcesDirectory)/${{ crate }} +# condition: and(succeeded(), ne(variables['isRelease'], 'true')) - template: azure-patch-crates.yml - ${{ each crate in parameters.crates }}: - - script: cargo test + - script: cargo test --lib && cargo test --tests env: LOOM_MAX_DURATION: 10 CI: 'True' diff --git a/tokio-current-thread/Cargo.toml b/tokio-current-thread/Cargo.toml index af54d114..f25a59c8 100644 --- a/tokio-current-thread/Cargo.toml +++ b/tokio-current-thread/Cargo.toml @@ -24,4 +24,6 @@ publish = false [dependencies] tokio-executor = { version = "0.2.0", path = "../tokio-executor" } -futures = "0.1.19" + +[dev-dependencies] +tokio-sync = { version = "0.2.0", path = "../tokio-sync" } diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs index a9800d8c..289f9af1 100644 --- a/tokio-current-thread/src/lib.rs +++ b/tokio-current-thread/src/lib.rs @@ -30,13 +30,14 @@ mod scheduler; use crate::scheduler::Scheduler; -use futures::future::{ExecuteError, ExecuteErrorKind, Executor}; -use futures::{executor, Async, Future}; use std::cell::Cell; use std::error::Error; use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::rc::Rc; use std::sync::{atomic, mpsc, Arc}; +use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant}; use tokio_executor::park::{Park, ParkThread, Unpark}; @@ -60,7 +61,7 @@ pub struct CurrentThread { spawn_handle: Handle, /// Receiver for futures spawned from other threads - spawn_receiver: mpsc::Receiver + Send + 'static>>, + spawn_receiver: mpsc::Receiver + Send + 'static>>>, /// The thread-local ID assigned to this executor. id: u64, @@ -182,11 +183,7 @@ struct Borrow<'a, U> { } trait SpawnLocal { - fn spawn_local( - &mut self, - future: Box>, - already_counted: bool, - ); + fn spawn_local(&mut self, future: Pin>>, already_counted: bool); } struct CurrentRunner { @@ -225,7 +222,7 @@ thread_local! { /// /// [`CurrentThread`]: struct.CurrentThread.html /// [mod]: index.html -pub fn block_on_all(future: F) -> Result +pub fn block_on_all(future: F) -> F::Output where F: Future, { @@ -233,8 +230,7 @@ where let ret = current_thread.block_on(future); current_thread.run().unwrap(); - - ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + ret } /// Executes a future on the current thread. @@ -252,10 +248,10 @@ where /// [`tokio::spawn`]: ../fn.spawn.html pub fn spawn(future: F) where - F: Future + 'static, + F: Future + 'static, { TaskExecutor::current() - .spawn_local(Box::new(future)) + .spawn_local(Box::pin(future)) .unwrap(); } @@ -283,7 +279,7 @@ impl CurrentThread

{ }); let scheduler = Scheduler::new(unpark); - let notify = scheduler.notify(); + let waker = scheduler.waker(); let num_futures = Arc::new(atomic::AtomicUsize::new(0)); @@ -294,10 +290,10 @@ impl CurrentThread

{ id, spawn_handle: Handle { sender: spawn_sender, - num_futures: num_futures, - notify: notify, + num_futures, + waker, shut_down: Cell::new(false), - thread: thread, + thread, id, }, spawn_receiver: spawn_receiver, @@ -319,9 +315,9 @@ impl CurrentThread

{ /// This internally queues the future to be executed once `run` is called. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { - self.borrow().spawn_local(Box::new(future), false); + self.borrow().spawn_local(Box::pin(future), false); self } @@ -338,7 +334,7 @@ impl CurrentThread

{ /// /// The caller is responsible for ensuring that other spawned futures /// complete execution. - pub fn block_on(&mut self, future: F) -> Result> + pub fn block_on(&mut self, future: F) -> F::Output where F: Future, { @@ -424,7 +420,7 @@ impl Drop for CurrentThread

{ impl tokio_executor::Executor for CurrentThread { fn spawn( &mut self, - future: Box + Send>, + future: Pin + Send>>, ) -> Result<(), SpawnError> { self.borrow().spawn_local(future, false); Ok(()) @@ -433,10 +429,10 @@ impl tokio_executor::Executor for CurrentThread { impl tokio_executor::TypedExecutor for CurrentThread where - T: Future + 'static, + T: Future + 'static, { fn spawn(&mut self, future: T) -> Result<(), SpawnError> { - self.borrow().spawn_local(Box::new(future), false); + self.borrow().spawn_local(Box::pin(future), false); Ok(()) } } @@ -461,9 +457,9 @@ impl<'a, P: Park> Entered<'a, P> { /// This internally queues the future to be executed once `run` is called. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { - self.executor.borrow().spawn_local(Box::new(future), false); + self.executor.borrow().spawn_local(Box::pin(future), false); self } @@ -480,29 +476,35 @@ impl<'a, P: Park> Entered<'a, P> { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution. - pub fn block_on(&mut self, future: F) -> Result> + /// + /// # Panics + /// + /// This function will panic if the `Park` call returns an error. + pub fn block_on(&mut self, mut future: F) -> F::Output where F: Future, { - let mut future = executor::spawn(future); - let notify = self.executor.scheduler.notify(); + // Safety: we shadow the original `future`, so it will never move + // again. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; + let waker = self.executor.scheduler.waker(); + let mut cx = Context::from_waker(&waker); loop { let res = self .executor .borrow() - .enter(self.enter, || future.poll_future_notify(¬ify, 0)); + .enter(self.enter, || future.as_mut().poll(&mut cx)); match res { - Ok(Async::Ready(e)) => return Ok(e), - Err(e) => return Err(BlockError { inner: Some(e) }), - Ok(Async::NotReady) => {} + Poll::Ready(e) => return e, + Poll::Pending => {} } self.tick(); if let Err(_) = self.executor.park.park() { - return Err(BlockError { inner: None }); + panic!("block_on park failed"); } } } @@ -629,10 +631,11 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> { /// Handle to spawn a future on the corresponding `CurrentThread` instance #[derive(Clone)] pub struct Handle { - sender: mpsc::Sender + Send + 'static>>, + sender: mpsc::Sender + Send + 'static>>>, num_futures: Arc, shut_down: Cell, - notify: executor::NotifyHandle, + /// Waker to the Scheduler + waker: Waker, thread: thread::ThreadId, /// The thread-local ID assigned to this Handle's executor. @@ -657,12 +660,12 @@ impl Handle { /// instance of the `Handle` does not exist anymore. pub fn spawn(&self, future: F) -> Result<(), SpawnError> where - F: Future + Send + 'static, + F: Future + Send + 'static, { if thread::current().id() == self.thread { let mut e = TaskExecutor::current(); if e.id() == Some(self.id) { - return e.spawn_local(Box::new(future)); + return e.spawn_local(Box::pin(future)); } } @@ -683,10 +686,9 @@ impl Handle { } self.sender - .send(Box::new(future)) + .send(Box::pin(future)) .expect("CurrentThread does not exist anymore"); - // use 0 for the id, CurrentThread does not make use of it - self.notify.notify(0); + self.waker.wake_by_ref(); Ok(()) } @@ -731,7 +733,7 @@ impl TaskExecutor { /// Spawn a future onto the current `CurrentThread` instance. pub fn spawn_local( &mut self, - future: Box>, + future: Pin>>, ) -> Result<(), SpawnError> { CURRENT.with(|current| match current.spawn.get() { Some(spawn) => { @@ -746,7 +748,7 @@ impl TaskExecutor { impl tokio_executor::Executor for TaskExecutor { fn spawn( &mut self, - future: Box + Send>, + future: Pin + Send>>, ) -> Result<(), SpawnError> { self.spawn_local(future) } @@ -754,25 +756,10 @@ impl tokio_executor::Executor for TaskExecutor { impl tokio_executor::TypedExecutor for TaskExecutor where - F: Future + 'static, + F: Future + 'static, { fn spawn(&mut self, future: F) -> Result<(), SpawnError> { - self.spawn_local(Box::new(future)) - } -} - -impl Executor for TaskExecutor -where - F: Future + 'static, -{ - fn execute(&self, future: F) -> Result<(), ExecuteError> { - CURRENT.with(|current| match current.spawn.get() { - Some(spawn) => { - unsafe { (*spawn).spawn_local(Box::new(future), false) }; - Ok(()) - } - None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)), - }) + self.spawn_local(Box::pin(future)) } } @@ -791,11 +778,7 @@ impl<'a, U: Unpark> Borrow<'a, U> { } impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { - fn spawn_local( - &mut self, - future: Box>, - already_counted: bool, - ) { + fn spawn_local(&mut self, future: Pin>>, already_counted: bool) { if !already_counted { // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. // NOTE: += 2 since LSB is the shutdown bit diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs index decef395..709ac415 100644 --- a/tokio-current-thread/src/scheduler.rs +++ b/tokio-current-thread/src/scheduler.rs @@ -1,14 +1,14 @@ use crate::Borrow; -use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify}; -use futures::{Async, Future}; use std::cell::UnsafeCell; use std::fmt::{self, Debug}; -use std::marker::PhantomData; +use std::future::Future; use std::mem; +use std::pin::Pin; use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::thread; use std::usize; use tokio_executor::park::Unpark; @@ -22,8 +22,6 @@ pub struct Scheduler { nodes: List, } -pub struct Notify<'a, U>(&'a Arc>); - // A linked-list of nodes struct List { len: usize, @@ -78,12 +76,6 @@ struct Inner { unsafe impl Send for Inner {} unsafe impl Sync for Inner {} -impl executor::Notify for Inner { - fn notify(&self, _: usize) { - self.unpark.unpark(); - } -} - struct Node { // The item item: UnsafeCell>, @@ -123,12 +115,12 @@ enum Dequeue { } /// Wraps a spawned boxed future -struct Task(Spawn>>); +struct Task(Pin>>); /// A task that is scheduled. `turn` must be called pub struct Scheduled<'a, U> { task: &'a mut Task, - notify: &'a Notify<'a, U>, + node: &'a Arc>, done: &'a mut bool, } @@ -165,11 +157,11 @@ where } } - pub fn notify(&self) -> NotifyHandle { - self.inner.clone().into() + pub fn waker(&self) -> Waker { + waker_inner(self.inner.clone()) } - pub fn schedule(&mut self, item: Box>) { + pub fn schedule(&mut self, item: Pin>>) { // Get the current scheduler tick let tick_num = self.inner.tick_num.load(SeqCst); @@ -317,11 +309,10 @@ where // deallocating the node if need be. let borrow = &mut *bomb.borrow; let enter = &mut *bomb.enter; - let notify = Notify(bomb.node.as_ref().unwrap()); let mut scheduled = Scheduled { task: item, - notify: ¬ify, + node: bomb.node.as_ref().unwrap(), done: &mut done, }; @@ -345,10 +336,15 @@ where impl<'a, U: Unpark> Scheduled<'a, U> { /// Polls the task, returns `true` if the task has completed. pub fn tick(&mut self) -> bool { - // Tick the future - let ret = match self.task.0.poll_future_notify(self.notify, 0) { - Ok(Async::Ready(_)) | Err(_) => true, - Ok(Async::NotReady) => false, + let waker = unsafe { + // Safety: we don't hold this waker ref longer than + // this `tick` function + waker_ref(self.node) + }; + let mut cx = Context::from_waker(&waker); + let ret = match self.task.0.as_mut().poll(&mut cx) { + Poll::Ready(()) => true, + Poll::Pending => false, }; *self.done = ret; @@ -357,8 +353,8 @@ impl<'a, U: Unpark> Scheduled<'a, U> { } impl Task { - pub fn new(future: Box + 'static>) -> Self { - Task(executor::spawn(future)) + pub fn new(future: Pin + 'static>>) -> Self { + Task(future) } } @@ -630,63 +626,101 @@ impl List { } } -impl<'a, U> Clone for Notify<'a, U> { - fn clone(&self) -> Self { - Notify(self.0) - } +unsafe fn noop(_: *const ()) {} + +// ===== Raw Waker Inner ====== + +fn waker_inner(inner: Arc>) -> Waker { + let ptr = Arc::into_raw(inner) as *const (); + let vtable = &RawWakerVTable::new( + clone_inner::, + wake_inner::, + wake_by_ref_inner::, + drop_inner::, + ); + + unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } -impl<'a, U> fmt::Debug for Notify<'a, U> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Notify").finish() - } +unsafe fn clone_inner(data: *const ()) -> RawWaker { + let arc: Arc> = Arc::from_raw(data as *const Inner); + let clone = arc.clone(); + // forget both Arcs so the refcounts don't get decremented + mem::forget(arc); + mem::forget(clone); + + let vtable = &RawWakerVTable::new( + clone_inner::, + wake_inner::, + wake_by_ref_inner::, + drop_inner::, + ); + RawWaker::new(data, vtable) } -impl<'a, U: Unpark> From> for NotifyHandle { - fn from(handle: Notify<'a, U>) -> NotifyHandle { - unsafe { - let ptr = handle.0.clone(); - let ptr = mem::transmute::>, *mut ArcNode>(ptr); - NotifyHandle::new(hide_lt(ptr)) - } - } +unsafe fn wake_inner(data: *const ()) { + let arc: Arc> = Arc::from_raw(data as *const Inner); + arc.unpark.unpark(); } -struct ArcNode(PhantomData); +unsafe fn wake_by_ref_inner(data: *const ()) { + let arc: Arc> = Arc::from_raw(data as *const Inner); + arc.unpark.unpark(); + // by_ref means we don't own the Node, so forget the Arc + mem::forget(arc); +} -// We should never touch `Task` on any thread other than the one owning -// `Scheduler`, so this should be a safe operation. -unsafe impl Send for ArcNode {} -unsafe impl Sync for ArcNode {} +unsafe fn drop_inner(data: *const ()) { + drop(Arc::>::from_raw(data as *const Inner)); +} +// ===== Raw Waker Node ====== + +unsafe fn waker_ref(node: &Arc>) -> Waker { + let ptr = &*node as &Node as *const Node as *const (); + let vtable = &RawWakerVTable::new( + clone_node::, + wake_unreachable, + wake_by_ref_node::, + noop, + ); + + Waker::from_raw(RawWaker::new(ptr, vtable)) +} -impl executor::Notify for ArcNode { - fn notify(&self, _id: usize) { - unsafe { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = me as *const Arc>; - Node::notify(&*me) - } - } +unsafe fn wake_unreachable(_data: *const ()) { + unreachable!("waker_ref::wake()"); } -unsafe impl UnsafeNotify for ArcNode { - unsafe fn clone_raw(&self) -> NotifyHandle { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = &*(me as *const Arc>); - Notify(me).into() - } +unsafe fn clone_node(data: *const ()) -> RawWaker { + let arc: Arc> = Arc::from_raw(data as *const Node); + let clone = arc.clone(); + // forget both Arcs so the refcounts don't get decremented + mem::forget(arc); + mem::forget(clone); + + let vtable = &RawWakerVTable::new( + clone_node::, + wake_node::, + wake_by_ref_node::, + drop_node::, + ); + RawWaker::new(data, vtable) +} - unsafe fn drop_raw(&self) { - let mut me: *const ArcNode = self; - let me = &mut me as *mut *const ArcNode as *mut Arc>; - ptr::drop_in_place(me); - } +unsafe fn wake_node(data: *const ()) { + let arc: Arc> = Arc::from_raw(data as *const Node); + Node::::notify(&arc); +} + +unsafe fn wake_by_ref_node(data: *const ()) { + let arc: Arc> = Arc::from_raw(data as *const Node); + Node::::notify(&arc); + // by_ref means we don't own the Node, so forget the Arc + mem::forget(arc); } -unsafe fn hide_lt(p: *mut ArcNode) -> *mut dyn UnsafeNotify { - mem::transmute(p as *mut dyn UnsafeNotify) +unsafe fn drop_node(data: *const ()) { + drop(Arc::>::from_raw(data as *const Node)); } impl Node { diff --git a/tokio-current-thread/tests/current_thread.rs b/tokio-current-thread/tests/current_thread.rs index 7336baf6..794345cd 100644 --- a/tokio-current-thread/tests/current_thread.rs +++ b/tokio-current-thread/tests/current_thread.rs @@ -1,39 +1,34 @@ #![deny(warnings, rust_2018_idioms)] +#![feature(async_await)] -use futures::future::{self, lazy}; -// This is not actually unused --- we need this trait to be in scope for -// the tests that sue TaskExecutor::current().execute(). The compiler -// doesn't realise that. -#[allow(unused_imports)] -use futures::future::Executor; -use futures::prelude::*; -use futures::sync::oneshot; -use futures::task; use std::any::Any; use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use std::thread; use std::time::Duration; use tokio_current_thread::{block_on_all, CurrentThread}; +use tokio_executor::TypedExecutor; +use tokio_sync::oneshot; mod from_block_on_all { use super::*; - fn test>) + 'static>(spawn: F) { + fn test>>) + 'static>(spawn: F) { let cnt = Rc::new(Cell::new(0)); let c = cnt.clone(); - let msg = tokio_current_thread::block_on_all(lazy(move || { + let msg = tokio_current_thread::block_on_all(async move { c.set(1 + c.get()); // Spawn! - spawn(Box::new(lazy(move || { + spawn(Box::pin(async move { c.set(1 + c.get()); - Ok::<(), ()>(()) - }))); + })); - Ok::<_, ()>("hello") - })) - .unwrap(); + "hello" + }); assert_eq!(2, cnt.get()); assert_eq!(msg, "hello"); @@ -48,7 +43,7 @@ mod from_block_on_all { fn execute() { test(|f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }); } @@ -66,11 +61,10 @@ fn block_waits() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - block_on_all(rx.then(move |_| { + block_on_all(async move { + rx.await.unwrap(); cnt.set(1 + cnt.get()); - Ok::<_, ()>(()) - })) - .unwrap(); + }); assert_eq!(1, cnt2.get()); } @@ -84,10 +78,9 @@ fn spawn_many() { for _ in 0..ITER { let cnt = cnt.clone(); - tokio_current_thread.spawn(lazy(move || { + tokio_current_thread.spawn(async move { cnt.set(1 + cnt.get()); - Ok::<(), ()>(()) - })); + }); } tokio_current_thread.run().unwrap(); @@ -98,48 +91,36 @@ fn spawn_many() { mod does_not_set_global_executor_by_default { use super::*; - fn test + Send>) -> Result<(), E> + 'static, E>( + fn test + Send>>) -> Result<(), E> + 'static, E>( spawn: F, ) { - block_on_all(lazy(|| { - spawn(Box::new(lazy(|| ok()))).unwrap_err(); - ok() - })) - .unwrap() + block_on_all(async { + spawn(Box::pin(async {})).unwrap_err(); + }); } #[test] fn spawn() { - use tokio_executor::Executor; test(|f| tokio_executor::DefaultExecutor::current().spawn(f)) } - - #[test] - fn execute() { - test(|f| tokio_executor::DefaultExecutor::current().execute(f)) - } } mod from_block_on_future { use super::*; - fn test>)>(spawn: F) { + fn test>>)>(spawn: F) { let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); let mut tokio_current_thread = CurrentThread::new(); - tokio_current_thread - .block_on(lazy(|| { - let cnt = cnt.clone(); + tokio_current_thread.block_on(async move { + let cnt3 = cnt2.clone(); - spawn(Box::new(lazy(move || { - cnt.set(1 + cnt.get()); - Ok(()) - }))); - - Ok::<_, ()>(()) - })) - .unwrap(); + spawn(Box::pin(async move { + cnt3.set(1 + cnt3.get()); + })); + }); tokio_current_thread.run().unwrap(); @@ -155,35 +136,30 @@ mod from_block_on_future { fn execute() { test(|f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }); } } -struct Never(Rc<()>); - -impl Future for Never { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - Ok(Async::NotReady) - } -} - mod outstanding_tasks_are_dropped_when_executor_is_dropped { use super::*; + async fn never(_rc: Rc<()>) { + loop { + yield_once().await; + } + } + fn test(spawn: F, dotspawn: G) where - F: Fn(Box>) + 'static, - G: Fn(&mut CurrentThread, Box>), + F: Fn(Pin>>) + 'static, + G: Fn(&mut CurrentThread, Pin>>), { let mut rc = Rc::new(()); let mut tokio_current_thread = CurrentThread::new(); - dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone()))); + dotspawn(&mut tokio_current_thread, Box::pin(never(rc.clone()))); drop(tokio_current_thread); @@ -193,15 +169,13 @@ mod outstanding_tasks_are_dropped_when_executor_is_dropped { // Using the global spawn fn let mut rc = Rc::new(()); + let rc2 = rc.clone(); let mut tokio_current_thread = CurrentThread::new(); - tokio_current_thread - .block_on(lazy(|| { - spawn(Box::new(Never(rc.clone()))); - Ok::<_, ()>(()) - })) - .unwrap(); + tokio_current_thread.block_on(async move { + spawn(Box::pin(never(rc2))); + }); drop(tokio_current_thread); @@ -221,7 +195,7 @@ mod outstanding_tasks_are_dropped_when_executor_is_dropped { test( |f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }, // Note: `CurrentThread` doesn't currently implement @@ -238,12 +212,9 @@ mod outstanding_tasks_are_dropped_when_executor_is_dropped { #[test] #[should_panic] fn nesting_run() { - block_on_all(lazy(|| { - block_on_all(lazy(|| ok())).unwrap(); - - ok() - })) - .unwrap(); + block_on_all(async { + block_on_all(async {}); + }); } mod run_in_future { @@ -252,29 +223,23 @@ mod run_in_future { #[test] #[should_panic] fn spawn() { - block_on_all(lazy(|| { - tokio_current_thread::spawn(lazy(|| { - block_on_all(lazy(|| ok())).unwrap(); - ok() - })); - ok() - })) - .unwrap(); + block_on_all(async { + tokio_current_thread::spawn(async { + block_on_all(async {}); + }); + }); } #[test] #[should_panic] fn execute() { - block_on_all(lazy(|| { + block_on_all(async { tokio_current_thread::TaskExecutor::current() - .execute(lazy(|| { - block_on_all(lazy(|| ok())).unwrap(); - ok() - })) + .spawn(async { + block_on_all(async {}); + }) .unwrap(); - ok() - })) - .unwrap(); + }); } } @@ -282,23 +247,15 @@ mod run_in_future { fn tick_on_infini_future() { let num = Rc::new(Cell::new(0)); - struct Infini { - num: Rc>, - } - - impl Future for Infini { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - self.num.set(1 + self.num.get()); - task::current().notify(); - Ok(Async::NotReady) + async fn infini(num: Rc>) { + loop { + num.set(1 + num.get()); + yield_once().await } } CurrentThread::new() - .spawn(Infini { num: num.clone() }) + .spawn(infini(num.clone())) .turn(None) .unwrap(); @@ -307,56 +264,41 @@ fn tick_on_infini_future() { mod tasks_are_scheduled_fairly { use super::*; - struct Spin { - state: Rc>, - idx: usize, - } - impl Future for Spin { - type Item = (); - type Error = (); + async fn spin(state: Rc>, idx: usize) { + loop { + // borrow_mut scope + { + let mut state = state.borrow_mut(); - fn poll(&mut self) -> Poll<(), ()> { - let mut state = self.state.borrow_mut(); + if idx == 0 { + let diff = state[0] - state[1]; - if self.idx == 0 { - let diff = state[0] - state[1]; + assert!(diff.abs() <= 1); - assert!(diff.abs() <= 1); - - if state[0] >= 50 { - return Ok(().into()); + if state[0] >= 50 { + return; + } } - } - state[self.idx] += 1; + state[idx] += 1; - if state[self.idx] >= 100 { - return Ok(().into()); + if state[idx] >= 100 { + return; + } } - task::current().notify(); - Ok(Async::NotReady) + yield_once().await; } } - fn test(spawn: F) { + fn test>>)>(spawn: F) { let state = Rc::new(RefCell::new([0, 0])); - block_on_all(lazy(|| { - spawn(Spin { - state: state.clone(), - idx: 0, - }); - - spawn(Spin { - state: state, - idx: 1, - }); - - ok() - })) - .unwrap(); + block_on_all(async move { + spawn(Box::pin(spin(state.clone(), 0))); + spawn(Box::pin(spin(state, 1))); + }); } #[test] @@ -368,7 +310,7 @@ mod tasks_are_scheduled_fairly { fn execute() { test(|f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }) } @@ -379,8 +321,8 @@ mod and_turn { fn test(spawn: F, dotspawn: G) where - F: Fn(Box>) + 'static, - G: Fn(&mut CurrentThread, Box>), + F: Fn(Pin>>) + 'static, + G: Fn(&mut CurrentThread, Pin>>), { let cnt = Rc::new(Cell::new(0)); let c = cnt.clone(); @@ -388,24 +330,21 @@ mod and_turn { let mut tokio_current_thread = CurrentThread::new(); // Spawn a basic task to get the executor to turn - dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(())))); + dotspawn(&mut tokio_current_thread, Box::pin(async {})); // Turn once... tokio_current_thread.turn(None).unwrap(); dotspawn( &mut tokio_current_thread, - Box::new(lazy(move || { + Box::pin(async move { c.set(1 + c.get()); // Spawn! - spawn(Box::new(lazy(move || { + spawn(Box::pin(async move { c.set(1 + c.get()); - Ok::<(), ()>(()) - }))); - - Ok(()) - })), + })); + }), ); // This does not run the newly spawned thread @@ -429,7 +368,7 @@ mod and_turn { test( |f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }, // Note: `CurrentThread` doesn't currently implement @@ -454,23 +393,12 @@ mod in_drop { } } - struct MyFuture { - _data: Box, - } - - impl Future for MyFuture { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - Ok(().into()) - } - } + async fn noop(_data: Box) {} fn test(spawn: F, dotspawn: G) where - F: Fn(Box>) + 'static, - G: Fn(&mut CurrentThread, Box>), + F: Fn(Pin>>) + 'static, + G: Fn(&mut CurrentThread, Pin>>), { let mut tokio_current_thread = CurrentThread::new(); @@ -478,14 +406,11 @@ mod in_drop { dotspawn( &mut tokio_current_thread, - Box::new(MyFuture { - _data: Box::new(OnDrop(Some(move || { - spawn(Box::new(lazy(move || { - tx.send(()).unwrap(); - Ok(()) - }))); - }))), - }), + Box::pin(noop(Box::new(OnDrop(Some(move || { + spawn(Box::pin(async move { + tx.send(()).unwrap(); + })); + }))))), ); tokio_current_thread.block_on(rx).unwrap(); @@ -504,7 +429,7 @@ mod in_drop { test( |f| { tokio_current_thread::TaskExecutor::current() - .execute(f) + .spawn(f) .unwrap(); }, // Note: `CurrentThread` doesn't currently implemen