diff options
author | Carl Lerche <me@carllerche.com> | 2017-10-25 10:54:54 -0700 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2017-11-01 07:28:49 -0700 |
commit | c6f1ff13d249a42a5d0ae716dffca6a22cd1d7ca (patch) | |
tree | 7d5845668553eea08013cb75fdfbc4cb4f629255 /examples/connect.rs | |
parent | 697851210c13e3df637a93af526cf6e41a217cfd (diff) |
Remove executor from reactor.
In accordance with tokio-rs/tokio-rfcs#3, the executor functionality of
Tokio is being removed and will be relocated into futures-rs as a
"current thread" executor.
This PR removes task execution from the code base. As a temporary
mesure, all examples and tests are switched to using CpuPool.
Depends on #19.
Diffstat (limited to 'examples/connect.rs')
-rw-r--r-- | examples/connect.rs | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index d5238a53..1c3fcb75 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -15,6 +15,7 @@ //! stdin/stdout to a server" to get up and running. extern crate futures; +extern crate futures_cpupool; extern crate tokio; extern crate tokio_io; extern crate bytes; @@ -26,6 +27,7 @@ use std::thread; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; +use futures_cpupool::CpuPool; use tokio::reactor::Core; fn main() { @@ -49,6 +51,8 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); + let pool = CpuPool::new(1); + // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will // read data (with blocking I/O) from stdin and then send it to the event @@ -61,9 +65,9 @@ fn main() { // our UDP connection to get a stream of bytes we're going to emit to // stdout. let stdout = if tcp { - tcp::connect(&addr, &handle, Box::new(stdin_rx)) + tcp::connect(&addr, &handle, &pool, Box::new(stdin_rx)) } else { - udp::connect(&addr, &handle, Box::new(stdin_rx)) + udp::connect(&addr, &handle, &pool, Box::new(stdin_rx)) }; // And now with our stream of bytes to write to stdout, we execute that in @@ -83,6 +87,8 @@ mod tcp { use bytes::{BufMut, BytesMut}; use futures::{Future, Stream}; + use futures::future::Executor; + use futures_cpupool::CpuPool; use tokio::net::TcpStream; use tokio::reactor::Handle; use tokio_io::AsyncRead; @@ -90,11 +96,12 @@ mod tcp { pub fn connect(addr: &SocketAddr, handle: &Handle, - stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>) + pool: &CpuPool, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) -> Box<Stream<Item = BytesMut, Error = io::Error>> { let tcp = TcpStream::connect(addr, handle); - let handle = handle.clone(); + let pool = pool.clone(); // After the TCP connection has been established, we set up our client // to start forwarding data. @@ -113,12 +120,12 @@ mod tcp { // with us reading data from the stream. Box::new(tcp.map(move |stream| { let (sink, stream) = stream.framed(Bytes).split(); - handle.spawn(stdin.forward(sink).then(|result| { + pool.execute(stdin.forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })); + })).unwrap(); stream }).flatten_stream()) } @@ -167,12 +174,15 @@ mod udp { use bytes::BytesMut; use futures::{Future, Stream}; + use futures::future::Executor; + use futures_cpupool::CpuPool; use tokio::net::{UdpCodec, UdpSocket}; use tokio::reactor::Handle; pub fn connect(&addr: &SocketAddr, handle: &Handle, - stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>) + pool: &CpuPool, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) -> Box<Stream<Item = BytesMut, Error = io::Error>> { // We'll bind our UDP socket to a local IP/port, but for now we @@ -193,14 +203,14 @@ mod udp { // All bytes from `stdin` will go to the `addr` specified in our // argument list. Like with TCP this is spawned concurrently - handle.spawn(stdin.map(move |chunk| { + pool.execute(stdin.map(move |chunk| { (addr, chunk) }).forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })); + })).unwrap(); // With UDP we could receive data from any source, so filter out // anything coming from a different address |