summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authoroberien <jaro.fietz@gmx.de>2016-09-23 02:14:03 +0200
committeroberien <jaro.fietz@gmx.de>2016-10-04 20:02:52 +0200
commit0205b855d0794d882959e9147890e53c4ca6d2fe (patch)
treea5bf515baeea3afae84c2d853d3fd89364473bca /examples/chat.rs
parente32115b1b42b7feaccf46da248946af0ec79af35 (diff)
Add Chat example
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs96
1 files changed, 96 insertions, 0 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
new file mode 100644
index 00000000..4d43c4f9
--- /dev/null
+++ b/examples/chat.rs
@@ -0,0 +1,96 @@
+//! A chat server that broadcasts a message to all connections.
+
+extern crate tokio_core;
+extern crate futures;
+
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::cell::RefCell;
+use std::iter;
+use std::env;
+use std::io::BufReader;
+
+use tokio_core::net::TcpListener;
+use tokio_core::reactor::Core;
+use tokio_core::io::{self, Io};
+
+use futures::stream::{self, Stream};
+use futures::Future;
+
+fn main() {
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse().unwrap();
+ // We are single-threaded, so we can just use Rc and RefCell.
+ let connections = Rc::new(RefCell::new(HashMap::new()));
+
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+ let socket = TcpListener::bind(&addr, &handle).unwrap();
+ println!("Listening on: {}", addr);
+
+ let connections = connections.clone();
+ let future = socket.incoming().for_each(move |(stream, addr)| {
+ let connections = connections.clone();
+ let handle_inner = handle.clone();
+ // We create a new future in which we create all other futures.
+ // This makes `stream` be bound on the `lazy` future's task, allowing
+ // `ReadHalf` and `WriteHalf` to be shared between inner futures.
+ handle.spawn_fn(move || {
+ println!("New Connection: {}", addr);
+ let (reader, writer) = stream.split();
+ // channel to send messages to this connection from other futures
+ let (tx, rx) = tokio_core::channel::channel(&handle_inner).unwrap();
+ // add sender to hashmap of all current connections
+ connections.borrow_mut().insert(addr, tx);
+
+ let connections_inner = connections.clone();
+ // https://users.rust-lang.org/t/loop-futures-for-client-handling/6950/2
+ // We have an endless loop reading from a client.
+ // In order to fuse the reading and writing futures in the end, we need to have the same
+ // output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>,
+ // Option<WriteHalf<TcpStream>>)`.
+ let reader = BufReader::new(reader);
+ let socket_reader = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok)).fold((Some(reader),None), move |(reader, _), _| {
+ let reader = reader.unwrap();
+ let connections = connections_inner.clone();
+ // read and parse length prefix
+ io::read_until(reader, '\n' as u8, vec![])
+ .and_then(|(reader, vec)| futures::lazy(|| {
+ // EOF was hit without reading a delimiter
+ if vec.len() == 0 {
+ futures::failed((std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken Pipe"))).boxed()
+ } else {
+ futures::finished((reader, vec)).boxed()
+ }
+ }))
+ // convert bytes into string
+ .map(|(reader, vec)| (reader, String::from_utf8(vec).unwrap()))
+ .and_then(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ // For each open connection except the sender, send the string via the channel
+ for tx in connections.borrow_mut().iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v) {
+ tx.send(message.clone()).unwrap();
+ }
+ futures::finished((Some(reader),None))
+ })
+ });
+
+ // Whenever we receive a string on the Receiver, we write it to `WriteHalf<TcpStream>`.
+ let socket_writer = rx.fold((None, Some(writer)), move |(_, writer), msg| {
+ let writer = writer.unwrap();
+ io::write_all(writer, msg.into_bytes()).map(|(writer, _)| (None, Some(writer))).boxed()
+ });
+
+ socket_reader.select(socket_writer)
+ .then(move |_| {
+ connections.borrow_mut().remove(&addr);
+ println!("Connection {:?} closed.", addr);
+ Ok(())
+ })
+ });
+ Ok(())
+ });
+
+ // exectue server
+ core.run(future).unwrap();
+}