summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-08-06 14:03:49 -0700
committerGitHub <noreply@github.com>2019-08-06 14:03:49 -0700
commit47e2ff48d9f1daac7dba9f136b24eed64c87cf40 (patch)
tree2c3ec343ec2ac5559fcd8ade8a914b7d5f61606d
parent2f43b0a023f155a3efed4b048f5e0822072840f8 (diff)
tokio: fix API doc examples (#1396)
-rw-r--r--README.md69
-rw-r--r--tokio/Cargo.toml3
-rw-r--r--tokio/src/codec/length_delimited.rs14
-rw-r--r--tokio/src/executor.rs32
-rw-r--r--tokio/src/future.rs21
-rw-r--r--tokio/src/lib.rs63
-rw-r--r--tokio/src/prelude.rs2
-rw-r--r--tokio/src/reactor.rs23
-rw-r--r--tokio/src/runtime/current_thread/builder.rs2
-rw-r--r--tokio/src/runtime/current_thread/mod.rs14
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs13
-rw-r--r--tokio/src/runtime/mod.rs129
-rw-r--r--tokio/src/runtime/threadpool/mod.rs62
-rw-r--r--tokio/src/runtime/threadpool/task_executor.rs11
-rw-r--r--tokio/src/stream.rs21
-rw-r--r--tokio/src/timer.rs41
-rw-r--r--tokio/tests/runtime_threaded.rs15
-rw-r--r--tokio/tests/timer.rs3
18 files changed, 279 insertions, 259 deletions
diff --git a/README.md b/README.md
index 23311e13..7445f51f 100644
--- a/README.md
+++ b/README.md
@@ -33,10 +33,6 @@ the Rust programming language. It is:
[API Docs](https://docs.rs/tokio/0.1.22/tokio) |
[Chat](https://gitter.im/tokio-rs/tokio)
-The API docs for the master branch are currently out of date as master is
-undergoing significant churn as it is updated to use `std::future`. The docs
-will be udpated once the branch stabilizes.
-
## Overview
Tokio is an event-driven, non-blocking I/O platform for writing
@@ -60,41 +56,42 @@ an asynchronous application.
A basic TCP echo server with Tokio:
```rust
-use tokio::prelude::*;
-use tokio::io::copy;
+#![feature(async_await)]
+
use tokio::net::TcpListener;
+use tokio::prelude::*;
-fn main() {
- // Bind the server's socket.
- let addr = "127.0.0.1:12345".parse().unwrap();
- let listener = TcpListener::bind(&addr)
- .expect("unable to bind TCP listener");
-
- // Pull out a stream of sockets for incoming connections
- let server = listener.incoming()
- .map_err(|e| eprintln!("accept failed = {:?}", e))
- .for_each(|sock| {
- // Split up the reading and writing parts of the
- // socket.
- let (reader, writer) = sock.split();
-
- // A future that echos the data and returns how
- // many bytes were copied...
- let bytes_copied = copy(reader, writer);
-
- // ... after which we'll print what happened.
- let handle_conn = bytes_copied.map(|amt| {
- println!("wrote {:?} bytes", amt)
- }).map_err(|err| {
- eprintln!("IO error {:?}", err)
- });
-
- // Spawn the future as a concurrent task.
- tokio::spawn(handle_conn)
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let addr = "127.0.0.1:8080".parse()?;
+ let mut listener = TcpListener::bind(&addr).unwrap();
+
+ loop {
+ let (mut socket, _) = listener.accept().await?;
+
+ tokio::spawn(async move {
+ let mut buf = [0; 1024];
+
+ // In a loop, read data from the socket and write the data back.
+ loop {
+ let n = match socket.read(&mut buf).await {
+ // socket closed
+ Ok(n) if n == 0 => return,
+ Ok(n) => n,
+ Err(e) => {
+ println!("failed to read from socket; err = {:?}", e);
+ return;
+ }
+ };
+
+ // Write the data back
+ if let Err(e) = socket.write_all(&buf[0..n]).await {
+ println!("failed to write to socket; err = {:?}", e);
+ return;
+ }
+ }
});
-
- // Start the Tokio runtime
- tokio::run(server);
+ }
}
```
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 34de0806..6882772b 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -63,7 +63,8 @@ uds = ["tokio-uds"]
# Only non-optional dependency...
#futures = "0.1.20"
futures-core-preview = "= 0.3.0-alpha.17"
-futures-util-preview = "= 0.3.0-alpha.17"
+futures-sink-preview = "= 0.3.0-alpha.17"
+futures-util-preview = { version = "= 0.3.0-alpha.17", features = ["sink"] }
# Everything else is optional...
bytes = { version = "0.4", optional = true }
diff --git a/tokio/src/codec/length_delimited.rs b/tokio/src/codec/length_delimited.rs
index b4822802..565c7607 100644
--- a/tokio/src/codec/length_delimited.rs
+++ b/tokio/src/codec/length_delimited.rs
@@ -39,20 +39,24 @@
//! Specifically, given the following:
//!
//! ```
+//! #![feature(async_await)]
+//!
//! use tokio::io::{AsyncRead, AsyncWrite};
//! use tokio::codec::*;
+//! use tokio::prelude::*;
+//!
//! use bytes::Bytes;
-//! use futures::{Sink, Future};
//!
-//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) -> Result<(), Box<dyn std::error::Error>> {
+//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
+//! where
+//! T: AsyncRead + AsyncWrite + Unpin,
+//! {
//! let mut transport = Framed::new(io, LengthDelimitedCodec::new());
//! let frame = Bytes::from("hello world");
//!
-//! transport.send(frame).wait()?;
+//! transport.send(frame).await?;
//! Ok(())
//! }
-//! #
-//! # pub fn main() {}
//! ```
//!
//! The encoded frame will look like this:
diff --git a/tokio/src/executor.rs b/tokio/src/executor.rs
index 34753131..830ecf45 100644
--- a/tokio/src/executor.rs
+++ b/tokio/src/executor.rs
@@ -68,26 +68,26 @@ pub struct Spawn(());
/// In this example, a server is started and `spawn` is used to start a new task
/// that processes each received connection.
///
-/// ```rust,ignore
-/// # use futures::{Future, Stream};
+/// ```
+/// #![feature(async_await)]
+///
/// use tokio::net::TcpListener;
///
-/// # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> {
-/// # unimplemented!();
-/// # }
-/// # fn dox() {
-/// # let addr = "127.0.0.1:8080".parse().unwrap();
-/// let listener = TcpListener::bind(&addr).unwrap();
-///
-/// let server = listener.incoming()
-/// .map_err(|e| println!("error = {:?}", e))
-/// .for_each(|socket| {
-/// tokio::spawn(process(socket))
-/// });
+/// # async fn process<T>(t: T) {}
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let addr = "127.0.0.1:8080".parse()?;
+/// let mut listener = TcpListener::bind(&addr).unwrap();
+///
+/// loop {
+/// let (socket, _) = listener.accept().await?;
///
-/// tokio::run(server);
+/// tokio::spawn(async move {
+/// // Process each socket concurrently.
+/// process(socket).await
+/// });
+/// }
+/// # Ok(())
/// # }
-/// # pub fn main() {}
/// ```
///
/// [default executor]: struct.DefaultExecutor.html
diff --git a/tokio/src/future.rs b/tokio/src/future.rs
index 3bc673d0..a2e4d220 100644
--- a/tokio/src/future.rs
+++ b/tokio/src/future.rs
@@ -40,20 +40,23 @@ pub trait FutureExt: Future {
/// # Examples
///
/// ```
+ /// #![feature(async_await)]
+ ///
/// use tokio::prelude::*;
/// use std::time::Duration;
- /// # use futures::future::{self, FutureResult};
///
- /// # fn long_future() -> FutureResult<(), ()> {
- /// # future::ok(())
- /// # }
- /// #
- /// # fn main() {
- /// let future = long_future()
+ /// async fn long_future() {
+ /// // do work here
+ /// }
+ ///
+ /// # async fn dox() {
+ /// let res = long_future()
/// .timeout(Duration::from_secs(1))
- /// .map_err(|e| println!("error = {:?}", e));
+ /// .await;
///
- /// tokio::run(future);
+ /// if res.is_err() {
+ /// println!("operation timed out");
+ /// }
/// # }
/// ```
#[cfg(feature = "timer")]
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index c78372e7..06cbe960 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -17,54 +17,51 @@
//! * Asynchronous [filesystem][fs] operations.
//! * [Timer][timer] API for scheduling work in the future.
//!
-//! Tokio is built using [futures] as the abstraction for managing the
-//! complexity of asynchronous programming.
-//!
//! Guide level documentation is found on the [website].
//!
//! [website]: https://tokio.rs/docs/
-//! [futures]: http://docs.rs/futures/0.1
//!
//! # Examples
//!
//! A simple TCP echo server:
//!
-//! ```no_run,ignore
-//! use tokio::prelude::*;
-//! use tokio::io::copy;
+//! ```no_run
+//! #![feature(async_await)]
+//!
//! use tokio::net::TcpListener;
+//! use tokio::prelude::*;
//!
-//! fn main() {
-//! // Bind the server's socket.
-//! let addr = "127.0.0.1:12345".parse().unwrap();
-//! let listener = TcpListener::bind(&addr)
-//! .expect("unable to bind TCP listener");
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let addr = "127.0.0.1:8080".parse()?;
+//! let mut listener = TcpListener::bind(&addr).unwrap();
//!
-//! // Pull out a stream of sockets for incoming connections
-//! let server = listener.incoming()
-//! .map_err(|e| eprintln!("accept failed = {:?}", e))
-//! .for_each(|sock| {
-//! // Split up the reading and writing parts of the
-//! // socket.
-//! let (reader, writer) = sock.split();
+//! loop {
+//! let (mut socket, _) = listener.accept().await?;
//!
-//! // A future that echos the data and returns how
-//! // many bytes were copied...
-//! let bytes_copied = copy(reader, writer);
+//! tokio::spawn(async move {
+//! let mut buf = [0; 1024];
//!
-//! // ... after which we'll print what happened.
-//! let handle_conn = bytes_copied.map(|amt| {
-//! println!("wrote {:?} bytes", amt)
-//! }).map_err(|err| {
-//! eprintln!("IO error {:?}", err)
-//! });
+//! // In a loop, read data from the socket and write the data back.
+//! loop {
+//! let n = match socket.read(&mut buf).await {
+//! // socket closed
+//! Ok(n) if n == 0 => return,
+//! Ok(n) => n,
+//! Err(e) => {
+//! println!("failed to read from socket; err = {:?}", e);
+//! return;
+//! }
+//! };
//!
-//! // Spawn the future as a concurrent task.
-//! tokio::spawn(handle_conn)
+//! // Write the data back
+//! if let Err(e) = socket.write_all(&buf[0..n]).await {
+//! println!("failed to write to socket; err = {:?}", e);
+//! return;
+//! }
+//! }
//! });
-//!
-//! // Start the Tokio runtime
-//! tokio::run(server);
+//! }
//! }
//! ```
diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs
index 1593294d..840e7d3b 100644
--- a/tokio/src/prelude.rs
+++ b/tokio/src/prelude.rs
@@ -15,6 +15,8 @@ pub use futures_util::future::FutureExt as _;
pub use std::future::Future;
pub use crate::stream::{Stream, StreamExt as _};
+pub use futures_sink::Sink;
+pub use futures_util::sink::SinkExt as _;
pub use futures_util::stream::StreamExt as _;
#[cfg(feature = "io")]
diff --git a/tokio/src/reactor.rs b/tokio/src/reactor.rs
index 2edec1fd..40c055ee 100644
--- a/tokio/src/reactor.rs
+++ b/tokio/src/reactor.rs
@@ -20,25 +20,22 @@
//!
//! Let's start with a basic example, establishing a TCP connection.
//!
-//! ```rust,ignore
-//! # fn dox() {
-//! use tokio::prelude::*;
+//! ```
+//! #![feature(async_await)]
+//!
//! use tokio::net::TcpStream;
//!
-//! let addr = "93.184.216.34:9243".parse().unwrap();
+//! # async fn process<T>(t: T) {}
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let addr = "93.184.216.34:9243".parse()?;
//!
-//! let connect_future = TcpStream::connect(&addr);
+//! let stream = TcpStream::connect(&addr).await?;
//!
-//! let task = connect_future
-//! .and_then(|socket| {
-//! println!("successfully connected");
-//! Ok(())
-//! })
-//! .map_err(|e| println!("failed to connect; err={:?}", e));
+//! println!("successfully connected");
//!
-//! tokio::run(task);
+//! process(stream).await;
+//! # Ok(())
//! # }
-//! # fn main() {}
//! ```
//!
//! Establishing a TCP connection usually cannot be completed immediately.
diff --git a/tokio/src/runtime/current_thread/builder.rs b/tokio/src/runtime/current_thread/builder.rs
index 31a2ec3c..c3d413b7 100644
--- a/tokio/src/runtime/current_thread/builder.rs
+++ b/tokio/src/runtime/current_thread/builder.rs
@@ -20,7 +20,7 @@ use std::io;
///
/// # Examples
///
-/// ```ignore
+/// ```
/// use tokio::runtime::current_thread::Builder;
/// use tokio_timer::clock::Clock;
///
diff --git a/tokio/src/runtime/current_thread/mod.rs b/tokio/src/runtime/current_thread/mod.rs
index d81837ee..5477934f 100644
--- a/tokio/src/runtime/current_thread/mod.rs
+++ b/tokio/src/runtime/current_thread/mod.rs
@@ -23,23 +23,21 @@
//!
//! For example:
//!
-//! ```ignore
+//! ```
+//! #![feature(async_await)]
+//!
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! use std::thread;
//!
-//! # fn main() {
//! let mut runtime = Runtime::new().unwrap();
//! let handle = runtime.handle();
//!
//! thread::spawn(move || {
-//! handle.spawn(future::ok(()));
+//! handle.spawn(async {
+//! println!("hello world");
+//! });
//! }).join().unwrap();
-//!
-//! # /*
-//! runtime.run().unwrap();
-//! # */
-//! # }
//! ```
//!
//! # Examples
diff --git a/tokio/src/runtime/current_thread/runtime.rs b/tokio/src/runtime/current_thread/runtime.rs
index 2e29140e..bb58891f 100644
--- a/tokio/src/runtime/current_thread/runtime.rs
+++ b/tokio/src/runtime/current_thread/runtime.rs
@@ -121,8 +121,9 @@ impl Runtime {
///
/// # Examples
///
- /// ```rust,ignore
- /// # use futures::{future, Future, Stream};
+ /// ```
+ /// #![feature(async_await)]
+ ///
/// use tokio::runtime::current_thread::Runtime;
///
/// # fn dox() {
@@ -130,12 +131,10 @@ impl Runtime {
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
- /// rt.spawn(future::lazy(|| {
- /// println!("running on the runtime");
- /// Ok(())
- /// }));
+ /// rt.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
/// # }
- /// # pub fn main() {}
/// ```
///
/// # Panics
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index 84302a0f..a6ce7991 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -31,30 +31,46 @@
//!
//! # Usage
//!
-//! Most applications will use the [`run`] function. This takes a future to
-//! "seed" the application, blocking the thread until the runtime becomes
-//! [idle].
+//! Most applications will use the [`tokio::main`] attribute macro.
//!
-//! ```rust,ignore
-//! # use futures::{Future, Stream};
-//! use tokio::net::TcpListener;
+//! ```no_run
+//! #![feature(async_await)]
//!
-//! # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> {
-//! # unimplemented!();
-//! # }
-//! # fn dox() {
-//! # let addr = "127.0.0.1:8080".parse().unwrap();
-//! let listener = TcpListener::bind(&addr).unwrap();
-//!
-//! let server = listener.incoming()
-//! .map_err(|e| println!("error = {:?}", e))
-//! .for_each(|socket| {
-//! tokio::spawn(process(socket))
-//! });
-//!
-//! tokio::run(server);
-//! # }
-//! # pub fn main() {}
+//! use tokio::net::TcpListener;
+//! use tokio::prelude::*;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let addr = "127.0.0.1:8080".parse()?;
+//! let mut listener = TcpListener::bind(&addr).unwrap();
+//!
+//! loop {
+//! let (mut socket, _) = listener.accept().await?;
+//!
+//! tokio::spawn(async move {
+//! let mut buf = [0; 1024];
+//!
+//! // In a loop, read data from the socket and write the data back.
+//! loop {
+//! let n = match socket.read(&mut buf).await {
+//! // socket closed
+//! Ok(n) if n == 0 => return,
+//! Ok(n) => n,
+//! Err(e) => {
+//! println!("failed to read from socket; err = {:?}", e);
+//! return;
+//! }
+//! };
+//!
+//! // Write the data back
+//! if let Err(e) = socket.write_all(&buf[0..n]).await {
+//! println!("failed to write to socket; err = {:?}", e);
+//! return;
+//! }
+//! }
+//! });
+//! }
+//! }
//! ```
//!
//! In this function, the `run` function blocks until the runtime becomes idle.
@@ -66,35 +82,50 @@
//!
//! A [`Runtime`] instance can also be used directly.
//!
-//! ```rust,ignore
-//! # use futures::{Future, Stream};
-//! use tokio::runtime::Runtime;
+//! ```no_run
+//! #![feature(async_await)]
+//!
//! use tokio::net::TcpListener;
+//! use tokio::prelude::*;
+//! use tokio::runtime::Runtime;
//!
-//! # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> {
-//! # unimplemented!();
-//! # }
-//! # fn dox() {
-//! # let addr = "127.0.0.1:8080".parse().unwrap();
-//! let listener = TcpListener::bind(&addr).unwrap();
-//!
-//! let server = listener.incoming()
-//! .map_err(|e| println!("error = {:?}", e))
-//! .for_each(|socket| {
-//! tokio::spawn(process(socket))
-//! });
-//!
-//! // Create the runtime
-//! let mut rt = Runtime::new().unwrap();
-//!
-//! // Spawn the server task
-//! rt.spawn(server);
-//!
-//! // Wait until the runtime becomes idle and shut it down.
-//! rt.shutdown_on_idle()
-//! .wait().unwrap();
-//! # }
-//! # pub fn main() {}
+//! fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Create the runtime
+//! let mut rt = Runtime::new().unwrap();
+//!
+//! // Spawn the root task
+//! rt.block_on(async {
+//! let addr = "127.0.0.1:8080".parse()?;
+//! let mut listener = TcpListener::bind(&addr).unwrap();
+//!
+//! loop {
+//! let (mut socket, _) = listener.accept().await?;
+//!
+//! tokio::spawn(async move {
+//! let mut buf = [0; 1024];
+//!
+//! // In a loop, read data from the socket and write the data back.
+//! loop {
+//! let n = match socket.read(&mut buf).await {
+//! // socket closed
+//! Ok(n) if n == 0 => return,
+//! Ok(n) => n,
+//! Err(e) => {
+//! println!("failed to read from socket; err = {:?}", e);
+//! return;
+//! }
+//! };
+//!
+//! // Write the data back
+//! if let Err(e) = socket.write_all(&buf[0..n]).await {
+//! println!("failed to write to socket; err = {:?}", e);
+//! return;
+//! }
+//! }
+//! });
+//! }
+//! })
+//! }
//! ```
//!
//! [reactor]: ../reactor/struct.Reactor.html
diff --git a/tokio/src/runtime/threadpool/mod.rs b/tokio/src/runtime/threadpool/mod.rs
index 5955210a..b699bbf4 100644
--- a/tokio/src/runtime/threadpool/mod.rs
+++ b/tokio/src/runtime/threadpool/mod.rs
@@ -79,8 +79,7 @@ impl Runtime {
/// // Use the runtime...
///
/// // Shutdown the runtime
- /// rt.shutdown_now()
- /// .wait().unwrap();
+ /// rt.shutdown_now();
/// ```
///
/// [mod]: index.html
@@ -121,21 +120,22 @@ impl Runtime {
///
/// # Examples
///
- /// ```rust
- /// # use futures::{future, Future, Stream};
+ /// ```
+ /// #![feature(async_await)]
+ ///
/// use tokio::runtime::Runtime;
///
- /// # fn dox() {
- /// // Create the runtime
- /// let rt = Runtime::new().unwrap();
- ///
- /// // Spawn a future onto the runtime
- /// rt.spawn(future::lazy(|| {
- /// println!("now running on a worker thread");
- /// Ok(())
- /// }));
- /// # }
- /// # pub fn main() {}
+ /// fn main() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// // Spawn a future onto the runtime
+ /// rt.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
+ ///
+ /// rt.shutdown_on_idle();
+ /// }
/// ```
///
/// # Panics
@@ -183,9 +183,7 @@ impl Runtime {
/// Signals the runtime to shutdown once it becomes idle.
///
- /// Returns a future that completes once the shutdown operation has
- /// completed.
- ///
+ /// Blocks the current thread until the shutdown operation has completed.
/// This function can be used to perform a graceful shutdown of the runtime.
///
/// The runtime enters an idle state once **all** of the following occur.
@@ -208,25 +206,24 @@ impl Runtime {
/// // Use the runtime...
///
/// // Shutdown the runtime
- /// rt.shutdown_on_idle()
- /// .wait().unwrap();
+ /// rt.shutdown_on_idle();
/// ```
///
/// [mod]: index.html
- pub async fn shutdown_on_idle(mut self) {
- let inner = self.inner.take().unwrap();
- let inner = inner.pool.shutdown_on_idle();
+ pub fn shutdown_on_idle(mut self) {
+ let mut e = tokio_executor::enter().unwrap();
- inner.await;
+ let inner = self.inner.take().unwrap();
+ e.block_on(inner.pool.shutdown_on_idle());
}
/// Signals the runtime to shutdown immediately.
///
- /// Returns a future that completes once the shutdown operation has
- /// completed.
- ///
+ /// Blocks the current thread until the shutdown operation has completed.
/// This function will forcibly shutdown the runtime, causing any
- /// in-progress work to become canceled. The shutdown steps are:
+ /// in-progress work to become canceled.
+ ///
+ /// The shutdown steps are:
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
@@ -250,14 +247,15 @@ impl Runtime {
/// // Use the runtime...
///
/// // Shutdown the runtime
- /// rt.shutdown_now()
- /// .wait().unwrap();
+ /// rt.shutdown_now();
/// ```
///
/// [mod]: index.html
- pub async fn shutdown_now(mut self) {
+ pub fn shutdown_now(mut self) {
+ let mut e = tokio_executor::enter().unwrap();
let inner = self.inner.take().unwrap();
- inner.pool.shutdown_now().await;
+
+ e.block_on(inner.pool.shutdown_now());
}
fn inner(&self) -> &Inner {
diff --git a/tokio/src/runtime/threadpool/task_executor.rs b/tokio/src/runtime/threadpool/task_executor.rs
index f4dfce6b..9dee3fcb 100644
--- a/tokio/src/runtime/threadpool/task_executor.rs
+++ b/tokio/src/runtime/threadpool/task_executor.rs
@@ -28,8 +28,9 @@ impl TaskExecutor {
///
/// # Examples
///
- /// ```rust
- /// # use futures::{future, Future, Stream};
+ /// ```
+ /// #![feature(async_await)]
+ ///
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
@@ -38,12 +39,10 @@ impl TaskExecutor {
/// let executor = rt.executor();
///
/// // Spawn a future onto the runtime
- /// executor.spawn(future::lazy(|| {
+ /// executor.spawn(async {
/// println!("now running on a worker thread");
- /// Ok(())
- /// }));
+ /// });
/// # }
- /// # pub fn main() {}
/// ```
///
/// # Panics
diff --git a/tokio/src/stream.rs b/tokio/src/stream.rs
index 5f0d0ee0..1fd19022 100644
--- a/tokio/src/stream.rs
+++ b/tokio/src/stream.rs
@@ -48,22 +48,23 @@ pub trait StreamExt: Stream {
/// # Examples
///
/// ```
+ /// #![feature(async_await)]
+ ///
/// use tokio::prelude::*;
+ ///
/// use std::time::Duration;
- /// # use futures::future::{self, FutureResult};
///
- /// # fn long_future() -> FutureResult<(), ()> {
- /// # future::ok(())
+ /// # fn slow_stream() -> impl Stream<Item = ()> {
+ /// # tokio::stream::empty()
/// # }
/// #
- /// # fn main() {
- /// let stream = long_future()
- /// .into_stream()
- /// .timeout(Duration::from_secs(1))
- /// .for_each(|i| future::ok(println!("item = {:?}", i)))
- /// .map_err(|e| println!("error = {:?}", e));
+ /// # async fn dox() {
+ /// let mut stream = slow_stream()
+ /// .timeout(Duration::from_secs(1));
///
- /// tokio::run(stream);
+ /// while let Some(value) = stream.next().await {
+ /// println!("value = {:?}", value);
+ /// }
/// # }
/// ```
#[cfg(feature = "timer")]
diff --git a/tokio/src/timer.rs b/tokio/src/timer.rs
index 716524c8..fd88af0f 100644
--- a/tokio/src/timer.rs
+++ b/tokio/src/timer.rs
@@ -30,21 +30,20 @@
//! Wait 100ms and print "Hello World!"
//!
//! ```
+//! #![feature(async_await)]
+//!
//! use tokio::prelude::*;
//! use tokio::timer::Delay;
//!
//! use std::time::{Duration, Instant};
//!
-//! let when = Instant::now() + Duration::from_millis(100);
//!
-//! tokio::run({
-//! Delay::new(when)
-//! .map_err(|e| panic!("timer failed; err={:?}", e))
-//! .and_then(|_| {
-//! println!("Hello world!");
-//! Ok(())
-//! })
-//! })
+//! #[tokio::main]
+//! async fn main() {
+//! let when = tokio::clock::now() + Duration::from_millis(100);
+//! Delay::new(when).await;
+//! println!("100 ms have elapsed");
+//! }
//! ```
//!
//! Require that an operation takes no more than 300ms. Note that this uses the
@@ -52,23 +51,23 @@
//! included in the prelude.
//!
//! ```
-//! use tokio::prelude::*;
+//! #![feature(async_await)]
//!
+//! use tokio::prelude::*;
//! use std::time::Duration;
//!
-//! fn long_op() -> Box<dyn Future<Item = (), Error = ()> + Send> {
-//! // ...
-//! # Box::new(futures::future::ok(()))
+//! async fn long_future() {
+//! // do work here
//! }
//!
-//! # fn main() {
-//! tokio::run({
-//! long_op()
-//! .timeout(Duration::from_millis(300))
-//! .map_err(|e| {
-//! println!("operation timed out");
-//! })
-//! })
+//! # async fn dox() {
+//! let res = long_future()
+//! .timeout(Duration::from_secs(1))
+//! .await;
+//!
+//! if res.is_err() {
+//! println!("operation timed out");
+//! }
//! # }
//! ```
//!
diff --git a/tokio/tests/runtime_threaded.rs b/tokio/tests/runtime_threaded.rs
index cdec008b..6d04db76 100644
--- a/tokio/tests/runtime_threaded.rs
+++ b/tokio/tests/runtime_threaded.rs
@@ -52,8 +52,7 @@ fn spawn_shutdown() {
let f = Box::pin(client_server(tx));
tokio_executor::Executor::spawn(&mut rt.executor(), f).unwrap();
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
assert_ok!(rx.try_recv());
assert_ok!(rx.try_recv());
@@ -72,8 +71,7 @@ fn block_on_timer() {
assert_eq!(v, 42);
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
}
#[test]
@@ -118,8 +116,7 @@ fn block_waits() {
assert_ok!(b_rx.try_recv());
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
}
#[test]
@@ -140,8 +137,7 @@ fn spawn_many() {
}
});
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
assert_eq!(ITER, *cnt.lock().unwrap());
}
@@ -206,8 +202,7 @@ fn after_start_and_before_stop_is_called() {
rt.block_on(client_server(tx));
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
assert_ok!(rx.try_recv());
diff --git a/tokio/tests/timer.rs b/tokio/tests/timer.rs
index 297a312e..63994d18 100644
--- a/tokio/tests/timer.rs
+++ b/tokio/tests/timer.rs
@@ -25,8 +25,7 @@ fn timer_with_threaded_runtime() {
tx.send(()).unwrap();
});
- let mut e = tokio_executor::enter().unwrap();
- e.block_on(rt.shutdown_on_idle());
+ rt.shutdown_on_idle();
rx.recv().unwrap();
}