diff options
Diffstat (limited to 'examples/chat-combinator.rs')
-rw-r--r-- | examples/chat-combinator.rs | 156 |
1 files changed, 0 insertions, 156 deletions
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs deleted file mode 100644 index b81e8f7c..00000000 --- a/examples/chat-combinator.rs +++ /dev/null @@ -1,156 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a line-based server which accepts connections, reads lines from -//! those connections, and broadcasts the lines to all other connected clients. -//! -//! This example is similar to chat.rs, but uses combinators and a much more -//! functional style. -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another window run: -//! -//! cargo run --example connect 127.0.0.1:8080 -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -use tokio::io; -use tokio::net::TcpListener; -use tokio::prelude::*; - -use std::collections::HashMap; -use std::env; -use std::io::BufReader; -use std::iter; -use std::sync::{Arc, Mutex}; - -fn main() -> Result<(), Box<std::error::Error>> { - // Create the TCP listener we'll accept connections on. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse()?; - - let socket = TcpListener::bind(&addr)?; - println!("Listening on: {}", addr); - - // This is running on the Tokio runtime, so it will be multi-threaded. The - // `Arc<Mutex<...>>` allows state to be shared across the threads. - let connections = Arc::new(Mutex::new(HashMap::new())); - - // The server task asynchronously iterates over and processes each incoming - // connection. - let srv = socket - .incoming() - .map_err(|e| { - println!("failed to accept socket; error = {:?}", e); - e - }) - .for_each(move |stream| { - // The client's socket address - let addr = stream.peer_addr()?; - - println!("New Connection: {}", addr); - - // Split the TcpStream into two separate handles. One handle for reading - // and one handle for writing. This lets us use separate tasks for - // reading and writing. - let (reader, writer) = stream.split(); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - connections.lock().unwrap().insert(addr, tx); - - // Define here what we do for the actual I/O. That is, read a bunch of - // lines from the socket and dispatch them while we also write any lines - // from other sockets. - let connections_inner = connections.clone(); - let reader = BufReader::new(reader); - - // Model the read portion of this socket by mapping an infinite - // iterator to each line off the socket. This "loop" is then - // terminated with an error once we hit EOF on the socket. - let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); - - let socket_reader = iter.fold(reader, move |reader, _| { - // Read a line off the socket, failing if we're at EOF - let line = io::read_until(reader, b'\n', Vec::new()); - let line = line.and_then(|(reader, vec)| { - if vec.len() == 0 { - Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) - } else { - Ok((reader, vec)) - } - }); - - // Convert the bytes we read into a string, and then send that - // string to all other connected clients. - let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); - - // Move the connection state into the closure below. - let connections = connections_inner.clone(); - - line.map(move |(reader, message)| { - println!("{}: {:?}", addr, message); - let mut conns = connections.lock().unwrap(); - - if let Ok(msg) = message { - // For each open connection except the sender, send the - // string via the channel. - let iter = conns - .iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); - } - } else { - let tx = conns.get_mut(&addr).unwrap(); - tx.unbounded_send("You didn't send valid UTF-8.".to_string()) - .unwrap(); - } - - reader - }) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf<TcpStream>`. - let socket_writer = rx.fold(writer, |writer, msg| { - let amt = io::write_all(writer, msg.into_bytes()); - let amt = amt.map(|(writer, _)| writer); - amt.map_err(|_| ()) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connections = connections.clone(); - let socket_reader = socket_reader.map_err(|_| ()); - let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); - - // Spawn a task to process the connection - tokio::spawn(connection.then(move |_| { - connections.lock().unwrap().remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - })); - - Ok(()) - }) - .map_err(|err| println!("error occurred: {:?}", err)); - - // execute server - tokio::run(srv); - Ok(()) -} |