diff options
author | Carl Lerche <me@carllerche.com> | 2018-02-21 07:42:22 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-21 07:42:22 -0800 |
commit | fe14e7b127b17a1e8f267f467843d2a7355c94be (patch) | |
tree | 7f0dfa97c5b35a67d52b57d9ef10b0dbdd43208a /tests | |
parent | e0d95aa037c354240ea495542b7afe29ad30e337 (diff) |
Introduce the Tokio runtime: Reactor + Threadpool (#141)
This patch is an intial implementation of the Tokio runtime. The Tokio
runtime provides an out of the box configuration for running I/O heavy
asynchronous applications.
As of now, the Tokio runtime is a combination of a work-stealing thread
pool as well as a background reactor to drive I/O resources.
This patch also includes tokio-executor, a hopefully short lived crate
that is based on the futures 0.2 executor RFC.
* Implement `Park` for `Reactor`
This enables the reactor to be used as the thread parker for executors.
This also adds an `Error` component to `Park`. With this change, a
`Reactor` and a `CurrentThread` can be combined to achieve the
capabilities of tokio-core.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/current_thread.rs | 262 | ||||
-rw-r--r-- | tests/global.rs | 3 | ||||
-rw-r--r-- | tests/runtime.rs | 52 |
3 files changed, 317 insertions, 0 deletions
diff --git a/tests/current_thread.rs b/tests/current_thread.rs new file mode 100644 index 00000000..926eee63 --- /dev/null +++ b/tests/current_thread.rs @@ -0,0 +1,262 @@ +extern crate tokio; +extern crate tokio_executor; +extern crate futures; + +use tokio::executor::current_thread::{self, block_on_all, CurrentThread}; + +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::thread; +use std::time::Duration; + +use futures::task; +use futures::future::{self, lazy}; +use futures::prelude::*; +use futures::sync::oneshot; + +#[test] +fn spawn_from_block_on_all() { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + let msg = current_thread::block_on_all(lazy(move || { + c.set(1 + c.get()); + + // Spawn! + current_thread::spawn(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + })); + + Ok::<_, ()>("hello") + })).unwrap(); + + assert_eq!(2, cnt.get()); + assert_eq!(msg, "hello"); +} + +#[test] +fn block_waits() { + let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + block_on_all(rx.then(move |_| { + cnt.set(1 + cnt.get()); + Ok::<_, ()>(()) + })).unwrap(); + + assert_eq!(1, cnt2.get()); +} + +#[test] +fn spawn_many() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + let mut current_thread = CurrentThread::new(); + + for _ in 0..ITER { + let cnt = cnt.clone(); + current_thread.spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok::<(), ()>(()) + })); + } + + current_thread.run().unwrap(); + + assert_eq!(cnt.get(), ITER); +} + +#[test] +fn does_not_set_global_executor_by_default() { + use tokio_executor::Executor; + + block_on_all(lazy(|| { + tokio_executor::DefaultExecutor::current() + .spawn(Box::new(lazy(|| ok()))) + .unwrap_err(); + + ok() + })).unwrap(); +} + +#[test] +fn spawn_from_block_on_future() { + let cnt = Rc::new(Cell::new(0)); + + let mut current_thread = CurrentThread::new(); + + current_thread.block_on(lazy(|| { + let cnt = cnt.clone(); + + current_thread::spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok(()) + })); + + Ok::<_, ()>(()) + })).unwrap(); + + current_thread.run().unwrap(); + + assert_eq!(1, cnt.get()); +} + +struct Never(Rc<()>); + +impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + +#[test] +fn outstanding_tasks_are_dropped_when_executor_is_dropped() { + let mut rc = Rc::new(()); + + let mut current_thread = CurrentThread::new(); + current_thread.spawn(Never(rc.clone())); + + drop(current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + + // Using the global spawn fn + + let mut rc = Rc::new(()); + + let mut current_thread = CurrentThread::new(); + + current_thread.block_on(lazy(|| { + current_thread::spawn(Never(rc.clone())); + Ok::<_, ()>(()) + })).unwrap(); + + drop(current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); +} + +#[test] +#[should_panic] +fn nesting_run() { + block_on_all(lazy(|| { + block_on_all(lazy(|| { + ok() + })).unwrap(); + + ok() + })).unwrap(); +} + +#[test] +#[should_panic] +fn run_in_future() { + block_on_all(lazy(|| { + current_thread::spawn(lazy(|| { + block_on_all(lazy(|| { + ok() + })).unwrap(); + ok() + })); + ok() + })).unwrap(); +} + +#[test] +fn tick_on_infini_future() { + let num = Rc::new(Cell::new(0)); + + struct Infini { + num: Rc<Cell<usize>>, + } + + impl Future for Infini { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.num.set(1 + self.num.get()); + task::current().notify(); + Ok(Async::NotReady) + } + } + + CurrentThread::new() + .spawn(Infini { + num: num.clone(), + }) + .turn(None) + .unwrap(); + + assert_eq!(1, num.get()); +} + +#[test] +fn tasks_are_scheduled_fairly() { + let state = Rc::new(RefCell::new([0, 0])); + + struct Spin { + state: Rc<RefCell<[i32; 2]>>, + idx: usize, + } + + impl Future for Spin { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut state = self.state.borrow_mut(); + + if self.idx == 0 { + let diff = state[0] - state[1]; + + assert!(diff.abs() <= 1); + + if state[0] >= 50 { + return Ok(().into()); + } + } + + state[self.idx] += 1; + + if state[self.idx] >= 100 { + return Ok(().into()); + } + + task::current().notify(); + Ok(Async::NotReady) + } + } + + block_on_all(lazy(|| { + current_thread::spawn(Spin { + state: state.clone(), + idx: 0, + }); + + current_thread::spawn(Spin { + state: state, + idx: 1, + }); + + ok() + })).unwrap(); +} + +fn ok() -> future::FutureResult<(), ()> { + future::ok(()) +} diff --git a/tests/global.rs b/tests/global.rs index bf5682fa..a863176b 100644 --- a/tests/global.rs +++ b/tests/global.rs @@ -1,5 +1,6 @@ extern crate futures; extern crate tokio; +extern crate env_logger; use std::thread; @@ -15,6 +16,8 @@ macro_rules! t { #[test] fn hammer() { + let _ = env_logger::init(); + let threads = (0..10).map(|_| { thread::spawn(|| { let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); diff --git a/tests/runtime.rs b/tests/runtime.rs new file mode 100644 index 00000000..ddda4860 --- /dev/null +++ b/tests/runtime.rs @@ -0,0 +1,52 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use futures::prelude::*; +use tokio::net::{TcpStream, TcpListener}; +use tokio_io::io; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn basic_runtime_usage() { + let _ = env_logger::init(); + + // TODO: Don't require the lazy wrapper + tokio::run(::futures::future::lazy(|| { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server.incoming().take(1) + .map_err(|e| println!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| println!("write done")) + .map_err(|e| println!("write err = {:?}", e)) + }) + }) + .map(|_| println!("accept done")); + + let client = client + .map_err(|e| println!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| println!("read done")) + .map_err(|e| println!("read err = {:?}", e)) + }); + + tokio::spawn({ + server.join(client) + .map(|_| println!("done")) + }) + })); +} |