summaryrefslogtreecommitdiffstats
path: root/examples/tinyhttp.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-06 09:59:04 -0800
committerGitHub <noreply@github.com>2018-03-06 09:59:04 -0800
commitf1cb12e14fb047f3f86c852c253962c60ce471e8 (patch)
tree59aab45a28961b00f7c71c5eb083c6242b51ff01 /examples/tinyhttp.rs
parent56c579787260abcb9786aa22cfca1ee4b7c3b5ba (diff)
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
Diffstat (limited to 'examples/tinyhttp.rs')
-rw-r--r--examples/tinyhttp.rs92
1 files changed, 38 insertions, 54 deletions
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 2f982484..eaa40e6f 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -11,12 +11,11 @@
//! respectively. By default this will run I/O on all the cores your system has
//! available, and it doesn't support HTTP request bodies.
+#![deny(warnings)]
+
extern crate bytes;
-extern crate futures;
-extern crate futures_cpupool;
extern crate http;
extern crate httparse;
-extern crate num_cpus;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
@@ -24,73 +23,58 @@ extern crate time;
extern crate tokio;
extern crate tokio_io;
-use std::env;
-use std::fmt;
-use std::io;
-use std::net::{self, SocketAddr};
-use std::thread;
+use std::{env, fmt, io};
+use std::net::SocketAddr;
+
+use tokio::net::{TcpStream, TcpListener};
+use tokio::prelude::*;
+
+use tokio_io::codec::{Encoder, Decoder};
use bytes::BytesMut;
-use futures::future::{self, Executor};
-use futures::sync::mpsc;
-use futures::{Stream, Future, Sink};
-use futures_cpupool::CpuPool;
use http::header::HeaderValue;
use http::{Request, Response, StatusCode};
-use tokio::net::TcpStream;
-use tokio::reactor::Handle;
-use tokio_io::codec::{Encoder, Decoder};
-use tokio_io::{AsyncRead};
fn main() {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
- .unwrap_or(num_cpus::get());
- let listener = net::TcpListener::bind(&addr).expect("failed to bind");
+ let listener = TcpListener::bind(&addr).expect("failed to bind");
println!("Listening on: {}", addr);
- let mut channels = Vec::new();
- for _ in 0..num_threads {
- let (tx, rx) = mpsc::unbounded();
- channels.push(tx);
- thread::spawn(|| worker(rx));
- }
- let mut next = 0;
- for socket in listener.incoming() {
- if let Ok(socket) = socket {
- channels[next].unbounded_send(socket).expect("worker thread died");
- next = (next + 1) % channels.len();
- }
- }
+ tokio::run({
+ listener.incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(|socket| {
+ process(socket);
+ Ok(())
+ })
+ });
}
-fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
- let handle = Handle::default();
-
- let pool = CpuPool::new(1);
-
- let done = rx.for_each(move |socket| {
- // Associate each socket we get with our local event loop, and then use
- // the codec support in the tokio-io crate to deal with discrete
- // request/response types instead of bytes. Here we'll just use our
- // framing defined below and then use the `send_all` helper to send the
- // responses back on the socket after we've processed them
- let socket = future::result(TcpStream::from_std(socket, &handle));
- let req = socket.and_then(|socket| {
- let (tx, rx) = socket.framed(Http).split();
- tx.send_all(rx.and_then(respond))
- });
- pool.execute(req.then(move |result| {
- drop(result);
+fn process(socket: TcpStream) {
+ let (tx, rx) = socket
+ // Frame the socket using the `Http` protocol. This maps the TCP socket
+ // to a Stream + Sink of HTTP frames.
+ .framed(Http)
+ // This splits a single `Stream + Sink` value into two separate handles
+ // that can be used independently (even on different tasks or threads).
+ .split();
+
+ // Map all requests into responses and send them back to the client.
+ let task = tx.send_all(rx.and_then(respond))
+ .then(|res| {
+ if let Err(e) = res {
+ println!("failed to process connection; error = {:?}", e);
+ }
+
Ok(())
- })).unwrap();
- Ok(())
- });
- done.wait().unwrap();
+ });
+
+ // Spawn the task that handles the connection.
+ tokio::spawn(task);
}
/// "Server logic" is implemented in this function.