summaryrefslogtreecommitdiffstats
path: root/examples/tinydb.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-22 10:13:49 -0700
committerGitHub <noreply@github.com>2019-10-22 10:13:49 -0700
commitcfc15617a5247ea780c32c85b7134b88b6de5845 (patch)
treeef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples/tinydb.rs
parentb8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff)
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new `tokio-util` crate. This crate will mirror `tokio` and provide additional APIs that may require a greater rate of breaking changes. As examples require `tokio-util`, they are moved into a separate crate (`examples`). This has the added advantage of being able to avoid example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r--examples/tinydb.rs224
1 files changed, 224 insertions, 0 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
new file mode 100644
index 00000000..3fc88f6b
--- /dev/null
+++ b/examples/tinydb.rs
@@ -0,0 +1,224 @@
+//! A "tiny database" and accompanying protocol
+//!
+//! This example shows the usage of shared state amongst all connected clients,
+//! namely a database of key/value pairs. Each connected client can send a
+//! series of GET/SET commands to query the current value of a key or set the
+//! value of a key.
+//!
+//! This example has a simple protocol you can use to interact with the server.
+//! To run, first run this in one terminal window:
+//!
+//! cargo run --example tinydb
+//!
+//! and next in another windows run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! In the `connect` window you can type in commands where when you hit enter
+//! you'll get a response from the server for that command. An example session
+//! is:
+//!
+//!
+//! $ cargo run --example connect 127.0.0.1:8080
+//! GET foo
+//! foo = bar
+//! GET FOOBAR
+//! error: no key FOOBAR
+//! SET FOOBAR my awesome string
+//! set FOOBAR = `my awesome string`, previous: None
+//! SET foo tokio
+//! set foo = `tokio`, previous: Some("bar")
+//! GET foo
+//! foo = tokio
+//!
+//! Namely you can issue two forms of commands:
+//!
+//! * `GET $key` - this will fetch the value of `$key` from the database and
+//! return it. The server's database is initially populated with the key `foo`
+//! set to the value `bar`
+//! * `SET $key $value` - this will set the value of `$key` to `$value`,
+//! returning the previous value, if any.
+
+#![warn(rust_2018_idioms)]
+
+use tokio::net::TcpListener;
+use tokio_util::codec::{Framed, LinesCodec};
+
+use futures::{SinkExt, StreamExt};
+use std::collections::HashMap;
+use std::env;
+use std::error::Error;
+use std::sync::{Arc, Mutex};
+
+/// The in-memory database shared amongst all clients.
+///
+/// This database will be shared via `Arc`, so to mutate the internal map we're
+/// going to use a `Mutex` for interior mutability.
+struct Database {
+ map: Mutex<HashMap<String, String>>,
+}
+
+/// Possible requests our clients can send us
+enum Request {
+ Get { key: String },
+ Set { key: String, value: String },
+}
+
+/// Responses to the `Request` commands above
+enum Response {
+ Value {
+ key: String,
+ value: String,
+ },
+ Set {
+ key: String,
+ value: String,
+ previous: Option<String>,
+ },
+ Error {
+ msg: String,
+ },
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ // Parse the address we're going to run this server on
+ // and set up our TCP listener to accept connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+
+ let mut listener = TcpListener::bind(&addr).await?;
+ println!("Listening on: {}", addr);
+
+ // Create the shared state of this server that will be shared amongst all
+ // clients. We populate the initial database and then create the `Database`
+ // structure. Note the usage of `Arc` here which will be used to ensure that
+ // each independently spawned client will have a reference to the in-memory
+ // database.
+ let mut initial_db = HashMap::new();
+ initial_db.insert("foo".to_string(), "bar".to_string());
+ let db = Arc::new(Database {
+ map: Mutex::new(initial_db),
+ });
+
+ loop {
+ match listener.accept().await {
+ Ok((socket, _)) => {
+ // After getting a new connection first we see a clone of the database
+ // being created, which is creating a new reference for this connected
+ // client to use.
+ let db = db.clone();
+
+ // Like with other small servers, we'll `spawn` this client to ensure it
+ // runs concurrently with all other clients. The `move` keyword is used
+ // here to move ownership of our db handle into the async closure.
+ tokio::spawn(async move {
+ // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec`
+ // to convert our stream of bytes, `socket`, into a `Stream` of lines
+ // as well as convert our line based responses into a stream of bytes.
+ let mut lines = Framed::new(socket, LinesCodec::new());
+
+ // Here for every line we get back from the `Framed` decoder,
+ // we parse the request, and if it's valid we generate a response
+ // based on the values in the database.
+ while let Some(result) = lines.next().await {
+ match result {
+ Ok(line) => {
+ let response = handle_request(&line, &db);
+
+ let response = response.serialize();
+
+ if let Err(e) = lines.send(response).await {
+ println!("error on sending response; error = {:?}", e);
+ }
+ }
+ Err(e) => {
+ println!("error on decoding from socket; error = {:?}", e);
+ }
+ }
+ }
+
+ // The connection will be closed at this point as `lines.next()` has returned `None`.
+ });
+ }
+ Err(e) => println!("error accepting socket; error = {:?}", e),
+ }
+ }
+}
+
+fn handle_request(line: &str, db: &Arc<Database>) -> Response {
+ let request = match Request::parse(&line) {
+ Ok(req) => req,
+ Err(e) => return Response::Error { msg: e },
+ };
+
+ let mut db = db.map.lock().unwrap();
+ match request {
+ Request::Get { key } => match db.get(&key) {
+ Some(value) => Response::Value {
+ key,
+ value: value.clone(),
+ },
+ None => Response::Error {
+ msg: format!("no key {}", key),
+ },
+ },
+ Request::Set { key, value } => {
+ let previous = db.insert(key.clone(), value.clone());
+ Response::Set {
+ key,
+ value,
+ previous,
+ }
+ }
+ }
+}
+
+impl Request {
+ fn parse(input: &str) -> Result<Request, String> {
+ let mut parts = input.splitn(3, " ");
+ match parts.next() {
+ Some("GET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("GET must be followed by a key")),
+ };
+ if parts.next().is_some() {
+ return Err(format!("GET's key must not be followed by anything"));
+ }
+ Ok(Request::Get {
+ key: key.to_string(),
+ })
+ }
+ Some("SET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("SET must be followed by a key")),
+ };
+ let value = match parts.next() {
+ Some(value) => value,
+ None => return Err(format!("SET needs a value")),
+ };
+ Ok(Request::Set {
+ key: key.to_string(),
+ value: value.to_string(),
+ })
+ }
+ Some(cmd) => Err(format!("unknown command: {}", cmd)),
+ None => Err(format!("empty input")),
+ }
+ }
+}
+
+impl Response {
+ fn serialize(&self) -> String {
+ match *self {
+ Response::Value { ref key, ref value } => format!("{} = {}", key, value),
+ Response::Set {
+ ref key,
+ ref value,
+ ref previous,
+ } => format!("set {} = `{}`, previous: {:?}", key, value, previous),
+ Response::Error { ref msg } => format!("error: {}", msg),
+ }
+ }
+}