summaryrefslogtreecommitdiffstats
path: root/examples/connect.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-06 09:59:04 -0800
committerGitHub <noreply@github.com>2018-03-06 09:59:04 -0800
commitf1cb12e14fb047f3f86c852c253962c60ce471e8 (patch)
tree59aab45a28961b00f7c71c5eb083c6242b51ff01 /examples/connect.rs
parent56c579787260abcb9786aa22cfca1ee4b7c3b5ba (diff)
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
Diffstat (limited to 'examples/connect.rs')
-rw-r--r--examples/connect.rs64
1 files changed, 33 insertions, 31 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index 26614b96..f3ea6970 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -14,10 +14,11 @@
//! this repository! Many of them recommend running this as a simple "hook up
//! stdin/stdout to a server" to get up and running.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
extern crate tokio_io;
+extern crate futures;
extern crate bytes;
use std::env;
@@ -25,9 +26,8 @@ use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::thread;
+use tokio::prelude::*;
use futures::sync::mpsc;
-use futures::{Future, Sink, Stream};
-use futures_cpupool::CpuPool;
fn main() {
// Determine if we're going to run in TCP or UDP mode
@@ -46,8 +46,6 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();
- let pool = CpuPool::new(1);
-
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@@ -60,9 +58,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
- tcp::connect(&addr, &pool, Box::new(stdin_rx))
+ tcp::connect(&addr, Box::new(stdin_rx))
} else {
- udp::connect(&addr, &pool, Box::new(stdin_rx))
+ udp::connect(&addr, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@@ -71,15 +69,21 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- stdout.for_each(|chunk| {
- out.write_all(&chunk)
- }).wait().unwrap();
+
+ tokio::run({
+ stdout
+ .for_each(move |chunk| {
+ out.write_all(&chunk)
+ })
+ .map_err(|e| println!("error reading stdout; error = {:?}", e))
+ });
}
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
use tokio_io::codec::{Encoder, Decoder};
+
/// A simple `Codec` implementation that just ships bytes around.
///
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
@@ -115,24 +119,21 @@ mod codec {
}
mod tcp {
- use std::io;
- use std::net::SocketAddr;
+ use tokio;
+ use tokio::net::TcpStream;
+ use tokio::prelude::*;
use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
- use tokio::net::TcpStream;
- use tokio_io::AsyncRead;
use codec::Bytes;
+ use std::io;
+ use std::net::SocketAddr;
+
pub fn connect(addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
let tcp = TcpStream::connect(addr);
- let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@@ -151,12 +152,14 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
- pool.execute(stdin.forward(sink).then(|result| {
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
+
stream
}).flatten_stream())
}
@@ -166,17 +169,16 @@ mod udp {
use std::io;
use std::net::SocketAddr;
- use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
+ use tokio;
use tokio::net::{UdpSocket, UdpFramed};
+ use tokio::prelude::*;
+ use bytes::BytesMut;
+
use codec::Bytes;
pub fn connect(&addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
@@ -196,14 +198,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- pool.execute(stdin.map(move |chunk| {
+ tokio::spawn(stdin.map(move |chunk| {
(chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
// With UDP we could receive data from any source, so filter out
// anything coming from a different address