summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2017-10-25 10:54:54 -0700
committerCarl Lerche <me@carllerche.com>2017-11-01 07:28:49 -0700
commitc6f1ff13d249a42a5d0ae716dffca6a22cd1d7ca (patch)
tree7d5845668553eea08013cb75fdfbc4cb4f629255 /examples
parent697851210c13e3df637a93af526cf6e41a217cfd (diff)
Remove executor from reactor.
In accordance with tokio-rs/tokio-rfcs#3, the executor functionality of Tokio is being removed and will be relocated into futures-rs as a "current thread" executor. This PR removes task execution from the code base. As a temporary mesure, all examples and tests are switched to using CpuPool. Depends on #19.
Diffstat (limited to 'examples')
-rw-r--r--examples/chat.rs25
-rw-r--r--examples/compress.rs7
-rw-r--r--examples/connect.rs28
-rw-r--r--examples/echo-threads.rs7
-rw-r--r--examples/echo.rs18
-rw-r--r--examples/proxy.rs7
-rw-r--r--examples/sink.rs7
-rw-r--r--examples/tinydb.rs23
-rw-r--r--examples/tinyhttp.rs11
-rw-r--r--examples/udp-codec.rs7
10 files changed, 96 insertions, 44 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index ff364476..a0023730 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -18,18 +18,20 @@
//! messages.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use std::collections::HashMap;
-use std::rc::Rc;
-use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::{Error, ErrorKind, BufReader};
+use std::sync::{Arc, Mutex};
use futures::Future;
+use futures::future::Executor;
use futures::stream::{self, Stream};
+use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Core;
use tokio_io::io;
@@ -45,9 +47,10 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
- // This is a single-threaded server, so we can just use Rc and RefCell to
- // store the map of all connections we know about.
- let connections = Rc::new(RefCell::new(HashMap::new()));
+ // This is currently a multi threaded server.
+ //
+ // Once the same thread executor lands, transition to single threaded.
+ let connections = Arc::new(Mutex::new(HashMap::new()));
let srv = socket.incoming().for_each(move |(stream, addr)| {
println!("New Connection: {}", addr);
@@ -57,7 +60,7 @@ fn main() {
// send us messages. Then register our address with the stream to send
// data to us.
let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.borrow_mut().insert(addr, tx);
+ connections.lock().unwrap().insert(addr, tx);
// Define here what we do for the actual I/O. That is, read a bunch of
// lines from the socket and dispatch them while we also write any lines
@@ -88,7 +91,7 @@ fn main() {
let connections = connections_inner.clone();
line.map(move |(reader, message)| {
println!("{}: {:?}", addr, message);
- let mut conns = connections.borrow_mut();
+ let mut conns = connections.lock().unwrap();
if let Ok(msg) = message {
// For each open connection except the sender, send the
// string via the channel.
@@ -114,17 +117,19 @@ fn main() {
amt.map_err(|_| ())
});
+ let pool = CpuPool::new(1);
+
// Now that we've got futures representing each half of the socket, we
// use the `select` combinator to wait for either half to be done to
// tear down the other. Then we spawn off the result.
let connections = connections.clone();
let socket_reader = socket_reader.map_err(|_| ());
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
- handle.spawn(connection.then(move |_| {
- connections.borrow_mut().remove(&addr);
+ pool.execute(connection.then(move |_| {
+ connections.lock().unwrap().remove(&addr);
println!("Connection {} closed.", addr);
Ok(())
- }));
+ })).unwrap();
Ok(())
});
diff --git a/examples/compress.rs b/examples/compress.rs
index 6b0f1730..8fedf25e 100644
--- a/examples/compress.rs
+++ b/examples/compress.rs
@@ -29,6 +29,7 @@ use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
+use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
@@ -53,13 +54,13 @@ fn main() {
// The compress logic will happen in the function below, but everything's
// still a future! Each client is spawned to concurrently get processed.
let server = socket.incoming().for_each(move |(socket, addr)| {
- handle.spawn(compress(socket, &pool).then(move |result| {
+ pool.execute(compress(socket, &pool).then(move |result| {
match result {
Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
Err(e) => println!("{}: failed when compressing: {}", addr, e),
}
Ok(())
- }));
+ })).unwrap();
Ok(())
});
@@ -70,7 +71,7 @@ fn main() {
/// `socket` on the `pool` provided, writing it back out to `socket` as it's
/// available.
fn compress(socket: TcpStream, pool: &CpuPool)
- -> Box<Future<Item = (u64, u64), Error = io::Error>>
+ -> Box<Future<Item = (u64, u64), Error = io::Error> + Send>
{
use tokio_io::io;
diff --git a/examples/connect.rs b/examples/connect.rs
index d5238a53..1c3fcb75 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -15,6 +15,7 @@
//! stdin/stdout to a server" to get up and running.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
extern crate bytes;
@@ -26,6 +27,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
+use futures_cpupool::CpuPool;
use tokio::reactor::Core;
fn main() {
@@ -49,6 +51,8 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
+ let pool = CpuPool::new(1);
+
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@@ -61,9 +65,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
- tcp::connect(&addr, &handle, Box::new(stdin_rx))
+ tcp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
} else {
- udp::connect(&addr, &handle, Box::new(stdin_rx))
+ udp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@@ -83,6 +87,8 @@ mod tcp {
use bytes::{BufMut, BytesMut};
use futures::{Future, Stream};
+ use futures::future::Executor;
+ use futures_cpupool::CpuPool;
use tokio::net::TcpStream;
use tokio::reactor::Handle;
use tokio_io::AsyncRead;
@@ -90,11 +96,12 @@ mod tcp {
pub fn connect(addr: &SocketAddr,
handle: &Handle,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
+ pool: &CpuPool,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
let tcp = TcpStream::connect(addr, handle);
- let handle = handle.clone();
+ let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@@ -113,12 +120,12 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
- handle.spawn(stdin.forward(sink).then(|result| {
+ pool.execute(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- }));
+ })).unwrap();
stream
}).flatten_stream())
}
@@ -167,12 +174,15 @@ mod udp {
use bytes::BytesMut;
use futures::{Future, Stream};
+ use futures::future::Executor;
+ use futures_cpupool::CpuPool;
use tokio::net::{UdpCodec, UdpSocket};
use tokio::reactor::Handle;
pub fn connect(&addr: &SocketAddr,
handle: &Handle,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
+ pool: &CpuPool,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
// We'll bind our UDP socket to a local IP/port, but for now we
@@ -193,14 +203,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- handle.spawn(stdin.map(move |chunk| {
+ pool.execute(stdin.map(move |chunk| {
(addr, chunk)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- }));
+ })).unwrap();
// With UDP we could receive data from any source, so filter out
// anything coming from a different address
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
index 63ed47e4..4869d337 100644
--- a/examples/echo-threads.rs
+++ b/examples/echo-threads.rs
@@ -14,6 +14,7 @@
//! cargo run --example connect 127.0.0.1:8080
extern crate futures;
+extern crate futures_cpupool;
extern crate num_cpus;
extern crate tokio;
extern crate tokio_io;
@@ -23,8 +24,10 @@ use std::net::{self, SocketAddr};
use std::thread;
use futures::Future;
+use futures::future::Executor;
use futures::stream::Stream;
use futures::sync::mpsc;
+use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpStream;
@@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
+ let pool = CpuPool::new(1);
+
let done = rx.for_each(move |socket| {
// First up when we receive a socket we associate it with our event loop
// using the `TcpStream::from_stream` API. After that the socket is not
@@ -92,7 +97,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
Ok(())
});
- handle.spawn(msg);
+ pool.execute(msg).unwrap();
Ok(())
});
diff --git a/examples/echo.rs b/examples/echo.rs
index 9abd9c38..fdf0e4cf 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -18,6 +18,7 @@
//! should be able to see them all make progress simultaneously.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@@ -25,7 +26,9 @@ use std::env;
use std::net::SocketAddr;
use futures::Future;
+use futures::future::Executor;
use futures::stream::Stream;
+use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
@@ -46,7 +49,7 @@ fn main() {
//
// After the event loop is created we acquire a handle to it through the
// `handle` method. With this handle we'll then later be able to create I/O
- // objects and spawn futures.
+ // objects.
let mut core = Core::new().unwrap();
let handle = core.handle();
@@ -58,6 +61,9 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
+ // A CpuPool allows futures to be executed concurrently.
+ let pool = CpuPool::new(1);
+
// Here we convert the `TcpListener` to a stream of incoming connections
// with the `incoming` method. We then define how to process each element in
// the stream with the `for_each` method.
@@ -105,16 +111,16 @@ fn main() {
// And this is where much of the magic of this server happens. We
// crucially want all clients to make progress concurrently, rather than
// blocking one on completion of another. To achieve this we use the
- // `spawn` function on `Handle` to essentially execute some work in the
- // background.
+ // `execute` function on the `Executor` trait to essentially execute
+ // some work in the background.
//
// This function will transfer ownership of the future (`msg` in this
// case) to the event loop that `handle` points to. The event loop will
// then drive the future to completion.
//
- // Essentially here we're spawning a new task to run concurrently, which
- // will allow all of our clients to be processed concurrently.
- handle.spawn(msg);
+ // Essentially here we're executing a new task to run concurrently,
+ // which will allow all of our clients to be processed concurrently.
+ pool.execute(msg).unwrap();
Ok(())
});
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 05e4c0c3..14a63a7f 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -17,6 +17,7 @@
//! the echo server, and you'll be able to see data flowing between them.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@@ -27,6 +28,8 @@ use std::io::{self, Read, Write};
use futures::stream::Stream;
use futures::{Future, Poll};
+use futures::future::Executor;
+use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -43,6 +46,8 @@ fn main() {
let mut l = Core::new().unwrap();
let handle = l.handle();
+ let pool = CpuPool::new(1);
+
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
println!("Listening on: {}", listen_addr);
@@ -88,7 +93,7 @@ fn main() {
// Don't panic. Maybe the client just disconnected too soon.
println!("error: {}", e);
});
- handle.spawn(msg);
+ pool.execute(msg).unwrap();
Ok(())
});
diff --git a/examples/sink.rs b/examples/sink.rs
index 5f42443b..fd1cd82b 100644
--- a/examples/sink.rs
+++ b/examples/sink.rs
@@ -17,6 +17,7 @@
extern crate env_logger;
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@@ -25,7 +26,9 @@ use std::iter;
use std::net::SocketAddr;
use futures::Future;
+use futures::future::Executor;
use futures::stream::{self, Stream};
+use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
@@ -35,13 +38,15 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
+ let pool = CpuPool::new(1);
+
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let server = socket.incoming().for_each(|(socket, addr)| {
println!("got a socket: {}", addr);
- handle.spawn(write(socket).or_else(|_| Ok(())));
+ pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
core.run(server).unwrap();
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 61d0fd62..bfb0d123 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -40,17 +40,19 @@
//! returning the previous value, if any.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
-use std::cell::RefCell;
use std::collections::HashMap;
use std::io::BufReader;
-use std::rc::Rc;
use std::env;
use std::net::SocketAddr;
+use std::sync::{Arc, Mutex};
use futures::prelude::*;
+use futures::future::Executor;
+use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Core;
use tokio_io::AsyncRead;
@@ -58,10 +60,10 @@ use tokio_io::io::{lines, write_all};
/// The in-memory database shared amongst all clients.
///
-/// This database will be shared via `Rc`, so to mutate the internal map we're
+/// This database will be shared via `Arc`, so to mutate the internal map we're
/// also going to use a `RefCell` for interior mutability.
struct Database {
- map: RefCell<HashMap<String, String>>,
+ map: Mutex<HashMap<String, String>>,
}
/// Possible requests our clients can send us
@@ -87,15 +89,18 @@ fn main() {
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);
+ // Create a CpuPool to execute tasks
+ let pool = CpuPool::new(1);
+
// Create the shared state of this server that will be shared amongst all
// clients. We populate the initial database and then create the `Database`
- // structure. Note the usage of `Rc` here which will be used to ensure that
+ // structure. Note the usage of `Arc` here which will be used to ensure that
// each independently spawned client will have a reference to the in-memory
// database.
let mut initial_db = HashMap::new();
initial_db.insert("foo".to_string(), "bar".to_string());
- let db = Rc::new(Database {
- map: RefCell::new(initial_db),
+ let db = Arc::new(Database {
+ map: Mutex::new(initial_db),
});
let done = listener.incoming().for_each(move |(socket, _addr)| {
@@ -125,7 +130,7 @@ fn main() {
Err(e) => return Response::Error { msg: e },
};
- let mut db = db.map.borrow_mut();
+ let mut db = db.map.lock().unwrap();
match request {
Request::Get { key } => {
match db.get(&key) {
@@ -154,7 +159,7 @@ fn main() {
// runs concurrently with all other clients, for now ignoring any errors
// that we see.
let msg = writes.then(move |_| Ok(()));
- handle.spawn(msg);
+ pool.execute(msg).unwrap();
Ok(())
});
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 043e56d2..2dddaf08 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -13,6 +13,7 @@
extern crate bytes;
extern crate futures;
+extern crate futures_cpupool;
extern crate http;
extern crate httparse;
extern crate num_cpus;
@@ -31,8 +32,10 @@ use std::thread;
use bytes::BytesMut;
use futures::future;
+use futures::future::Executor;
use futures::sync::mpsc;
use futures::{Stream, Future, Sink};
+use futures_cpupool::CpuPool;
use http::{Request, Response, StatusCode};
use http::header::HeaderValue;
use tokio::net::TcpStream;
@@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
+ let pool = CpuPool::new(1);
+
let done = rx.for_each(move |socket| {
// Associate each socket we get with our local event loop, and then use
// the codec support in the tokio-io crate to deal with discrete
@@ -80,10 +85,10 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let (tx, rx) = socket.framed(Http).split();
tx.send_all(rx.and_then(respond))
});
- handle.spawn(req.then(move |result| {
+ pool.execute(req.then(move |result| {
drop(result);
Ok(())
- }));
+ })).unwrap();
Ok(())
});
core.run(done).unwrap();
@@ -95,7 +100,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
/// represents the various handling a server might do. Currently the contents
/// here are pretty uninteresting.
fn respond(req: Request<()>)
- -> Box<Future<Item = Response<String>, Error = io::Error>>
+ -> Box<Future<Item = Response<String>, Error = io::Error> + Send>
{
let mut ret = Response::builder();
let body = match req.uri().path() {
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 4066060e..65a522ca 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -9,11 +9,14 @@
extern crate tokio;
extern crate env_logger;
extern crate futures;
+extern crate futures_cpupool;
use std::io;
use std::net::SocketAddr;
use futures::{Future, Stream, Sink};
+use futures::future::Executor;
+use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec};
use tokio::reactor::Core;
@@ -39,6 +42,8 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
+ let pool = CpuPool::new(1);
+
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
// Bind both our sockets and then figure out what ports we got.
@@ -73,6 +78,6 @@ fn main() {
let b = b_sink.send_all(b_stream);
// Spawn the sender of pongs and then wait for our pinger to finish.
- handle.spawn(b.then(|_| Ok(())));
+ pool.execute(b.then(|_| Ok(()))).unwrap();
drop(core.run(a));
}