path: root/src/input/thread/
diff options
Diffstat (limited to 'src/input/thread/')
1 files changed, 304 insertions, 0 deletions
diff --git a/src/input/thread/ b/src/input/thread/
new file mode 100644
index 0000000..1cdd257
--- /dev/null
+++ b/src/input/thread/
@@ -0,0 +1,304 @@
+use std::{
+ collections::VecDeque,
+ mem,
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+ },
+ time::Duration,
+use parking_lot::Mutex;
+use crate::input::Event;
+const MAXIMUM_EVENTS: usize = 100;
+const EVENT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
+/// Input thread state.
+#[derive(Clone, Debug)]
+pub(crate) struct State<CustomEvent: crate::input::CustomEvent> {
+ ended: Arc<AtomicBool>,
+ event_queue: Arc<Mutex<VecDeque<Event<CustomEvent>>>>,
+ paused: Arc<AtomicBool>,
+ update_receiver: crossbeam_channel::Receiver<()>,
+ update_sender: crossbeam_channel::Sender<()>,
+impl<CustomEvent: crate::input::CustomEvent> State<CustomEvent> {
+ pub(crate) fn new() -> Self {
+ let (update_sender, update_receiver) = crossbeam_channel::unbounded();
+ Self {
+ ended: Arc::new(AtomicBool::from(false)),
+ event_queue: Arc::new(Mutex::new(VecDeque::new())),
+ paused: Arc::new(AtomicBool::from(false)),
+ update_receiver,
+ update_sender,
+ }
+ }
+ fn send_update(&self) {
+ let _result = self.update_sender.send(());
+ }
+ pub(crate) fn is_paused(&self) -> bool {
+ self.paused.load(Ordering::Acquire)
+ }
+ pub(crate) fn is_ended(&self) -> bool {
+ self.ended.load(Ordering::Acquire)
+ }
+ /// Pause the event read thread.
+ #[inline]
+ pub(crate) fn pause(&self) {
+, Ordering::Release);
+ }
+ /// Resume the event read thread.
+ #[inline]
+ pub(crate) fn resume(&self) {
+, Ordering::Release);
+ }
+ /// Permanently End the event read thread.
+ #[inline]
+ pub(crate) fn end(&self) {
+, Ordering::Release);
+ }
+ /// Add an event after existing events.
+ #[inline]
+ pub(crate) fn enqueue_event(&self, event: Event<CustomEvent>) {
+ let mut events = self.event_queue.lock();
+ let last_resize_event_maybe = matches!(event, Event::Resize(..))
+ .then(|| events.back_mut().filter(|e| matches!(*e, &mut Event::Resize(..))))
+ .flatten();
+ if let Some(last_resize_event) = last_resize_event_maybe {
+ let _old = mem::replace(last_resize_event, event);
+ }
+ else if events.len() < MAXIMUM_EVENTS {
+ events.push_back(event);
+ }
+ self.send_update();
+ }
+ /// Add an event before existing events.
+ #[inline]
+ pub(crate) fn push_event(&self, event: Event<CustomEvent>) {
+ let mut events = self.event_queue.lock();
+ if events.len() >= MAXIMUM_EVENTS {
+ _ = events.pop_back();
+ }
+ events.push_front(event);
+ self.send_update();
+ }
+ /// Read an event from the queue. This function will block for a while until an event is
+ /// available. And if no event is available, it will return `Event::None`.
+ #[inline]
+ #[must_use]
+ pub(crate) fn read_event(&self) -> Event<CustomEvent> {
+ // clear existing message since last read
+ while self.update_receiver.try_recv().is_ok() {}
+ loop {
+ if let Some(event) = self.event_queue.lock().pop_front() {
+ return event;
+ }
+ // if there is no event available on the queue, instead of returning early, we can wait
+ // for the new event message and try again.
+ if self.update_receiver.recv_timeout(EVENT_POLL_TIMEOUT).is_ok() {
+ continue;
+ }
+ // We always return if the above recv call times out, to ensure this does not block
+ // forever
+ return Event::None;
+ }
+ }
+mod tests {
+ use std::{
+ sync::atomic::AtomicUsize,
+ thread::{sleep, spawn},
+ };
+ use super::*;
+ use crate::input::testutil::local::{Event, TestEvent};
+ fn create_state() -> State<TestEvent> {
+ State::new()
+ }
+ #[test]
+ fn paused() {
+ let state = create_state();
+ state.pause();
+ assert!(state.is_paused());
+ }
+ #[test]
+ fn resumed() {
+ let state = create_state();
+ state.resume();
+ assert!(!state.is_paused());
+ }
+ #[test]
+ fn ended() {
+ let state = create_state();
+ state.end();
+ assert!(state.is_ended());
+ }
+ #[test]
+ fn enqueue_event() {
+ let state = create_state();
+ state.enqueue_event(Event::from('a'));
+ state.enqueue_event(Event::from('b'));
+ assert_eq!(state.read_event(), Event::from('a'));
+ assert_eq!(state.read_event(), Event::from('b'));
+ }
+ #[test]
+ fn enqueue_event_resize_last_follow_by_non_resize() {
+ let state = create_state();
+ state.enqueue_event(Event::Resize(1, 1));
+ state.enqueue_event(Event::from('a'));
+ assert_eq!(state.read_event(), Event::Resize(1, 1));
+ assert_eq!(state.read_event(), Event::from('a'));
+ }
+ #[test]
+ fn enqueue_event_resize_last_follow_by_new_resize() {
+ let state = create_state();
+ state.enqueue_event(Event::Resize(1, 1));
+ state.enqueue_event(Event::Resize(2, 2));
+ assert_eq!(state.read_event(), Event::Resize(2, 2));
+ assert_eq!(state.read_event(), Event::None);
+ }
+ #[test]
+ fn enqueue_event_overflow() {
+ let state = create_state();
+ // fill queue
+ for _ in 0..MAXIMUM_EVENTS {
+ state.enqueue_event(Event::from('a'));
+ }
+ state.enqueue_event(Event::from('b'));
+ let mut events_received = vec![];
+ loop {
+ let event = state.read_event();
+ if event == Event::None {
+ break;
+ }
+ events_received.push(event);
+ }
+ assert_eq!(state.read_event(), Event::None);
+ assert_eq!(events_received.len(), MAXIMUM_EVENTS);
+ assert_eq!(events_received.first().unwrap(), &Event::from('a'));
+ assert_eq!(events_received.last().unwrap(), &Event::from('a'));
+ }
+ #[test]
+ fn push_event() {
+ let state = create_state();
+ state.push_event(Event::from('a'));
+ state.push_event(Event::from('b'));
+ assert_eq!(state.read_event(), Event::from('b'));
+ assert_eq!(state.read_event(), Event::from('a'));
+ }
+ #[test]
+ fn push_event_overflow() {
+ let state = create_state();
+ // fill queue
+ for _ in 0..MAXIMUM_EVENTS {
+ state.push_event(Event::from('a'));
+ }
+ state.push_event(Event::from('b'));
+ let mut events_received = vec![];
+ loop {
+ let event = state.read_event();
+ if event == Event::None {
+ break;
+ }
+ events_received.push(event);
+ }
+ assert_eq!(state.read_event(), Event::None);
+ assert_eq!(events_received.len(), MAXIMUM_EVENTS);
+ assert_eq!(events_received.first().unwrap(), &Event::from('b'));
+ assert_eq!(events_received.last().unwrap(), &Event::from('a'));
+ }
+ #[test]
+ fn read_event() {
+ // STEPS:
+ // 0 -> thread: initial event read with timeout, returns None
+ // test: waits for initial event read with timeout to occur (moves to step 1)
+ // 1 -> thread: waits for step 1 to complete
+ // test: enqueues new event (moves to step 2)
+ // 2 -> tread: reads enqueued event (moves to step 3)
+ // test: waits for step 2 to complete
+ // 3 -> thead: ended, no action
+ // test: assert events read match
+ let state = create_state();
+ let step: Arc<Mutex<AtomicUsize>> = Arc::new(Mutex::new(AtomicUsize::new(0)));
+ let events_read: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(vec![]));
+ let thread_state = state.clone();
+ let thread_step = Arc::clone(&step);
+ let thread_events_read = Arc::clone(&events_read);
+ _ = spawn(move || {
+ loop {
+ let mut thread_events_read_lock = thread_events_read.lock();
+ let thread_step_lock = thread_step.lock();
+ match thread_step_lock.load(Ordering::Acquire) {
+ 0 => {
+ let event = thread_state.read_event();
+ thread_events_read_lock.push(event);
+, Ordering::Release);
+ },
+ 1 => {
+ sleep(Duration::from_millis(10));
+ },
+ 2 => {
+ let event = thread_state.read_event();
+ thread_events_read_lock.push(event);
+, Ordering::Release);
+ break;
+ },
+ _ => unreachable!(),
+ }
+ }
+ });
+ while step.lock().load(Ordering::Acquire) != 1 {
+ sleep(Duration::from_millis(10));
+ }
+ state.enqueue_event(Event::from('a'));
+ step.lock().store(2, Ordering::Release);
+ while step.lock().load(Ordering::Acquire) == 2 {
+ sleep(Duration::from_millis(10));
+ }
+ let mut events_read_lock = events_read.lock();
+ assert_eq!(events_read_lock.pop().unwrap(), Event::from('a'));
+ assert_eq!(events_read_lock.pop().unwrap(), Event::None);
+ }