diff options
Diffstat (limited to 'examples/tinydb.rs')
-rw-r--r-- | examples/tinydb.rs | 131 |
1 files changed, 63 insertions, 68 deletions
diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 0a68a314..6901a120 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -39,10 +39,9 @@ //! * `SET $key $value` - this will set the value of `$key` to `$value`, //! returning the previous value, if any. -extern crate futures; -extern crate futures_cpupool; +#![deny(warnings)] + extern crate tokio; -extern crate tokio_io; use std::collections::HashMap; use std::io::BufReader; @@ -50,12 +49,9 @@ use std::env; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; -use futures::prelude::*; -use futures::future::Executor; -use futures_cpupool::CpuPool; +use tokio::io::{lines, write_all}; use tokio::net::TcpListener; -use tokio_io::AsyncRead; -use tokio_io::io::{lines, write_all}; +use tokio::prelude::*; /// The in-memory database shared amongst all clients. /// @@ -86,9 +82,6 @@ fn main() { let listener = TcpListener::bind(&addr).expect("failed to bind"); println!("Listening on: {}", addr); - // Create a CpuPool to execute tasks - let pool = CpuPool::new(1); - // Create the shared state of this server that will be shared amongst all // clients. We populate the initial database and then create the `Database` // structure. Note the usage of `Arc` here which will be used to ensure that @@ -100,67 +93,69 @@ fn main() { map: Mutex::new(initial_db), }); - let done = listener.incoming().for_each(move |socket| { - // As with many other small examples, the first thing we'll do is - // *split* this TCP stream into two separately owned halves. This'll - // allow us to work with the read and write halves independently. - let (reader, writer) = socket.split(); - - // Since our protocol is line-based we use `tokio_io`'s `lines` utility - // to convert our stream of bytes, `reader`, into a `Stream` of lines. - let lines = lines(BufReader::new(reader)); - - // Here's where the meat of the processing in this server happens. First - // we see a clone of the database being created, which is creating a - // new reference for this connected client to use. Also note the `move` - // keyword on the closure here which moves ownership of the reference - // into the closure, which we'll need for spawning the client below. - // - // The `map` function here means that we'll run some code for all - // requests (lines) we receive from the client. The actual handling here - // is pretty simple, first we parse the request and if it's valid we - // generate a response based on the values in the database. - let db = db.clone(); - let responses = lines.map(move |line| { - let request = match Request::parse(&line) { - Ok(req) => req, - Err(e) => return Response::Error { msg: e }, - }; - - let mut db = db.map.lock().unwrap(); - match request { - Request::Get { key } => { - match db.get(&key) { - Some(value) => Response::Value { key, value: value.clone() }, - None => Response::Error { msg: format!("no key {}", key) }, + let done = listener.incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |socket| { + // As with many other small examples, the first thing we'll do is + // *split* this TCP stream into two separately owned halves. This'll + // allow us to work with the read and write halves independently. + let (reader, writer) = socket.split(); + + // Since our protocol is line-based we use `tokio_io`'s `lines` utility + // to convert our stream of bytes, `reader`, into a `Stream` of lines. + let lines = lines(BufReader::new(reader)); + + // Here's where the meat of the processing in this server happens. First + // we see a clone of the database being created, which is creating a + // new reference for this connected client to use. Also note the `move` + // keyword on the closure here which moves ownership of the reference + // into the closure, which we'll need for spawning the client below. + // + // The `map` function here means that we'll run some code for all + // requests (lines) we receive from the client. The actual handling here + // is pretty simple, first we parse the request and if it's valid we + // generate a response based on the values in the database. + let db = db.clone(); + let responses = lines.map(move |line| { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => { + match db.get(&key) { + Some(value) => Response::Value { key, value: value.clone() }, + None => Response::Error { msg: format!("no key {}", key) }, + } + } + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { key, value, previous } } } - Request::Set { key, value } => { - let previous = db.insert(key.clone(), value.clone()); - Response::Set { key, value, previous } - } - } - }); - - // At this point `responses` is a stream of `Response` types which we - // now want to write back out to the client. To do that we use - // `Stream::fold` to perform a loop here, serializing each response and - // then writing it out to the client. - let writes = responses.fold(writer, |writer, response| { - let mut response = response.serialize(); - response.push('\n'); - write_all(writer, response.into_bytes()).map(|(w, _)| w) + }); + + // At this point `responses` is a stream of `Response` types which we + // now want to write back out to the client. To do that we use + // `Stream::fold` to perform a loop here, serializing each response and + // then writing it out to the client. + let writes = responses.fold(writer, |writer, response| { + let mut response = response.serialize(); + response.push('\n'); + write_all(writer, response.into_bytes()).map(|(w, _)| w) + }); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients, for now ignoring any errors + // that we see. + let msg = writes.then(move |_| Ok(())); + + tokio::spawn(msg) }); - // Like with other small servers, we'll `spawn` this client to ensure it - // runs concurrently with all other clients, for now ignoring any errors - // that we see. - let msg = writes.then(move |_| Ok(())); - pool.execute(msg).unwrap(); - Ok(()) - }); - - done.wait().unwrap(); + tokio::run(done); } impl Request { |