summaryrefslogtreecommitdiffstats
path: root/examples/proxy.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-06 09:59:04 -0800
committerGitHub <noreply@github.com>2018-03-06 09:59:04 -0800
commitf1cb12e14fb047f3f86c852c253962c60ce471e8 (patch)
tree59aab45a28961b00f7c71c5eb083c6242b51ff01 /examples/proxy.rs
parent56c579787260abcb9786aa22cfca1ee4b7c3b5ba (diff)
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
Diffstat (limited to 'examples/proxy.rs')
-rw-r--r--examples/proxy.rs110
1 files changed, 55 insertions, 55 deletions
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 131fa41b..42b740df 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -1,6 +1,10 @@
//! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
+//! Because the Tokio runtime uses a thread poool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
//! You can showcase this by running this in one terminal:
//!
//! cargo run --example proxy
@@ -16,23 +20,18 @@
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
-extern crate tokio_io;
use std::sync::{Arc, Mutex};
use std::env;
use std::net::{Shutdown, SocketAddr};
use std::io::{self, Read, Write};
-use futures::stream::Stream;
-use futures::{Future, Poll};
-use futures::future::{Executor};
-use futures_cpupool::CpuPool;
+use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream};
-use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_io::io::{copy, shutdown};
+use tokio::prelude::*;
fn main() {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
@@ -41,59 +40,60 @@ fn main() {
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
- let pool = CpuPool::new(1);
-
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
- let done = socket.incoming().for_each(move |client| {
- let server = TcpStream::connect(&server_addr);
- let amounts = server.and_then(move |server| {
- // Create separate read/write handles for the TCP clients that we're
- // proxying data between. Note that typically you'd use
- // `AsyncRead::split` for this operation, but we want our writer
- // handles to have a custom implementation of `shutdown` which
- // actually calls `TcpStream::shutdown` to ensure that EOF is
- // transmitted properly across the proxied connection.
- //
- // As a result, we wrap up our client/server manually in arcs and
- // use the impls below on our custom `MyTcpStream` type.
- let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
- let client_writer = client_reader.clone();
- let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
- let server_writer = server_reader.clone();
-
- // Copy the data (in parallel) between the client and the server.
- // After the copy is done we indicate to the remote side that we've
- // finished by shutting down the connection.
- let client_to_server = copy(client_reader, server_writer)
- .and_then(|(n, _, server_writer)| {
- shutdown(server_writer).map(move |_| n)
- });
-
- let server_to_client = copy(server_reader, client_writer)
- .and_then(|(n, _, client_writer)| {
- shutdown(client_writer).map(move |_| n)
- });
-
- client_to_server.join(server_to_client)
+ let done = socket.incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |client| {
+ let server = TcpStream::connect(&server_addr);
+ let amounts = server.and_then(move |server| {
+ // Create separate read/write handles for the TCP clients that we're
+ // proxying data between. Note that typically you'd use
+ // `AsyncRead::split` for this operation, but we want our writer
+ // handles to have a custom implementation of `shutdown` which
+ // actually calls `TcpStream::shutdown` to ensure that EOF is
+ // transmitted properly across the proxied connection.
+ //
+ // As a result, we wrap up our client/server manually in arcs and
+ // use the impls below on our custom `MyTcpStream` type.
+ let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
+ let client_writer = client_reader.clone();
+ let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
+ let server_writer = server_reader.clone();
+
+ // Copy the data (in parallel) between the client and the server.
+ // After the copy is done we indicate to the remote side that we've
+ // finished by shutting down the connection.
+ let client_to_server = copy(client_reader, server_writer)
+ .and_then(|(n, _, server_writer)| {
+ shutdown(server_writer).map(move |_| n)
+ });
+
+ let server_to_client = copy(server_reader, client_writer)
+ .and_then(|(n, _, client_writer)| {
+ shutdown(client_writer).map(move |_| n)
+ });
+
+ client_to_server.join(server_to_client)
+ });
+
+ let msg = amounts.map(move |(from_client, from_server)| {
+ println!("client wrote {} bytes and received {} bytes",
+ from_client, from_server);
+ }).map_err(|e| {
+ // Don't panic. Maybe the client just disconnected too soon.
+ println!("error: {}", e);
+ });
+
+ tokio::spawn(msg);
+
+ Ok(())
});
- let msg = amounts.map(move |(from_client, from_server)| {
- println!("client wrote {} bytes and received {} bytes",
- from_client, from_server);
- }).map_err(|e| {
- // Don't panic. Maybe the client just disconnected too soon.
- println!("error: {}", e);
- });
- pool.execute(msg).unwrap();
-
- Ok(())
- });
-
- done.wait().unwrap();
+ tokio::run(done);
}
// This is a custom type used to have a custom implementation of the