diff options
author | oberien <jaro.fietz@gmx.de> | 2016-10-06 15:38:20 +0200 |
---|---|---|
committer | oberien <jaro.fietz@gmx.de> | 2016-10-06 15:38:20 +0200 |
commit | 6961efa8dde54a429cfebc157594b25e5d25283f (patch) | |
tree | d6d42d843fadb8a17605aefdd354b28e6132fa8a /examples/chat.rs | |
parent | 0205b855d0794d882959e9147890e53c4ca6d2fe (diff) |
fix(chat): Implement alexcrichton's suggestions
* Remove unnecessary clone
* Improve rightward drift
* Remove unnecessary lazy future
* Improve utf-8 handling
* Refactor to make code more understandable
Diffstat (limited to 'examples/chat.rs')
-rw-r--r-- | examples/chat.rs | 91 |
1 files changed, 53 insertions, 38 deletions
diff --git a/examples/chat.rs b/examples/chat.rs index 4d43c4f9..beb1d78e 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -8,7 +8,7 @@ use std::rc::Rc; use std::cell::RefCell; use std::iter; use std::env; -use std::io::BufReader; +use std::io::{Error, ErrorKind, BufReader}; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; @@ -28,12 +28,11 @@ fn main() { let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); - let connections = connections.clone(); let future = socket.incoming().for_each(move |(stream, addr)| { let connections = connections.clone(); let handle_inner = handle.clone(); // We create a new future in which we create all other futures. - // This makes `stream` be bound on the `lazy` future's task, allowing + // This makes `stream` be bound on the outer future's task, allowing // `ReadHalf` and `WriteHalf` to be shared between inner futures. handle.spawn_fn(move || { println!("New Connection: {}", addr); @@ -43,50 +42,65 @@ fn main() { // add sender to hashmap of all current connections connections.borrow_mut().insert(addr, tx); + let reader = BufReader::new(reader); let connections_inner = connections.clone(); // https://users.rust-lang.org/t/loop-futures-for-client-handling/6950/2 - // We have an endless loop reading from a client. - // In order to fuse the reading and writing futures in the end, we need to have the same - // output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>, - // Option<WriteHalf<TcpStream>>)`. - let reader = BufReader::new(reader); - let socket_reader = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok)).fold((Some(reader),None), move |(reader, _), _| { - let reader = reader.unwrap(); + // First we need to get an infinite iterator + let iter = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok)); + // Then we fold it as infinite loop + let socket_reader = iter.fold(reader, move |reader, _| { let connections = connections_inner.clone(); - // read and parse length prefix - io::read_until(reader, '\n' as u8, vec![]) - .and_then(|(reader, vec)| futures::lazy(|| { - // EOF was hit without reading a delimiter - if vec.len() == 0 { - futures::failed((std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken Pipe"))).boxed() - } else { - futures::finished((reader, vec)).boxed() + // read line + let amt = io::read_until(reader, '\n' as u8, vec![]); + // check if we hit EOF and need to close the connection + let amt = amt.and_then(|(reader, vec)| { + // EOF was hit without reading a delimiter + if vec.len() == 0 { + let err = Error::new(ErrorKind::BrokenPipe, "Broken Pipe"); + futures::failed(err).boxed() + } else { + futures::finished((reader, vec)).boxed() + } + }); + // convert bytes into string + let amt = amt.map(|(reader, vec)| (reader, String::from_utf8(vec))); + amt.and_then(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let conns = connections.borrow_mut(); + if let Ok(msg) = message { + // For each open connection except the sender, send the string + // via the channel + let iter = conns.iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v); + for tx in iter { + tx.send(msg.clone()).unwrap(); } - })) - // convert bytes into string - .map(|(reader, vec)| (reader, String::from_utf8(vec).unwrap())) - .and_then(move |(reader, message)| { - println!("{}: {:?}", addr, message); - // For each open connection except the sender, send the string via the channel - for tx in connections.borrow_mut().iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v) { - tx.send(message.clone()).unwrap(); - } - futures::finished((Some(reader),None)) - }) + } else { + let tx = conns.get(&addr).unwrap(); + tx.send("You didn't send valid UTF-8.".to_string()).unwrap(); + } + futures::finished(reader) + }) }); // Whenever we receive a string on the Receiver, we write it to `WriteHalf<TcpStream>`. - let socket_writer = rx.fold((None, Some(writer)), move |(_, writer), msg| { - let writer = writer.unwrap(); - io::write_all(writer, msg.into_bytes()).map(|(writer, _)| (None, Some(writer))).boxed() + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt }); - socket_reader.select(socket_writer) - .then(move |_| { - connections.borrow_mut().remove(&addr); - println!("Connection {:?} closed.", addr); - Ok(()) - }) + // In order to fuse the reading and writing futures in the end, we need to have the + // same output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>, + // Option<WriteHalf<TcpStream>>)`. + let socket_reader = socket_reader.map(|reader| (Some(reader), None)); + let socket_writer = socket_writer.map(|writer| (None, Some(writer))); + + let amt = socket_reader.select(socket_writer); + amt.then(move |_| { + connections.borrow_mut().remove(&addr); + println!("Connection {:?} closed.", addr); + Ok(()) + }) }); Ok(()) }); @@ -94,3 +108,4 @@ fn main() { // exectue server core.run(future).unwrap(); } + |