summaryrefslogtreecommitdiffstats
path: root/examples/tinydb.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2017-09-23 12:22:34 -0700
committerAlex Crichton <alex@alexcrichton.com>2017-09-23 12:22:34 -0700
commit0868b97832277656c8f361b8aed43d4f373b804f (patch)
tree95c005131006f604846a39e248142b3443bed2cf /examples/tinydb.rs
parentfbd0a9e5f1a5b425bf04bbda2f623d67e848fb0b (diff)
Add a `tinydb` example sharing state
This example is intended to showcase sharing state between all connected clients on a server, for example a key/value store (in-memory database) Closes #257
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r--examples/tinydb.rs209
1 files changed, 209 insertions, 0 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
new file mode 100644
index 00000000..fe7865c3
--- /dev/null
+++ b/examples/tinydb.rs
@@ -0,0 +1,209 @@
+//! A "tiny database" and accompanying protocol
+//!
+//! This example shows the usage of shared state amongst all connected clients,
+//! namely a database of key/value pairs. Each connected client can send a
+//! series of GET/SET commands to query the current value of a key or set the
+//! value of a key.
+//!
+//! This example has a simple protocol you can use to interact with the server.
+//! To run, first run this in one terminal window:
+//!
+//! cargo run --example tinydb
+//!
+//! and next in another windows run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! In the `connect` window you can type in commands where when you hit enter
+//! you'll get a response from the server for that command. An example session
+//! is:
+//!
+//!
+//! $ cargo run --example connect 127.0.0.1:8080
+//! GET foo
+//! foo = bar
+//! GET FOOBAR
+//! error: no key FOOBAR
+//! SET FOOBAR my awesome string
+//! set FOOBAR = `my awesome string`, previous: None
+//! SET foo tokio
+//! set foo = `tokio`, previous: Some("bar")
+//! GET foo
+//! foo = tokio
+//!
+//! Namely you can issue two forms of commands:
+//!
+//! * `GET $key` - this will fetch the value of `$key` from the database and
+//! return it. The server's database is initially populated with the key `foo`
+//! set to the value `bar`
+//! * `SET $key $value` - this will set the value of `$key` to `$value`,
+//! returning the previous value, if any.
+
+extern crate futures;
+extern crate tokio_core;
+extern crate tokio_io;
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::io::BufReader;
+use std::rc::Rc;
+use std::env;
+use std::net::SocketAddr;
+
+use futures::prelude::*;
+use tokio_core::net::TcpListener;
+use tokio_core::reactor::Core;
+use tokio_io::AsyncRead;
+use tokio_io::io::{lines, write_all};
+
+/// The in-memory database shared amongst all clients.
+///
+/// This database will be shared via `Rc`, so to mutate the internal map we're
+/// also going to use a `RefCell` for interior mutability.
+struct Database {
+ map: RefCell<HashMap<String, String>>,
+}
+
+/// Possible requests our clients can send us
+enum Request {
+ Get { key: String },
+ Set { key: String, value: String },
+}
+
+/// Responses to the `Request` commands above
+enum Response {
+ Value { key: String, value: String },
+ Set { key: String, value: String, previous: Option<String> },
+ Error { msg: String },
+}
+
+fn main() {
+ // Parse the address we're going to run this server on, create a `Core`, and
+ // set up our TCP listener to accept connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>().unwrap();
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+ let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
+ println!("Listening on: {}", addr);
+
+ // Create the shared state of this server that will be shared amongst all
+ // clients. We populate the initial database and then create the `Database`
+ // structure. Note the usage of `Rc` here which will be used to ensure that
+ // each independently spawned client will have a reference to the in-memory
+ // database.
+ let mut initial_db = HashMap::new();
+ initial_db.insert("foo".to_string(), "bar".to_string());
+ let db = Rc::new(Database {
+ map: RefCell::new(initial_db),
+ });
+
+ let done = listener.incoming().for_each(move |(socket, _addr)| {
+ // As with many other small examples, the first thing we'll do is
+ // *split* this TCP stream into two separately owned halves. This'll
+ // allow us to work with the read and write halves independently.
+ let (reader, writer) = socket.split();
+
+ // Since our protocol is line-based we use `tokio_io`'s `lines` utility
+ // to convert our stream of bytes, `reader`, into a `Stream` of lines.
+ let lines = lines(BufReader::new(reader));
+
+ // Here's where the meat of the processing in this server happens. First
+ // we see a clone of the database being created, which is creating a
+ // new reference for this connected client to use. Also note the `move`
+ // keyword on the closure here which moves ownership of the reference
+ // into the closure, which we'll need for spawning the client below.
+ //
+ // The `map` function here means that we'll run some code for all
+ // requests (lines) we receive from the client. The actual handling here
+ // is pretty simple, first we parse the request and if it's valid we
+ // generate a response based on the values in the database.
+ let db = db.clone();
+ let responses = lines.map(move |line| {
+ let request = match Request::parse(&line) {
+ Ok(req) => req,
+ Err(e) => return Response::Error { msg: e },
+ };
+
+ let mut db = db.map.borrow_mut();
+ match request {
+ Request::Get { key } => {
+ match db.get(&key) {
+ Some(value) => Response::Value { key, value: value.clone() },
+ None => Response::Error { msg: format!("no key {}", key) },
+ }
+ }
+ Request::Set { key, value } => {
+ let previous = db.insert(key.clone(), value.clone());
+ Response::Set { key, value, previous }
+ }
+ }
+ });
+
+ // At this point `responses` is a stream of `Response` types which we
+ // now want to write back out to the client. To do that we use
+ // `Stream::fold` to perform a loop here, serializing each response and
+ // then writing it out to the client.
+ let writes = responses.fold(writer, |writer, response| {
+ let mut response = response.serialize();
+ response.push('\n');
+ write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // Like with other small servers, we'll `spawn` this client to ensure it
+ // runs concurrently with all other clients, for now ignoring any errors
+ // that we see.
+ let msg = writes.then(move |_| Ok(()));
+ handle.spawn(msg);
+ Ok(())
+ });
+
+ core.run(done).unwrap();
+}
+
+impl Request {
+ fn parse(input: &str) -> Result<Request, String> {
+ let mut parts = input.splitn(3, " ");
+ match parts.next() {
+ Some("GET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("GET must be followed by a key")),
+ };
+ if parts.next().is_some() {
+ return Err(format!("GET's key must not be followed by anything"))
+ }
+ Ok(Request::Get { key: key.to_string() })
+ }
+ Some("SET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("SET must be followed by a key")),
+ };
+ let value = match parts.next() {
+ Some(value) => value,
+ None => return Err(format!("SET needs a value")),
+ };
+ Ok(Request::Set { key: key.to_string(), value: value.to_string() })
+ }
+ Some(cmd) => Err(format!("unknown command: {}", cmd)),
+ None => Err(format!("empty input")),
+ }
+ }
+}
+
+impl Response {
+ fn serialize(&self) -> String {
+ match *self {
+ Response::Value { ref key, ref value } => {
+ format!("{} = {}", key, value)
+ }
+ Response::Set { ref key, ref value, ref previous } => {
+ format!("set {} = `{}`, previous: {:?}", key, value, previous)
+ }
+ Response::Error { ref msg } => {
+ format!("error: {}", msg)
+ }
+ }
+ }
+}