diff options
Diffstat (limited to 'examples/connect.rs')
-rw-r--r-- | examples/connect.rs | 64 |
1 files changed, 33 insertions, 31 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index 26614b96..f3ea6970 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -14,10 +14,11 @@ //! this repository! Many of them recommend running this as a simple "hook up //! stdin/stdout to a server" to get up and running. -extern crate futures; -extern crate futures_cpupool; +#![deny(warnings)] + extern crate tokio; extern crate tokio_io; +extern crate futures; extern crate bytes; use std::env; @@ -25,9 +26,8 @@ use std::io::{self, Read, Write}; use std::net::SocketAddr; use std::thread; +use tokio::prelude::*; use futures::sync::mpsc; -use futures::{Future, Sink, Stream}; -use futures_cpupool::CpuPool; fn main() { // Determine if we're going to run in TCP or UDP mode @@ -46,8 +46,6 @@ fn main() { }); let addr = addr.parse::<SocketAddr>().unwrap(); - let pool = CpuPool::new(1); - // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will // read data (with blocking I/O) from stdin and then send it to the event @@ -60,9 +58,9 @@ fn main() { // our UDP connection to get a stream of bytes we're going to emit to // stdout. let stdout = if tcp { - tcp::connect(&addr, &pool, Box::new(stdin_rx)) + tcp::connect(&addr, Box::new(stdin_rx)) } else { - udp::connect(&addr, &pool, Box::new(stdin_rx)) + udp::connect(&addr, Box::new(stdin_rx)) }; // And now with our stream of bytes to write to stdout, we execute that in @@ -71,15 +69,21 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - stdout.for_each(|chunk| { - out.write_all(&chunk) - }).wait().unwrap(); + + tokio::run({ + stdout + .for_each(move |chunk| { + out.write_all(&chunk) + }) + .map_err(|e| println!("error reading stdout; error = {:?}", e)) + }); } mod codec { use std::io; use bytes::{BufMut, BytesMut}; use tokio_io::codec::{Encoder, Decoder}; + /// A simple `Codec` implementation that just ships bytes around. /// /// This type is used for "framing" a TCP/UDP stream of bytes but it's really @@ -115,24 +119,21 @@ mod codec { } mod tcp { - use std::io; - use std::net::SocketAddr; + use tokio; + use tokio::net::TcpStream; + use tokio::prelude::*; use bytes::BytesMut; - use futures::{Future, Stream}; - use futures::future::Executor; - use futures_cpupool::CpuPool; - use tokio::net::TcpStream; - use tokio_io::AsyncRead; use codec::Bytes; + use std::io; + use std::net::SocketAddr; + pub fn connect(addr: &SocketAddr, - pool: &CpuPool, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) - -> Box<Stream<Item = BytesMut, Error = io::Error>> + -> Box<Stream<Item = BytesMut, Error = io::Error> + Send> { let tcp = TcpStream::connect(addr); - let pool = pool.clone(); // After the TCP connection has been established, we set up our client // to start forwarding data. @@ -151,12 +152,14 @@ mod tcp { // with us reading data from the stream. Box::new(tcp.map(move |stream| { let (sink, stream) = stream.framed(Bytes).split(); - pool.execute(stdin.forward(sink).then(|result| { + + tokio::spawn(stdin.forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })).unwrap(); + })); + stream }).flatten_stream()) } @@ -166,17 +169,16 @@ mod udp { use std::io; use std::net::SocketAddr; - use bytes::BytesMut; - use futures::{Future, Stream}; - use futures::future::Executor; - use futures_cpupool::CpuPool; + use tokio; use tokio::net::{UdpSocket, UdpFramed}; + use tokio::prelude::*; + use bytes::BytesMut; + use codec::Bytes; pub fn connect(&addr: &SocketAddr, - pool: &CpuPool, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) - -> Box<Stream<Item = BytesMut, Error = io::Error>> + -> Box<Stream<Item = BytesMut, Error = io::Error> + Send> { // We'll bind our UDP socket to a local IP/port, but for now we // basically let the OS pick both of those. @@ -196,14 +198,14 @@ mod udp { // All bytes from `stdin` will go to the `addr` specified in our // argument list. Like with TCP this is spawned concurrently - pool.execute(stdin.map(move |chunk| { + tokio::spawn(stdin.map(move |chunk| { (chunk, addr) }).forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) } Ok(()) - })).unwrap(); + })); // With UDP we could receive data from any source, so filter out // anything coming from a different address |