summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-02-21 11:56:15 -0800
committerGitHub <noreply@github.com>2019-02-21 11:56:15 -0800
commit80162306e71c8561873a9c9496d65f2c1387d119 (patch)
tree83327ca8d9d1326d54e3c679e1fb4eb16775d4be /examples/chat.rs
parentab595d08253dd7ee0422144f8dafffa382700976 (diff)
chore: apply rustfmt to all crates (#917)
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs58
1 files changed, 28 insertions, 30 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 182af7c8..b21432af 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -31,12 +31,12 @@ extern crate tokio;
extern crate futures;
extern crate bytes;
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::future::{self, Either};
+use futures::sync::mpsc;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
-use futures::sync::mpsc;
-use futures::future::{self, Either};
-use bytes::{BytesMut, Bytes, BufMut};
use std::collections::HashMap;
use std::net::SocketAddr;
@@ -131,10 +131,7 @@ impl Shared {
impl Peer {
/// Create a new instance of `Peer`.
- fn new(name: BytesMut,
- state: Arc<Mutex<Shared>>,
- lines: Lines) -> Peer
- {
+ fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
// Get the client socket address
let addr = lines.socket.peer_addr().unwrap();
@@ -142,8 +139,7 @@ impl Peer {
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
- state.lock().unwrap()
- .peers.insert(addr, tx);
+ state.lock().unwrap().peers.insert(addr, tx);
Peer {
name,
@@ -198,7 +194,7 @@ impl Future for Peer {
// though there could still be lines to read. Because we did
// not reach `Async::NotReady`, we have to notify ourselves
// in order to tell the executor to schedule the task again.
- if i+1 == LINES_PER_TICK {
+ if i + 1 == LINES_PER_TICK {
task::current().notify();
}
}
@@ -256,8 +252,7 @@ impl Future for Peer {
impl Drop for Peer {
fn drop(&mut self) {
- self.state.lock().unwrap().peers
- .remove(&self.addr);
+ self.state.lock().unwrap().peers.remove(&self.addr);
}
}
@@ -333,7 +328,10 @@ impl Stream for Lines {
let sock_closed = self.fill_read_buf()?.is_ready();
// Now, try finding lines
- let pos = self.rd.windows(2).enumerate()
+ let pos = self
+ .rd
+ .windows(2)
+ .enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i);
@@ -373,7 +371,8 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// We use the `into_future` combinator to extract the first item from the
// lines stream. `into_future` takes a `Stream` and converts it to a future
// of `(first, rest)` where `rest` is the original stream instance.
- let connection = lines.into_future()
+ let connection = lines
+ .into_future()
// `into_future` doesn't have the right error type, so map the error to
// make it work.
.map_err(|(e, _)| e)
@@ -408,10 +407,7 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
//
// This is also a future that processes the connection, only
// completing when the socket closes.
- let peer = Peer::new(
- name,
- state,
- lines);
+ let peer = Peer::new(name, state, lines);
// Wrap `peer` with `Either::B` to make the return type fit.
Either::B(peer)
@@ -443,18 +439,20 @@ pub fn main() -> Result<(), Box<std::error::Error>> {
// The server task asynchronously iterates over and processes each
// incoming connection.
- let server = listener.incoming().for_each(move |socket| {
- // Spawn a task to process the connection
- process(socket, state.clone());
- Ok(())
- })
- .map_err(|err| {
- // All tasks must have an `Error` type of `()`. This forces error
- // handling and helps avoid silencing failures.
- //
- // In our example, we are only going to log the error to STDOUT.
- println!("accept error = {:?}", err);
- });
+ let server = listener
+ .incoming()
+ .for_each(move |socket| {
+ // Spawn a task to process the connection
+ process(socket, state.clone());
+ Ok(())
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("accept error = {:?}", err);
+ });
println!("server running on localhost:6142");