From c6f1ff13d249a42a5d0ae716dffca6a22cd1d7ca Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 25 Oct 2017 10:54:54 -0700 Subject: Remove executor from reactor. In accordance with tokio-rs/tokio-rfcs#3, the executor functionality of Tokio is being removed and will be relocated into futures-rs as a "current thread" executor. This PR removes task execution from the code base. As a temporary mesure, all examples and tests are switched to using CpuPool. Depends on #19. --- examples/chat.rs | 25 +++++++++++++++---------- examples/compress.rs | 7 ++++--- examples/connect.rs | 28 +++++++++++++++++++--------- examples/echo-threads.rs | 7 ++++++- examples/echo.rs | 18 ++++++++++++------ examples/proxy.rs | 7 ++++++- examples/sink.rs | 7 ++++++- examples/tinydb.rs | 23 ++++++++++++++--------- examples/tinyhttp.rs | 11 ++++++++--- examples/udp-codec.rs | 7 ++++++- 10 files changed, 96 insertions(+), 44 deletions(-) (limited to 'examples') diff --git a/examples/chat.rs b/examples/chat.rs index ff364476..a0023730 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -18,18 +18,20 @@ //! messages. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; use std::collections::HashMap; -use std::rc::Rc; -use std::cell::RefCell; use std::iter; use std::env; use std::io::{Error, ErrorKind, BufReader}; +use std::sync::{Arc, Mutex}; use futures::Future; +use futures::future::Executor; use futures::stream::{self, Stream}; +use futures_cpupool::CpuPool; use tokio::net::TcpListener; use tokio::reactor::Core; use tokio_io::io; @@ -45,9 +47,10 @@ fn main() { let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); - // This is a single-threaded server, so we can just use Rc and RefCell to - // store the map of all connections we know about. - let connections = Rc::new(RefCell::new(HashMap::new())); + // This is currently a multi threaded server. + // + // Once the same thread executor lands, transition to single threaded. + let connections = Arc::new(Mutex::new(HashMap::new())); let srv = socket.incoming().for_each(move |(stream, addr)| { println!("New Connection: {}", addr); @@ -57,7 +60,7 @@ fn main() { // send us messages. Then register our address with the stream to send // data to us. let (tx, rx) = futures::sync::mpsc::unbounded(); - connections.borrow_mut().insert(addr, tx); + connections.lock().unwrap().insert(addr, tx); // Define here what we do for the actual I/O. That is, read a bunch of // lines from the socket and dispatch them while we also write any lines @@ -88,7 +91,7 @@ fn main() { let connections = connections_inner.clone(); line.map(move |(reader, message)| { println!("{}: {:?}", addr, message); - let mut conns = connections.borrow_mut(); + let mut conns = connections.lock().unwrap(); if let Ok(msg) = message { // For each open connection except the sender, send the // string via the channel. @@ -114,17 +117,19 @@ fn main() { amt.map_err(|_| ()) }); + let pool = CpuPool::new(1); + // Now that we've got futures representing each half of the socket, we // use the `select` combinator to wait for either half to be done to // tear down the other. Then we spawn off the result. let connections = connections.clone(); let socket_reader = socket_reader.map_err(|_| ()); let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - handle.spawn(connection.then(move |_| { - connections.borrow_mut().remove(&addr); + pool.execute(connection.then(move |_| { + connections.lock().unwrap().remove(&addr); println!("Connection {} closed.", addr); Ok(()) - })); + })).unwrap(); Ok(()) }); diff --git a/examples/compress.rs b/examples/compress.rs index 6b0f1730..8fedf25e 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -29,6 +29,7 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, Poll}; +use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio::reactor::Core; @@ -53,13 +54,13 @@ fn main() { // The compress logic will happen in the function below, but everything's // still a future! Each client is spawned to concurrently get processed. let server = socket.incoming().for_each(move |(socket, addr)| { - handle.spawn(compress(socket, &pool).then(move |result| { + pool.execute(compress(socket, &pool).then(move |result| { match result { Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w), Err(e) => println!("{}: failed when compressing: {}", addr, e), } Ok(()) - })); + })).unwrap(); Ok(()) }); @@ -70,7 +71,7 @@ fn main() { /// `socket` on the `pool` provided, writing it back out to `socket` as it's /// available. fn compress(socket: TcpStream, pool: &CpuPool) - -> Box> + -> Box + Send> { use tokio_io::io; diff --git a/examples/connect.rs b/examples/connect.rs index d5238a53..1c3fcb75 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -15,6 +15,7 @@ //! stdin/stdout to a server" to get up and running. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; extern crate bytes; @@ -26,6 +27,7 @@ use std::thread; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; +use futures_cpupool::CpuPool; use tokio::reactor::Core; fn main() { @@ -49,6 +51,8 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); + let pool = CpuPool::new(1); + // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will // read data (with blocking I/O) from stdin and then send it to the event @@ -61,9 +65,9 @@ fn main() { // our UDP connection to get a stream of bytes we're going to emit to // stdout. let stdout = if tcp { - tcp::connect(&addr, &handle, Box::new(stdin_rx)) + tcp::connect(&addr, &handle, &pool, Box::new(stdin_rx)) } else { - udp::connect(&addr, &handle, Box::new(stdin_rx)) + udp::connect(&addr, &handle, &pool, Box::new(stdin_rx)) }; // And now with our stream of bytes to write to stdout, we execute that in @@ -83,6 +87,8 @@ mod tcp { use bytes::{BufMut, BytesMut}; use futures::{Future, Stream}; + use futures::future::Executor; + use futures_cpupool::CpuPool; use tokio::net::TcpStream; use tokio::reactor::Handle; use tokio_io::AsyncRead; @@ -90,11 +96,12 @@ mod tcp { pub fn connect(addr: &SocketAddr, handle: &Handle, - stdin: Box, Error = io::Error>>) + pool: &CpuPool, + stdin: Box, Error = io::Error> + Send>) -> Box> { let tcp = TcpStream::connect(addr, handle); - let handle = handle.clone(); + let pool = pool.clone(); // After the TCP connection has been established, we set up our client // to start forwarding data. @@ -113,12 +120,12 @@ mod tcp { // with us reading data from the stream. Box::new(tcp.map(move |stream| { let (sink, stream) = stream.framed(Bytes).split(); - handle.spawn(stdin.forward(sink).then(|result| { + pool.execute(stdin.forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })); + })).unwrap(); stream }).flatten_stream()) } @@ -167,12 +174,15 @@ mod udp { use bytes::BytesMut; use futures::{Future, Stream}; + use futures::future::Executor; + use futures_cpupool::CpuPool; use tokio::net::{UdpCodec, UdpSocket}; use tokio::reactor::Handle; pub fn connect(&addr: &SocketAddr, handle: &Handle, - stdin: Box, Error = io::Error>>) + pool: &CpuPool, + stdin: Box, Error = io::Error> + Send>) -> Box> { // We'll bind our UDP socket to a local IP/port, but for now we @@ -193,14 +203,14 @@ mod udp { // All bytes from `stdin` will go to the `addr` specified in our // argument list. Like with TCP this is spawned concurrently - handle.spawn(stdin.map(move |chunk| { + pool.execute(stdin.map(move |chunk| { (addr, chunk) }).forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })); + })).unwrap(); // With UDP we could receive data from any source, so filter out // anything coming from a different address diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index 63ed47e4..4869d337 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -14,6 +14,7 @@ //! cargo run --example connect 127.0.0.1:8080 extern crate futures; +extern crate futures_cpupool; extern crate num_cpus; extern crate tokio; extern crate tokio_io; @@ -23,8 +24,10 @@ use std::net::{self, SocketAddr}; use std::thread; use futures::Future; +use futures::future::Executor; use futures::stream::Stream; use futures::sync::mpsc; +use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; use tokio::net::TcpStream; @@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver) { let mut core = Core::new().unwrap(); let handle = core.handle(); + let pool = CpuPool::new(1); + let done = rx.for_each(move |socket| { // First up when we receive a socket we associate it with our event loop // using the `TcpStream::from_stream` API. After that the socket is not @@ -92,7 +97,7 @@ fn worker(rx: mpsc::UnboundedReceiver) { Ok(()) }); - handle.spawn(msg); + pool.execute(msg).unwrap(); Ok(()) }); diff --git a/examples/echo.rs b/examples/echo.rs index 9abd9c38..fdf0e4cf 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -18,6 +18,7 @@ //! should be able to see them all make progress simultaneously. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; @@ -25,7 +26,9 @@ use std::env; use std::net::SocketAddr; use futures::Future; +use futures::future::Executor; use futures::stream::Stream; +use futures_cpupool::CpuPool; use tokio_io::AsyncRead; use tokio_io::io::copy; use tokio::net::TcpListener; @@ -46,7 +49,7 @@ fn main() { // // After the event loop is created we acquire a handle to it through the // `handle` method. With this handle we'll then later be able to create I/O - // objects and spawn futures. + // objects. let mut core = Core::new().unwrap(); let handle = core.handle(); @@ -58,6 +61,9 @@ fn main() { let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); + // A CpuPool allows futures to be executed concurrently. + let pool = CpuPool::new(1); + // Here we convert the `TcpListener` to a stream of incoming connections // with the `incoming` method. We then define how to process each element in // the stream with the `for_each` method. @@ -105,16 +111,16 @@ fn main() { // And this is where much of the magic of this server happens. We // crucially want all clients to make progress concurrently, rather than // blocking one on completion of another. To achieve this we use the - // `spawn` function on `Handle` to essentially execute some work in the - // background. + // `execute` function on the `Executor` trait to essentially execute + // some work in the background. // // This function will transfer ownership of the future (`msg` in this // case) to the event loop that `handle` points to. The event loop will // then drive the future to completion. // - // Essentially here we're spawning a new task to run concurrently, which - // will allow all of our clients to be processed concurrently. - handle.spawn(msg); + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + pool.execute(msg).unwrap(); Ok(()) }); diff --git a/examples/proxy.rs b/examples/proxy.rs index 05e4c0c3..14a63a7f 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -17,6 +17,7 @@ //! the echo server, and you'll be able to see data flowing between them. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; @@ -27,6 +28,8 @@ use std::io::{self, Read, Write}; use futures::stream::Stream; use futures::{Future, Poll}; +use futures::future::Executor; +use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio::reactor::Core; use tokio_io::{AsyncRead, AsyncWrite}; @@ -43,6 +46,8 @@ fn main() { let mut l = Core::new().unwrap(); let handle = l.handle(); + let pool = CpuPool::new(1); + // Create a TCP listener which will listen for incoming connections. let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap(); println!("Listening on: {}", listen_addr); @@ -88,7 +93,7 @@ fn main() { // Don't panic. Maybe the client just disconnected too soon. println!("error: {}", e); }); - handle.spawn(msg); + pool.execute(msg).unwrap(); Ok(()) }); diff --git a/examples/sink.rs b/examples/sink.rs index 5f42443b..fd1cd82b 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -17,6 +17,7 @@ extern crate env_logger; extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; @@ -25,7 +26,9 @@ use std::iter; use std::net::SocketAddr; use futures::Future; +use futures::future::Executor; use futures::stream::{self, Stream}; +use futures_cpupool::CpuPool; use tokio_io::IoFuture; use tokio::net::{TcpListener, TcpStream}; use tokio::reactor::Core; @@ -35,13 +38,15 @@ fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); + let pool = CpuPool::new(1); + let mut core = Core::new().unwrap(); let handle = core.handle(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); let server = socket.incoming().for_each(|(socket, addr)| { println!("got a socket: {}", addr); - handle.spawn(write(socket).or_else(|_| Ok(()))); + pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); core.run(server).unwrap(); diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 61d0fd62..bfb0d123 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -40,17 +40,19 @@ //! returning the previous value, if any. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; -use std::cell::RefCell; use std::collections::HashMap; use std::io::BufReader; -use std::rc::Rc; use std::env; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use futures::prelude::*; +use futures::future::Executor; +use futures_cpupool::CpuPool; use tokio::net::TcpListener; use tokio::reactor::Core; use tokio_io::AsyncRead; @@ -58,10 +60,10 @@ use tokio_io::io::{lines, write_all}; /// The in-memory database shared amongst all clients. /// -/// This database will be shared via `Rc`, so to mutate the internal map we're +/// This database will be shared via `Arc`, so to mutate the internal map we're /// also going to use a `RefCell` for interior mutability. struct Database { - map: RefCell>, + map: Mutex>, } /// Possible requests our clients can send us @@ -87,15 +89,18 @@ fn main() { let listener = TcpListener::bind(&addr, &handle).expect("failed to bind"); println!("Listening on: {}", addr); + // Create a CpuPool to execute tasks + let pool = CpuPool::new(1); + // Create the shared state of this server that will be shared amongst all // clients. We populate the initial database and then create the `Database` - // structure. Note the usage of `Rc` here which will be used to ensure that + // structure. Note the usage of `Arc` here which will be used to ensure that // each independently spawned client will have a reference to the in-memory // database. let mut initial_db = HashMap::new(); initial_db.insert("foo".to_string(), "bar".to_string()); - let db = Rc::new(Database { - map: RefCell::new(initial_db), + let db = Arc::new(Database { + map: Mutex::new(initial_db), }); let done = listener.incoming().for_each(move |(socket, _addr)| { @@ -125,7 +130,7 @@ fn main() { Err(e) => return Response::Error { msg: e }, }; - let mut db = db.map.borrow_mut(); + let mut db = db.map.lock().unwrap(); match request { Request::Get { key } => { match db.get(&key) { @@ -154,7 +159,7 @@ fn main() { // runs concurrently with all other clients, for now ignoring any errors // that we see. let msg = writes.then(move |_| Ok(())); - handle.spawn(msg); + pool.execute(msg).unwrap(); Ok(()) }); diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 043e56d2..2dddaf08 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -13,6 +13,7 @@ extern crate bytes; extern crate futures; +extern crate futures_cpupool; extern crate http; extern crate httparse; extern crate num_cpus; @@ -31,8 +32,10 @@ use std::thread; use bytes::BytesMut; use futures::future; +use futures::future::Executor; use futures::sync::mpsc; use futures::{Stream, Future, Sink}; +use futures_cpupool::CpuPool; use http::{Request, Response, StatusCode}; use http::header::HeaderValue; use tokio::net::TcpStream; @@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver) { let mut core = Core::new().unwrap(); let handle = core.handle(); + let pool = CpuPool::new(1); + let done = rx.for_each(move |socket| { // Associate each socket we get with our local event loop, and then use // the codec support in the tokio-io crate to deal with discrete @@ -80,10 +85,10 @@ fn worker(rx: mpsc::UnboundedReceiver) { let (tx, rx) = socket.framed(Http).split(); tx.send_all(rx.and_then(respond)) }); - handle.spawn(req.then(move |result| { + pool.execute(req.then(move |result| { drop(result); Ok(()) - })); + })).unwrap(); Ok(()) }); core.run(done).unwrap(); @@ -95,7 +100,7 @@ fn worker(rx: mpsc::UnboundedReceiver) { /// represents the various handling a server might do. Currently the contents /// here are pretty uninteresting. fn respond(req: Request<()>) - -> Box, Error = io::Error>> + -> Box, Error = io::Error> + Send> { let mut ret = Response::builder(); let body = match req.uri().path() { diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 4066060e..65a522ca 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,11 +9,14 @@ extern crate tokio; extern crate env_logger; extern crate futures; +extern crate futures_cpupool; use std::io; use std::net::SocketAddr; use futures::{Future, Stream, Sink}; +use futures::future::Executor; +use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; use tokio::reactor::Core; @@ -39,6 +42,8 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); + let pool = CpuPool::new(1); + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); // Bind both our sockets and then figure out what ports we got. @@ -73,6 +78,6 @@ fn main() { let b = b_sink.send_all(b_stream); // Spawn the sender of pongs and then wait for our pinger to finish. - handle.spawn(b.then(|_| Ok(()))); + pool.execute(b.then(|_| Ok(()))).unwrap(); drop(core.run(a)); } -- cgit v1.2.3