diff options
Diffstat (limited to 'examples/tinyhttp.rs')
-rw-r--r-- | examples/tinyhttp.rs | 92 |
1 files changed, 38 insertions, 54 deletions
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 2f982484..eaa40e6f 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -11,12 +11,11 @@ //! respectively. By default this will run I/O on all the cores your system has //! available, and it doesn't support HTTP request bodies. +#![deny(warnings)] + extern crate bytes; -extern crate futures; -extern crate futures_cpupool; extern crate http; extern crate httparse; -extern crate num_cpus; #[macro_use] extern crate serde_derive; extern crate serde_json; @@ -24,73 +23,58 @@ extern crate time; extern crate tokio; extern crate tokio_io; -use std::env; -use std::fmt; -use std::io; -use std::net::{self, SocketAddr}; -use std::thread; +use std::{env, fmt, io}; +use std::net::SocketAddr; + +use tokio::net::{TcpStream, TcpListener}; +use tokio::prelude::*; + +use tokio_io::codec::{Encoder, Decoder}; use bytes::BytesMut; -use futures::future::{self, Executor}; -use futures::sync::mpsc; -use futures::{Stream, Future, Sink}; -use futures_cpupool::CpuPool; use http::header::HeaderValue; use http::{Request, Response, StatusCode}; -use tokio::net::TcpStream; -use tokio::reactor::Handle; -use tokio_io::codec::{Encoder, Decoder}; -use tokio_io::{AsyncRead}; fn main() { // Parse the arguments, bind the TCP socket we'll be listening to, spin up // our worker threads, and start shipping sockets to those worker threads. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let num_threads = env::args().nth(2).and_then(|s| s.parse().ok()) - .unwrap_or(num_cpus::get()); - let listener = net::TcpListener::bind(&addr).expect("failed to bind"); + let listener = TcpListener::bind(&addr).expect("failed to bind"); println!("Listening on: {}", addr); - let mut channels = Vec::new(); - for _ in 0..num_threads { - let (tx, rx) = mpsc::unbounded(); - channels.push(tx); - thread::spawn(|| worker(rx)); - } - let mut next = 0; - for socket in listener.incoming() { - if let Ok(socket) = socket { - channels[next].unbounded_send(socket).expect("worker thread died"); - next = (next + 1) % channels.len(); - } - } + tokio::run({ + listener.incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(|socket| { + process(socket); + Ok(()) + }) + }); } -fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { - let handle = Handle::default(); - - let pool = CpuPool::new(1); - - let done = rx.for_each(move |socket| { - // Associate each socket we get with our local event loop, and then use - // the codec support in the tokio-io crate to deal with discrete - // request/response types instead of bytes. Here we'll just use our - // framing defined below and then use the `send_all` helper to send the - // responses back on the socket after we've processed them - let socket = future::result(TcpStream::from_std(socket, &handle)); - let req = socket.and_then(|socket| { - let (tx, rx) = socket.framed(Http).split(); - tx.send_all(rx.and_then(respond)) - }); - pool.execute(req.then(move |result| { - drop(result); +fn process(socket: TcpStream) { + let (tx, rx) = socket + // Frame the socket using the `Http` protocol. This maps the TCP socket + // to a Stream + Sink of HTTP frames. + .framed(Http) + // This splits a single `Stream + Sink` value into two separate handles + // that can be used independently (even on different tasks or threads). + .split(); + + // Map all requests into responses and send them back to the client. + let task = tx.send_all(rx.and_then(respond)) + .then(|res| { + if let Err(e) = res { + println!("failed to process connection; error = {:?}", e); + } + Ok(()) - })).unwrap(); - Ok(()) - }); - done.wait().unwrap(); + }); + + // Spawn the task that handles the connection. + tokio::spawn(task); } /// "Server logic" is implemented in this function. |