diff options
Diffstat (limited to 'src/core/src/runtime/runtime.rs')
-rw-r--r-- | src/core/src/runtime/runtime.rs | 415 |
1 files changed, 415 insertions, 0 deletions
diff --git a/src/core/src/runtime/runtime.rs b/src/core/src/runtime/runtime.rs new file mode 100644 index 0000000..3e252c9 --- /dev/null +++ b/src/core/src/runtime/runtime.rs @@ -0,0 +1,415 @@ +use std::{clone::Clone, sync::Arc, thread}; + +use crossbeam_channel::{unbounded, Receiver, Sender}; +use parking_lot::Mutex; + +use crate::runtime::{Installer, RuntimeError, Status, ThreadStatuses, Threadable}; + +const RUNTIME_THREAD_NAME: &str = "runtime"; + +/// A system the manages the lifetime of threads. This includes ensuring errors are handled, threads are paused and +/// resumed on request and that once the main application is completed, all threads complete and end. +#[allow(missing_debug_implementations)] +pub(crate) struct Runtime<'runtime> { + receiver: Receiver<(String, Status)>, + sender: Sender<(String, Status)>, + thread_statuses: ThreadStatuses, + threadables: Arc<Mutex<Vec<&'runtime mut dyn Threadable>>>, +} + +impl<'runtime> Runtime<'runtime> { + /// Create a new instances of the `Runtime`. + #[inline] + #[must_use] + pub(crate) fn new(thread_statuses: ThreadStatuses) -> Self { + let (sender, receiver) = unbounded(); + + thread_statuses.register_thread(RUNTIME_THREAD_NAME, Status::Waiting); + + Self { + receiver, + sender, + thread_statuses, + threadables: Arc::new(Mutex::new(vec![])), + } + } + + /// Get a cloned copy of the `ThreadStatuses`. + #[inline] + #[must_use] + pub(crate) fn statuses(&self) -> ThreadStatuses { + self.thread_statuses.clone() + } + + /// Register a new `Threadable`. + #[inline] + pub(crate) fn register(&self, threadable: &'runtime mut (dyn Threadable)) { + self.threadables.lock().push(threadable); + } + + /// Join the runtime thread, waiting for all threads to finish. + /// + /// # Errors + /// Returns and error if any of the threads registered to the runtime produce an error. + #[inline] + #[allow(clippy::iter_over_hash_type)] + pub(crate) fn join(&self) -> Result<(), RuntimeError> { + let installer = Installer::new(self.thread_statuses.clone(), self.sender.clone()); + { + let threadables = self.threadables.lock(); + for threadable in threadables.iter() { + threadable.install(&installer); + } + } + let mut handles = vec![]; + + for (name, op) in installer.into_ops().drain() { + handles.push( + thread::Builder::new() + .name(String::from(name.as_str())) + .spawn(op) + .map_err(|_err| RuntimeError::ThreadSpawnError(name))?, + ); + } + + let mut result = Ok(()); + + for (name, status) in &self.receiver { + match status { + Status::Error(err) => { + // since we entered an error state, we attempt to shutdown the other threads, but + // they could fail due to the error state, but keeping the shutdown error is less + // important than the original error. + let _result = self.shutdown(); + result = Err(err); + break; + }, + Status::RequestPause => { + for threadable in self.threadables.lock().iter() { + threadable.pause(); + } + }, + Status::RequestResume => { + for threadable in self.threadables.lock().iter() { + threadable.resume(); + } + }, + Status::RequestEnd => { + self.thread_statuses.update_thread(RUNTIME_THREAD_NAME, Status::Ended); + for threadable in self.threadables.lock().iter() { + threadable.end(); + } + }, + Status::New | Status::Busy | Status::Waiting | Status::Ended => {}, + } + + self.thread_statuses.update_thread(name.as_str(), status); + + if self.thread_statuses.all_ended() { + result = self.shutdown(); + break; + } + } + + while let Some(handle) = handles.pop() { + let _result = handle.join(); + } + + result + } + + #[inline] + fn shutdown(&self) -> Result<(), RuntimeError> { + if self.thread_statuses.all_ended() { + return Ok(()); + } + + for threadable in self.threadables.lock().iter() { + threadable.end(); + } + self.sender + .send((String::from(RUNTIME_THREAD_NAME), Status::Ended)) + .map_err(|_err| RuntimeError::SendError) + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{AtomicBool, Ordering}, + thread::sleep, + time::Duration, + }; + + use claims::assert_err; + + use super::*; + + #[test] + fn run_thread_finish() { + struct Thread; + + impl Thread { + const fn new() -> Self { + Self {} + } + } + + impl Threadable for Thread { + fn install(&self, installer: &Installer) { + installer.spawn("name", |notifier| { + move || { + notifier.end(); + notifier.request_end(); + } + }); + } + } + + let runtime = Runtime::new(ThreadStatuses::new()); + let mut thread = Thread::new(); + runtime.register(&mut thread); + runtime.join().unwrap(); + assert!(runtime.statuses().all_ended()); + } + + #[test] + fn run_thread_error() { + struct Thread1; + + impl Thread1 { + const fn new() -> Self { + Self {} + } + } + + impl Threadable for Thread1 { + fn install(&self, installer: &Installer) { + installer.spawn("name0", |notifier| { + move || { + notifier.error(RuntimeError::ThreadError(String::from("error"))); + } + }); + } + } + + struct Thread2 { + ended: Arc<AtomicBool>, + } + + impl Thread2 { + fn new() -> Self { + Self { + ended: Arc::new(AtomicBool::new(false)), + } + } + } + + impl Threadable for Thread2 { + fn install(&self, installer: &Installer) { + let ended = Arc::clone(&self.ended); + installer.spawn("name1", |notifier| { + move || { + while !ended.load(Ordering::Acquire) { + sleep(Duration::from_millis(10)); + } + notifier.end(); + } + }); + } + + fn end(&self) { + self.ended.store(true, Ordering::Release); + } + } + + let runtime = Runtime::new(ThreadStatuses::new()); + let mut thread1 = Thread1::new(); + let mut thread2 = Thread2::new(); + runtime.register(&mut thread1); + runtime.register(&mut thread2); + assert_err!(runtime.join()); + } + + #[test] + fn run_thread_request_pause() { + struct Thread1; + + impl Thread1 { + const fn new() -> Self { + Self {} + } + } + + impl Threadable for Thread1 { + fn install(&self, installer: &Installer) { + installer.spawn("name0", |notifier| { + move || { + notifier.request_pause(); + notifier.end(); + } + }); + } + } + + struct Thread2 { + paused: Arc<AtomicBool>, + } + + impl Thread2 { + fn new() -> Self { + Self { + paused: Arc::new(AtomicBool::new(false)), + } + } + } + + impl Threadable for Thread2 { + fn install(&self, installer: &Installer) { + let paused = Arc::clone(&self.paused); + installer.spawn("name1", |notifier| { + move || { + while !paused.load(Ordering::Acquire) { + sleep(Duration::from_millis(10)); + } + notifier.end(); + notifier.request_end(); + } + }); + } + + fn pause(&self) { + self.paused.store(true, Ordering::Release); + } + } + + let runtime = Runtime::new(ThreadStatuses::new()); + let mut thread1 = Thread1::new(); + let mut thread2 = Thread2::new(); + runtime.register(&mut thread1); + runtime.register(&mut thread2); + runtime.join().unwrap(); + assert!(thread2.paused.load(Ordering::Acquire)); + } + + #[test] + fn run_thread_request_resume() { + struct Thread1; + + impl Thread1 { + const fn new() -> Self { + Self {} + } + } + + impl Threadable for Thread1 { + fn install(&self, installer: &Installer) { + installer.spawn("name0", |notifier| { + move || { + notifier.request_resume(); + notifier.end(); + } + }); + } + } + + struct Thread2 { + resumed: Arc<AtomicBool>, + } + + impl Thread2 { + fn new() -> Self { + Self { + resumed: Arc::new(AtomicBool::new(false)), + } + } + } + + impl Threadable for Thread2 { + fn install(&self, installer: &Installer) { + let resumed = Arc::clone(&self.resumed); + installer.spawn("name1", |notifier| { + move || { + while !resumed.load(Ordering::Acquire) { + sleep(Duration::from_millis(10)); + } + notifier.end(); + notifier.request_end(); + } + }); + } + + fn resume(&self) { + self.resumed.store(true, Ordering::Release); + } + } + + let runtime = Runtime::new(ThreadStatuses::new()); + let mut thread1 = Thread1::new(); + let mut thread2 = Thread2::new(); + runtime.register(&mut thread1); + runtime.register(&mut thread2); + runtime.join().unwrap(); + assert!(thread2.resumed.load(Ordering::Acquire)); + } + + #[test] + fn run_thread_request_end() { + struct Thread1; + + impl Thread1 { + const fn new() -> Self { + Self {} + } + } + + impl Threadable for Thread1 { + fn install(&self, installer: &Installer) { + installer.spawn("name0", |notifier| { + move || { + notifier.request_end(); + notifier.end(); + } + }); + } + } + + struct Thread2 { + ended: Arc<AtomicBool>, + } + + impl Thread2 { + fn new() -> Self { + Self { + ended: Arc::new(AtomicBool::new(false)), + } + } + } + + impl Threadable for Thread2 { + fn install(&self, installer: &Installer) { + let ended = Arc::clone(&self.ended); + installer.spawn("name1", |notifier| { + move || { + while !ended.load(Ordering::Acquire) { + sleep(Duration::from_millis(10)); + } + notifier.end(); + } + }); + } + + fn end(&self) { + self.ended.store(true, Ordering::Release); + } + } + + let runtime = Runtime::new(ThreadStatuses::new()); + let mut thread1 = Thread1::new(); + let mut thread2 = Thread2::new(); + runtime.register(&mut thread1); + runtime.register(&mut thread2); + runtime.join().unwrap(); + assert!(thread2.ended.load(Ordering::Acquire)); + } +} |