summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2017-09-10 08:54:35 -0700
committerAlex Crichton <alex@alexcrichton.com>2017-09-10 08:54:35 -0700
commit7b94cf307d6562a8f31c5d764122e39cda080842 (patch)
treed9070a82f3d25b6a7a402fcdaf09083ba1922d6d /examples
parent645ae7051d08221cbd44ad1211eac8af2f0b6c67 (diff)
Add a multithreaded echo server example
Diffstat (limited to 'examples')
-rw-r--r--examples/echo-threads.rs100
-rw-r--r--examples/echo.rs2
2 files changed, 101 insertions, 1 deletions
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
new file mode 100644
index 00000000..1cb94e87
--- /dev/null
+++ b/examples/echo-threads.rs
@@ -0,0 +1,100 @@
+//! A multithreaded version of an echo server
+//!
+//! This server implements the same functionality as the `echo` example, except
+//! that this example will use all cores of the machine to do I/O instead of
+//! just one. This examples works by having the main thread using blocking I/O
+//! and shipping accepted sockets to worker threads in a round-robin fashion.
+//!
+//! To see this server in action, you can run this in one terminal:
+//!
+//! cargo run --example echoe-threads
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+
+extern crate futures;
+extern crate num_cpus;
+extern crate tokio_core;
+extern crate tokio_io;
+
+use std::env;
+use std::net::{self, SocketAddr};
+use std::thread;
+
+use futures::Future;
+use futures::stream::Stream;
+use futures::sync::mpsc;
+use tokio_io::AsyncRead;
+use tokio_io::io::copy;
+use tokio_core::net::TcpStream;
+use tokio_core::reactor::Core;
+
+fn main() {
+ // First argument, the address to bind
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>().unwrap();
+
+ // Second argument, the number of threads we'll be using
+ let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
+ .unwrap_or(num_cpus::get());
+
+ // Use `std::net` to bind the requested port, we'll use this on the main
+ // thread below
+ let listener = net::TcpListener::bind(&addr).expect("failed to bind");
+ println!("Listening on: {}", addr);
+
+ // Spin up our worker threads, creating a channel routing to each worker
+ // thread that we'll use below.
+ let mut channels = Vec::new();
+ for _ in 0..num_threads {
+ let (tx, rx) = mpsc::unbounded();
+ channels.push(tx);
+ thread::spawn(|| worker(rx));
+ }
+
+ // Infinitely accept sockets from our `std::net::TcpListener`, as this'll do
+ // blocking I/O. Each socket is then shipped round-robin to a particular
+ // thread which will associate the socket with the corresponding event loop
+ // and process the connection.
+ let mut next = 0;
+ for socket in listener.incoming() {
+ let socket = socket.expect("failed to accept");
+ channels[next].unbounded_send(socket).expect("worker thread died");
+ next = (next + 1) % channels.len();
+ }
+}
+
+fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+
+ let done = rx.for_each(move |socket| {
+ // First up when we receive a socket we associate it with our event loop
+ // using the `TcpStream::from_stream` API. After that the socket is not
+ // a `tokio_core::net::TcpStream` meaning it's in nonblocking mode and
+ // ready to be used with Tokio
+ let socket = TcpStream::from_stream(socket, &handle)
+ .expect("failed to associate TCP stream");
+ let addr = socket.peer_addr().expect("failed to get remote address");
+
+ // Like the single-threaded `echo` example we split the socket halves
+ // and use the `copy` helper to ship bytes back and forth. Afterwards we
+ // spawn the task to run concurrently on this thread, and then print out
+ // what happened afterwards
+ let (reader, writer) = socket.split();
+ let amt = copy(reader, writer);
+ let msg = amt.then(move |result| {
+ match result {
+ Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
+ Err(e) => println!("error on {}: {}", addr, e),
+ }
+
+ Ok(())
+ });
+ handle.spawn(msg);
+
+ Ok(())
+ });
+ core.run(done).unwrap();
+}
diff --git a/examples/echo.rs b/examples/echo.rs
index 80e73ea7..2bf8f391 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -1,4 +1,4 @@
-//! An "hello world" echo server with tokio-core
+//! A "hello world" echo server with tokio-core
//!
//! This server will create a TCP listener, accept connections in a loop, and
//! simply write back everything that's read off of each TCP connection. Each