summaryrefslogtreecommitdiffstats
path: root/examples/echo-threads.rs
blob: 6ce8b1563a99fc6bcbacbd33a85752e333544eeb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//! A multithreaded version of an echo server
//!
//! This server implements the same functionality as the `echo` example, except
//! that this example will use all cores of the machine to do I/O instead of
//! just one. This examples works by having the main thread using blocking I/O
//! and shipping accepted sockets to worker threads in a round-robin fashion.
//!
//! To see this server in action, you can run this in one terminal:
//!
//!     cargo run --example echo-threads
//!
//! and in another terminal you can run:
//!
//!     cargo run --example connect 127.0.0.1:8080

extern crate futures;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate tokio;
extern crate tokio_io;

use std::env;
use std::net::SocketAddr;
use std::thread;

use futures::prelude::*;
use futures::future::Executor;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::{TcpStream, TcpListener};

fn main() {
    // First argument, the address to bind
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();

    // Second argument, the number of threads we'll be using
    let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
        .unwrap_or(num_cpus::get());

    let listener = TcpListener::bind(&addr).expect("failed to bind");
    println!("Listening on: {}", addr);

    // Spin up our worker threads, creating a channel routing to each worker
    // thread that we'll use below.
    let mut channels = Vec::new();
    for _ in 0..num_threads {
        let (tx, rx) = mpsc::unbounded();
        channels.push(tx);
        thread::spawn(|| worker(rx));
    }

    // Infinitely accept sockets from our `TcpListener`.  Each socket is then
    // shipped round-robin to a particular thread which will associate the
    // socket with the corresponding event loop and process the connection.
    let mut next = 0;
    let srv = listener.incoming().for_each(|socket| {
        channels[next].unbounded_send(socket).expect("worker thread died");
        next = (next + 1) % channels.len();
        Ok(())
    });
    srv.wait().unwrap();
}

fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
    let pool = CpuPool::new(1);

    let done = rx.for_each(move |socket| {
        let addr = socket.peer_addr().expect("failed to get remote address");

        // Like the single-threaded `echo` example we split the socket halves
        // and use the `copy` helper to ship bytes back and forth. Afterwards we
        // spawn the task to run concurrently on this thread, and then print out
        // what happened afterwards
        let (reader, writer) = socket.split();
        let amt = copy(reader, writer);
        let msg = amt.then(move |result| {
            match result {
                Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
                Err(e) => println!("error on {}: {}", addr, e),
            }

            Ok(())
        });
        pool.execute(msg).unwrap();

        Ok(())
    });
    done.wait().unwrap();
}