summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
blob: ca8a5958c8beb73582a5edf7e44b6ab81700e465 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! A chat server that broadcasts a message to all connections.
//!
//! This is a simple line-based server which accepts connections, reads lines
//! from those connections, and broadcasts the lines to all other connected
//! clients. In a sense this is a bit of a "poor man's chat server".

extern crate tokio_core;
extern crate futures;

use std::collections::HashMap;
use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::{Error, ErrorKind, BufReader};

use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_core::io::{self, Io};

use futures::stream::{self, Stream};
use futures::Future;

fn main() {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse().unwrap();

    // Create the event loop and TCP listener we'll accept connections on.
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&addr, &handle).unwrap();
    println!("Listening on: {}", addr);

    // This is a single-threaded server, so we can just use Rc and RefCell to
    // store the map of all connections we know about.
    let connections = Rc::new(RefCell::new(HashMap::new()));

    let srv = socket.incoming().for_each(move |(stream, addr)| {
        println!("New Connection: {}", addr);

        // Create a channel for our stream, which other sockets will use to
        // send us messages. Then register our address with the stream to send
        // data to us.
        let (tx, rx) = tokio_core::channel::channel(&handle).unwrap();
        connections.borrow_mut().insert(addr, tx);

        // Note that below we're calling `spawn` to spawn a new future for this
        // connection. As a result we use `futures::lazy` here to ensure that
        // the call to `.split()` happens on the right task.
        //
        // This `split` will give us a read/write half to work with each portion
        // of the socket separately.
        let pair = futures::lazy(|| Ok(stream.split()));

        // Define here what we do for the actual I/O. That is, read a bunch of
        // lines from the socket and dispatch them while we also write any lines
        // from other sockets.
        let connections_inner = connections.clone();
        let pair = pair.map(move |(reader, writer)| {
            let reader = BufReader::new(reader);

            // Model the read portion of this socket by mapping an infinite
            // iterator to each line off the socket. This "loop" is then
            // terminated with an error once we hit EOF on the socket.
            let iter = stream::iter(iter::repeat(()).map(Ok));
            let socket_reader = iter.fold(reader, move |reader, _| {
                // Read a line off the socket, failing if we're at EOF
                let line = io::read_until(reader, b'\n', Vec::new());
                let line = line.and_then(|(reader, vec)| {
                    if vec.len() == 0 {
                        Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
                    } else {
                        Ok((reader, vec))
                    }
                });

                // Convert the bytes we read into a string, and then send that
                // string to all other connected clients.
                let line = line.map(|(reader, vec)| {
                    (reader, String::from_utf8(vec))
                });
                let connections = connections_inner.clone();
                line.map(move |(reader, message)| {
                    println!("{}: {:?}", addr, message);
                    let conns = connections.borrow_mut();
                    if let Ok(msg) = message {
                        // For each open connection except the sender, send the
                        // string via the channel.
                        let iter = conns.iter()
                                        .filter(|&(&k, _)| k != addr)
                                        .map(|(_, v)| v);
                        for tx in iter {
                            tx.send(format!("{}: {}", addr, msg)).unwrap();
                        }
                    } else {
                        let tx = conns.get(&addr).unwrap();
                        tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
                    }
                    reader
                })
            });

            // Whenever we receive a string on the Receiver, we write it to
            // `WriteHalf<TcpStream>`.
            let socket_writer = rx.fold(writer, |writer, msg| {
                let amt = io::write_all(writer, msg.into_bytes());
                let amt = amt.map(|(writer, _)| writer);
                amt
            });

            (socket_reader, socket_writer)
        });

        // Now that we've got futures representing each half of the socket, we
        // use the `select` combinator to wait for either half to be done to
        // tear down the other. Then we spawn off the result.
        let connections = connections.clone();
        let addr = addr;
        handle.spawn(pair.and_then(|(reader, writer)| {
            let reader = reader.map(|_| ());
            let writer = writer.map(|_| ());

            reader.select(writer)
        }).then(move |_| {
            connections.borrow_mut().remove(&addr);
            println!("Connection {} closed.", addr);
            Ok(())
        }));

        Ok(())
    });

    // execute server
    core.run(srv).unwrap();
}