diff options
author | Carl Lerche <me@carllerche.com> | 2019-08-06 14:03:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-06 14:03:49 -0700 |
commit | 47e2ff48d9f1daac7dba9f136b24eed64c87cf40 (patch) | |
tree | 2c3ec343ec2ac5559fcd8ade8a914b7d5f61606d | |
parent | 2f43b0a023f155a3efed4b048f5e0822072840f8 (diff) |
tokio: fix API doc examples (#1396)
-rw-r--r-- | README.md | 69 | ||||
-rw-r--r-- | tokio/Cargo.toml | 3 | ||||
-rw-r--r-- | tokio/src/codec/length_delimited.rs | 14 | ||||
-rw-r--r-- | tokio/src/executor.rs | 32 | ||||
-rw-r--r-- | tokio/src/future.rs | 21 | ||||
-rw-r--r-- | tokio/src/lib.rs | 63 | ||||
-rw-r--r-- | tokio/src/prelude.rs | 2 | ||||
-rw-r--r-- | tokio/src/reactor.rs | 23 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/builder.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/mod.rs | 14 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/runtime.rs | 13 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 129 | ||||
-rw-r--r-- | tokio/src/runtime/threadpool/mod.rs | 62 | ||||
-rw-r--r-- | tokio/src/runtime/threadpool/task_executor.rs | 11 | ||||
-rw-r--r-- | tokio/src/stream.rs | 21 | ||||
-rw-r--r-- | tokio/src/timer.rs | 41 | ||||
-rw-r--r-- | tokio/tests/runtime_threaded.rs | 15 | ||||
-rw-r--r-- | tokio/tests/timer.rs | 3 |
18 files changed, 279 insertions, 259 deletions
@@ -33,10 +33,6 @@ the Rust programming language. It is: [API Docs](https://docs.rs/tokio/0.1.22/tokio) | [Chat](https://gitter.im/tokio-rs/tokio) -The API docs for the master branch are currently out of date as master is -undergoing significant churn as it is updated to use `std::future`. The docs -will be udpated once the branch stabilizes. - ## Overview Tokio is an event-driven, non-blocking I/O platform for writing @@ -60,41 +56,42 @@ an asynchronous application. A basic TCP echo server with Tokio: ```rust -use tokio::prelude::*; -use tokio::io::copy; +#![feature(async_await)] + use tokio::net::TcpListener; +use tokio::prelude::*; -fn main() { - // Bind the server's socket. - let addr = "127.0.0.1:12345".parse().unwrap(); - let listener = TcpListener::bind(&addr) - .expect("unable to bind TCP listener"); - - // Pull out a stream of sockets for incoming connections - let server = listener.incoming() - .map_err(|e| eprintln!("accept failed = {:?}", e)) - .for_each(|sock| { - // Split up the reading and writing parts of the - // socket. - let (reader, writer) = sock.split(); - - // A future that echos the data and returns how - // many bytes were copied... - let bytes_copied = copy(reader, writer); - - // ... after which we'll print what happened. - let handle_conn = bytes_copied.map(|amt| { - println!("wrote {:?} bytes", amt) - }).map_err(|err| { - eprintln!("IO error {:?}", err) - }); - - // Spawn the future as a concurrent task. - tokio::spawn(handle_conn) +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let addr = "127.0.0.1:8080".parse()?; + let mut listener = TcpListener::bind(&addr).unwrap(); + + loop { + let (mut socket, _) = listener.accept().await?; + + tokio::spawn(async move { + let mut buf = [0; 1024]; + + // In a loop, read data from the socket and write the data back. + loop { + let n = match socket.read(&mut buf).await { + // socket closed + Ok(n) if n == 0 => return, + Ok(n) => n, + Err(e) => { + println!("failed to read from socket; err = {:?}", e); + return; + } + }; + + // Write the data back + if let Err(e) = socket.write_all(&buf[0..n]).await { + println!("failed to write to socket; err = {:?}", e); + return; + } + } }); - - // Start the Tokio runtime - tokio::run(server); + } } ``` diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 34de0806..6882772b 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -63,7 +63,8 @@ uds = ["tokio-uds"] # Only non-optional dependency... #futures = "0.1.20" futures-core-preview = "= 0.3.0-alpha.17" -futures-util-preview = "= 0.3.0-alpha.17" +futures-sink-preview = "= 0.3.0-alpha.17" +futures-util-preview = { version = "= 0.3.0-alpha.17", features = ["sink"] } # Everything else is optional... bytes = { version = "0.4", optional = true } diff --git a/tokio/src/codec/length_delimited.rs b/tokio/src/codec/length_delimited.rs index b4822802..565c7607 100644 --- a/tokio/src/codec/length_delimited.rs +++ b/tokio/src/codec/length_delimited.rs @@ -39,20 +39,24 @@ //! Specifically, given the following: //! //! ``` +//! #![feature(async_await)] +//! //! use tokio::io::{AsyncRead, AsyncWrite}; //! use tokio::codec::*; +//! use tokio::prelude::*; +//! //! use bytes::Bytes; -//! use futures::{Sink, Future}; //! -//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) -> Result<(), Box<dyn std::error::Error>> { +//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>> +//! where +//! T: AsyncRead + AsyncWrite + Unpin, +//! { //! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); //! let frame = Bytes::from("hello world"); //! -//! transport.send(frame).wait()?; +//! transport.send(frame).await?; //! Ok(()) //! } -//! # -//! # pub fn main() {} //! ``` //! //! The encoded frame will look like this: diff --git a/tokio/src/executor.rs b/tokio/src/executor.rs index 34753131..830ecf45 100644 --- a/tokio/src/executor.rs +++ b/tokio/src/executor.rs @@ -68,26 +68,26 @@ pub struct Spawn(()); /// In this example, a server is started and `spawn` is used to start a new task /// that processes each received connection. /// -/// ```rust,ignore -/// # use futures::{Future, Stream}; +/// ``` +/// #![feature(async_await)] +/// /// use tokio::net::TcpListener; /// -/// # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> { -/// # unimplemented!(); -/// # } -/// # fn dox() { -/// # let addr = "127.0.0.1:8080".parse().unwrap(); -/// let listener = TcpListener::bind(&addr).unwrap(); -/// -/// let server = listener.incoming() -/// .map_err(|e| println!("error = {:?}", e)) -/// .for_each(|socket| { -/// tokio::spawn(process(socket)) -/// }); +/// # async fn process<T>(t: T) {} +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let addr = "127.0.0.1:8080".parse()?; +/// let mut listener = TcpListener::bind(&addr).unwrap(); +/// +/// loop { +/// let (socket, _) = listener.accept().await?; /// -/// tokio::run(server); +/// tokio::spawn(async move { +/// // Process each socket concurrently. +/// process(socket).await +/// }); +/// } +/// # Ok(()) /// # } -/// # pub fn main() {} /// ``` /// /// [default executor]: struct.DefaultExecutor.html diff --git a/tokio/src/future.rs b/tokio/src/future.rs index 3bc673d0..a2e4d220 100644 --- a/tokio/src/future.rs +++ b/tokio/src/future.rs @@ -40,20 +40,23 @@ pub trait FutureExt: Future { /// # Examples /// /// ``` + /// #![feature(async_await)] + /// /// use tokio::prelude::*; /// use std::time::Duration; - /// # use futures::future::{self, FutureResult}; /// - /// # fn long_future() -> FutureResult<(), ()> { - /// # future::ok(()) - /// # } - /// # - /// # fn main() { - /// let future = long_future() + /// async fn long_future() { + /// // do work here + /// } + /// + /// # async fn dox() { + /// let res = long_future() /// .timeout(Duration::from_secs(1)) - /// .map_err(|e| println!("error = {:?}", e)); + /// .await; /// - /// tokio::run(future); + /// if res.is_err() { + /// println!("operation timed out"); + /// } /// # } /// ``` #[cfg(feature = "timer")] diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index c78372e7..06cbe960 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -17,54 +17,51 @@ //! * Asynchronous [filesystem][fs] operations. //! * [Timer][timer] API for scheduling work in the future. //! -//! Tokio is built using [futures] as the abstraction for managing the -//! complexity of asynchronous programming. -//! //! Guide level documentation is found on the [website]. //! //! [website]: https://tokio.rs/docs/ -//! [futures]: http://docs.rs/futures/0.1 //! //! # Examples //! //! A simple TCP echo server: //! -//! ```no_run,ignore -//! use tokio::prelude::*; -//! use tokio::io::copy; +//! ```no_run +//! #![feature(async_await)] +//! //! use tokio::net::TcpListener; +//! use tokio::prelude::*; //! -//! fn main() { -//! // Bind the server's socket. -//! let addr = "127.0.0.1:12345".parse().unwrap(); -//! let listener = TcpListener::bind(&addr) -//! .expect("unable to bind TCP listener"); +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let addr = "127.0.0.1:8080".parse()?; +//! let mut listener = TcpListener::bind(&addr).unwrap(); //! -//! // Pull out a stream of sockets for incoming connections -//! let server = listener.incoming() -//! .map_err(|e| eprintln!("accept failed = {:?}", e)) -//! .for_each(|sock| { -//! // Split up the reading and writing parts of the -//! // socket. -//! let (reader, writer) = sock.split(); +//! loop { +//! let (mut socket, _) = listener.accept().await?; //! -//! // A future that echos the data and returns how -//! // many bytes were copied... -//! let bytes_copied = copy(reader, writer); +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; //! -//! // ... after which we'll print what happened. -//! let handle_conn = bytes_copied.map(|amt| { -//! println!("wrote {:?} bytes", amt) -//! }).map_err(|err| { -//! eprintln!("IO error {:?}", err) -//! }); +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; //! -//! // Spawn the future as a concurrent task. -//! tokio::spawn(handle_conn) +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } //! }); -//! -//! // Start the Tokio runtime -//! tokio::run(server); +//! } //! } //! ``` diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs index 1593294d..840e7d3b 100644 --- a/tokio/src/prelude.rs +++ b/tokio/src/prelude.rs @@ -15,6 +15,8 @@ pub use futures_util::future::FutureExt as _; pub use std::future::Future; pub use crate::stream::{Stream, StreamExt as _}; +pub use futures_sink::Sink; +pub use futures_util::sink::SinkExt as _; pub use futures_util::stream::StreamExt as _; #[cfg(feature = "io")] diff --git a/tokio/src/reactor.rs b/tokio/src/reactor.rs index 2edec1fd..40c055ee 100644 --- a/tokio/src/reactor.rs +++ b/tokio/src/reactor.rs @@ -20,25 +20,22 @@ //! //! Let's start with a basic example, establishing a TCP connection. //! -//! ```rust,ignore -//! # fn dox() { -//! use tokio::prelude::*; +//! ``` +//! #![feature(async_await)] +//! //! use tokio::net::TcpStream; //! -//! let addr = "93.184.216.34:9243".parse().unwrap(); +//! # async fn process<T>(t: T) {} +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let addr = "93.184.216.34:9243".parse()?; //! -//! let connect_future = TcpStream::connect(&addr); +//! let stream = TcpStream::connect(&addr).await?; //! -//! let task = connect_future -//! .and_then(|socket| { -//! println!("successfully connected"); -//! Ok(()) -//! }) -//! .map_err(|e| println!("failed to connect; err={:?}", e)); +//! println!("successfully connected"); //! -//! tokio::run(task); +//! process(stream).await; +//! # Ok(()) //! # } -//! # fn main() {} //! ``` //! //! Establishing a TCP connection usually cannot be completed immediately. diff --git a/tokio/src/runtime/current_thread/builder.rs b/tokio/src/runtime/current_thread/builder.rs index 31a2ec3c..c3d413b7 100644 --- a/tokio/src/runtime/current_thread/builder.rs +++ b/tokio/src/runtime/current_thread/builder.rs @@ -20,7 +20,7 @@ use std::io; /// /// # Examples /// -/// ```ignore +/// ``` /// use tokio::runtime::current_thread::Builder; /// use tokio_timer::clock::Clock; /// diff --git a/tokio/src/runtime/current_thread/mod.rs b/tokio/src/runtime/current_thread/mod.rs index d81837ee..5477934f 100644 --- a/tokio/src/runtime/current_thread/mod.rs +++ b/tokio/src/runtime/current_thread/mod.rs @@ -23,23 +23,21 @@ //! //! For example: //! -//! ```ignore +//! ``` +//! #![feature(async_await)] +//! //! use tokio::runtime::current_thread::Runtime; //! use tokio::prelude::*; //! use std::thread; //! -//! # fn main() { //! let mut runtime = Runtime::new().unwrap(); //! let handle = runtime.handle(); //! //! thread::spawn(move || { -//! handle.spawn(future::ok(())); +//! handle.spawn(async { +//! println!("hello world"); +//! }); //! }).join().unwrap(); -//! -//! # /* -//! runtime.run().unwrap(); -//! # */ -//! # } //! ``` //! //! # Examples diff --git a/tokio/src/runtime/current_thread/runtime.rs b/tokio/src/runtime/current_thread/runtime.rs index 2e29140e..bb58891f 100644 --- a/tokio/src/runtime/current_thread/runtime.rs +++ b/tokio/src/runtime/current_thread/runtime.rs @@ -121,8 +121,9 @@ impl Runtime { /// /// # Examples /// - /// ```rust,ignore - /// # use futures::{future, Future, Stream}; + /// ``` + /// #![feature(async_await)] + /// /// use tokio::runtime::current_thread::Runtime; /// /// # fn dox() { @@ -130,12 +131,10 @@ impl Runtime { /// let mut rt = Runtime::new().unwrap(); /// /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|| { - /// println!("running on the runtime"); - /// Ok(()) - /// })); + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); /// # } - /// # pub fn main() {} /// ``` /// /// # Panics diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 84302a0f..a6ce7991 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -31,30 +31,46 @@ //! //! # Usage //! -//! Most applications will use the [`run`] function. This takes a future to -//! "seed" the application, blocking the thread until the runtime becomes -//! [idle]. +//! Most applications will use the [`tokio::main`] attribute macro. //! -//! ```rust,ignore -//! # use futures::{Future, Stream}; -//! use tokio::net::TcpListener; +//! ```no_run +//! #![feature(async_await)] //! -//! # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> { -//! # unimplemented!(); -//! # } -//! # fn dox() { -//! # let addr = "127.0.0.1:8080".parse().unwrap(); -//! let listener = TcpListener::bind(&addr).unwrap(); -//! -//! let server = listener.incoming() -//! .map_err(|e| println!("error = {:?}", e)) -//! .for_each(|socket| { -//! tokio::spawn(process(socket)) -//! }); -//! -//! tokio::run(server); -//! # } -//! # pub fn main() {} +//! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let addr = "127.0.0.1:8080".parse()?; +//! let mut listener = TcpListener::bind(&addr).unwrap(); +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! } //! ``` //! //! In this function, the `run` function blocks until the runtime becomes idle. @@ -66,35 +82,50 @@ //! //! A [`Runtime`] instance can also be used directly. //! -//! ```rust,ignore -//! # use futures::{Future, Stream}; -//! use tokio::runtime::Runtime; +//! ```no_run +//! #![feature(async_await)] +//! //! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! use tokio::runtime::Runtime; //! -//! # fn process<T>(_: T) -> Box<dyn Future<Item = (), Error = ()> + Send> { -//! # unimplemented!(); -//! # } -//! # fn dox() { -//! # let addr = "127.0.0.1:8080".parse().unwrap(); -//! let listener = TcpListener::bind(&addr).unwrap(); -//! -//! let server = listener.incoming() -//! .map_err(|e| println!("error = {:?}", e)) -//! .for_each(|socket| { -//! tokio::spawn(process(socket)) -//! }); -//! -//! // Create the runtime -//! let mut rt = Runtime::new().unwrap(); -//! -//! // Spawn the server task -//! rt.spawn(server); -//! -//! // Wait until the runtime becomes idle and shut it down. -//! rt.shutdown_on_idle() -//! .wait().unwrap(); -//! # } -//! # pub fn main() {} +//! fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create the runtime +//! let mut rt = Runtime::new().unwrap(); +//! +//! // Spawn the root task +//! rt.block_on(async { +//! let addr = "127.0.0.1:8080".parse()?; +//! let mut listener = TcpListener::bind(&addr).unwrap(); +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! }) +//! } //! ``` //! //! [reactor]: ../reactor/struct.Reactor.html diff --git a/tokio/src/runtime/threadpool/mod.rs b/tokio/src/runtime/threadpool/mod.rs index 5955210a..b699bbf4 100644 --- a/tokio/src/runtime/threadpool/mod.rs +++ b/tokio/src/runtime/threadpool/mod.rs @@ -79,8 +79,7 @@ impl Runtime { /// // Use the runtime... /// /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); + /// rt.shutdown_now(); /// ``` /// /// [mod]: index.html @@ -121,21 +120,22 @@ impl Runtime { /// /// # Examples /// - /// ```rust - /// # use futures::{future, Future, Stream}; + /// ``` + /// #![feature(async_await)] + /// /// use tokio::runtime::Runtime; /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|| { - /// println!("now running on a worker thread"); - /// Ok(()) - /// })); - /// # } - /// # pub fn main() {} + /// fn main() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// + /// rt.shutdown_on_idle(); + /// } /// ``` /// /// # Panics @@ -183,9 +183,7 @@ impl Runtime { /// Signals the runtime to shutdown once it becomes idle. /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// + /// Blocks the current thread until the shutdown operation has completed. /// This function can be used to perform a graceful shutdown of the runtime. /// /// The runtime enters an idle state once **all** of the following occur. @@ -208,25 +206,24 @@ impl Runtime { /// // Use the runtime... /// /// // Shutdown the runtime - /// rt.shutdown_on_idle() - /// .wait().unwrap(); + /// rt.shutdown_on_idle(); /// ``` /// /// [mod]: index.html - pub async fn shutdown_on_idle(mut self) { - let inner = self.inner.take().unwrap(); - let inner = inner.pool.shutdown_on_idle(); + pub fn shutdown_on_idle(mut self) { + let mut e = tokio_executor::enter().unwrap(); - inner.await; + let inner = self.inner.take().unwrap(); + e.block_on(inner.pool.shutdown_on_idle()); } /// Signals the runtime to shutdown immediately. /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// + /// Blocks the current thread until the shutdown operation has completed. /// This function will forcibly shutdown the runtime, causing any - /// in-progress work to become canceled. The shutdown steps are: + /// in-progress work to become canceled. + /// + /// The shutdown steps are: /// /// * Drain any scheduled work queues. /// * Drop any futures that have not yet completed. @@ -250,14 +247,15 @@ impl Runtime { /// // Use the runtime... /// /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); + /// rt.shutdown_now(); /// ``` /// /// [mod]: index.html - pub async fn shutdown_now(mut self) { + pub fn shutdown_now(mut self) { + let mut e = tokio_executor::enter().unwrap(); let inner = self.inner.take().unwrap(); - inner.pool.shutdown_now().await; + + e.block_on(inner.pool.shutdown_now()); } fn inner(&self) -> &Inner { diff --git a/tokio/src/runtime/threadpool/task_executor.rs b/tokio/src/runtime/threadpool/task_executor.rs index f4dfce6b..9dee3fcb 100644 --- a/tokio/src/runtime/threadpool/task_executor.rs +++ b/tokio/src/runtime/threadpool/task_executor.rs @@ -28,8 +28,9 @@ impl TaskExecutor { /// /// # Examples /// - /// ```rust - /// # use futures::{future, Future, Stream}; + /// ``` + /// #![feature(async_await)] + /// /// use tokio::runtime::Runtime; /// /// # fn dox() { @@ -38,12 +39,10 @@ impl TaskExecutor { /// let executor = rt.executor(); /// /// // Spawn a future onto the runtime - /// executor.spawn(future::lazy(|| { + /// executor.spawn(async { /// println!("now running on a worker thread"); - /// Ok(()) - /// })); + /// }); /// # } - /// # pub fn main() {} /// ``` /// /// # Panics diff --git a/tokio/src/stream.rs b/tokio/src/stream.rs index 5f0d0ee0..1fd19022 100644 --- a/tokio/src/stream.rs +++ b/tokio/src/stream.rs @@ -48,22 +48,23 @@ pub trait StreamExt: Stream { /// # Examples /// /// ``` + /// #![feature(async_await)] + /// /// use tokio::prelude::*; + /// /// use std::time::Duration; - /// # use futures::future::{self, FutureResult}; /// - /// # fn long_future() -> FutureResult<(), ()> { - /// # future::ok(()) + /// # fn slow_stream() -> impl Stream<Item = ()> { + /// # tokio::stream::empty() /// # } /// # - /// # fn main() { - /// let stream = long_future() - /// .into_stream() - /// .timeout(Duration::from_secs(1)) - /// .for_each(|i| future::ok(println!("item = {:?}", i))) - /// .map_err(|e| println!("error = {:?}", e)); + /// # async fn dox() { + /// let mut stream = slow_stream() + /// .timeout(Duration::from_secs(1)); /// - /// tokio::run(stream); + /// while let Some(value) = stream.next().await { + /// println!("value = {:?}", value); + /// } /// # } /// ``` #[cfg(feature = "timer")] diff --git a/tokio/src/timer.rs b/tokio/src/timer.rs index 716524c8..fd88af0f 100644 --- a/tokio/src/timer.rs +++ b/tokio/src/timer.rs @@ -30,21 +30,20 @@ //! Wait 100ms and print "Hello World!" //! //! ``` +//! #![feature(async_await)] +//! //! use tokio::prelude::*; //! use tokio::timer::Delay; //! //! use std::time::{Duration, Instant}; //! -//! let when = Instant::now() + Duration::from_millis(100); //! -//! tokio::run({ -//! Delay::new(when) -//! .map_err(|e| panic!("timer failed; err={:?}", e)) -//! .and_then(|_| { -//! println!("Hello world!"); -//! Ok(()) -//! }) -//! }) +//! #[tokio::main] +//! async fn main() { +//! let when = tokio::clock::now() + Duration::from_millis(100); +//! Delay::new(when).await; +//! println!("100 ms have elapsed"); +//! } //! ``` //! //! Require that an operation takes no more than 300ms. Note that this uses the @@ -52,23 +51,23 @@ //! included in the prelude. //! //! ``` -//! use tokio::prelude::*; +//! #![feature(async_await)] //! +//! use tokio::prelude::*; //! use std::time::Duration; //! -//! fn long_op() -> Box<dyn Future<Item = (), Error = ()> + Send> { -//! // ... -//! # Box::new(futures::future::ok(())) +//! async fn long_future() { +//! // do work here //! } //! -//! # fn main() { -//! tokio::run({ -//! long_op() -//! .timeout(Duration::from_millis(300)) -//! .map_err(|e| { -//! println!("operation timed out"); -//! }) -//! }) +//! # async fn dox() { +//! let res = long_future() +//! .timeout(Duration::from_secs(1)) +//! .await; +//! +//! if res.is_err() { +//! println!("operation timed out"); +//! } //! # } //! ``` //! diff --git a/tokio/tests/runtime_threaded.rs b/tokio/tests/runtime_threaded.rs index cdec008b..6d04db76 100644 --- a/tokio/tests/runtime_threaded.rs +++ b/tokio/tests/runtime_threaded.rs @@ -52,8 +52,7 @@ fn spawn_shutdown() { let f = Box::pin(client_server(tx)); tokio_executor::Executor::spawn(&mut rt.executor(), f).unwrap(); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); assert_ok!(rx.try_recv()); assert_ok!(rx.try_recv()); @@ -72,8 +71,7 @@ fn block_on_timer() { assert_eq!(v, 42); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); } #[test] @@ -118,8 +116,7 @@ fn block_waits() { assert_ok!(b_rx.try_recv()); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); } #[test] @@ -140,8 +137,7 @@ fn spawn_many() { } }); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); assert_eq!(ITER, *cnt.lock().unwrap()); } @@ -206,8 +202,7 @@ fn after_start_and_before_stop_is_called() { rt.block_on(client_server(tx)); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); assert_ok!(rx.try_recv()); diff --git a/tokio/tests/timer.rs b/tokio/tests/timer.rs index 297a312e..63994d18 100644 --- a/tokio/tests/timer.rs +++ b/tokio/tests/timer.rs @@ -25,8 +25,7 @@ fn timer_with_threaded_runtime() { tx.send(()).unwrap(); }); - let mut e = tokio_executor::enter().unwrap(); - e.block_on(rt.shutdown_on_idle()); + rt.shutdown_on_idle(); rx.recv().unwrap(); } |