summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-02-21 07:42:22 -0800
committerGitHub <noreply@github.com>2018-02-21 07:42:22 -0800
commitfe14e7b127b17a1e8f267f467843d2a7355c94be (patch)
tree7f0dfa97c5b35a67d52b57d9ef10b0dbdd43208a /tests
parente0d95aa037c354240ea495542b7afe29ad30e337 (diff)
Introduce the Tokio runtime: Reactor + Threadpool (#141)
This patch is an intial implementation of the Tokio runtime. The Tokio runtime provides an out of the box configuration for running I/O heavy asynchronous applications. As of now, the Tokio runtime is a combination of a work-stealing thread pool as well as a background reactor to drive I/O resources. This patch also includes tokio-executor, a hopefully short lived crate that is based on the futures 0.2 executor RFC. * Implement `Park` for `Reactor` This enables the reactor to be used as the thread parker for executors. This also adds an `Error` component to `Park`. With this change, a `Reactor` and a `CurrentThread` can be combined to achieve the capabilities of tokio-core.
Diffstat (limited to 'tests')
-rw-r--r--tests/current_thread.rs262
-rw-r--r--tests/global.rs3
-rw-r--r--tests/runtime.rs52
3 files changed, 317 insertions, 0 deletions
diff --git a/tests/current_thread.rs b/tests/current_thread.rs
new file mode 100644
index 00000000..926eee63
--- /dev/null
+++ b/tests/current_thread.rs
@@ -0,0 +1,262 @@
+extern crate tokio;
+extern crate tokio_executor;
+extern crate futures;
+
+use tokio::executor::current_thread::{self, block_on_all, CurrentThread};
+
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::thread;
+use std::time::Duration;
+
+use futures::task;
+use futures::future::{self, lazy};
+use futures::prelude::*;
+use futures::sync::oneshot;
+
+#[test]
+fn spawn_from_block_on_all() {
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
+
+ let msg = current_thread::block_on_all(lazy(move || {
+ c.set(1 + c.get());
+
+ // Spawn!
+ current_thread::spawn(lazy(move || {
+ c.set(1 + c.get());
+ Ok::<(), ()>(())
+ }));
+
+ Ok::<_, ()>("hello")
+ })).unwrap();
+
+ assert_eq!(2, cnt.get());
+ assert_eq!(msg, "hello");
+}
+
+#[test]
+fn block_waits() {
+ let cnt = Rc::new(Cell::new(0));
+ let cnt2 = cnt.clone();
+
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(|| {
+ thread::sleep(Duration::from_millis(1000));
+ tx.send(()).unwrap();
+ });
+
+ block_on_all(rx.then(move |_| {
+ cnt.set(1 + cnt.get());
+ Ok::<_, ()>(())
+ })).unwrap();
+
+ assert_eq!(1, cnt2.get());
+}
+
+#[test]
+fn spawn_many() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+ let mut current_thread = CurrentThread::new();
+
+ for _ in 0..ITER {
+ let cnt = cnt.clone();
+ current_thread.spawn(lazy(move || {
+ cnt.set(1 + cnt.get());
+ Ok::<(), ()>(())
+ }));
+ }
+
+ current_thread.run().unwrap();
+
+ assert_eq!(cnt.get(), ITER);
+}
+
+#[test]
+fn does_not_set_global_executor_by_default() {
+ use tokio_executor::Executor;
+
+ block_on_all(lazy(|| {
+ tokio_executor::DefaultExecutor::current()
+ .spawn(Box::new(lazy(|| ok())))
+ .unwrap_err();
+
+ ok()
+ })).unwrap();
+}
+
+#[test]
+fn spawn_from_block_on_future() {
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut current_thread = CurrentThread::new();
+
+ current_thread.block_on(lazy(|| {
+ let cnt = cnt.clone();
+
+ current_thread::spawn(lazy(move || {
+ cnt.set(1 + cnt.get());
+ Ok(())
+ }));
+
+ Ok::<_, ()>(())
+ })).unwrap();
+
+ current_thread.run().unwrap();
+
+ assert_eq!(1, cnt.get());
+}
+
+struct Never(Rc<()>);
+
+impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+}
+
+#[test]
+fn outstanding_tasks_are_dropped_when_executor_is_dropped() {
+ let mut rc = Rc::new(());
+
+ let mut current_thread = CurrentThread::new();
+ current_thread.spawn(Never(rc.clone()));
+
+ drop(current_thread);
+
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
+
+ // Using the global spawn fn
+
+ let mut rc = Rc::new(());
+
+ let mut current_thread = CurrentThread::new();
+
+ current_thread.block_on(lazy(|| {
+ current_thread::spawn(Never(rc.clone()));
+ Ok::<_, ()>(())
+ })).unwrap();
+
+ drop(current_thread);
+
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
+}
+
+#[test]
+#[should_panic]
+fn nesting_run() {
+ block_on_all(lazy(|| {
+ block_on_all(lazy(|| {
+ ok()
+ })).unwrap();
+
+ ok()
+ })).unwrap();
+}
+
+#[test]
+#[should_panic]
+fn run_in_future() {
+ block_on_all(lazy(|| {
+ current_thread::spawn(lazy(|| {
+ block_on_all(lazy(|| {
+ ok()
+ })).unwrap();
+ ok()
+ }));
+ ok()
+ })).unwrap();
+}
+
+#[test]
+fn tick_on_infini_future() {
+ let num = Rc::new(Cell::new(0));
+
+ struct Infini {
+ num: Rc<Cell<usize>>,
+ }
+
+ impl Future for Infini {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ self.num.set(1 + self.num.get());
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+
+ CurrentThread::new()
+ .spawn(Infini {
+ num: num.clone(),
+ })
+ .turn(None)
+ .unwrap();
+
+ assert_eq!(1, num.get());
+}
+
+#[test]
+fn tasks_are_scheduled_fairly() {
+ let state = Rc::new(RefCell::new([0, 0]));
+
+ struct Spin {
+ state: Rc<RefCell<[i32; 2]>>,
+ idx: usize,
+ }
+
+ impl Future for Spin {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let mut state = self.state.borrow_mut();
+
+ if self.idx == 0 {
+ let diff = state[0] - state[1];
+
+ assert!(diff.abs() <= 1);
+
+ if state[0] >= 50 {
+ return Ok(().into());
+ }
+ }
+
+ state[self.idx] += 1;
+
+ if state[self.idx] >= 100 {
+ return Ok(().into());
+ }
+
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+
+ block_on_all(lazy(|| {
+ current_thread::spawn(Spin {
+ state: state.clone(),
+ idx: 0,
+ });
+
+ current_thread::spawn(Spin {
+ state: state,
+ idx: 1,
+ });
+
+ ok()
+ })).unwrap();
+}
+
+fn ok() -> future::FutureResult<(), ()> {
+ future::ok(())
+}
diff --git a/tests/global.rs b/tests/global.rs
index bf5682fa..a863176b 100644
--- a/tests/global.rs
+++ b/tests/global.rs
@@ -1,5 +1,6 @@
extern crate futures;
extern crate tokio;
+extern crate env_logger;
use std::thread;
@@ -15,6 +16,8 @@ macro_rules! t {
#[test]
fn hammer() {
+ let _ = env_logger::init();
+
let threads = (0..10).map(|_| {
thread::spawn(|| {
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
diff --git a/tests/runtime.rs b/tests/runtime.rs
new file mode 100644
index 00000000..ddda4860
--- /dev/null
+++ b/tests/runtime.rs
@@ -0,0 +1,52 @@
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+extern crate env_logger;
+
+use futures::prelude::*;
+use tokio::net::{TcpStream, TcpListener};
+use tokio_io::io;
+
+macro_rules! t {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ })
+}
+
+#[test]
+fn basic_runtime_usage() {
+ let _ = env_logger::init();
+
+ // TODO: Don't require the lazy wrapper
+ tokio::run(::futures::future::lazy(|| {
+ let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
+ let addr = t!(server.local_addr());
+ let client = TcpStream::connect(&addr);
+
+ let server = server.incoming().take(1)
+ .map_err(|e| println!("accept err = {:?}", e))
+ .for_each(|socket| {
+ tokio::spawn({
+ io::write_all(socket, b"hello")
+ .map(|_| println!("write done"))
+ .map_err(|e| println!("write err = {:?}", e))
+ })
+ })
+ .map(|_| println!("accept done"));
+
+ let client = client
+ .map_err(|e| println!("connect err = {:?}", e))
+ .and_then(|client| {
+ // Read all
+ io::read_to_end(client, vec![])
+ .map(|_| println!("read done"))
+ .map_err(|e| println!("read err = {:?}", e))
+ });
+
+ tokio::spawn({
+ server.join(client)
+ .map(|_| println!("done"))
+ })
+ }));
+}