summaryrefslogtreecommitdiffstats
path: root/examples/connect.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2017-10-25 10:54:54 -0700
committerCarl Lerche <me@carllerche.com>2017-11-01 07:28:49 -0700
commitc6f1ff13d249a42a5d0ae716dffca6a22cd1d7ca (patch)
tree7d5845668553eea08013cb75fdfbc4cb4f629255 /examples/connect.rs
parent697851210c13e3df637a93af526cf6e41a217cfd (diff)
Remove executor from reactor.
In accordance with tokio-rs/tokio-rfcs#3, the executor functionality of Tokio is being removed and will be relocated into futures-rs as a "current thread" executor. This PR removes task execution from the code base. As a temporary mesure, all examples and tests are switched to using CpuPool. Depends on #19.
Diffstat (limited to 'examples/connect.rs')
-rw-r--r--examples/connect.rs28
1 files changed, 19 insertions, 9 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index d5238a53..1c3fcb75 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -15,6 +15,7 @@
//! stdin/stdout to a server" to get up and running.
extern crate futures;
+extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
extern crate bytes;
@@ -26,6 +27,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
+use futures_cpupool::CpuPool;
use tokio::reactor::Core;
fn main() {
@@ -49,6 +51,8 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
+ let pool = CpuPool::new(1);
+
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@@ -61,9 +65,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
- tcp::connect(&addr, &handle, Box::new(stdin_rx))
+ tcp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
} else {
- udp::connect(&addr, &handle, Box::new(stdin_rx))
+ udp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@@ -83,6 +87,8 @@ mod tcp {
use bytes::{BufMut, BytesMut};
use futures::{Future, Stream};
+ use futures::future::Executor;
+ use futures_cpupool::CpuPool;
use tokio::net::TcpStream;
use tokio::reactor::Handle;
use tokio_io::AsyncRead;
@@ -90,11 +96,12 @@ mod tcp {
pub fn connect(addr: &SocketAddr,
handle: &Handle,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
+ pool: &CpuPool,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
let tcp = TcpStream::connect(addr, handle);
- let handle = handle.clone();
+ let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@@ -113,12 +120,12 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
- handle.spawn(stdin.forward(sink).then(|result| {
+ pool.execute(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- }));
+ })).unwrap();
stream
}).flatten_stream())
}
@@ -167,12 +174,15 @@ mod udp {
use bytes::BytesMut;
use futures::{Future, Stream};
+ use futures::future::Executor;
+ use futures_cpupool::CpuPool;
use tokio::net::{UdpCodec, UdpSocket};
use tokio::reactor::Handle;
pub fn connect(&addr: &SocketAddr,
handle: &Handle,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
+ pool: &CpuPool,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
// We'll bind our UDP socket to a local IP/port, but for now we
@@ -193,14 +203,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- handle.spawn(stdin.map(move |chunk| {
+ pool.execute(stdin.map(move |chunk| {
(addr, chunk)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- }));
+ })).unwrap();
// With UDP we could receive data from any source, so filter out
// anything coming from a different address