summaryrefslogtreecommitdiffstats
path: root/examples/proxy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/proxy.rs')
-rw-r--r--examples/proxy.rs110
1 files changed, 55 insertions, 55 deletions
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 131fa41b..42b740df 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -1,6 +1,10 @@
//! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
+//! Because the Tokio runtime uses a thread poool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
//! You can showcase this by running this in one terminal:
//!
//! cargo run --example proxy
@@ -16,23 +20,18 @@
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
-extern crate tokio_io;
use std::sync::{Arc, Mutex};
use std::env;
use std::net::{Shutdown, SocketAddr};
use std::io::{self, Read, Write};
-use futures::stream::Stream;
-use futures::{Future, Poll};
-use futures::future::{Executor};
-use futures_cpupool::CpuPool;
+use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream};
-use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_io::io::{copy, shutdown};
+use tokio::prelude::*;
fn main() {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
@@ -41,59 +40,60 @@ fn main() {
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
- let pool = CpuPool::new(1);
-
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr).unwrap();
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
- let done = socket.incoming().for_each(move |client| {
- let server = TcpStream::connect(&server_addr);
- let amounts = server.and_then(move |server| {
- // Create separate read/write handles for the TCP clients that we're
- // proxying data between. Note that typically you'd use
- // `AsyncRead::split` for this operation, but we want our writer
- // handles to have a custom implementation of `shutdown` which
- // actually calls `TcpStream::shutdown` to ensure that EOF is
- // transmitted properly across the proxied connection.
- //
- // As a result, we wrap up our client/server manually in arcs and
- // use the impls below on our custom `MyTcpStream` type.
- let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
- let client_writer = client_reader.clone();
- let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
- let server_writer = server_reader.clone();
-
- // Copy the data (in parallel) between the client and the server.
- // After the copy is done we indicate to the remote side that we've
- // finished by shutting down the connection.
- let client_to_server = copy(client_reader, server_writer)
- .and_then(|(n, _, server_writer)| {
- shutdown(server_writer).map(move |_| n)
- });
-
- let server_to_client = copy(server_reader, client_writer)
- .and_then(|(n, _, client_writer)| {
- shutdown(client_writer).map(move |_| n)
- });
-
- client_to_server.join(server_to_client)
+ let done = socket.incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |client| {
+ let server = TcpStream::connect(&server_addr);
+ let amounts = server.and_then(move |server| {
+ // Create separate read/write handles for the TCP clients that we're
+ // proxying data between. Note that typically you'd use
+ // `AsyncRead::split` for this operation, but we want our writer
+ // handles to have a custom implementation of `shutdown` which
+ // actually calls `TcpStream::shutdown` to ensure that EOF is
+ // transmitted properly across the proxied connection.
+ //
+ // As a result, we wrap up our client/server manually in arcs and
+ // use the impls below on our custom `MyTcpStream` type.
+ let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
+ let client_writer = client_reader.clone();
+ let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
+ let server_writer = server_reader.clone();
+
+ // Copy the data (in parallel) between the client and the server.
+ // After the copy is done we indicate to the remote side that we've
+ // finished by shutting down the connection.
+ let client_to_server = copy(client_reader, server_writer)
+ .and_then(|(n, _, server_writer)| {
+ shutdown(server_writer).map(move |_| n)
+ });
+
+ let server_to_client = copy(server_reader, client_writer)
+ .and_then(|(n, _, client_writer)| {
+ shutdown(client_writer).map(move |_| n)
+ });
+
+ client_to_server.join(server_to_client)
+ });
+
+ let msg = amounts.map(move |(from_client, from_server)| {
+ println!("client wrote {} bytes and received {} bytes",
+ from_client, from_server);
+ }).map_err(|e| {
+ // Don't panic. Maybe the client just disconnected too soon.
+ println!("error: {}", e);
+ });
+
+ tokio::spawn(msg);
+
+ Ok(())
});
- let msg = amounts.map(move |(from_client, from_server)| {
- println!("client wrote {} bytes and received {} bytes",
- from_client, from_server);
- }).map_err(|e| {
- // Don't panic. Maybe the client just disconnected too soon.
- println!("error: {}", e);
- });
- pool.execute(msg).unwrap();
-
- Ok(())
- });
-
- done.wait().unwrap();
+ tokio::run(done);
}
// This is a custom type used to have a custom implementation of the