summaryrefslogtreecommitdiffstats
path: root/examples/connect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/connect.rs')
-rw-r--r--examples/connect.rs64
1 files changed, 33 insertions, 31 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index 26614b96..f3ea6970 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -14,10 +14,11 @@
//! this repository! Many of them recommend running this as a simple "hook up
//! stdin/stdout to a server" to get up and running.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
extern crate tokio_io;
+extern crate futures;
extern crate bytes;
use std::env;
@@ -25,9 +26,8 @@ use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::thread;
+use tokio::prelude::*;
use futures::sync::mpsc;
-use futures::{Future, Sink, Stream};
-use futures_cpupool::CpuPool;
fn main() {
// Determine if we're going to run in TCP or UDP mode
@@ -46,8 +46,6 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();
- let pool = CpuPool::new(1);
-
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@@ -60,9 +58,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
- tcp::connect(&addr, &pool, Box::new(stdin_rx))
+ tcp::connect(&addr, Box::new(stdin_rx))
} else {
- udp::connect(&addr, &pool, Box::new(stdin_rx))
+ udp::connect(&addr, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@@ -71,15 +69,21 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- stdout.for_each(|chunk| {
- out.write_all(&chunk)
- }).wait().unwrap();
+
+ tokio::run({
+ stdout
+ .for_each(move |chunk| {
+ out.write_all(&chunk)
+ })
+ .map_err(|e| println!("error reading stdout; error = {:?}", e))
+ });
}
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
use tokio_io::codec::{Encoder, Decoder};
+
/// A simple `Codec` implementation that just ships bytes around.
///
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
@@ -115,24 +119,21 @@ mod codec {
}
mod tcp {
- use std::io;
- use std::net::SocketAddr;
+ use tokio;
+ use tokio::net::TcpStream;
+ use tokio::prelude::*;
use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
- use tokio::net::TcpStream;
- use tokio_io::AsyncRead;
use codec::Bytes;
+ use std::io;
+ use std::net::SocketAddr;
+
pub fn connect(addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
let tcp = TcpStream::connect(addr);
- let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@@ -151,12 +152,14 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
- pool.execute(stdin.forward(sink).then(|result| {
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
+
stream
}).flatten_stream())
}
@@ -166,17 +169,16 @@ mod udp {
use std::io;
use std::net::SocketAddr;
- use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
+ use tokio;
use tokio::net::{UdpSocket, UdpFramed};
+ use tokio::prelude::*;
+ use bytes::BytesMut;
+
use codec::Bytes;
pub fn connect(&addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
@@ -196,14 +198,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- pool.execute(stdin.map(move |chunk| {
+ tokio::spawn(stdin.map(move |chunk| {
(chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
// With UDP we could receive data from any source, so filter out
// anything coming from a different address