summaryrefslogtreecommitdiffstats
path: root/examples/chat-combinator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/chat-combinator.rs')
-rw-r--r--examples/chat-combinator.rs156
1 files changed, 0 insertions, 156 deletions
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs
deleted file mode 100644
index b81e8f7c..00000000
--- a/examples/chat-combinator.rs
+++ /dev/null
@@ -1,156 +0,0 @@
-//! A chat server that broadcasts a message to all connections.
-//!
-//! This is a line-based server which accepts connections, reads lines from
-//! those connections, and broadcasts the lines to all other connected clients.
-//!
-//! This example is similar to chat.rs, but uses combinators and a much more
-//! functional style.
-//!
-//! You can test this out by running:
-//!
-//! cargo run --example chat
-//!
-//! And then in another window run:
-//!
-//! cargo run --example connect 127.0.0.1:8080
-//!
-//! You can run the second command in multiple windows and then chat between the
-//! two, seeing the messages from the other client as they're received. For all
-//! connected clients they'll all join the same room and see everyone else's
-//! messages.
-
-#![deny(warnings)]
-
-extern crate futures;
-extern crate tokio;
-
-use tokio::io;
-use tokio::net::TcpListener;
-use tokio::prelude::*;
-
-use std::collections::HashMap;
-use std::env;
-use std::io::BufReader;
-use std::iter;
-use std::sync::{Arc, Mutex};
-
-fn main() -> Result<(), Box<std::error::Error>> {
- // Create the TCP listener we'll accept connections on.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse()?;
-
- let socket = TcpListener::bind(&addr)?;
- println!("Listening on: {}", addr);
-
- // This is running on the Tokio runtime, so it will be multi-threaded. The
- // `Arc<Mutex<...>>` allows state to be shared across the threads.
- let connections = Arc::new(Mutex::new(HashMap::new()));
-
- // The server task asynchronously iterates over and processes each incoming
- // connection.
- let srv = socket
- .incoming()
- .map_err(|e| {
- println!("failed to accept socket; error = {:?}", e);
- e
- })
- .for_each(move |stream| {
- // The client's socket address
- let addr = stream.peer_addr()?;
-
- println!("New Connection: {}", addr);
-
- // Split the TcpStream into two separate handles. One handle for reading
- // and one handle for writing. This lets us use separate tasks for
- // reading and writing.
- let (reader, writer) = stream.split();
-
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.lock().unwrap().insert(addr, tx);
-
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_inner = connections.clone();
- let reader = BufReader::new(reader);
-
- // Model the read portion of this socket by mapping an infinite
- // iterator to each line off the socket. This "loop" is then
- // terminated with an error once we hit EOF on the socket.
- let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
-
- let socket_reader = iter.fold(reader, move |reader, _| {
- // Read a line off the socket, failing if we're at EOF
- let line = io::read_until(reader, b'\n', Vec::new());
- let line = line.and_then(|(reader, vec)| {
- if vec.len() == 0 {
- Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
- } else {
- Ok((reader, vec))
- }
- });
-
- // Convert the bytes we read into a string, and then send that
- // string to all other connected clients.
- let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
-
- // Move the connection state into the closure below.
- let connections = connections_inner.clone();
-
- line.map(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- let mut conns = connections.lock().unwrap();
-
- if let Ok(msg) = message {
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns
- .iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
- }
- } else {
- let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string())
- .unwrap();
- }
-
- reader
- })
- });
-
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold(writer, |writer, msg| {
- let amt = io::write_all(writer, msg.into_bytes());
- let amt = amt.map(|(writer, _)| writer);
- amt.map_err(|_| ())
- });
-
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
-
- // Spawn a task to process the connection
- tokio::spawn(connection.then(move |_| {
- connections.lock().unwrap().remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- }));
-
- Ok(())
- })
- .map_err(|err| println!("error occurred: {:?}", err));
-
- // execute server
- tokio::run(srv);
- Ok(())
-}