diff options
author | Alex Crichton <alex@alexcrichton.com> | 2017-12-11 21:29:18 -0600 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2017-12-11 21:29:18 -0600 |
commit | a577bfc033b50c913c2241c432bcaeac3917145c (patch) | |
tree | 1151bc60d9f9373722d6bea9127b965a4db470bc /examples | |
parent | 32f2750c2d99e82d64033c5865d2f4e029cb31ac (diff) |
Remove the `Reactor::run` method (#58)
This commit removes the `Reactor::run` method which has previously been used to
execute futures and turn the reactor at the same time. The tests/examples made
heavy usage of this method but they have now all temporarily moved to `wait()`
until the futures dependency is upgraded. In the meantime this'll allow us to
further trim down the `Reactor` APIs to their final state.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/chat.rs | 9 | ||||
-rw-r--r-- | examples/compress.rs | 7 | ||||
-rw-r--r-- | examples/connect.rs | 10 | ||||
-rw-r--r-- | examples/echo-threads.rs | 41 | ||||
-rw-r--r-- | examples/echo-udp.rs | 13 | ||||
-rw-r--r-- | examples/echo.rs | 27 | ||||
-rw-r--r-- | examples/hello.rs | 10 | ||||
-rw-r--r-- | examples/proxy.rs | 10 | ||||
-rw-r--r-- | examples/sink.rs | 7 | ||||
-rw-r--r-- | examples/tinydb.rs | 11 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 11 | ||||
-rw-r--r-- | examples/udp-codec.rs | 7 |
12 files changed, 62 insertions, 101 deletions
diff --git a/examples/chat.rs b/examples/chat.rs index f22d6b64..d7d84669 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -33,7 +33,7 @@ use futures::future::Executor; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::io; use tokio_io::AsyncRead; @@ -41,9 +41,8 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse().unwrap(); - // Create the event loop and TCP listener we'll accept connections on. - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + // Create the TCP listener we'll accept connections on. + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); @@ -135,5 +134,5 @@ fn main() { }); // execute server - core.run(srv).unwrap(); + srv.wait().unwrap(); } diff --git a/examples/compress.rs b/examples/compress.rs index d158060f..42cbab8e 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -32,7 +32,7 @@ use futures::{Future, Stream, Poll}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use flate2::write::GzEncoder; @@ -41,8 +41,7 @@ fn main() { // reactor. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); @@ -64,7 +63,7 @@ fn main() { Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } /// The main workhorse of this example. This'll compress all data read from diff --git a/examples/connect.rs b/examples/connect.rs index 235da1af..5cedadfb 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -28,7 +28,7 @@ use std::thread; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use futures_cpupool::CpuPool; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { // Determine if we're going to run in TCP or UDP mode @@ -47,9 +47,7 @@ fn main() { }); let addr = addr.parse::<SocketAddr>().unwrap(); - // Create the event loop and initiate the connection to the remote server - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -76,9 +74,9 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - core.run(stdout.for_each(|chunk| { + stdout.for_each(|chunk| { out.write_all(&chunk) - })).unwrap(); + }).wait().unwrap(); } mod tcp { diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index ea3ca362..8d428e49 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -20,18 +20,17 @@ extern crate tokio; extern crate tokio_io; use std::env; -use std::net::{self, SocketAddr}; +use std::net::SocketAddr; use std::thread; -use futures::Future; +use futures::prelude::*; use futures::future::Executor; -use futures::stream::Stream; use futures::sync::mpsc; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; -use tokio::net::TcpStream; -use tokio::reactor::Reactor; +use tokio::net::{TcpStream, TcpListener}; +use tokio::reactor::Handle; fn main() { // First argument, the address to bind @@ -42,9 +41,8 @@ fn main() { let num_threads = env::args().nth(2).and_then(|s| s.parse().ok()) .unwrap_or(num_cpus::get()); - // Use `std::net` to bind the requested port, we'll use this on the main - // thread below - let listener = net::TcpListener::bind(&addr).expect("failed to bind"); + let handle = Handle::default(); + let listener = TcpListener::bind(&addr, &handle).expect("failed to bind"); println!("Listening on: {}", addr); // Spin up our worker threads, creating a channel routing to each worker @@ -56,31 +54,22 @@ fn main() { thread::spawn(|| worker(rx)); } - // Infinitely accept sockets from our `std::net::TcpListener`, as this'll do - // blocking I/O. Each socket is then shipped round-robin to a particular - // thread which will associate the socket with the corresponding event loop - // and process the connection. + // Infinitely accept sockets from our `TcpListener`. Each socket is then + // shipped round-robin to a particular thread which will associate the + // socket with the corresponding event loop and process the connection. let mut next = 0; - for socket in listener.incoming() { - let socket = socket.expect("failed to accept"); + let srv = listener.incoming().for_each(|(socket, _)| { channels[next].unbounded_send(socket).expect("worker thread died"); next = (next + 1) % channels.len(); - } + Ok(()) + }); + srv.wait().unwrap(); } -fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); - +fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { let pool = CpuPool::new(1); let done = rx.for_each(move |socket| { - // First up when we receive a socket we associate it with our event loop - // using the `TcpStream::from_stream` API. After that the socket is not - // a `tokio::net::TcpStream` meaning it's in nonblocking mode and - // ready to be used with Tokio - let socket = TcpStream::from_std(socket, &handle) - .expect("failed to associate TCP stream"); let addr = socket.peer_addr().expect("failed to get remote address"); // Like the single-threaded `echo` example we split the socket halves @@ -101,5 +90,5 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index 0e163efe..0bcc3807 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -20,7 +20,7 @@ use std::net::SocketAddr; use futures::{Future, Poll}; use tokio::net::UdpSocket; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; struct Server { socket: UdpSocket, @@ -54,18 +54,15 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - // Create the event loop that will drive this server, and also bind the - // socket we'll be listening to. - let mut l = Reactor::new().unwrap(); - let handle = l.handle(); + let handle = Handle::default(); let socket = UdpSocket::bind(&addr, &handle).unwrap(); println!("Listening on: {}", socket.local_addr().unwrap()); // Next we'll create a future to spawn (the one we defined above) and then - // we'll run the event loop by running the future. - l.run(Server { + // we'll block our current thread waiting on the result of the future + Server { socket: socket, buf: vec![0; 1024], to_send: None, - }).unwrap(); + }.wait().unwrap(); } diff --git a/examples/echo.rs b/examples/echo.rs index 07c061c4..ca081f84 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -32,7 +32,7 @@ use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { // Allow passing an address to listen on as the first argument of this @@ -41,17 +41,7 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - // First up we'll create the event loop that's going to drive this server. - // This is done by creating an instance of the `Reactor` type, tokio-core's - // event loop. Most functions in tokio-core return an `io::Result`, and - // `Reactor::new` is no exception. For this example, though, we're mostly just - // ignoring errors, so we unwrap the return value. - // - // After the event loop is created we acquire a handle to it through the - // `handle` method. With this handle we'll then later be able to create I/O - // objects. - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined @@ -125,13 +115,8 @@ fn main() { Ok(()) }); - // And finally now that we've define what our server is, we run it! We - // didn't actually do much I/O up to this point and this `Reactor::run` method - // is responsible for driving the entire server to completion. - // - // The `run` method will return the result of the future that it's running, - // but in our case the `done` future won't ever finish because a TCP - // listener is never done accepting clients. That basically just means that - // we're going to be running the server until it's killed (e.g. ctrl-c). - core.run(done).unwrap(); + // And finally now that we've define what our server is, we run it! Here we + // just need to execute the future we've created and wait for it to complete + // using the standard methods in the `futures` crate. + done.wait().unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index 0bff27e9..5d1c226e 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,17 +19,17 @@ extern crate tokio_io; use std::env; use std::net::SocketAddr; -use futures::stream::Stream; -use tokio::reactor::Reactor; +use futures::prelude::*; use tokio::net::TcpListener; +use tokio::reactor::Handle; fn main() { env_logger::init().unwrap(); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut core = Reactor::new().unwrap(); - let listener = TcpListener::bind(&addr, &core.handle()).unwrap(); + let handle = Handle::default(); + let listener = TcpListener::bind(&addr, &handle).unwrap(); let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr); @@ -42,5 +42,5 @@ fn main() { Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } diff --git a/examples/proxy.rs b/examples/proxy.rs index 9d77c54f..03a83204 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -31,7 +31,7 @@ use futures::{Future, Poll}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::{copy, shutdown}; @@ -42,14 +42,12 @@ fn main() { let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); let server_addr = server_addr.parse::<SocketAddr>().unwrap(); - // Create the event loop that will drive this server. - let mut l = Reactor::new().unwrap(); - let handle = l.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); // Create a TCP listener which will listen for incoming connections. - let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap(); + let socket = TcpListener::bind(&listen_addr, &handle).unwrap(); println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); @@ -97,7 +95,7 @@ fn main() { Ok(()) }); - l.run(done).unwrap(); + done.wait().unwrap(); } // This is a custom type used to have a custom implementation of the diff --git a/examples/sink.rs b/examples/sink.rs index 980cb63e..48643e05 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -31,7 +31,7 @@ use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio_io::IoFuture; use tokio::net::{TcpListener, TcpStream}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; fn main() { env_logger::init().unwrap(); @@ -40,8 +40,7 @@ fn main() { let pool = CpuPool::new(1); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); let server = socket.incoming().for_each(|(socket, addr)| { @@ -49,7 +48,7 @@ fn main() { pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); - core.run(server).unwrap(); + server.wait().unwrap(); } fn write(socket: TcpStream) -> IoFuture<()> { diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 9929e369..7b5c47d1 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -54,7 +54,7 @@ use futures::prelude::*; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::TcpListener; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::AsyncRead; use tokio_io::io::{lines, write_all}; @@ -80,12 +80,11 @@ enum Response { } fn main() { - // Parse the address we're going to run this server on, create a `Reactor`, and - // set up our TCP listener to accept connections. + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let listener = TcpListener::bind(&addr, &handle).expect("failed to bind"); println!("Listening on: {}", addr); @@ -163,7 +162,7 @@ fn main() { Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } impl Request { diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index f5513992..00c16fec 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -31,15 +31,15 @@ use std::net::{self, SocketAddr}; use std::thread; use bytes::BytesMut; -use futures::future; use futures::future::Executor; +use futures::future; use futures::sync::mpsc; use futures::{Stream, Future, Sink}; use futures_cpupool::CpuPool; -use http::{Request, Response, StatusCode}; use http::header::HeaderValue; +use http::{Request, Response, StatusCode}; use tokio::net::TcpStream; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; use tokio_io::codec::{Encoder, Decoder}; use tokio_io::{AsyncRead}; @@ -70,8 +70,7 @@ fn main() { } fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -92,7 +91,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { })).unwrap(); Ok(()) }); - core.run(done).unwrap(); + done.wait().unwrap(); } /// "Server logic" is implemented in this function. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 91fde26d..f60fc108 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -18,7 +18,7 @@ use futures::{Future, Stream, Sink}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; -use tokio::reactor::Reactor; +use tokio::reactor::Handle; pub struct LineCodec; @@ -39,8 +39,7 @@ impl UdpCodec for LineCodec { fn main() { drop(env_logger::init()); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); + let handle = Handle::default(); let pool = CpuPool::new(1); @@ -79,5 +78,5 @@ fn main() { // Spawn the sender of pongs and then wait for our pinger to finish. pool.execute(b.then(|_| Ok(()))).unwrap(); - drop(core.run(a)); + drop(a.wait()); } |