#![warn(rust_2018_idioms)]
#![feature(async_await)]
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use tokio_current_thread::{block_on_all, CurrentThread};
use tokio_executor::TypedExecutor;
use tokio_sync::oneshot;
mod from_block_on_all {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let msg = tokio_current_thread::block_on_all(async move {
c.set(1 + c.get());
// Spawn!
spawn(Box::pin(async move {
c.set(1 + c.get());
}));
"hello"
});
assert_eq!(2, cnt.get());
assert_eq!(msg, "hello");
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn)
}
#[test]
fn execute() {
test(|f| {
tokio_current_thread::TaskExecutor::current()
.spawn(f)
.unwrap();
});
}
}
#[test]
fn block_waits() {
let (tx, rx) = oneshot::channel();
thread::spawn(|| {
thread::sleep(Duration::from_millis(1000));
tx.send(()).unwrap();
});
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
block_on_all(async move {
rx.await.unwrap();
cnt.set(1 + cnt.get());
});
assert_eq!(1, cnt2.get());
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let cnt = Rc::new(Cell::new(0));
let mut tokio_current_thread = CurrentThread::new();
for _ in 0..ITER {
let cnt = cnt.clone();
tokio_current_thread.spawn(async move {
cnt.set(1 + cnt.get());
});
}
tokio_current_thread.run().unwrap();
assert_eq!(cnt.get(), ITER);
}
mod does_not_set_global_executor_by_default {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<(), E> + 'static, E>(
spawn: F,
) {
block_on_all(async {
spawn(Box::pin(async {})).unwrap_err();
});
}
#[test]
fn spawn() {
test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
}
}
mod from_block_on_future {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>)>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread.block_on(async move {
let cnt3 = cnt2.clone();
spawn(Box::pin(async move {
cnt3.set(1 + cnt3.get());
}));
});
tokio_current_thread.run().unwrap();
assert_eq!(1, cnt.get());
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn);
}
#[test]
fn execute() {
test(|f| {
tokio_current_thread::TaskExecutor::current()
.spawn(f)
.unwrap();
});
}
}
mod outstanding_tasks_are_dropped_when_executor_is_dropped {
use super::*;
async fn never(_rc: Rc<()>) {
loop {
yield_once().await;
}
}
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>),
{
let mut rc = Rc::new(());
let mut tokio_current_thread = CurrentThread::new();
dotspawn(&mut tokio_current_thread, Box::pin(never(rc.clone())));
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
// Using the global spawn fn
let mut rc = Rc::new(());
let rc2 = rc.clone();
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread.block_on(async move {
spawn(Box::pin(never(rc2)));
});
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
}
#[test]
fn spawn() {
test(tokio_current_thread::spawn, |rt, f| {
rt.spawn(f);
})