1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
use std::{
cell::RefCell,
collections::HashMap,
fmt::{Debug, Formatter},
};
use crossbeam_channel::Sender;
use crate::{Notifier, Status, ThreadStatuses};
/// A thread installer that is passed to a `Threadable` when installing the threads into the `Runtime`
pub struct Installer {
sender: Sender<(String, Status)>,
thread_statuses: ThreadStatuses,
ops: RefCell<HashMap<String, Box<dyn FnOnce() + Send>>>,
}
impl Installer {
pub(crate) fn new(thread_statuses: ThreadStatuses, sender: Sender<(String, Status)>) -> Self {
Self {
sender,
thread_statuses,
ops: RefCell::new(HashMap::new()),
}
}
pub(crate) fn into_ops(self) -> HashMap<String, Box<dyn FnOnce() + Send>> {
self.ops.take()
}
/// Spawn a new thread with a name. The installer function callback will be called with a `Notifier` and is
/// returns the thread function.
#[inline]
pub fn spawn<InstallFn, ThreadFn>(&self, name: &str, install: InstallFn)
where
InstallFn: FnOnce(Notifier) -> ThreadFn,
ThreadFn: FnOnce() + Send + 'static,
{
self.thread_statuses.register_thread(name, Status::New);
let sender = self.sender.clone();
let notifier = Notifier::new(name, sender);
let _previous = self
.ops
.borrow_mut()
.insert(String::from(name), Box::new(install(notifier)));
}
}
impl Debug for Installer {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Installer")
.field("sender", &self.sender)
.field("thread_statuses", &self.thread_statuses)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use crossbeam_channel::unbounded;
use super::*;
use crate::Threadable;
struct Thread {
called: Arc<AtomicBool>,
}
impl Thread {
fn new() -> Self {
Self {
called: Arc::new(AtomicBool::new(false)),
}
}
}
impl Threadable for Thread {
fn install(&self, installer: &Installer) {
let called = Arc::clone(&self.called);
installer.spawn("name", |_| {
move || {
called.store(true, Ordering::Relaxed);
}
});
}
}
#[test]
fn test() {
let (sender, _receiver) = unbounded();
let installer = Installer::new(ThreadStatuses::new(), sender);
let thread = Thread::new();
thread.install(&installer);
let mut ops = installer.into_ops();
let func = ops.remove("name").unwrap();
func();
assert!(thread.called.load(Ordering::Acquire));
}
#[test]
fn debug() {
let (sender, _receiver) = unbounded();
let installer = Installer::new(ThreadStatuses::new(), sender);
assert_eq!(
format!("{installer:?}"),
"Installer { sender: Sender { .. }, thread_statuses: ThreadStatuses { statuses: Mutex { data: {} } }, .. }"
);
}
}
|