summaryrefslogtreecommitdiffstats
path: root/tokio/tests/support/mock_pool.rs
blob: e1fdb426417ae8045f663bd13814837753411c56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use tokio::sync::oneshot;

use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

thread_local! {
    static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
}

#[derive(Debug)]
pub(crate) struct Blocking<T> {
    rx: oneshot::Receiver<T>,
}

pub(crate) fn run<F, R>(f: F) -> Blocking<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let (tx, rx) = oneshot::channel();
    let task = Box::new(move || {
        let _ = tx.send(f());
    });

    QUEUE.with(|cell| cell.borrow_mut().push_back(task));

    Blocking { rx }
}

impl<T> Future for Blocking<T> {
    type Output = Result<T, io::Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        use std::task::Poll::*;

        match Pin::new(&mut self.rx).poll(cx) {
            Ready(Ok(v)) => Ready(Ok(v)),
            Ready(Err(e)) => panic!("error = {:?}", e),
            Pending => Pending,
        }
    }
}

pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
where
    F: FnOnce() -> io::Result<T> + Send + 'static,
    T: Send + 'static,
{
    run(f).await?
}

pub(crate) fn len() -> usize {
    QUEUE.with(|cell| cell.borrow().len())
}

pub(crate) fn run_one() {
    let task = QUEUE
        .with(|cell| cell.borrow_mut().pop_front())
        .expect("expected task to run, but none ready");

    task();
}