summaryrefslogtreecommitdiffstats
path: root/async-await/src/chat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'async-await/src/chat.rs')
-rw-r--r--async-await/src/chat.rs131
1 files changed, 0 insertions, 131 deletions
diff --git a/async-await/src/chat.rs b/async-await/src/chat.rs
deleted file mode 100644
index d3a0c992..00000000
--- a/async-await/src/chat.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-#![feature(await_macro, async_await)]
-
-use tokio::async_wait;
-use tokio::codec::{LinesCodec, Decoder};
-use tokio::net::{TcpListener, TcpStream};
-use tokio::prelude::*;
-
-use futures::sync::mpsc;
-
-use std::collections::HashMap;
-use std::io;
-use std::net::SocketAddr;
-use std::sync::{Arc, Mutex};
-
-/// Shorthand for the transmit half of the message channel.
-type Tx = mpsc::UnboundedSender<String>;
-
-struct Shared {
- peers: HashMap<SocketAddr, Tx>,
-}
-
-impl Shared {
- /// Create a new, empty, instance of `Shared`.
- fn new() -> Self {
- Shared {
- peers: HashMap::new(),
- }
- }
-}
-
-async fn process(stream: TcpStream, state: Arc<Mutex<Shared>>) -> io::Result<()> {
- let addr = stream.peer_addr().unwrap();
- let mut lines = LinesCodec::new().framed(stream);
-
- // Extract the peer's name
- let name = match async_wait!(lines.next()) {
- Some(name) => name?,
- None => {
- // Disconnected early
- return Ok(());
- }
- };
-
- println!("`{}` is joining the chat", name);
-
- let (tx, mut rx) = mpsc::unbounded();
-
- // Register the socket
- state.lock().unwrap()
- .peers.insert(addr, tx);
-
- // Split the `lines` handle into send and recv handles. This allows spawning
- // separate tasks.
- let (mut lines_tx, mut lines_rx) = lines.split();
-
- // Spawn a task that receives all lines broadcasted to us from other peers
- // and writes it to the client.
- tokio::spawn_async(async move {
- while let Some(line) = async_wait!(rx.next()) {
- let line = line.unwrap();
- async_wait!(lines_tx.send_async(line)).unwrap();
- }
- });
-
- // Use the current task to read lines from the socket and broadcast them to
- // other peers.
- while let Some(message) = async_wait!(lines_rx.next()) {
- // TODO: Error handling
- let message = message.unwrap();
-
- let mut line = name.clone();
- line.push_str(": ");
- line.push_str(&message);
- line.push_str("\r\n");
-
- let state = state.lock().unwrap();
-
- for (peer_addr, tx) in &state.peers {
- if *peer_addr != addr {
- // TODO: Error handling
- tx.unbounded_send(line.clone()).unwrap();
- }
- }
- }
-
- // Remove the client from the shared state. Doing so will also result in the
- // tx task to terminate.
- state.lock().unwrap()
- .peers.remove(&addr)
- .expect("bug");
-
- Ok(())
-}
-
-#[tokio::main]
-async fn main() {
- // Create the shared state. This is how all the peers communicate.
- //
- // The server task will hold a handle to this. For every new client, the
- // `state` handle is cloned and passed into the task that processes the
- // client connection.
- let state = Arc::new(Mutex::new(Shared::new()));
-
- let addr = "127.0.0.1:6142".parse().unwrap();
-
- // Bind a TCP listener to the socket address.
- //
- // Note that this is the Tokio TcpListener, which is fully async.
- let listener = TcpListener::bind(&addr).unwrap();
-
- println!("server running on localhost:6142");
-
- // Start the Tokio runtime.
- let mut incoming = listener.incoming();
-
- while let Some(stream) = async_wait!(incoming.next()) {
- let stream = match stream {
- Ok(stream) => stream,
- Err(_) => continue,
- };
-
- let state = state.clone();
-
- tokio::spawn_async(async move {
- if let Err(_) = async_wait!(process(stream, state)) {
- eprintln!("failed to process connection");
- }
- });
- }
-}
-