diff options
author | Tim Oram <dev@mitmaro.ca> | 2022-12-20 11:20:42 -0330 |
---|---|---|
committer | Tim Oram <dev@mitmaro.ca> | 2022-12-21 15:23:53 -0330 |
commit | f2c0e3a61802d582c1ba98037d7202346883adb3 (patch) | |
tree | 905b950382a1c8c0a769a9b113db3ce7201a97e5 | |
parent | fb9a1c025dec1650802fb8952844c31181f104eb (diff) |
Add search thread and Searchable trait
This is the start of allowing for searches to be run in a separate
thread, using a reusable interface.
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | src/core/Cargo.toml | 1 | ||||
-rw-r--r-- | src/core/src/lib.rs | 2 | ||||
-rw-r--r-- | src/core/src/search/action.rs | 52 | ||||
-rw-r--r-- | src/core/src/search/interrupter.rs | 42 | ||||
-rw-r--r-- | src/core/src/search/mod.rs | 18 | ||||
-rw-r--r-- | src/core/src/search/search_result.rs | 6 | ||||
-rw-r--r-- | src/core/src/search/search_state.rs | 6 | ||||
-rw-r--r-- | src/core/src/search/searchable.rs | 7 | ||||
-rw-r--r-- | src/core/src/search/state.rs | 116 | ||||
-rw-r--r-- | src/core/src/search/thread.rs | 404 |
11 files changed, 655 insertions, 0 deletions
@@ -312,6 +312,7 @@ dependencies = [ "captur", "chrono", "claim", + "crossbeam-channel", "girt-config", "girt-display", "girt-git", diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 57a937e..6a897ee 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -18,6 +18,7 @@ name = "core" anyhow = "1.0.68" bitflags = "1.3.2" captur = "0.1.0" +crossbeam-channel = "0.5.6" if_chain = "1.0.2" lazy_static = "1.4.0" num-format = "0.4.4" diff --git a/src/core/src/lib.rs b/src/core/src/lib.rs index 700289c..88d03a4 100644 --- a/src/core/src/lib.rs +++ b/src/core/src/lib.rs @@ -127,6 +127,8 @@ mod license; mod module; mod modules; mod process; +#[allow(dead_code)] +mod search; #[cfg(test)] mod tests; #[cfg(test)] diff --git a/src/core/src/search/action.rs b/src/core/src/search/action.rs new file mode 100644 index 0000000..80a7468 --- /dev/null +++ b/src/core/src/search/action.rs @@ -0,0 +1,52 @@ +use std::fmt::{Debug, Formatter}; + +use crate::search::searchable::Searchable; + +#[allow(clippy::exhaustive_enums)] +pub(crate) enum Action { + Cancel, + Continue, + End, + SetSearchable(Box<dyn Searchable>), + Start(String), +} + +impl Debug for Action { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match *self { + Self::Cancel => write!(f, "Cancel"), + Self::Continue => write!(f, "Continue"), + Self::End => write!(f, "End"), + Self::SetSearchable(_) => write!(f, "SetSearchable(_)"), + Self::Start(ref term) => write!(f, "Start({term})"), + } + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + use crate::search::{Interrupter, SearchResult}; + + struct TestSearchable; + + impl Searchable for TestSearchable { + fn reset(&mut self) {} + + fn search(&mut self, _: Interrupter, _: &str) -> SearchResult { + SearchResult::None + } + } + + #[rstest] + #[case::cancel(Action::Cancel, "Cancel")] + #[case::cont(Action::Continue, "Continue")] + #[case::end(Action::End, "End")] + #[case::set_searchable(Action::SetSearchable(Box::new(TestSearchable {})), "SetSearchable(_)")] + #[case::start(Action::Start(String::from("foo")), "Start(foo)")] + fn debug(#[case] action: Action, #[case] expected: &str) { + assert_eq!(format!("{action:?}"), expected); + } +} diff --git a/src/core/src/search/interrupter.rs b/src/core/src/search/interrupter.rs new file mode 100644 index 0000000..7aa3549 --- /dev/null +++ b/src/core/src/search/interrupter.rs @@ -0,0 +1,42 @@ +use std::{ + ops::Add, + time::{Duration, Instant}, +}; + +pub(crate) struct Interrupter { + finish: Instant, +} + +impl Interrupter { + pub(crate) fn new(duration: Duration) -> Self { + Self { + finish: Instant::now().add(duration), + } + } + + pub(crate) fn should_continue(&self) -> bool { + Instant::now() < self.finish + } +} + +#[cfg(test)] +mod test { + use std::{ops::Sub, time::Duration}; + + use super::*; + use crate::search::Interrupter; + + #[test] + fn should_continue_before_finish() { + let interrupter = Interrupter::new(Duration::from_secs(60)); + assert!(interrupter.should_continue()); + } + + #[test] + fn should_continue_after_finish() { + let interrupter = Interrupter { + finish: Instant::now().sub(Duration::from_secs(60)), + }; + assert!(!interrupter.should_continue()); + } +} diff --git a/src/core/src/search/mod.rs b/src/core/src/search/mod.rs new file mode 100644 index 0000000..1fbf7f9 --- /dev/null +++ b/src/core/src/search/mod.rs @@ -0,0 +1,18 @@ +mod action; +mod interrupter; +mod search_result; +mod search_state; +mod searchable; +mod state; +mod thread; + +#[allow(unused_imports)] +pub(crate) use self::{ + action::Action, + interrupter::Interrupter, + search_result::SearchResult, + search_state::SearchState, + searchable::Searchable, + state::State, + thread::Thread, +}; diff --git a/src/core/src/search/search_result.rs b/src/core/src/search/search_result.rs new file mode 100644 index 0000000..11c9666 --- /dev/null +++ b/src/core/src/search/search_result.rs @@ -0,0 +1,6 @@ +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(crate) enum SearchResult { + None, + Complete, + Updated, +} diff --git a/src/core/src/search/search_state.rs b/src/core/src/search/search_state.rs new file mode 100644 index 0000000..e667a66 --- /dev/null +++ b/src/core/src/search/search_state.rs @@ -0,0 +1,6 @@ +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(crate) enum SearchState { + Inactive, + Active, + Complete, +} diff --git a/src/core/src/search/searchable.rs b/src/core/src/search/searchable.rs new file mode 100644 index 0000000..7097af1 --- /dev/null +++ b/src/core/src/search/searchable.rs @@ -0,0 +1,7 @@ +use crate::search::{Interrupter, SearchResult}; + +pub(crate) trait Searchable: Send { + fn reset(&mut self); + + fn search(&mut self, interrupter: Interrupter, term: &str) -> SearchResult; +} diff --git a/src/core/src/search/state.rs b/src/core/src/search/state.rs new file mode 100644 index 0000000..c3dbdc8 --- /dev/null +++ b/src/core/src/search/state.rs @@ -0,0 +1,116 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use crossbeam_channel::RecvTimeoutError; + +use crate::search::action::Action; + +const RECEIVE_TIMEOUT: Duration = Duration::from_millis(500); + +#[derive(Clone, Debug)] +pub(crate) struct State { + ended: Arc<AtomicBool>, + paused: Arc<AtomicBool>, + update_receiver: crossbeam_channel::Receiver<Action>, + update_sender: crossbeam_channel::Sender<Action>, +} + +impl State { + pub(crate) fn new() -> Self { + let (update_sender, update_receiver) = crossbeam_channel::unbounded(); + Self { + ended: Arc::new(AtomicBool::from(false)), + paused: Arc::new(AtomicBool::from(false)), + update_receiver, + update_sender, + } + } + + pub(crate) fn receive_update(&self) -> Action { + self.update_receiver + .recv_timeout(RECEIVE_TIMEOUT) + .unwrap_or_else(|e: RecvTimeoutError| { + match e { + RecvTimeoutError::Timeout => Action::Continue, + RecvTimeoutError::Disconnected => Action::End, + } + }) + } + + pub(crate) fn send_update(&self, action: Action) { + let _result = self.update_sender.send(action); + } + + pub(crate) fn is_paused(&self) -> bool { + self.paused.load(Ordering::Acquire) + } + + pub(crate) fn is_ended(&self) -> bool { + self.ended.load(Ordering::Acquire) + } + + pub(crate) fn pause(&self) { + self.paused.store(true, Ordering::Release); + } + + pub(crate) fn resume(&self) { + self.paused.store(false, Ordering::Release); + } + + pub(crate) fn end(&self) { + self.ended.store(true, Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn send_recv_update() { + let state = State::new(); + state.send_update(Action::Start(String::from("test"))); + assert!(matches!(state.receive_update(), Action::Start(_))); + } + + #[test] + fn send_recv_update_timeout() { + let state = State::new(); + assert!(matches!(state.receive_update(), Action::Continue)); + } + + #[test] + fn send_recv_disconnect() { + let (update_sender, _update_receiver) = crossbeam_channel::unbounded(); + let mut state = State::new(); + state.update_sender = update_sender; // replace last reference to sender, to force a disconnect + assert!(matches!(state.receive_update(), Action::End)); + } + + #[test] + fn paused() { + let state = State::new(); + state.pause(); + assert!(state.is_paused()); + } + + #[test] + fn resumed() { + let state = State::new(); + state.resume(); + assert!(!state.is_paused()); + } + + #[test] + fn ended() { + let state = State::new(); + state.end(); + assert!(state.is_ended()); + } +} diff --git a/src/core/src/search/thread.rs b/src/core/src/search/thread.rs new file mode 100644 index 0000000..f968ed5 --- /dev/null +++ b/src/core/src/search/thread.rs @@ -0,0 +1,404 @@ +use std::{ + sync::Arc, + thread::sleep, + time::{Duration, Instant}, +}; + +use captur::capture; +use runtime::{Installer, Threadable}; + +use crate::search::{ + action::Action, + interrupter::Interrupter, + search_result::SearchResult, + searchable::Searchable, + State, +}; + +pub(crate) const THREAD_NAME: &str = "search"; +const MINIMUM_PAUSE_RATE: Duration = Duration::from_millis(50); +const SEARCH_INTERRUPT_TIME: Duration = Duration::from_millis(10); + +#[derive(Debug)] +pub(crate) struct Thread<UpdateHandler: Fn() + Sync + Send> { + state: State, + search_update_handler: Arc<UpdateHandler>, +} + +impl<UpdateHandler> Threadable for Thread<UpdateHandler> +where UpdateHandler: Fn() + Sync + Send + 'static +{ + #[inline] + fn install(&self, installer: &Installer) { + let state = self.state(); + let update_handler = Arc::clone(&self.search_update_handler); + + installer.spawn(THREAD_NAME, |notifier| { + move || { + capture!(notifier, state); + let mut active_searchable: Option<Box<dyn Searchable>> = None; + let mut search_term = String::new(); + let mut search_complete = false; + + notifier.wait(); + let mut time = Instant::now(); + + loop { + notifier.wait(); + if state.is_ended() { + break; + } + while state.is_paused() { + sleep(time.saturating_duration_since(Instant::now())); + time += MINIMUM_PAUSE_RATE; + } + + let msg = state.receive_update(); + notifier.busy(); + match msg { + Action::Cancel => { + if let Some(searchable) = active_searchable.as_mut() { + searchable.reset(); + }; + search_complete = true; + search_term.clear(); + }, + Action::SetSearchable(searchable) => { + search_complete = true; + active_searchable = Some(searchable); + }, + Action::Start(term) => { + // avoid calling update handler when there is no change in the search term + if term == search_term { + continue; + } + search_complete = false; + search_term = term; + }, + Action::Continue => {}, + Action::End => break, + } + if search_complete || search_term.is_empty() { + continue; + } + + let Some(searchable) = active_searchable.as_mut() else { + continue; + }; + + match searchable.search(Interrupter::new(SEARCH_INTERRUPT_TIME), search_term.as_str()) { + SearchResult::None | SearchResult::Updated => {}, + SearchResult::Complete => search_complete = true, + } + update_handler(); + } + + notifier.request_end(); + notifier.end(); + } + }); + } + + #[inline] + fn pause(&self) { + self.state.pause(); + } + + #[inline] + fn resume(&self) { + self.state.resume(); + } + + #[inline] + fn end(&self) { + self.state.end(); + } +} + +impl<UpdateHandler> Thread<UpdateHandler> +where UpdateHandler: Fn() + Sync + Send +{ + pub(crate) fn new(search_update_handler: UpdateHandler) -> Self { + Self { + state: State::new(), + search_update_handler: Arc::new(search_update_handler), + } + } + + pub(crate) fn state(&self) -> State { + self.state.clone() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use parking_lot::Mutex; + use runtime::{testutils::ThreadableTester, Status}; + + use super::*; + + #[derive(Clone)] + struct MockedSearchable { + calls: Arc<Mutex<Vec<String>>>, + search_result: Arc<Mutex<SearchResult>>, + } + + impl MockedSearchable { + fn new() -> Self { + Self { + calls: Arc::new(Mutex::new(vec![])), + search_result: Arc::new(Mutex::new(SearchResult::None)), + } + } + } + + impl Searchable for MockedSearchable { + fn reset(&mut self) { + self.calls.lock().push(String::from("Reset")); + } + + fn search(&mut self, _: Interrupter, term: &str) -> SearchResult { + self.calls.lock().push(format!("Search({term})")); + *self.search_result.lock() + } + } + + #[test] + fn set_pause_resume() { + let thread = Thread::new(|| {}); + let state = thread.state(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + tester.wait_for_status(&Status::Waiting); + thread.pause(); + assert!(state.is_paused()); + // give thread time to pause + sleep(Duration::from_secs(1)); + state.send_update(Action::Continue); + thread.resume(); + assert!(!state.is_paused()); + state.end(); + tester.wait_for_status(&Status::Ended); + } + + #[test] + fn set_end() { + let thread = Thread::new(|| {}); + let state = thread.state(); + thread.end(); + assert!(state.is_ended()); + } + + #[test] + fn thread_end_from_state() { + let thread = Thread::new(|| {}); + let state = thread.state(); + + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + tester.wait_for_status(&Status::Waiting); + state.end(); + tester.wait_for_status(&Status::Ended); + } + + #[test] + fn thread_end_from_action() { + let thread = Thread::new(|| {}); + let state = thread.state(); + + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + tester.wait_for_status(&Status::Waiting); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + } + + #[test] + fn thread_start_search() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + tester.wait_for_status(&Status::Waiting); + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 1); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)")]); + } + + #[test] + fn thread_start_cancel() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::Cancel); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Relaxed), 1); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)"), String::from("Reset")]); + } + + #[test] + fn thread_start_continue() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::Continue); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 2); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)"), String::from("Search(foo)")]); + } + + #[test] + fn thread_no_updates_after_complete() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + *searchable.search_result.lock() = SearchResult::Complete; + state.send_update(Action::Continue); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 1); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)")]); + } + + #[test] + fn thread_no_updates_on_empty_term() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::new())); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 0); + let calls = searchable.calls.lock(); + assert!(calls.is_empty()); + } + + #[test] + fn thread_no_additional_updates_on_start_with_same_term() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 1); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)")]); + } + + #[test] + fn thread_no_updates_on_no_searchable() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::Start(String::from("foo"))); + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 0); + } + + #[test] + fn thread_updates_after_timeout() { + let update_handler_calls = Arc::new(AtomicUsize::new(0)); + let update_handler_calls_thread = Arc::clone(&update_handler_calls); + let thread = Thread::new(move || { + let _ = update_handler_calls_thread.fetch_add(1, Ordering::Release); + }); + let state = thread.state(); + + let searchable = MockedSearchable::new(); + let tester = ThreadableTester::new(); + tester.start_threadable(&thread, THREAD_NAME); + + state.send_update(Action::SetSearchable(Box::new(searchable.clone()))); + state.send_update(Action::Start(String::from("foo"))); + sleep(Duration::from_millis(750)); // will timeout after 500ms + state.send_update(Action::End); + tester.wait_for_status(&Status::Ended); + + assert_eq!(update_handler_calls.load(Ordering::Acquire), 2); + let calls = searchable.calls.lock(); + assert_eq!(*calls, vec![String::from("Search(foo)"), String::from("Search(foo)")]); + } +} |