summaryrefslogtreecommitdiffstats
path: root/examples/tinyhttp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/tinyhttp.rs')
-rw-r--r--examples/tinyhttp.rs92
1 files changed, 38 insertions, 54 deletions
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 2f982484..eaa40e6f 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -11,12 +11,11 @@
//! respectively. By default this will run I/O on all the cores your system has
//! available, and it doesn't support HTTP request bodies.
+#![deny(warnings)]
+
extern crate bytes;
-extern crate futures;
-extern crate futures_cpupool;
extern crate http;
extern crate httparse;
-extern crate num_cpus;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
@@ -24,73 +23,58 @@ extern crate time;
extern crate tokio;
extern crate tokio_io;
-use std::env;
-use std::fmt;
-use std::io;
-use std::net::{self, SocketAddr};
-use std::thread;
+use std::{env, fmt, io};
+use std::net::SocketAddr;
+
+use tokio::net::{TcpStream, TcpListener};
+use tokio::prelude::*;
+
+use tokio_io::codec::{Encoder, Decoder};
use bytes::BytesMut;
-use futures::future::{self, Executor};
-use futures::sync::mpsc;
-use futures::{Stream, Future, Sink};
-use futures_cpupool::CpuPool;
use http::header::HeaderValue;
use http::{Request, Response, StatusCode};
-use tokio::net::TcpStream;
-use tokio::reactor::Handle;
-use tokio_io::codec::{Encoder, Decoder};
-use tokio_io::{AsyncRead};
fn main() {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
- .unwrap_or(num_cpus::get());
- let listener = net::TcpListener::bind(&addr).expect("failed to bind");
+ let listener = TcpListener::bind(&addr).expect("failed to bind");
println!("Listening on: {}", addr);
- let mut channels = Vec::new();
- for _ in 0..num_threads {
- let (tx, rx) = mpsc::unbounded();
- channels.push(tx);
- thread::spawn(|| worker(rx));
- }
- let mut next = 0;
- for socket in listener.incoming() {
- if let Ok(socket) = socket {
- channels[next].unbounded_send(socket).expect("worker thread died");
- next = (next + 1) % channels.len();
- }
- }
+ tokio::run({
+ listener.incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(|socket| {
+ process(socket);
+ Ok(())
+ })
+ });
}
-fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
- let handle = Handle::default();
-
- let pool = CpuPool::new(1);
-
- let done = rx.for_each(move |socket| {
- // Associate each socket we get with our local event loop, and then use
- // the codec support in the tokio-io crate to deal with discrete
- // request/response types instead of bytes. Here we'll just use our
- // framing defined below and then use the `send_all` helper to send the
- // responses back on the socket after we've processed them
- let socket = future::result(TcpStream::from_std(socket, &handle));
- let req = socket.and_then(|socket| {
- let (tx, rx) = socket.framed(Http).split();
- tx.send_all(rx.and_then(respond))
- });
- pool.execute(req.then(move |result| {
- drop(result);
+fn process(socket: TcpStream) {
+ let (tx, rx) = socket
+ // Frame the socket using the `Http` protocol. This maps the TCP socket
+ // to a Stream + Sink of HTTP frames.
+ .framed(Http)
+ // This splits a single `Stream + Sink` value into two separate handles
+ // that can be used independently (even on different tasks or threads).
+ .split();
+
+ // Map all requests into responses and send them back to the client.
+ let task = tx.send_all(rx.and_then(respond))
+ .then(|res| {
+ if let Err(e) = res {
+ println!("failed to process connection; error = {:?}", e);
+ }
+
Ok(())
- })).unwrap();
- Ok(())
- });
- done.wait().unwrap();
+ });
+
+ // Spawn the task that handles the connection.
+ tokio::spawn(task);
}
/// "Server logic" is implemented in this function.