summaryrefslogtreecommitdiffstats
path: root/src/runtime/src/installer.rs
blob: fc5fd293ad4a37864e25cbae1069dc68b7b1eda2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::{
	cell::RefCell,
	collections::HashMap,
	fmt::{Debug, Formatter},
};

use crossbeam_channel::Sender;

use crate::{Notifier, Status, ThreadStatuses};

/// A thread installer that is passed to a `Threadable` when installing the threads into the `Runtime`
pub struct Installer {
	sender: Sender<(String, Status)>,
	thread_statuses: ThreadStatuses,
	ops: RefCell<HashMap<String, Box<dyn FnOnce() + Send>>>,
}

impl Installer {
	pub(crate) fn new(thread_statuses: ThreadStatuses, sender: Sender<(String, Status)>) -> Self {
		Self {
			sender,
			thread_statuses,
			ops: RefCell::new(HashMap::new()),
		}
	}

	pub(crate) fn into_ops(self) -> HashMap<String, Box<dyn FnOnce() + Send>> {
		self.ops.take()
	}

	/// Spawn a new thread with a name. The installer function callback will be called with a `Notifier` and is
	/// returns the thread function.
	#[inline]
	pub fn spawn<InstallFn, ThreadFn>(&self, name: &str, install: InstallFn)
	where
		InstallFn: FnOnce(Notifier) -> ThreadFn,
		ThreadFn: FnOnce() + Send + 'static,
	{
		self.thread_statuses.register_thread(name, Status::New);
		let sender = self.sender.clone();
		let notifier = Notifier::new(name, sender);
		let _previous = self
			.ops
			.borrow_mut()
			.insert(String::from(name), Box::new(install(notifier)));
	}
}

impl Debug for Installer {
	#[inline]
	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
		f.debug_struct("Installer")
			.field("sender", &self.sender)
			.field("thread_statuses", &self.thread_statuses)
			.finish_non_exhaustive()
	}
}

#[cfg(test)]
mod tests {
	use std::sync::{
		atomic::{AtomicBool, Ordering},
		Arc,
	};

	use crossbeam_channel::unbounded;

	use super::*;
	use crate::Threadable;

	struct Thread {
		called: Arc<AtomicBool>,
	}

	impl Thread {
		fn new() -> Self {
			Self {
				called: Arc::new(AtomicBool::new(false)),
			}
		}
	}

	impl Threadable for Thread {
		fn install(&self, installer: &Installer) {
			let called = Arc::clone(&self.called);
			installer.spawn("name", |_| {
				move || {
					called.store(true, Ordering::Relaxed);
				}
			});
		}
	}

	#[test]
	fn test() {
		let (sender, _receiver) = unbounded();
		let installer = Installer::new(ThreadStatuses::new(), sender);

		let thread = Thread::new();
		thread.install(&installer);

		let mut ops = installer.into_ops();
		let func = ops.remove("name").unwrap();
		func();

		assert!(thread.called.load(Ordering::Acquire));
	}

	#[test]
	fn debug() {
		let (sender, _receiver) = unbounded();
		let installer = Installer::new(ThreadStatuses::new(), sender);
		assert_eq!(
			format!("{installer:?}"),
			"Installer { sender: Sender { .. }, thread_statuses: ThreadStatuses { statuses: Mutex { data: {} } }, .. }"
		);
	}
}