summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2017-12-11 21:29:18 -0600
committerCarl Lerche <me@carllerche.com>2017-12-11 21:29:18 -0600
commita577bfc033b50c913c2241c432bcaeac3917145c (patch)
tree1151bc60d9f9373722d6bea9127b965a4db470bc /examples
parent32f2750c2d99e82d64033c5865d2f4e029cb31ac (diff)
Remove the `Reactor::run` method (#58)
This commit removes the `Reactor::run` method which has previously been used to execute futures and turn the reactor at the same time. The tests/examples made heavy usage of this method but they have now all temporarily moved to `wait()` until the futures dependency is upgraded. In the meantime this'll allow us to further trim down the `Reactor` APIs to their final state.
Diffstat (limited to 'examples')
-rw-r--r--examples/chat.rs9
-rw-r--r--examples/compress.rs7
-rw-r--r--examples/connect.rs10
-rw-r--r--examples/echo-threads.rs41
-rw-r--r--examples/echo-udp.rs13
-rw-r--r--examples/echo.rs27
-rw-r--r--examples/hello.rs10
-rw-r--r--examples/proxy.rs10
-rw-r--r--examples/sink.rs7
-rw-r--r--examples/tinydb.rs11
-rw-r--r--examples/tinyhttp.rs11
-rw-r--r--examples/udp-codec.rs7
12 files changed, 62 insertions, 101 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index f22d6b64..d7d84669 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -33,7 +33,7 @@ use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
use tokio_io::io;
use tokio_io::AsyncRead;
@@ -41,9 +41,8 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();
- // Create the event loop and TCP listener we'll accept connections on.
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ // Create the TCP listener we'll accept connections on.
+ let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
@@ -135,5 +134,5 @@ fn main() {
});
// execute server
- core.run(srv).unwrap();
+ srv.wait().unwrap();
}
diff --git a/examples/compress.rs b/examples/compress.rs
index d158060f..42cbab8e 100644
--- a/examples/compress.rs
+++ b/examples/compress.rs
@@ -32,7 +32,7 @@ use futures::{Future, Stream, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use flate2::write::GzEncoder;
@@ -41,8 +41,7 @@ fn main() {
// reactor.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
@@ -64,7 +63,7 @@ fn main() {
Ok(())
});
- core.run(server).unwrap();
+ server.wait().unwrap();
}
/// The main workhorse of this example. This'll compress all data read from
diff --git a/examples/connect.rs b/examples/connect.rs
index 235da1af..5cedadfb 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -28,7 +28,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use futures_cpupool::CpuPool;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
fn main() {
// Determine if we're going to run in TCP or UDP mode
@@ -47,9 +47,7 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();
- // Create the event loop and initiate the connection to the remote server
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let pool = CpuPool::new(1);
@@ -76,9 +74,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- core.run(stdout.for_each(|chunk| {
+ stdout.for_each(|chunk| {
out.write_all(&chunk)
- })).unwrap();
+ }).wait().unwrap();
}
mod tcp {
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
index ea3ca362..8d428e49 100644
--- a/examples/echo-threads.rs
+++ b/examples/echo-threads.rs
@@ -20,18 +20,17 @@ extern crate tokio;
extern crate tokio_io;
use std::env;
-use std::net::{self, SocketAddr};
+use std::net::SocketAddr;
use std::thread;
-use futures::Future;
+use futures::prelude::*;
use futures::future::Executor;
-use futures::stream::Stream;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
-use tokio::net::TcpStream;
-use tokio::reactor::Reactor;
+use tokio::net::{TcpStream, TcpListener};
+use tokio::reactor::Handle;
fn main() {
// First argument, the address to bind
@@ -42,9 +41,8 @@ fn main() {
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
.unwrap_or(num_cpus::get());
- // Use `std::net` to bind the requested port, we'll use this on the main
- // thread below
- let listener = net::TcpListener::bind(&addr).expect("failed to bind");
+ let handle = Handle::default();
+ let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);
// Spin up our worker threads, creating a channel routing to each worker
@@ -56,31 +54,22 @@ fn main() {
thread::spawn(|| worker(rx));
}
- // Infinitely accept sockets from our `std::net::TcpListener`, as this'll do
- // blocking I/O. Each socket is then shipped round-robin to a particular
- // thread which will associate the socket with the corresponding event loop
- // and process the connection.
+ // Infinitely accept sockets from our `TcpListener`. Each socket is then
+ // shipped round-robin to a particular thread which will associate the
+ // socket with the corresponding event loop and process the connection.
let mut next = 0;
- for socket in listener.incoming() {
- let socket = socket.expect("failed to accept");
+ let srv = listener.incoming().for_each(|(socket, _)| {
channels[next].unbounded_send(socket).expect("worker thread died");
next = (next + 1) % channels.len();
- }
+ Ok(())
+ });
+ srv.wait().unwrap();
}
-fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
-
+fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
let pool = CpuPool::new(1);
let done = rx.for_each(move |socket| {
- // First up when we receive a socket we associate it with our event loop
- // using the `TcpStream::from_stream` API. After that the socket is not
- // a `tokio::net::TcpStream` meaning it's in nonblocking mode and
- // ready to be used with Tokio
- let socket = TcpStream::from_std(socket, &handle)
- .expect("failed to associate TCP stream");
let addr = socket.peer_addr().expect("failed to get remote address");
// Like the single-threaded `echo` example we split the socket halves
@@ -101,5 +90,5 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
Ok(())
});
- core.run(done).unwrap();
+ done.wait().unwrap();
}
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index 0e163efe..0bcc3807 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -20,7 +20,7 @@ use std::net::SocketAddr;
use futures::{Future, Poll};
use tokio::net::UdpSocket;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
struct Server {
socket: UdpSocket,
@@ -54,18 +54,15 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- // Create the event loop that will drive this server, and also bind the
- // socket we'll be listening to.
- let mut l = Reactor::new().unwrap();
- let handle = l.handle();
+ let handle = Handle::default();
let socket = UdpSocket::bind(&addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
// Next we'll create a future to spawn (the one we defined above) and then
- // we'll run the event loop by running the future.
- l.run(Server {
+ // we'll block our current thread waiting on the result of the future
+ Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
- }).unwrap();
+ }.wait().unwrap();
}
diff --git a/examples/echo.rs b/examples/echo.rs
index 07c061c4..ca081f84 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -32,7 +32,7 @@ use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
fn main() {
// Allow passing an address to listen on as the first argument of this
@@ -41,17 +41,7 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- // First up we'll create the event loop that's going to drive this server.
- // This is done by creating an instance of the `Reactor` type, tokio-core's
- // event loop. Most functions in tokio-core return an `io::Result`, and
- // `Reactor::new` is no exception. For this example, though, we're mostly just
- // ignoring errors, so we unwrap the return value.
- //
- // After the event loop is created we acquire a handle to it through the
- // `handle` method. With this handle we'll then later be able to create I/O
- // objects.
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
@@ -125,13 +115,8 @@ fn main() {
Ok(())
});
- // And finally now that we've define what our server is, we run it! We
- // didn't actually do much I/O up to this point and this `Reactor::run` method
- // is responsible for driving the entire server to completion.
- //
- // The `run` method will return the result of the future that it's running,
- // but in our case the `done` future won't ever finish because a TCP
- // listener is never done accepting clients. That basically just means that
- // we're going to be running the server until it's killed (e.g. ctrl-c).
- core.run(done).unwrap();
+ // And finally now that we've define what our server is, we run it! Here we
+ // just need to execute the future we've created and wait for it to complete
+ // using the standard methods in the `futures` crate.
+ done.wait().unwrap();
}
diff --git a/examples/hello.rs b/examples/hello.rs
index 0bff27e9..5d1c226e 100644
--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -19,17 +19,17 @@ extern crate tokio_io;
use std::env;
use std::net::SocketAddr;
-use futures::stream::Stream;
-use tokio::reactor::Reactor;
+use futures::prelude::*;
use tokio::net::TcpListener;
+use tokio::reactor::Handle;
fn main() {
env_logger::init().unwrap();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut core = Reactor::new().unwrap();
- let listener = TcpListener::bind(&addr, &core.handle()).unwrap();
+ let handle = Handle::default();
+ let listener = TcpListener::bind(&addr, &handle).unwrap();
let addr = listener.local_addr().unwrap();
println!("Listening for connections on {}", addr);
@@ -42,5 +42,5 @@ fn main() {
Ok(())
});
- core.run(server).unwrap();
+ server.wait().unwrap();
}
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 9d77c54f..03a83204 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -31,7 +31,7 @@ use futures::{Future, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{copy, shutdown};
@@ -42,14 +42,12 @@ fn main() {
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
- // Create the event loop that will drive this server.
- let mut l = Reactor::new().unwrap();
- let handle = l.handle();
+ let handle = Handle::default();
let pool = CpuPool::new(1);
// Create a TCP listener which will listen for incoming connections.
- let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
+ let socket = TcpListener::bind(&listen_addr, &handle).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
@@ -97,7 +95,7 @@ fn main() {
Ok(())
});
- l.run(done).unwrap();
+ done.wait().unwrap();
}
// This is a custom type used to have a custom implementation of the
diff --git a/examples/sink.rs b/examples/sink.rs
index 980cb63e..48643e05 100644
--- a/examples/sink.rs
+++ b/examples/sink.rs
@@ -31,7 +31,7 @@ use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
use tokio::net::{TcpListener, TcpStream};
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
fn main() {
env_logger::init().unwrap();
@@ -40,8 +40,7 @@ fn main() {
let pool = CpuPool::new(1);
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let server = socket.incoming().for_each(|(socket, addr)| {
@@ -49,7 +48,7 @@ fn main() {
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
- core.run(server).unwrap();
+ server.wait().unwrap();
}
fn write(socket: TcpStream) -> IoFuture<()> {
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 9929e369..7b5c47d1 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -54,7 +54,7 @@ use futures::prelude::*;
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::io::{lines, write_all};
@@ -80,12 +80,11 @@ enum Response {
}
fn main() {
- // Parse the address we're going to run this server on, create a `Reactor`, and
- // set up our TCP listener to accept connections.
+ // Parse the address we're going to run this server on
+ // and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);
@@ -163,7 +162,7 @@ fn main() {
Ok(())
});
- core.run(done).unwrap();
+ done.wait().unwrap();
}
impl Request {
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index f5513992..00c16fec 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -31,15 +31,15 @@ use std::net::{self, SocketAddr};
use std::thread;
use bytes::BytesMut;
-use futures::future;
use futures::future::Executor;
+use futures::future;
use futures::sync::mpsc;
use futures::{Stream, Future, Sink};
use futures_cpupool::CpuPool;
-use http::{Request, Response, StatusCode};
use http::header::HeaderValue;
+use http::{Request, Response, StatusCode};
use tokio::net::TcpStream;
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
use tokio_io::codec::{Encoder, Decoder};
use tokio_io::{AsyncRead};
@@ -70,8 +70,7 @@ fn main() {
}
fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let pool = CpuPool::new(1);
@@ -92,7 +91,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
})).unwrap();
Ok(())
});
- core.run(done).unwrap();
+ done.wait().unwrap();
}
/// "Server logic" is implemented in this function.
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 91fde26d..f60fc108 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -18,7 +18,7 @@ use futures::{Future, Stream, Sink};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec};
-use tokio::reactor::Reactor;
+use tokio::reactor::Handle;
pub struct LineCodec;
@@ -39,8 +39,7 @@ impl UdpCodec for LineCodec {
fn main() {
drop(env_logger::init());
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
+ let handle = Handle::default();
let pool = CpuPool::new(1);
@@ -79,5 +78,5 @@ fn main() {
// Spawn the sender of pongs and then wait for our pinger to finish.
pool.execute(b.then(|_| Ok(()))).unwrap();
- drop(core.run(a));
+ drop(a.wait());
}