summaryrefslogtreecommitdiffstats
path: root/examples/tinydb.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r--examples/tinydb.rs131
1 files changed, 63 insertions, 68 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 0a68a314..6901a120 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -39,10 +39,9 @@
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
//! returning the previous value, if any.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
-extern crate tokio_io;
use std::collections::HashMap;
use std::io::BufReader;
@@ -50,12 +49,9 @@ use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
-use futures::prelude::*;
-use futures::future::Executor;
-use futures_cpupool::CpuPool;
+use tokio::io::{lines, write_all};
use tokio::net::TcpListener;
-use tokio_io::AsyncRead;
-use tokio_io::io::{lines, write_all};
+use tokio::prelude::*;
/// The in-memory database shared amongst all clients.
///
@@ -86,9 +82,6 @@ fn main() {
let listener = TcpListener::bind(&addr).expect("failed to bind");
println!("Listening on: {}", addr);
- // Create a CpuPool to execute tasks
- let pool = CpuPool::new(1);
-
// Create the shared state of this server that will be shared amongst all
// clients. We populate the initial database and then create the `Database`
// structure. Note the usage of `Arc` here which will be used to ensure that
@@ -100,67 +93,69 @@ fn main() {
map: Mutex::new(initial_db),
});
- let done = listener.incoming().for_each(move |socket| {
- // As with many other small examples, the first thing we'll do is
- // *split* this TCP stream into two separately owned halves. This'll
- // allow us to work with the read and write halves independently.
- let (reader, writer) = socket.split();
-
- // Since our protocol is line-based we use `tokio_io`'s `lines` utility
- // to convert our stream of bytes, `reader`, into a `Stream` of lines.
- let lines = lines(BufReader::new(reader));
-
- // Here's where the meat of the processing in this server happens. First
- // we see a clone of the database being created, which is creating a
- // new reference for this connected client to use. Also note the `move`
- // keyword on the closure here which moves ownership of the reference
- // into the closure, which we'll need for spawning the client below.
- //
- // The `map` function here means that we'll run some code for all
- // requests (lines) we receive from the client. The actual handling here
- // is pretty simple, first we parse the request and if it's valid we
- // generate a response based on the values in the database.
- let db = db.clone();
- let responses = lines.map(move |line| {
- let request = match Request::parse(&line) {
- Ok(req) => req,
- Err(e) => return Response::Error { msg: e },
- };
-
- let mut db = db.map.lock().unwrap();
- match request {
- Request::Get { key } => {
- match db.get(&key) {
- Some(value) => Response::Value { key, value: value.clone() },
- None => Response::Error { msg: format!("no key {}", key) },
+ let done = listener.incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |socket| {
+ // As with many other small examples, the first thing we'll do is
+ // *split* this TCP stream into two separately owned halves. This'll
+ // allow us to work with the read and write halves independently.
+ let (reader, writer) = socket.split();
+
+ // Since our protocol is line-based we use `tokio_io`'s `lines` utility
+ // to convert our stream of bytes, `reader`, into a `Stream` of lines.
+ let lines = lines(BufReader::new(reader));
+
+ // Here's where the meat of the processing in this server happens. First
+ // we see a clone of the database being created, which is creating a
+ // new reference for this connected client to use. Also note the `move`
+ // keyword on the closure here which moves ownership of the reference
+ // into the closure, which we'll need for spawning the client below.
+ //
+ // The `map` function here means that we'll run some code for all
+ // requests (lines) we receive from the client. The actual handling here
+ // is pretty simple, first we parse the request and if it's valid we
+ // generate a response based on the values in the database.
+ let db = db.clone();
+ let responses = lines.map(move |line| {
+ let request = match Request::parse(&line) {
+ Ok(req) => req,
+ Err(e) => return Response::Error { msg: e },
+ };
+
+ let mut db = db.map.lock().unwrap();
+ match request {
+ Request::Get { key } => {
+ match db.get(&key) {
+ Some(value) => Response::Value { key, value: value.clone() },
+ None => Response::Error { msg: format!("no key {}", key) },
+ }
+ }
+ Request::Set { key, value } => {
+ let previous = db.insert(key.clone(), value.clone());
+ Response::Set { key, value, previous }
}
}
- Request::Set { key, value } => {
- let previous = db.insert(key.clone(), value.clone());
- Response::Set { key, value, previous }
- }
- }
- });
-
- // At this point `responses` is a stream of `Response` types which we
- // now want to write back out to the client. To do that we use
- // `Stream::fold` to perform a loop here, serializing each response and
- // then writing it out to the client.
- let writes = responses.fold(writer, |writer, response| {
- let mut response = response.serialize();
- response.push('\n');
- write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // At this point `responses` is a stream of `Response` types which we
+ // now want to write back out to the client. To do that we use
+ // `Stream::fold` to perform a loop here, serializing each response and
+ // then writing it out to the client.
+ let writes = responses.fold(writer, |writer, response| {
+ let mut response = response.serialize();
+ response.push('\n');
+ write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // Like with other small servers, we'll `spawn` this client to ensure it
+ // runs concurrently with all other clients, for now ignoring any errors
+ // that we see.
+ let msg = writes.then(move |_| Ok(()));
+
+ tokio::spawn(msg)
});
- // Like with other small servers, we'll `spawn` this client to ensure it
- // runs concurrently with all other clients, for now ignoring any errors
- // that we see.
- let msg = writes.then(move |_| Ok(()));
- pool.execute(msg).unwrap();
- Ok(())
- });
-
- done.wait().unwrap();
+ tokio::run(done);
}
impl Request {