summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorjq-rs <juhamatk@gmail.com>2018-12-29 17:16:30 +0200
committerToby Lawrence <tobz@users.noreply.github.com>2018-12-29 10:16:30 -0500
commit9e4ddaeaf3ffcc6794c6669fd6a2019d3a113cbd (patch)
treea2b7b04edc22e9c75571708a0aa3ba44003226f8 /examples
parent03e2e864f37c565d209f3118da771cd2c9f1d680 (diff)
examples: single-threaded chat combinator example (#794)
Diffstat (limited to 'examples')
-rw-r--r--examples/chat-combinator-current-thread.rs167
1 files changed, 167 insertions, 0 deletions
diff --git a/examples/chat-combinator-current-thread.rs b/examples/chat-combinator-current-thread.rs
new file mode 100644
index 00000000..c528eeec
--- /dev/null
+++ b/examples/chat-combinator-current-thread.rs
@@ -0,0 +1,167 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This is a line-based server which accepts connections, reads lines from
+//! those connections, and broadcasts the lines to all other connected clients.
+//!
+//! This example is similar to chat.rs, but uses combinators and a much more
+//! functional style.
+//!
+//! Because we are here running the reactor/executor on the same thread instead
+//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use
+//! Rc + RefCell instead. The max performance is however limited to a CPU HW
+//! thread.
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat-combinator-current-thread
+//!
+//! And then in another window run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![deny(warnings)]
+
+extern crate tokio;
+extern crate futures;
+
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+use tokio::runtime::current_thread::{Runtime, TaskExecutor};
+
+use std::collections::HashMap;
+use std::iter;
+use std::env;
+use std::io::{BufReader};
+use std::rc::Rc;
+use std::cell::RefCell;
+
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let mut runtime = Runtime::new().unwrap();
+
+ // Create the TCP listener we'll accept connections on.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse()?;
+
+ let socket = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ // This is running on the Tokio current_thread runtime, so it will be single-
+ // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks.
+ let connections = Rc::new(RefCell::new(HashMap::new()));
+
+ // The server task asynchronously iterates over and processes each incoming
+ // connection.
+ let srv = socket.incoming()
+ .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e})
+ .for_each(move |stream| {
+ // The client's socket address
+ let addr = stream.peer_addr()?;
+
+ println!("New Connection: {}", addr);
+
+ // Split the TcpStream into two separate handles. One handle for reading
+ // and one handle for writing. This lets us use separate tasks for
+ // reading and writing.
+ let (reader, writer) = stream.split();
+
+ // Create a channel for our stream, which other sockets will use to
+ // send us messages. Then register our address with the stream to send
+ // data to us.
+ let (tx, rx) = futures::sync::mpsc::unbounded();
+ let mut conns = connections.borrow_mut();
+ conns.insert(addr, tx);
+
+ // Define here what we do for the actual I/O. That is, read a bunch of
+ // lines from the socket and dispatch them while we also write any lines
+ // from other sockets.
+ let connections_inner = connections.clone();
+ let reader = BufReader::new(reader);
+
+ // Model the read portion of this socket by mapping an infinite
+ // iterator to each line off the socket. This "loop" is then
+ // terminated with an error once we hit EOF on the socket.
+ let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
+
+ let socket_reader = iter.fold(reader, move |reader, _| {
+ // Read a line off the socket, failing if we're at EOF
+ let line = io::read_until(reader, b'\n', Vec::new());
+ let line = line.and_then(|(reader, vec)| {
+ if vec.len() == 0 {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
+ } else {
+ Ok((reader, vec))
+ }
+ });
+
+ // Convert the bytes we read into a string, and then send that
+ // string to all other connected clients.
+ let line = line.map(|(reader, vec)| {
+ (reader, String::from_utf8(vec))
+ });
+
+ // Move the connection state into the closure below.
+ let connections = connections_inner.clone();
+
+ line.map(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ let mut conns = connections.borrow_mut();
+
+ if let Ok(msg) = message {
+ // For each open connection except the sender, send the
+ // string via the channel.
+ let iter = conns.iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
+ for tx in iter {
+ tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
+ }
+ } else {
+ let tx = conns.get_mut(&addr).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
+ }
+
+ reader
+ })
+ });
+
+ // Whenever we receive a string on the Receiver, we write it to
+ // `WriteHalf<TcpStream>`.
+ let socket_writer = rx.fold(writer, |writer, msg| {
+ let amt = io::write_all(writer, msg.into_bytes());
+ let amt = amt.map(|(writer, _)| writer);
+ amt.map_err(|_| ())
+ });
+
+ // Now that we've got futures representing each half of the socket, we
+ // use the `select` combinator to wait for either half to be done to
+ // tear down the other. Then we spawn off the result.
+ let connections = connections.clone();
+ let socket_reader = socket_reader.map_err(|_| ());
+ let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
+
+ // Spawn locally a task to process the connection
+ TaskExecutor::current().spawn_local(Box::new(connection.then(move |_| {
+ let mut conns = connections.borrow_mut();
+ conns.remove(&addr);
+ println!("Connection {} closed.", addr);
+ Ok(())
+ }))).unwrap();
+
+ Ok(())
+ })
+ .map_err(|err| println!("error occurred: {:?}", err));
+
+ // Spawn srv itself
+ runtime.spawn(srv);
+
+ // Execute server
+ runtime.run().unwrap();
+ Ok(())
+}