summaryrefslogtreecommitdiffstats
path: root/examples/tinydb.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-06 09:59:04 -0800
committerGitHub <noreply@github.com>2018-03-06 09:59:04 -0800
commitf1cb12e14fb047f3f86c852c253962c60ce471e8 (patch)
tree59aab45a28961b00f7c71c5eb083c6242b51ff01 /examples/tinydb.rs
parent56c579787260abcb9786aa22cfca1ee4b7c3b5ba (diff)
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r--examples/tinydb.rs131
1 files changed, 63 insertions, 68 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 0a68a314..6901a120 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -39,10 +39,9 @@
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
//! returning the previous value, if any.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
-extern crate tokio_io;
use std::collections::HashMap;
use std::io::BufReader;
@@ -50,12 +49,9 @@ use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
-use futures::prelude::*;
-use futures::future::Executor;
-use futures_cpupool::CpuPool;
+use tokio::io::{lines, write_all};
use tokio::net::TcpListener;
-use tokio_io::AsyncRead;
-use tokio_io::io::{lines, write_all};
+use tokio::prelude::*;
/// The in-memory database shared amongst all clients.
///
@@ -86,9 +82,6 @@ fn main() {
let listener = TcpListener::bind(&addr).expect("failed to bind");
println!("Listening on: {}", addr);
- // Create a CpuPool to execute tasks
- let pool = CpuPool::new(1);
-
// Create the shared state of this server that will be shared amongst all
// clients. We populate the initial database and then create the `Database`
// structure. Note the usage of `Arc` here which will be used to ensure that
@@ -100,67 +93,69 @@ fn main() {
map: Mutex::new(initial_db),
});
- let done = listener.incoming().for_each(move |socket| {
- // As with many other small examples, the first thing we'll do is
- // *split* this TCP stream into two separately owned halves. This'll
- // allow us to work with the read and write halves independently.
- let (reader, writer) = socket.split();
-
- // Since our protocol is line-based we use `tokio_io`'s `lines` utility
- // to convert our stream of bytes, `reader`, into a `Stream` of lines.
- let lines = lines(BufReader::new(reader));
-
- // Here's where the meat of the processing in this server happens. First
- // we see a clone of the database being created, which is creating a
- // new reference for this connected client to use. Also note the `move`
- // keyword on the closure here which moves ownership of the reference
- // into the closure, which we'll need for spawning the client below.
- //
- // The `map` function here means that we'll run some code for all
- // requests (lines) we receive from the client. The actual handling here
- // is pretty simple, first we parse the request and if it's valid we
- // generate a response based on the values in the database.
- let db = db.clone();
- let responses = lines.map(move |line| {
- let request = match Request::parse(&line) {
- Ok(req) => req,
- Err(e) => return Response::Error { msg: e },
- };
-
- let mut db = db.map.lock().unwrap();
- match request {
- Request::Get { key } => {
- match db.get(&key) {
- Some(value) => Response::Value { key, value: value.clone() },
- None => Response::Error { msg: format!("no key {}", key) },
+ let done = listener.incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |socket| {
+ // As with many other small examples, the first thing we'll do is
+ // *split* this TCP stream into two separately owned halves. This'll
+ // allow us to work with the read and write halves independently.
+ let (reader, writer) = socket.split();
+
+ // Since our protocol is line-based we use `tokio_io`'s `lines` utility
+ // to convert our stream of bytes, `reader`, into a `Stream` of lines.
+ let lines = lines(BufReader::new(reader));
+
+ // Here's where the meat of the processing in this server happens. First
+ // we see a clone of the database being created, which is creating a
+ // new reference for this connected client to use. Also note the `move`
+ // keyword on the closure here which moves ownership of the reference
+ // into the closure, which we'll need for spawning the client below.
+ //
+ // The `map` function here means that we'll run some code for all
+ // requests (lines) we receive from the client. The actual handling here
+ // is pretty simple, first we parse the request and if it's valid we
+ // generate a response based on the values in the database.
+ let db = db.clone();
+ let responses = lines.map(move |line| {
+ let request = match Request::parse(&line) {
+ Ok(req) => req,
+ Err(e) => return Response::Error { msg: e },
+ };
+
+ let mut db = db.map.lock().unwrap();
+ match request {
+ Request::Get { key } => {
+ match db.get(&key) {
+ Some(value) => Response::Value { key, value: value.clone() },
+ None => Response::Error { msg: format!("no key {}", key) },
+ }
+ }
+ Request::Set { key, value } => {
+ let previous = db.insert(key.clone(), value.clone());
+ Response::Set { key, value, previous }
}
}
- Request::Set { key, value } => {
- let previous = db.insert(key.clone(), value.clone());
- Response::Set { key, value, previous }
- }
- }
- });
-
- // At this point `responses` is a stream of `Response` types which we
- // now want to write back out to the client. To do that we use
- // `Stream::fold` to perform a loop here, serializing each response and
- // then writing it out to the client.
- let writes = responses.fold(writer, |writer, response| {
- let mut response = response.serialize();
- response.push('\n');
- write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // At this point `responses` is a stream of `Response` types which we
+ // now want to write back out to the client. To do that we use
+ // `Stream::fold` to perform a loop here, serializing each response and
+ // then writing it out to the client.
+ let writes = responses.fold(writer, |writer, response| {
+ let mut response = response.serialize();
+ response.push('\n');
+ write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // Like with other small servers, we'll `spawn` this client to ensure it
+ // runs concurrently with all other clients, for now ignoring any errors
+ // that we see.
+ let msg = writes.then(move |_| Ok(()));
+
+ tokio::spawn(msg)
});
- // Like with other small servers, we'll `spawn` this client to ensure it
- // runs concurrently with all other clients, for now ignoring any errors
- // that we see.
- let msg = writes.then(move |_| Ok(()));
- pool.execute(msg).unwrap();
- Ok(())
- });
-
- done.wait().unwrap();
+ tokio::run(done);
}
impl Request {