summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/thread_pool/pool.rs
blob: 1da4239be8f5a3fc2718e6bb6224f98478026632 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::executor::blocking::PoolWaiter;
use crate::executor::thread_pool::{shutdown, Builder, JoinHandle, Spawner};
use crate::executor::Executor;

use std::fmt;
use std::future::Future;

/// Work-stealing based thread pool for executing futures.
pub struct ThreadPool {
    spawner: Spawner,

    /// Shutdown waiter
    shutdown_rx: shutdown::Receiver,

    /// Shutdown valve for Pool
    blocking: PoolWaiter,
}

impl ThreadPool {
    /// Create a new ThreadPool with default configuration
    pub fn new() -> ThreadPool {
        Builder::new().build()
    }

    pub(super) fn from_parts(
        spawner: Spawner,
        shutdown_rx: shutdown::Receiver,
        blocking: PoolWaiter,
    ) -> ThreadPool {
        ThreadPool {
            spawner,
            shutdown_rx,
            blocking,
        }
    }

    /// Returns reference to `Spawner`.
    ///
    /// The `Spawner` handle can be cloned and enables spawning tasks from other
    /// threads.
    pub fn spawner(&self) -> &Spawner {
        &self.spawner
    }

    /// Spawn a task
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        self.spawner.spawn(future)
    }

    /// Spawn a task in the background
    pub(crate) fn spawn_background<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.spawner.spawn_background(future);
    }

    /// Block the current thread waiting for the future to complete.
    ///
    /// The future will execute on the current thread, but all spawned tasks
    /// will be executed on the thread pool.
    pub fn block_on<F>(&self, future: F) -> F::Output
    where
        F: Future,
    {
        crate::executor::global::with_threadpool(self, || {
            let mut enter = crate::executor::enter().expect("attempting to block while on a Tokio executor");
            crate::executor::blocking::with_pool(self.spawner.blocking_pool(), || enter.block_on(future))
        })
    }

    /// Shutdown the thread pool.
    pub fn shutdown_now(&mut self) {
        if self.spawner.workers().close() {
            self.shutdown_rx.wait();
        }
        self.blocking.shutdown();
    }
}

impl Default for ThreadPool {
    fn default() -> ThreadPool {
        ThreadPool::new()
    }
}

impl Executor for &ThreadPool {
    fn spawn(
        &mut self,
        future: std::pin::Pin<Box<dyn Future<Output = ()> + Send>>,
    ) -> Result<(), crate::executor::SpawnError> {
        ThreadPool::spawn_background(self, future);
        Ok(())
    }
}

impl fmt::Debug for ThreadPool {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("ThreadPool").finish()
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        self.shutdown_now();
    }
}