summaryrefslogtreecommitdiffstats
path: root/examples/tinydb.rs
blob: 134d01b15a50d093b34681112fadef64e168c8fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//! A "tiny database" and accompanying protocol
//!
//! This example shows the usage of shared state amongst all connected clients,
//! namely a database of key/value pairs. Each connected client can send a
//! series of GET/SET commands to query the current value of a key or set the
//! value of a key.
//!
//! This example has a simple protocol you can use to interact with the server.
//! To run, first run this in one terminal window:
//!
//!     cargo run --example tinydb
//!
//! and next in another windows run:
//!
//!     cargo run --example connect 127.0.0.1:8080
//!
//! In the `connect` window you can type in commands where when you hit enter
//! you'll get a response from the server for that command. An example session
//! is:
//!
//!
//!     $ cargo run --example connect 127.0.0.1:8080
//!     GET foo
//!     foo = bar
//!     GET FOOBAR
//!     error: no key FOOBAR
//!     SET FOOBAR my awesome string
//!     set FOOBAR = `my awesome string`, previous: None
//!     SET foo tokio
//!     set foo = `tokio`, previous: Some("bar")
//!     GET foo
//!     foo = tokio
//!
//! Namely you can issue two forms of commands:
//!
//! * `GET $key` - this will fetch the value of `$key` from the database and
//!   return it. The server's database is initially populated with the key `foo`
//!   set to the value `bar`
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
//!   returning the previous value, if any.

#![deny(warnings)]

extern crate tokio;

use std::collections::HashMap;
use std::io::BufReader;
use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};

use tokio::io::{lines, write_all};
use tokio::net::TcpListener;
use tokio::prelude::*;

/// The in-memory database shared amongst all clients.
///
/// This database will be shared via `Arc`, so to mutate the internal map we're
/// going to use a `Mutex` for interior mutability.
struct Database {
    map: Mutex<HashMap<String, String>>,
}

/// Possible requests our clients can send us
enum Request {
    Get { key: String },
    Set { key: String, value: String },
}

/// Responses to the `Request` commands above
enum Response {
    Value { key: String, value: String },
    Set { key: String, value: String, previous: Option<String> },
    Error { msg: String },
}

fn main() {
    // Parse the address we're going to run this server on
    // and set up our TCP listener to accept connections.
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();
    let listener = TcpListener::bind(&addr).expect("failed to bind");
    println!("Listening on: {}", addr);

    // Create the shared state of this server that will be shared amongst all
    // clients. We populate the initial database and then create the `Database`
    // structure. Note the usage of `Arc` here which will be used to ensure that
    // each independently spawned client will have a reference to the in-memory
    // database.
    let mut initial_db = HashMap::new();
    initial_db.insert("foo".to_string(), "bar".to_string());
    let db = Arc::new(Database {
        map: Mutex::new(initial_db),
    });

    let done = listener.incoming()
        .map_err(|e| println!("error accepting socket; error = {:?}", e))
        .for_each(move |socket| {
            // As with many other small examples, the first thing we'll do is
            // *split* this TCP stream into two separately owned halves. This'll
            // allow us to work with the read and write halves independently.
            let (reader, writer) = socket.split();

            // Since our protocol is line-based we use `tokio_io`'s `lines` utility
            // to convert our stream of bytes, `reader`, into a `Stream` of lines.
            let lines = lines(BufReader::new(reader));

            // Here's where the meat of the processing in this server happens. First
            // we see a clone of the database being created, which is creating a
            // new reference for this connected client to use. Also note the `move`
            // keyword on the closure here which moves ownership of the reference
            // into the closure, which we'll need for spawning the client below.
            //
            // The `map` function here means that we'll run some code for all
            // requests (lines) we receive from the client. The actual handling here
            // is pretty simple, first we parse the request and if it's valid we
            // generate a response based on the values in the database.
            let db = db.clone();
            let responses = lines.map(move |line| {
                let request = match Request::parse(&line) {
                    Ok(req) => req,
                    Err(e) => return Response::Error { msg: e },
                };

                let mut db = db.map.lock().unwrap();
                match request {
                    Request::Get { key } => {
                        match db.get(&key) {
                            Some(value) => Response::Value { key, value: value.clone() },
                            None => Response::Error { msg: format!("no key {}", key) },
                        }
                    }
                    Request::Set { key, value } => {
                        let previous = db.insert(key.clone(), value.clone());
                        Response::Set { key, value, previous }
                    }
                }
            });

            // At this point `responses` is a stream of `Response` types which we
            // now want to write back out to the client. To do that we use
            // `Stream::fold` to perform a loop here, serializing each response and
            // then writing it out to the client.
            let writes = responses.fold(writer, |writer, response| {
                let mut response = response.serialize();
                response.push('\n');
                write_all(writer, response.into_bytes()).map(|(w, _)| w)
            });

            // Like with other small servers, we'll `spawn` this client to ensure it
            // runs concurrently with all other clients, for now ignoring any errors
            // that we see.
            let msg = writes.then(move |_| Ok(()));

            tokio::spawn(msg)
        });

    tokio::run(done);
}

impl Request {
    fn parse(input: &str) -> Result<Request, String> {
        let mut parts = input.splitn(3, " ");
        match parts.next() {
            Some("GET") => {
                let key = match parts.next() {
                    Some(key) => key,
                    None => return Err(format!("GET must be followed by a key")),
                };
                if parts.next().is_some() {
                    return Err(format!("GET's key must not be followed by anything"))
                }
                Ok(Request::Get { key: key.to_string() })
            }
            Some("SET") => {
                let key = match parts.next() {
                    Some(key) => key,
                    None => return Err(format!("SET must be followed by a key")),
                };
                let value = match parts.next() {
                    Some(value) => value,
                    None => return Err(format!("SET needs a value")),
                };
                Ok(Request::Set { key: key.to_string(), value: value.to_string() })
            }
            Some(cmd) => Err(format!("unknown command: {}", cmd)),
            None => Err(format!("empty input")),
        }
    }
}

impl Response {
    fn serialize(&self) -> String {
        match *self {
            Response::Value { ref key, ref value } => {
                format!("{} = {}", key, value)
            }
            Response::Set { ref key, ref value, ref previous } => {
                format!("set {} = `{}`, previous: {:?}", key, value, previous)
            }
            Response::Error { ref msg } => {
                format!("error: {}", msg)
            }
        }
    }
}