diff options
author | Carl Lerche <me@carllerche.com> | 2019-02-21 11:56:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-21 11:56:15 -0800 |
commit | 80162306e71c8561873a9c9496d65f2c1387d119 (patch) | |
tree | 83327ca8d9d1326d54e3c679e1fb4eb16775d4be /examples | |
parent | ab595d08253dd7ee0422144f8dafffa382700976 (diff) |
chore: apply rustfmt to all crates (#917)
Diffstat (limited to 'examples')
-rw-r--r-- | examples/chat-combinator-current-thread.rs | 45 | ||||
-rw-r--r-- | examples/chat-combinator.rs | 28 | ||||
-rw-r--r-- | examples/chat.rs | 58 | ||||
-rw-r--r-- | examples/connect.rs | 94 | ||||
-rw-r--r-- | examples/echo-udp.rs | 4 | ||||
-rw-r--r-- | examples/echo.rs | 4 | ||||
-rw-r--r-- | examples/hello_world.rs | 27 | ||||
-rw-r--r-- | examples/print_each_packet.rs | 4 | ||||
-rw-r--r-- | examples/proxy.rs | 33 | ||||
-rw-r--r-- | examples/tinydb.rs | 68 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 78 | ||||
-rw-r--r-- | examples/udp-client.rs | 3 | ||||
-rw-r--r-- | examples/udp-codec.rs | 4 |
13 files changed, 248 insertions, 202 deletions
diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs index c528eeec..ee147025 100644 --- a/examples/chat-combinator-current-thread.rs +++ b/examples/chat-combinator-current-thread.rs @@ -26,21 +26,20 @@ #![deny(warnings)] -extern crate tokio; extern crate futures; +extern crate tokio; use tokio::io; use tokio::net::TcpListener; use tokio::prelude::*; use tokio::runtime::current_thread::{Runtime, TaskExecutor}; +use std::cell::RefCell; use std::collections::HashMap; -use std::iter; use std::env; -use std::io::{BufReader}; +use std::io::BufReader; +use std::iter; use std::rc::Rc; -use std::cell::RefCell; - fn main() -> Result<(), Box<std::error::Error>> { let mut runtime = Runtime::new().unwrap(); @@ -58,8 +57,12 @@ fn main() -> Result<(), Box<std::error::Error>> { // The server task asynchronously iterates over and processes each incoming // connection. - let srv = socket.incoming() - .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e}) + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) .for_each(move |stream| { // The client's socket address let addr = stream.peer_addr()?; @@ -102,9 +105,7 @@ fn main() -> Result<(), Box<std::error::Error>> { // Convert the bytes we read into a string, and then send that // string to all other connected clients. - let line = line.map(|(reader, vec)| { - (reader, String::from_utf8(vec)) - }); + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); // Move the connection state into the closure below. let connections = connections_inner.clone(); @@ -116,15 +117,17 @@ fn main() -> Result<(), Box<std::error::Error>> { if let Ok(msg) = message { // For each open connection except the sender, send the // string via the channel. - let iter = conns.iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); for tx in iter { tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); } } else { let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); } reader @@ -147,12 +150,14 @@ fn main() -> Result<(), Box<std::error::Error>> { let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); // Spawn locally a task to process the connection - TaskExecutor::current().spawn_local(Box::new(connection.then(move |_| { - let mut conns = connections.borrow_mut(); - conns.remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - }))).unwrap(); + TaskExecutor::current() + .spawn_local(Box::new(connection.then(move |_| { + let mut conns = connections.borrow_mut(); + conns.remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + }))) + .unwrap(); Ok(()) }) diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs index 0572afbd..b81e8f7c 100644 --- a/examples/chat-combinator.rs +++ b/examples/chat-combinator.rs @@ -21,17 +21,17 @@ #![deny(warnings)] -extern crate tokio; extern crate futures; +extern crate tokio; use tokio::io; use tokio::net::TcpListener; use tokio::prelude::*; use std::collections::HashMap; -use std::iter; use std::env; -use std::io::{BufReader}; +use std::io::BufReader; +use std::iter; use std::sync::{Arc, Mutex}; fn main() -> Result<(), Box<std::error::Error>> { @@ -48,8 +48,12 @@ fn main() -> Result<(), Box<std::error::Error>> { // The server task asynchronously iterates over and processes each incoming // connection. - let srv = socket.incoming() - .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e}) + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) .for_each(move |stream| { // The client's socket address let addr = stream.peer_addr()?; @@ -91,9 +95,7 @@ fn main() -> Result<(), Box<std::error::Error>> { // Convert the bytes we read into a string, and then send that // string to all other connected clients. - let line = line.map(|(reader, vec)| { - (reader, String::from_utf8(vec)) - }); + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); // Move the connection state into the closure below. let connections = connections_inner.clone(); @@ -105,15 +107,17 @@ fn main() -> Result<(), Box<std::error::Error>> { if let Ok(msg) = message { // For each open connection except the sender, send the // string via the channel. - let iter = conns.iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); for tx in iter { tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); } } else { let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); } reader diff --git a/examples/chat.rs b/examples/chat.rs index 182af7c8..b21432af 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -31,12 +31,12 @@ extern crate tokio; extern crate futures; extern crate bytes; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::future::{self, Either}; +use futures::sync::mpsc; use tokio::io; use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; -use futures::sync::mpsc; -use futures::future::{self, Either}; -use bytes::{BytesMut, Bytes, BufMut}; use std::collections::HashMap; use std::net::SocketAddr; @@ -131,10 +131,7 @@ impl Shared { impl Peer { /// Create a new instance of `Peer`. - fn new(name: BytesMut, - state: Arc<Mutex<Shared>>, - lines: Lines) -> Peer - { + fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer { // Get the client socket address let addr = lines.socket.peer_addr().unwrap(); @@ -142,8 +139,7 @@ impl Peer { let (tx, rx) = mpsc::unbounded(); // Add an entry for this `Peer` in the shared state map. - state.lock().unwrap() - .peers.insert(addr, tx); + state.lock().unwrap().peers.insert(addr, tx); Peer { name, @@ -198,7 +194,7 @@ impl Future for Peer { // though there could still be lines to read. Because we did // not reach `Async::NotReady`, we have to notify ourselves // in order to tell the executor to schedule the task again. - if i+1 == LINES_PER_TICK { + if i + 1 == LINES_PER_TICK { task::current().notify(); } } @@ -256,8 +252,7 @@ impl Future for Peer { impl Drop for Peer { fn drop(&mut self) { - self.state.lock().unwrap().peers - .remove(&self.addr); + self.state.lock().unwrap().peers.remove(&self.addr); } } @@ -333,7 +328,10 @@ impl Stream for Lines { let sock_closed = self.fill_read_buf()?.is_ready(); // Now, try finding lines - let pos = self.rd.windows(2).enumerate() + let pos = self + .rd + .windows(2) + .enumerate() .find(|&(_, bytes)| bytes == b"\r\n") .map(|(i, _)| i); @@ -373,7 +371,8 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { // We use the `into_future` combinator to extract the first item from the // lines stream. `into_future` takes a `Stream` and converts it to a future // of `(first, rest)` where `rest` is the original stream instance. - let connection = lines.into_future() + let connection = lines + .into_future() // `into_future` doesn't have the right error type, so map the error to // make it work. .map_err(|(e, _)| e) @@ -408,10 +407,7 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { // // This is also a future that processes the connection, only // completing when the socket closes. - let peer = Peer::new( - name, - state, - lines); + let peer = Peer::new(name, state, lines); // Wrap `peer` with `Either::B` to make the return type fit. Either::B(peer) @@ -443,18 +439,20 @@ pub fn main() -> Result<(), Box<std::error::Error>> { // The server task asynchronously iterates over and processes each // incoming connection. - let server = listener.incoming().for_each(move |socket| { - // Spawn a task to process the connection - process(socket, state.clone()); - Ok(()) - }) - .map_err(|err| { - // All tasks must have an `Error` type of `()`. This forces error - // handling and helps avoid silencing failures. - // - // In our example, we are only going to log the error to STDOUT. - println!("accept error = {:?}", err); - }); + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + process(socket, state.clone()); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); println!("server running on localhost:6142"); diff --git a/examples/connect.rs b/examples/connect.rs index 93f55533..4dc0ea31 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -16,18 +16,18 @@ #![deny(warnings)] +extern crate bytes; +extern crate futures; extern crate tokio; extern crate tokio_io; -extern crate futures; -extern crate bytes; use std::env; use std::io::{self, Read, Write}; use std::net::SocketAddr; use std::thread; -use tokio::prelude::*; use futures::sync::mpsc; +use tokio::prelude::*; fn main() -> Result<(), Box<std::error::Error>> { // Determine if we're going to run in TCP or UDP mode @@ -73,18 +73,16 @@ fn main() -> Result<(), Box<std::error::Error>> { tokio::run({ stdout - .for_each(move |chunk| { - out.write_all(&chunk) - }) + .for_each(move |chunk| out.write_all(&chunk)) .map_err(|e| println!("error reading stdout; error = {:?}", e)) }); Ok(()) } mod codec { - use std::io; use bytes::{BufMut, BytesMut}; - use tokio::codec::{Encoder, Decoder}; + use std::io; + use tokio::codec::{Decoder, Encoder}; /// A simple `Codec` implementation that just ships bytes around. /// @@ -122,9 +120,9 @@ mod codec { mod tcp { use tokio; + use tokio::codec::Decoder; use tokio::net::TcpStream; use tokio::prelude::*; - use tokio::codec::Decoder; use bytes::BytesMut; use codec::Bytes; @@ -133,10 +131,10 @@ mod tcp { use std::io; use std::net::SocketAddr; - pub fn connect(addr: &SocketAddr, - stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) - -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> - { + pub fn connect( + addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { let tcp = TcpStream::connect(addr); // After the TCP connection has been established, we set up our client @@ -154,18 +152,21 @@ mod tcp { // You'll also note that we *spawn* the work to read stdin and write it // to the TCP stream. This is done to ensure that happens concurrently // with us reading data from the stream. - let stream = Box::new(tcp.map(move |stream| { - let (sink, stream) = Bytes.framed(stream).split(); - - tokio::spawn(stdin.forward(sink).then(|result| { - if let Err(e) = result { - println!("failed to write to socket: {}", e) - } - Ok(()) - })); - - stream - }).flatten_stream()); + let stream = Box::new( + tcp.map(move |stream| { + let (sink, stream) = Bytes.framed(stream).split(); + + tokio::spawn(stdin.forward(sink).then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + })); + + stream + }) + .flatten_stream(), + ); Ok(stream) } } @@ -175,17 +176,17 @@ mod udp { use std::io; use std::net::SocketAddr; + use bytes::BytesMut; use tokio; - use tokio::net::{UdpSocket, UdpFramed}; + use tokio::net::{UdpFramed, UdpSocket}; use tokio::prelude::*; - use bytes::BytesMut; use codec::Bytes; - pub fn connect(&addr: &SocketAddr, - stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) - -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> - { + pub fn connect( + &addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { // We'll bind our UDP socket to a local IP/port, but for now we // basically let the OS pick both of those. let addr_to_bind = if addr.ip().is_ipv4() { @@ -206,14 +207,15 @@ mod udp { // All bytes from `stdin` will go to the `addr` specified in our // argument list. Like with TCP this is spawned concurrently - let forward_stdin = stdin.map(move |chunk| { - (chunk, addr) - }).forward(sink).then(|result| { - if let Err(e) = result { - println!("failed to write to socket: {}", e) - } - Ok(()) - }); + let forward_stdin = stdin + .map(move |chunk| (chunk, addr)) + .forward(sink) + .then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + }); // With UDP we could receive data from any source, so filter out // anything coming from a different address @@ -225,10 +227,13 @@ mod udp { } }); - let stream = Box::new(future::lazy(|| { - tokio::spawn(forward_stdin); - future::ok(receive) - }).flatten_stream()); + let stream = Box::new( + future::lazy(|| { + tokio::spawn(forward_stdin); + future::ok(receive) + }) + .flatten_stream(), + ); Ok(stream) } } @@ -240,8 +245,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { loop { let mut buf = vec![0; 1024]; let n = match stdin.read(&mut buf) { - Err(_) | - Ok(0) => break, + Err(_) | Ok(0) => break, Ok(n) => n, }; buf.truncate(n); diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index 08a14563..93ebca79 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -16,11 +16,11 @@ extern crate futures; extern crate tokio; -use std::{env, io}; use std::net::SocketAddr; +use std::{env, io}; -use tokio::prelude::*; use tokio::net::UdpSocket; +use tokio::prelude::*; struct Server { socket: UdpSocket, diff --git a/examples/echo.rs b/examples/echo.rs index f33247cb..45f808f8 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -54,7 +54,8 @@ fn main() -> Result<(), Box<std::error::Error>> { // connections made to the server). The return value of the `for_each` // method is itself a future representing processing the entire stream of // connections, and ends up being our server. - let done = socket.incoming() + let done = socket + .incoming() .map_err(|e| println!("failed to accept socket; error = {:?}", e)) .for_each(move |socket| { // Once we're inside this closure this represents an accepted client @@ -89,7 +90,6 @@ fn main() -> Result<(), Box<std::error::Error>> { Ok(()) }); - // And this is where much of the magic of this server happens. We // crucially want all clients to make progress concurrently, rather than // blocking one on completion of another. To achieve this we use the diff --git a/examples/hello_world.rs b/examples/hello_world.rs index a05a8f22..c8276269 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -25,20 +25,21 @@ pub fn main() -> Result<(), Box<std::error::Error>> { // Open a TCP stream to the socket address. // // Note that this is the Tokio TcpStream, which is fully async. - let client = TcpStream::connect(&addr).and_then(|stream| { - println!("created stream"); - io::write_all(stream, "hello world\n").then(|result| { - println!("wrote to stream; success={:?}", result.is_ok()); - Ok(()) + let client = TcpStream::connect(&addr) + .and_then(|stream| { + println!("created stream"); + io::write_all(stream, "hello world\n").then(|result| { + println!("wrote to stream; success={:?}", result.is_ok()); + Ok(()) + }) }) - }) - .map_err(|err| { - // All tasks must have an `Error` type of `()`. This forces error - // handling and helps avoid silencing failures. - // - // In our example, we are only going to log the error to STDOUT. - println!("connection error = {:?}", err); - }); + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("connection error = {:?}", err); + }); // Start the Tokio runtime. // diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index 864d94bd..94a60648 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -57,10 +57,10 @@ extern crate tokio; extern crate tokio_codec; -use tokio_codec::BytesCodec; +use tokio::codec::Decoder; use tokio::net::TcpListener; use tokio::prelude::*; -use tokio::codec::Decoder; +use tokio_codec::BytesCodec; use std::env; use std::net::SocketAddr; diff --git a/examples/proxy.rs b/examples/proxy.rs index 1df115fd..ae8bf3a4 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -24,10 +24,10 @@ extern crate tokio; -use std::sync::{Arc, Mutex}; use std::env; -use std::net::{Shutdown, SocketAddr}; use std::io::{self, Read, Write}; +use std::net::{Shutdown, SocketAddr}; +use std::sync::{Arc, Mutex}; use tokio::io::{copy, shutdown}; use tokio::net::{TcpListener, TcpStream}; @@ -45,7 +45,8 @@ fn main() -> Result<(), Box<std::error::Error>> { println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); - let done = socket.incoming() + let done = socket + .incoming() .map_err(|e| println!("error accepting socket; error = {:?}", e)) .for_each(move |client| { let server = TcpStream::connect(&server_addr); @@ -68,25 +69,25 @@ fn main() -> Result<(), Box<std::error::Error>> { // After the copy is done we indicate to the remote side that we've // finished by shutting down the connection. let client_to_server = copy(client_reader, server_writer) - .and_then(|(n, _, server_writer)| { - shutdown(server_writer).map(move |_| n) - }); + .and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n)); let server_to_client = copy(server_reader, client_writer) - .and_then(|(n, _, client_writer)| { - shutdown(client_writer).map(move |_| n) - }); + .and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n)); client_to_server.join(server_to_client) }); - let msg = amounts.map(move |(from_client, from_server)| { - println!("client wrote {} bytes and received {} bytes", - from_client, from_server); - }).map_err(|e| { - // Don't panic. Maybe the client just disconnected too soon. - println!("error: {}", e); - }); + let msg = amounts + .map(move |(from_client, from_server)| { + println!( + "client wrote {} bytes and received {} bytes", + from_client, from_server + ); + }) + .map_err(|e| { + // Don't panic. Maybe the client just disconnected too soon. + println!("error: {}", e); + }); tokio::spawn(msg); diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 702704d3..11298ed1 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -44,8 +44,8 @@ extern crate tokio; use std::collections::HashMap; -use std::io::BufReader; use std::env; +use std::io::BufReader; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -69,9 +69,18 @@ enum Request { /// Responses to the `Request` commands above enum Response { - Value { key: String, value: String }, - Set { key: String, value: String, previous: Option<String> }, - Error { msg: String }, + Value { + key: String, + value: String, + }, + Set { + key: String, + value: String, + previous: Option<String>, + }, + Error { + msg: String, + }, } fn main() -> Result<(), Box<std::error::Error>> { @@ -93,7 +102,8 @@ fn main() -> Result<(), Box<std::error::Error>> { map: Mutex::new(initial_db), }); - let done = listener.incoming() + let done = listener + .incoming() .map_err(|e| println!("error accepting socket; error = {:?}", e)) .for_each(move |socket| { // As with many other small examples, the first thing we'll do is @@ -124,15 +134,22 @@ fn main() -> Result<(), Box<std::error::Error>> { let mut db = db.map.lock().unwrap(); match request { - Request::Get { key } => { - match db.get(&key) { - Some(value) => Response::Value { key, value: value.clone() }, - None => Response::Error { msg: format!("no key {}", key) }, - } - } + Request::Get { key } => match db.get(&key) { + Some(value) => Response::Value { + key, + value: value.clone(), + }, + None => Response::Error { + msg: format!("no key {}", key), + }, + }, Request::Set { key, value } => { let previous = db.insert(key.clone(), value.clone()); - Response::Set { key, value, previous } + Response::Set { + key, + value, + previous, + } } } }); @@ -169,9 +186,11 @@ impl Request { None => return Err(format!("GET must be followed by a key")), }; if parts.next().is_some() { - return Err(format!("GET's key must not be followed by anything")) + return Err(format!("GET's key must not be followed by anything")); } - Ok(Request::Get { key: key.to_string() }) + Ok(Request::Get { + key: key.to_string(), + }) } Some("SET") => { let key = match parts.next() { @@ -182,7 +201,10 @@ impl Request { Some(value) => value, None => return Err(format!("SET needs a value")), }; - Ok(Request::Set { key: key.to_string(), value: value.to_string() }) + Ok(Request::Set { + key: key.to_string(), + value: value.to_string(), + }) } Some(cmd) => Err(format!("unknown command: {}", cmd)), None => Err(format!("empty input")), @@ -193,15 +215,13 @@ impl Request { impl Response { fn serialize(&self) -> String { match *self { - Response::Value { ref key, ref value } => { - format!("{} = {}", key, value) - } - Response::Set { ref key, ref value, ref previous } => { - format!("set {} = `{}`, previous: {:?}", key, value, previous) - } - Response::Error { ref msg } => { - format!("error: {}", msg) - } + Response::Value { ref key, ref value } => format!("{} = {}", key, value), + Response::Set { + ref key, + ref value, + ref previous, + } => format!("set {} = `{}`, previous: {:?}", key, value, previous), + Response::Error { ref msg } => format!("error: {}", msg), } } } diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 4cbefcc9..cde1b79a 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -23,12 +23,12 @@ extern crate time; extern crate tokio; extern crate tokio_io; -use std::{env, fmt, io}; use std::net::SocketAddr; +use std::{env, fmt, io}; -use tokio::net::{TcpStream, TcpListener}; +use tokio::codec::{Decoder, Encoder}; +use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; -use tokio::codec::{Encoder, Decoder}; use bytes::BytesMut; use http::header::HeaderValue; @@ -44,7 +44,8 @@ fn main() -> Result<(), Box<std::error::Error>> { println!("Listening on: {}", addr); tokio::run({ - listener.incoming() + listener + .incoming() .map_err(|e| println!("failed to accept socket; error = {:?}", e)) .for_each(|socket| { process(socket); @@ -64,14 +65,13 @@ fn process(socket: TcpStream) { .split(); // Map all requests into responses and send them back to the client. - let task = tx.send_all(rx.and_then(respond)) - .then(|res| { - if let Err(e) = res { - println!("failed to process connection; error = {:?}", e); - } + let task = tx.send_all(rx.and_then(respond)).then(|res| { + if let Err(e) = res { + println!("failed to process connection; error = {:?}", e); + } - Ok(()) - }); + Ok(()) + }); // Spawn the task that handles the connection. tokio::spawn(task); @@ -82,9 +82,7 @@ fn process(socket: TcpStream) { /// This function is a map from and HTTP request to a future of a response and /// represents the various handling a server might do. Currently the contents /// here are pretty uninteresting. -fn respond(req: Request<()>) - -> Box<Future<Item = Response<String>, Error = io::Error> + Send> -{ +fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> { let f = future::lazy(move || { let mut response = Response::builder(); let body = match req.uri().path() { @@ -99,14 +97,18 @@ fn respond(req: Request<()>) struct Message { message: &'static str, } - serde_json::to_string(&Message { message: "Hello, World!" })? + serde_json::to_string(&Message { + message: "Hello, World!", + })? } _ => { response.status(StatusCode::NOT_FOUND); String::new() } }; - let response = response.body(body).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + let response = response + .body(body) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; Ok(response) }); @@ -124,12 +126,19 @@ impl Encoder for Http { fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { use std::fmt::Write; - write!(BytesWrite(dst), "\ - HTTP/1.1 {}\r\n\ - Server: Example\r\n\ - Content-Length: {}\r\n\ - Date: {}\r\n\ - ", item.status(), item.body().len(), date::now()).unwrap(); + write!( + BytesWrite(dst), + "\ + HTTP/1.1 {}\r\n\ + Server: Example\r\n\ + Content-Length: {}\r\n\ + Date: {}\r\n\ + ", + item.status(), + item.body().len(), + date::now() + ) + .unwrap(); for (k, v) in item.headers() { dst.extend_from_slice(k.as_str().as_bytes()); @@ -198,13 +207,18 @@ impl Decoder for Http { headers[i] = Some((k, v)); } - (toslice(r.method.unwrap().as_bytes()), - toslice(r.path.unwrap().as_bytes()), - r.version.unwrap(), - amt) + ( + toslice(r.method.unwrap().as_bytes()), + toslice(r.path.unwrap().as_bytes()), + r.version.unwrap(), + amt, + ) }; if version != 1 { - return Err(io::Error::new(io::ErrorKind::Other, "only HTTP/1.1 accepted")) + return Err(io::Error::new( + io::ErrorKind::Other, + " |