diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 /examples/tinydb.rs | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r-- | examples/tinydb.rs | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs new file mode 100644 index 00000000..3fc88f6b --- /dev/null +++ b/examples/tinydb.rs @@ -0,0 +1,224 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![warn(rust_2018_idioms)] + +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LinesCodec}; + +use futures::{SinkExt, StreamExt}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::sync::{Arc, Mutex}; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. +struct Database { + map: Mutex<HashMap<String, String>>, +} + +/// Possible requests our clients can send us +enum Request { + Get { key: String }, + Set { key: String, value: String }, +} + +/// Responses to the `Request` commands above +enum Response { + Value { + key: String, + value: String, + }, + Set { + key: String, + value: String, + previous: Option<String>, + }, + Error { + msg: String, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + + let mut listener = TcpListener::bind(&addr).await?; + println!("Listening on: {}", addr); + + // Create the shared state of this server that will be shared amongst all + // clients. We populate the initial database and then create the `Database` + // structure. Note the usage of `Arc` here which will be used to ensure that + // each independently spawned client will have a reference to the in-memory + // database. + let mut initial_db = HashMap::new(); + initial_db.insert("foo".to_string(), "bar".to_string()); + let db = Arc::new(Database { + map: Mutex::new(initial_db), + }); + + loop { + match listener.accept().await { + Ok((socket, _)) => { + // After getting a new connection first we see a clone of the database + // being created, which is creating a new reference for this connected + // client to use. + let db = db.clone(); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients. The `move` keyword is used + // here to move ownership of our db handle into the async closure. + tokio::spawn(async move { + // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec` + // to convert our stream of bytes, `socket`, into a `Stream` of lines + // as well as convert our line based responses into a stream of bytes. + let mut lines = Framed::new(socket, LinesCodec::new()); + + // Here for every line we get back from the `Framed` decoder, + // we parse the request, and if it's valid we generate a response + // based on the values in the database. + while let Some(result) = lines.next().await { + match result { + Ok(line) => { + let response = handle_request(&line, &db); + + let response = response.serialize(); + + if let Err(e) = lines.send(response).await { + println!("error on sending response; error = {:?}", e); + } + } + Err(e) => { + println!("error on decoding from socket; error = {:?}", e); + } + } + } + + // The connection will be closed at this point as `lines.next()` has returned `None`. + }); + } + Err(e) => println!("error accepting socket; error = {:?}", e), + } + } +} + +fn handle_request(line: &str, db: &Arc<Database>) -> Response { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => match db.get(&key) { + Some(value) => Response::Value { + key, + value: value.clone(), + }, + None => Response::Error { + msg: format!("no key {}", key), + }, + }, + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { + key, + value, + previous, + } + } + } +} + +impl Request { + fn parse(input: &str) -> Result<Request, String> { + let mut parts = input.splitn(3, " "); + match parts.next() { + Some("GET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("GET must be followed by a key")), + }; + if parts.next().is_some() { + return Err(format!("GET's key must not be followed by anything")); + } + Ok(Request::Get { + key: key.to_string(), + }) + } + Some("SET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("SET must be followed by a key")), + }; + let value = match parts.next() { + Some(value) => value, + None => return Err(format!("SET needs a value")), + }; + Ok(Request::Set { + key: key.to_string(), + value: value.to_string(), + }) + } + Some(cmd) => Err(format!("unknown command: {}", cmd)), + None => Err(format!("empty input")), + } + } +} + +impl Response { + fn serialize(&self) -> String { + match *self { + Response::Value { ref key, ref value } => format!("{} = {}", key, value), + Response::Set { + ref key, + ref value, + ref previous, + } => format!("set {} = `{}`, previous: {:?}", key, value, previous), + Response::Error { ref msg } => format!("error: {}", msg), + } + } +} |