summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authoroberien <jaro.fietz@gmx.de>2016-10-06 15:38:20 +0200
committeroberien <jaro.fietz@gmx.de>2016-10-06 15:38:20 +0200
commit6961efa8dde54a429cfebc157594b25e5d25283f (patch)
treed6d42d843fadb8a17605aefdd354b28e6132fa8a /examples/chat.rs
parent0205b855d0794d882959e9147890e53c4ca6d2fe (diff)
fix(chat): Implement alexcrichton's suggestions
* Remove unnecessary clone * Improve rightward drift * Remove unnecessary lazy future * Improve utf-8 handling * Refactor to make code more understandable
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs91
1 files changed, 53 insertions, 38 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 4d43c4f9..beb1d78e 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -8,7 +8,7 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
-use std::io::BufReader;
+use std::io::{Error, ErrorKind, BufReader};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
@@ -28,12 +28,11 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
- let connections = connections.clone();
let future = socket.incoming().for_each(move |(stream, addr)| {
let connections = connections.clone();
let handle_inner = handle.clone();
// We create a new future in which we create all other futures.
- // This makes `stream` be bound on the `lazy` future's task, allowing
+ // This makes `stream` be bound on the outer future's task, allowing
// `ReadHalf` and `WriteHalf` to be shared between inner futures.
handle.spawn_fn(move || {
println!("New Connection: {}", addr);
@@ -43,50 +42,65 @@ fn main() {
// add sender to hashmap of all current connections
connections.borrow_mut().insert(addr, tx);
+ let reader = BufReader::new(reader);
let connections_inner = connections.clone();
// https://users.rust-lang.org/t/loop-futures-for-client-handling/6950/2
- // We have an endless loop reading from a client.
- // In order to fuse the reading and writing futures in the end, we need to have the same
- // output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>,
- // Option<WriteHalf<TcpStream>>)`.
- let reader = BufReader::new(reader);
- let socket_reader = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok)).fold((Some(reader),None), move |(reader, _), _| {
- let reader = reader.unwrap();
+ // First we need to get an infinite iterator
+ let iter = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok));
+ // Then we fold it as infinite loop
+ let socket_reader = iter.fold(reader, move |reader, _| {
let connections = connections_inner.clone();
- // read and parse length prefix
- io::read_until(reader, '\n' as u8, vec![])
- .and_then(|(reader, vec)| futures::lazy(|| {
- // EOF was hit without reading a delimiter
- if vec.len() == 0 {
- futures::failed((std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken Pipe"))).boxed()
- } else {
- futures::finished((reader, vec)).boxed()
+ // read line
+ let amt = io::read_until(reader, '\n' as u8, vec![]);
+ // check if we hit EOF and need to close the connection
+ let amt = amt.and_then(|(reader, vec)| {
+ // EOF was hit without reading a delimiter
+ if vec.len() == 0 {
+ let err = Error::new(ErrorKind::BrokenPipe, "Broken Pipe");
+ futures::failed(err).boxed()
+ } else {
+ futures::finished((reader, vec)).boxed()
+ }
+ });
+ // convert bytes into string
+ let amt = amt.map(|(reader, vec)| (reader, String::from_utf8(vec)));
+ amt.and_then(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ let conns = connections.borrow_mut();
+ if let Ok(msg) = message {
+ // For each open connection except the sender, send the string
+ // via the channel
+ let iter = conns.iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v);
+ for tx in iter {
+ tx.send(msg.clone()).unwrap();
}
- }))
- // convert bytes into string
- .map(|(reader, vec)| (reader, String::from_utf8(vec).unwrap()))
- .and_then(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- // For each open connection except the sender, send the string via the channel
- for tx in connections.borrow_mut().iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v) {
- tx.send(message.clone()).unwrap();
- }
- futures::finished((Some(reader),None))
- })
+ } else {
+ let tx = conns.get(&addr).unwrap();
+ tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
+ }
+ futures::finished(reader)
+ })
});
// Whenever we receive a string on the Receiver, we write it to `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold((None, Some(writer)), move |(_, writer), msg| {
- let writer = writer.unwrap();
- io::write_all(writer, msg.into_bytes()).map(|(writer, _)| (None, Some(writer))).boxed()
+ let socket_writer = rx.fold(writer, |writer, msg| {
+ let amt = io::write_all(writer, msg.into_bytes());
+ let amt = amt.map(|(writer, _)| writer);
+ amt
});
- socket_reader.select(socket_writer)
- .then(move |_| {
- connections.borrow_mut().remove(&addr);
- println!("Connection {:?} closed.", addr);
- Ok(())
- })
+ // In order to fuse the reading and writing futures in the end, we need to have the
+ // same output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>,
+ // Option<WriteHalf<TcpStream>>)`.
+ let socket_reader = socket_reader.map(|reader| (Some(reader), None));
+ let socket_writer = socket_writer.map(|writer| (None, Some(writer)));
+
+ let amt = socket_reader.select(socket_writer);
+ amt.then(move |_| {
+ connections.borrow_mut().remove(&addr);
+ println!("Connection {:?} closed.", addr);
+ Ok(())
+ })
});
Ok(())
});
@@ -94,3 +108,4 @@ fn main() {
// exectue server
core.run(future).unwrap();
}
+