summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-02-21 11:56:15 -0800
committerGitHub <noreply@github.com>2019-02-21 11:56:15 -0800
commit80162306e71c8561873a9c9496d65f2c1387d119 (patch)
tree83327ca8d9d1326d54e3c679e1fb4eb16775d4be /examples
parentab595d08253dd7ee0422144f8dafffa382700976 (diff)
chore: apply rustfmt to all crates (#917)
Diffstat (limited to 'examples')
-rw-r--r--examples/chat-combinator-current-thread.rs45
-rw-r--r--examples/chat-combinator.rs28
-rw-r--r--examples/chat.rs58
-rw-r--r--examples/connect.rs94
-rw-r--r--examples/echo-udp.rs4
-rw-r--r--examples/echo.rs4
-rw-r--r--examples/hello_world.rs27
-rw-r--r--examples/print_each_packet.rs4
-rw-r--r--examples/proxy.rs33
-rw-r--r--examples/tinydb.rs68
-rw-r--r--examples/tinyhttp.rs78
-rw-r--r--examples/udp-client.rs3
-rw-r--r--examples/udp-codec.rs4
13 files changed, 248 insertions, 202 deletions
diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs
index c528eeec..ee147025 100644
--- a/examples/chat-combinator-current-thread.rs
+++ b/examples/chat-combinator-current-thread.rs
@@ -26,21 +26,20 @@
#![deny(warnings)]
-extern crate tokio;
extern crate futures;
+extern crate tokio;
use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::runtime::current_thread::{Runtime, TaskExecutor};
+use std::cell::RefCell;
use std::collections::HashMap;
-use std::iter;
use std::env;
-use std::io::{BufReader};
+use std::io::BufReader;
+use std::iter;
use std::rc::Rc;
-use std::cell::RefCell;
-
fn main() -> Result<(), Box<std::error::Error>> {
let mut runtime = Runtime::new().unwrap();
@@ -58,8 +57,12 @@ fn main() -> Result<(), Box<std::error::Error>> {
// The server task asynchronously iterates over and processes each incoming
// connection.
- let srv = socket.incoming()
- .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e})
+ let srv = socket
+ .incoming()
+ .map_err(|e| {
+ println!("failed to accept socket; error = {:?}", e);
+ e
+ })
.for_each(move |stream| {
// The client's socket address
let addr = stream.peer_addr()?;
@@ -102,9 +105,7 @@ fn main() -> Result<(), Box<std::error::Error>> {
// Convert the bytes we read into a string, and then send that
// string to all other connected clients.
- let line = line.map(|(reader, vec)| {
- (reader, String::from_utf8(vec))
- });
+ let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
// Move the connection state into the closure below.
let connections = connections_inner.clone();
@@ -116,15 +117,17 @@ fn main() -> Result<(), Box<std::error::Error>> {
if let Ok(msg) = message {
// For each open connection except the sender, send the
// string via the channel.
- let iter = conns.iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
+ let iter = conns
+ .iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
for tx in iter {
tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
}
} else {
let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string())
+ .unwrap();
}
reader
@@ -147,12 +150,14 @@ fn main() -> Result<(), Box<std::error::Error>> {
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
// Spawn locally a task to process the connection
- TaskExecutor::current().spawn_local(Box::new(connection.then(move |_| {
- let mut conns = connections.borrow_mut();
- conns.remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- }))).unwrap();
+ TaskExecutor::current()
+ .spawn_local(Box::new(connection.then(move |_| {
+ let mut conns = connections.borrow_mut();
+ conns.remove(&addr);
+ println!("Connection {} closed.", addr);
+ Ok(())
+ })))
+ .unwrap();
Ok(())
})
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs
index 0572afbd..b81e8f7c 100644
--- a/examples/chat-combinator.rs
+++ b/examples/chat-combinator.rs
@@ -21,17 +21,17 @@
#![deny(warnings)]
-extern crate tokio;
extern crate futures;
+extern crate tokio;
use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::collections::HashMap;
-use std::iter;
use std::env;
-use std::io::{BufReader};
+use std::io::BufReader;
+use std::iter;
use std::sync::{Arc, Mutex};
fn main() -> Result<(), Box<std::error::Error>> {
@@ -48,8 +48,12 @@ fn main() -> Result<(), Box<std::error::Error>> {
// The server task asynchronously iterates over and processes each incoming
// connection.
- let srv = socket.incoming()
- .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e})
+ let srv = socket
+ .incoming()
+ .map_err(|e| {
+ println!("failed to accept socket; error = {:?}", e);
+ e
+ })
.for_each(move |stream| {
// The client's socket address
let addr = stream.peer_addr()?;
@@ -91,9 +95,7 @@ fn main() -> Result<(), Box<std::error::Error>> {
// Convert the bytes we read into a string, and then send that
// string to all other connected clients.
- let line = line.map(|(reader, vec)| {
- (reader, String::from_utf8(vec))
- });
+ let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
// Move the connection state into the closure below.
let connections = connections_inner.clone();
@@ -105,15 +107,17 @@ fn main() -> Result<(), Box<std::error::Error>> {
if let Ok(msg) = message {
// For each open connection except the sender, send the
// string via the channel.
- let iter = conns.iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
+ let iter = conns
+ .iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
for tx in iter {
tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
}
} else {
let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string())
+ .unwrap();
}
reader
diff --git a/examples/chat.rs b/examples/chat.rs
index 182af7c8..b21432af 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -31,12 +31,12 @@ extern crate tokio;
extern crate futures;
extern crate bytes;
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::future::{self, Either};
+use futures::sync::mpsc;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
-use futures::sync::mpsc;
-use futures::future::{self, Either};
-use bytes::{BytesMut, Bytes, BufMut};
use std::collections::HashMap;
use std::net::SocketAddr;
@@ -131,10 +131,7 @@ impl Shared {
impl Peer {
/// Create a new instance of `Peer`.
- fn new(name: BytesMut,
- state: Arc<Mutex<Shared>>,
- lines: Lines) -> Peer
- {
+ fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
// Get the client socket address
let addr = lines.socket.peer_addr().unwrap();
@@ -142,8 +139,7 @@ impl Peer {
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
- state.lock().unwrap()
- .peers.insert(addr, tx);
+ state.lock().unwrap().peers.insert(addr, tx);
Peer {
name,
@@ -198,7 +194,7 @@ impl Future for Peer {
// though there could still be lines to read. Because we did
// not reach `Async::NotReady`, we have to notify ourselves
// in order to tell the executor to schedule the task again.
- if i+1 == LINES_PER_TICK {
+ if i + 1 == LINES_PER_TICK {
task::current().notify();
}
}
@@ -256,8 +252,7 @@ impl Future for Peer {
impl Drop for Peer {
fn drop(&mut self) {
- self.state.lock().unwrap().peers
- .remove(&self.addr);
+ self.state.lock().unwrap().peers.remove(&self.addr);
}
}
@@ -333,7 +328,10 @@ impl Stream for Lines {
let sock_closed = self.fill_read_buf()?.is_ready();
// Now, try finding lines
- let pos = self.rd.windows(2).enumerate()
+ let pos = self
+ .rd
+ .windows(2)
+ .enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i);
@@ -373,7 +371,8 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// We use the `into_future` combinator to extract the first item from the
// lines stream. `into_future` takes a `Stream` and converts it to a future
// of `(first, rest)` where `rest` is the original stream instance.
- let connection = lines.into_future()
+ let connection = lines
+ .into_future()
// `into_future` doesn't have the right error type, so map the error to
// make it work.
.map_err(|(e, _)| e)
@@ -408,10 +407,7 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
//
// This is also a future that processes the connection, only
// completing when the socket closes.
- let peer = Peer::new(
- name,
- state,
- lines);
+ let peer = Peer::new(name, state, lines);
// Wrap `peer` with `Either::B` to make the return type fit.
Either::B(peer)
@@ -443,18 +439,20 @@ pub fn main() -> Result<(), Box<std::error::Error>> {
// The server task asynchronously iterates over and processes each
// incoming connection.
- let server = listener.incoming().for_each(move |socket| {
- // Spawn a task to process the connection
- process(socket, state.clone());
- Ok(())
- })
- .map_err(|err| {
- // All tasks must have an `Error` type of `()`. This forces error
- // handling and helps avoid silencing failures.
- //
- // In our example, we are only going to log the error to STDOUT.
- println!("accept error = {:?}", err);
- });
+ let server = listener
+ .incoming()
+ .for_each(move |socket| {
+ // Spawn a task to process the connection
+ process(socket, state.clone());
+ Ok(())
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("accept error = {:?}", err);
+ });
println!("server running on localhost:6142");
diff --git a/examples/connect.rs b/examples/connect.rs
index 93f55533..4dc0ea31 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -16,18 +16,18 @@
#![deny(warnings)]
+extern crate bytes;
+extern crate futures;
extern crate tokio;
extern crate tokio_io;
-extern crate futures;
-extern crate bytes;
use std::env;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::thread;
-use tokio::prelude::*;
use futures::sync::mpsc;
+use tokio::prelude::*;
fn main() -> Result<(), Box<std::error::Error>> {
// Determine if we're going to run in TCP or UDP mode
@@ -73,18 +73,16 @@ fn main() -> Result<(), Box<std::error::Error>> {
tokio::run({
stdout
- .for_each(move |chunk| {
- out.write_all(&chunk)
- })
+ .for_each(move |chunk| out.write_all(&chunk))
.map_err(|e| println!("error reading stdout; error = {:?}", e))
});
Ok(())
}
mod codec {
- use std::io;
use bytes::{BufMut, BytesMut};
- use tokio::codec::{Encoder, Decoder};
+ use std::io;
+ use tokio::codec::{Decoder, Encoder};
/// A simple `Codec` implementation that just ships bytes around.
///
@@ -122,9 +120,9 @@ mod codec {
mod tcp {
use tokio;
+ use tokio::codec::Decoder;
use tokio::net::TcpStream;
use tokio::prelude::*;
- use tokio::codec::Decoder;
use bytes::BytesMut;
use codec::Bytes;
@@ -133,10 +131,10 @@ mod tcp {
use std::io;
use std::net::SocketAddr;
- pub fn connect(addr: &SocketAddr,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
- {
+ pub fn connect(
+ addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
+ ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
let tcp = TcpStream::connect(addr);
// After the TCP connection has been established, we set up our client
@@ -154,18 +152,21 @@ mod tcp {
// You'll also note that we *spawn* the work to read stdin and write it
// to the TCP stream. This is done to ensure that happens concurrently
// with us reading data from the stream.
- let stream = Box::new(tcp.map(move |stream| {
- let (sink, stream) = Bytes.framed(stream).split();
-
- tokio::spawn(stdin.forward(sink).then(|result| {
- if let Err(e) = result {
- println!("failed to write to socket: {}", e)
- }
- Ok(())
- }));
-
- stream
- }).flatten_stream());
+ let stream = Box::new(
+ tcp.map(move |stream| {
+ let (sink, stream) = Bytes.framed(stream).split();
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
+ if let Err(e) = result {
+ println!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ }));
+
+ stream
+ })
+ .flatten_stream(),
+ );
Ok(stream)
}
}
@@ -175,17 +176,17 @@ mod udp {
use std::io;
use std::net::SocketAddr;
+ use bytes::BytesMut;
use tokio;
- use tokio::net::{UdpSocket, UdpFramed};
+ use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
- use bytes::BytesMut;
use codec::Bytes;
- pub fn connect(&addr: &SocketAddr,
- stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
- {
+ pub fn connect(
+ &addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
+ ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
let addr_to_bind = if addr.ip().is_ipv4() {
@@ -206,14 +207,15 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- let forward_stdin = stdin.map(move |chunk| {
- (chunk, addr)
- }).forward(sink).then(|result| {
- if let Err(e) = result {
- println!("failed to write to socket: {}", e)
- }
- Ok(())
- });
+ let forward_stdin = stdin
+ .map(move |chunk| (chunk, addr))
+ .forward(sink)
+ .then(|result| {
+ if let Err(e) = result {
+ println!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ });
// With UDP we could receive data from any source, so filter out
// anything coming from a different address
@@ -225,10 +227,13 @@ mod udp {
}
});
- let stream = Box::new(future::lazy(|| {
- tokio::spawn(forward_stdin);
- future::ok(receive)
- }).flatten_stream());
+ let stream = Box::new(
+ future::lazy(|| {
+ tokio::spawn(forward_stdin);
+ future::ok(receive)
+ })
+ .flatten_stream(),
+ );
Ok(stream)
}
}
@@ -240,8 +245,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf) {
- Err(_) |
- Ok(0) => break,
+ Err(_) | Ok(0) => break,
Ok(n) => n,
};
buf.truncate(n);
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index 08a14563..93ebca79 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -16,11 +16,11 @@
extern crate futures;
extern crate tokio;
-use std::{env, io};
use std::net::SocketAddr;
+use std::{env, io};
-use tokio::prelude::*;
use tokio::net::UdpSocket;
+use tokio::prelude::*;
struct Server {
socket: UdpSocket,
diff --git a/examples/echo.rs b/examples/echo.rs
index f33247cb..45f808f8 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -54,7 +54,8 @@ fn main() -> Result<(), Box<std::error::Error>> {
// connections made to the server). The return value of the `for_each`
// method is itself a future representing processing the entire stream of
// connections, and ends up being our server.
- let done = socket.incoming()
+ let done = socket
+ .incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
// Once we're inside this closure this represents an accepted client
@@ -89,7 +90,6 @@ fn main() -> Result<(), Box<std::error::Error>> {
Ok(())
});
-
// And this is where much of the magic of this server happens. We
// crucially want all clients to make progress concurrently, rather than
// blocking one on completion of another. To achieve this we use the
diff --git a/examples/hello_world.rs b/examples/hello_world.rs
index a05a8f22..c8276269 100644
--- a/examples/hello_world.rs
+++ b/examples/hello_world.rs
@@ -25,20 +25,21 @@ pub fn main() -> Result<(), Box<std::error::Error>> {
// Open a TCP stream to the socket address.
//
// Note that this is the Tokio TcpStream, which is fully async.
- let client = TcpStream::connect(&addr).and_then(|stream| {
- println!("created stream");
- io::write_all(stream, "hello world\n").then(|result| {
- println!("wrote to stream; success={:?}", result.is_ok());
- Ok(())
+ let client = TcpStream::connect(&addr)
+ .and_then(|stream| {
+ println!("created stream");
+ io::write_all(stream, "hello world\n").then(|result| {
+ println!("wrote to stream; success={:?}", result.is_ok());
+ Ok(())
+ })
})
- })
- .map_err(|err| {
- // All tasks must have an `Error` type of `()`. This forces error
- // handling and helps avoid silencing failures.
- //
- // In our example, we are only going to log the error to STDOUT.
- println!("connection error = {:?}", err);
- });
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("connection error = {:?}", err);
+ });
// Start the Tokio runtime.
//
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
index 864d94bd..94a60648 100644
--- a/examples/print_each_packet.rs
+++ b/examples/print_each_packet.rs
@@ -57,10 +57,10 @@
extern crate tokio;
extern crate tokio_codec;
-use tokio_codec::BytesCodec;
+use tokio::codec::Decoder;
use tokio::net::TcpListener;
use tokio::prelude::*;
-use tokio::codec::Decoder;
+use tokio_codec::BytesCodec;
use std::env;
use std::net::SocketAddr;
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 1df115fd..ae8bf3a4 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -24,10 +24,10 @@
extern crate tokio;
-use std::sync::{Arc, Mutex};
use std::env;
-use std::net::{Shutdown, SocketAddr};
use std::io::{self, Read, Write};
+use std::net::{Shutdown, SocketAddr};
+use std::sync::{Arc, Mutex};
use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream};
@@ -45,7 +45,8 @@ fn main() -> Result<(), Box<std::error::Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
- let done = socket.incoming()
+ let done = socket
+ .incoming()
.map_err(|e| println!("error accepting socket; error = {:?}", e))
.for_each(move |client| {
let server = TcpStream::connect(&server_addr);
@@ -68,25 +69,25 @@ fn main() -> Result<(), Box<std::error::Error>> {
// After the copy is done we indicate to the remote side that we've
// finished by shutting down the connection.
let client_to_server = copy(client_reader, server_writer)
- .and_then(|(n, _, server_writer)| {
- shutdown(server_writer).map(move |_| n)
- });
+ .and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n));
let server_to_client = copy(server_reader, client_writer)
- .and_then(|(n, _, client_writer)| {
- shutdown(client_writer).map(move |_| n)
- });
+ .and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n));
client_to_server.join(server_to_client)
});
- let msg = amounts.map(move |(from_client, from_server)| {
- println!("client wrote {} bytes and received {} bytes",
- from_client, from_server);
- }).map_err(|e| {
- // Don't panic. Maybe the client just disconnected too soon.
- println!("error: {}", e);
- });
+ let msg = amounts
+ .map(move |(from_client, from_server)| {
+ println!(
+ "client wrote {} bytes and received {} bytes",
+ from_client, from_server
+ );
+ })
+ .map_err(|e| {
+ // Don't panic. Maybe the client just disconnected too soon.
+ println!("error: {}", e);
+ });
tokio::spawn(msg);
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 702704d3..11298ed1 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -44,8 +44,8 @@
extern crate tokio;
use std::collections::HashMap;
-use std::io::BufReader;
use std::env;
+use std::io::BufReader;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
@@ -69,9 +69,18 @@ enum Request {
/// Responses to the `Request` commands above
enum Response {
- Value { key: String, value: String },
- Set { key: String, value: String, previous: Option<String> },
- Error { msg: String },
+ Value {
+ key: String,
+ value: String,
+ },
+ Set {
+ key: String,
+ value: String,
+ previous: Option<String>,
+ },
+ Error {
+ msg: String,
+ },
}
fn main() -> Result<(), Box<std::error::Error>> {
@@ -93,7 +102,8 @@ fn main() -> Result<(), Box<std::error::Error>> {
map: Mutex::new(initial_db),
});
- let done = listener.incoming()
+ let done = listener
+ .incoming()
.map_err(|e| println!("error accepting socket; error = {:?}", e))
.for_each(move |socket| {
// As with many other small examples, the first thing we'll do is
@@ -124,15 +134,22 @@ fn main() -> Result<(), Box<std::error::Error>> {
let mut db = db.map.lock().unwrap();
match request {
- Request::Get { key } => {
- match db.get(&key) {
- Some(value) => Response::Value { key, value: value.clone() },
- None => Response::Error { msg: format!("no key {}", key) },
- }
- }
+ Request::Get { key } => match db.get(&key) {
+ Some(value) => Response::Value {
+ key,
+ value: value.clone(),
+ },
+ None => Response::Error {
+ msg: format!("no key {}", key),
+ },
+ },
Request::Set { key, value } => {
let previous = db.insert(key.clone(), value.clone());
- Response::Set { key, value, previous }
+ Response::Set {
+ key,
+ value,
+ previous,
+ }
}
}
});
@@ -169,9 +186,11 @@ impl Request {
None => return Err(format!("GET must be followed by a key")),
};
if parts.next().is_some() {
- return Err(format!("GET's key must not be followed by anything"))
+ return Err(format!("GET's key must not be followed by anything"));
}
- Ok(Request::Get { key: key.to_string() })
+ Ok(Request::Get {
+ key: key.to_string(),
+ })
}
Some("SET") => {
let key = match parts.next() {
@@ -182,7 +201,10 @@ impl Request {
Some(value) => value,
None => return Err(format!("SET needs a value")),
};
- Ok(Request::Set { key: key.to_string(), value: value.to_string() })
+ Ok(Request::Set {
+ key: key.to_string(),
+ value: value.to_string(),
+ })
}
Some(cmd) => Err(format!("unknown command: {}", cmd)),
None => Err(format!("empty input")),
@@ -193,15 +215,13 @@ impl Request {
impl Response {
fn serialize(&self) -> String {
match *self {
- Response::Value { ref key, ref value } => {
- format!("{} = {}", key, value)
- }
- Response::Set { ref key, ref value, ref previous } => {
- format!("set {} = `{}`, previous: {:?}", key, value, previous)
- }
- Response::Error { ref msg } => {
- format!("error: {}", msg)
- }
+ Response::Value { ref key, ref value } => format!("{} = {}", key, value),
+ Response::Set {
+ ref key,
+ ref value,
+ ref previous,
+ } => format!("set {} = `{}`, previous: {:?}", key, value, previous),
+ Response::Error { ref msg } => format!("error: {}", msg),
}
}
}
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 4cbefcc9..cde1b79a 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -23,12 +23,12 @@ extern crate time;
extern crate tokio;
extern crate tokio_io;
-use std::{env, fmt, io};
use std::net::SocketAddr;
+use std::{env, fmt, io};
-use tokio::net::{TcpStream, TcpListener};
+use tokio::codec::{Decoder, Encoder};
+use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
-use tokio::codec::{Encoder, Decoder};
use bytes::BytesMut;
use http::header::HeaderValue;
@@ -44,7 +44,8 @@ fn main() -> Result<(), Box<std::error::Error>> {
println!("Listening on: {}", addr);
tokio::run({
- listener.incoming()
+ listener
+ .incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(|socket| {
process(socket);
@@ -64,14 +65,13 @@ fn process(socket: TcpStream) {
.split();
// Map all requests into responses and send them back to the client.
- let task = tx.send_all(rx.and_then(respond))
- .then(|res| {
- if let Err(e) = res {
- println!("failed to process connection; error = {:?}", e);
- }
+ let task = tx.send_all(rx.and_then(respond)).then(|res| {
+ if let Err(e) = res {
+ println!("failed to process connection; error = {:?}", e);
+ }
- Ok(())
- });
+ Ok(())
+ });
// Spawn the task that handles the connection.
tokio::spawn(task);
@@ -82,9 +82,7 @@ fn process(socket: TcpStream) {
/// This function is a map from and HTTP request to a future of a response and
/// represents the various handling a server might do. Currently the contents
/// here are pretty uninteresting.
-fn respond(req: Request<()>)
- -> Box<Future<Item = Response<String>, Error = io::Error> + Send>
-{
+fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> {
let f = future::lazy(move || {
let mut response = Response::builder();
let body = match req.uri().path() {
@@ -99,14 +97,18 @@ fn respond(req: Request<()>)
struct Message {
message: &'static str,
}
- serde_json::to_string(&Message { message: "Hello, World!" })?
+ serde_json::to_string(&Message {
+ message: "Hello, World!",
+ })?
}
_ => {
response.status(StatusCode::NOT_FOUND);
String::new()
}
};
- let response = response.body(body).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+ let response = response
+ .body(body)
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(response)
});
@@ -124,12 +126,19 @@ impl Encoder for Http {
fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
use std::fmt::Write;
- write!(BytesWrite(dst), "\
- HTTP/1.1 {}\r\n\
- Server: Example\r\n\
- Content-Length: {}\r\n\
- Date: {}\r\n\
- ", item.status(), item.body().len(), date::now()).unwrap();
+ write!(
+ BytesWrite(dst),
+ "\
+ HTTP/1.1 {}\r\n\
+ Server: Example\r\n\
+ Content-Length: {}\r\n\
+ Date: {}\r\n\
+ ",
+ item.status(),
+ item.body().len(),
+ date::now()
+ )
+ .unwrap();
for (k, v) in item.headers() {
dst.extend_from_slice(k.as_str().as_bytes());
@@ -198,13 +207,18 @@ impl Decoder for Http {
headers[i] = Some((k, v));
}
- (toslice(r.method.unwrap().as_bytes()),
- toslice(r.path.unwrap().as_bytes()),
- r.version.unwrap(),
- amt)
+ (
+ toslice(r.method.unwrap().as_bytes()),
+ toslice(r.path.unwrap().as_bytes()),
+ r.version.unwrap(),
+ amt,
+ )
};
if version != 1 {
- return Err(io::Error::new(io::ErrorKind::Other, "only HTTP/1.1 accepted"))
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "