summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTim Oram <dev@mitmaro.ca>2022-03-26 12:57:45 -0230
committerTim Oram <dev@mitmaro.ca>2022-03-26 13:10:59 -0230
commit3b075de656a70269f63761a5bb95981715a69af8 (patch)
treee1af79432b34f3340721ef1b42388e9d909aeb7c
parente12c2eb2ff7ddde82c65931e1135384bd525f6b1 (diff)
Fix event/input polling performance issue
The input/event reading is returning almost immediately causing the "run loop" to run constantly. This means that the application is wasting CPU cycle when idling. This change updates the input handling to block for a time during read_event when there is not an event available.
-rw-r--r--src/input/src/event_action.rs2
-rw-r--r--src/input/src/sender.rs174
-rw-r--r--src/input/src/testutil.rs3
-rw-r--r--src/input/src/thread.rs7
4 files changed, 178 insertions, 8 deletions
diff --git a/src/input/src/event_action.rs b/src/input/src/event_action.rs
index 9fdf984..9ed65a1 100644
--- a/src/input/src/event_action.rs
+++ b/src/input/src/event_action.rs
@@ -1,6 +1,6 @@
use crate::Event;
-#[derive(Debug, Copy, Clone)]
+#[derive(Debug, Copy, Clone, PartialEq)]
#[allow(clippy::exhaustive_enums)]
pub enum EventAction {
End,
diff --git a/src/input/src/sender.rs b/src/input/src/sender.rs
index 15fe229..dc48be2 100644
--- a/src/input/src/sender.rs
+++ b/src/input/src/sender.rs
@@ -5,6 +5,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
+ time::Duration,
};
use anyhow::{anyhow, Error, Result};
@@ -16,23 +17,27 @@ fn map_send_err(_: crossbeam_channel::SendError<EventAction>) -> Error {
anyhow!("Unable to send data")
}
+const EVENT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
+
/// Represents a message sender and receiver for passing actions between threads.
#[derive(Clone, Debug)]
pub struct Sender {
+ event_queue: Arc<Mutex<VecDeque<Event>>>,
poisoned: Arc<AtomicBool>,
+ receiver: crossbeam_channel::Receiver<()>,
sender: crossbeam_channel::Sender<EventAction>,
- event_queue: Arc<Mutex<VecDeque<Event>>>,
}
impl Sender {
/// Create a new instance.
#[inline]
#[must_use]
- pub fn new(sender: crossbeam_channel::Sender<EventAction>) -> Self {
+ pub fn new(sender: crossbeam_channel::Sender<EventAction>, receiver: crossbeam_channel::Receiver<()>) -> Self {
Self {
+ event_queue: Arc::new(Mutex::new(VecDeque::new())),
poisoned: Arc::new(AtomicBool::new(false)),
+ receiver,
sender,
- event_queue: Arc::new(Mutex::new(VecDeque::new())),
}
}
@@ -67,7 +72,23 @@ impl Sender {
/// Read an event from the queue
#[inline]
pub fn read_event(&mut self) -> Event {
- self.event_queue.lock().borrow_mut().pop_front().unwrap_or(Event::None)
+ // clear existing message since last read
+ while self.receiver.try_recv().is_ok() {}
+ loop {
+ if let Some(event) = self.event_queue.lock().borrow_mut().pop_front() {
+ return event;
+ }
+
+ // if there is no event available on the queue, instead of returning early, we can wait
+ // for the new event message and try again.
+ if self.receiver.recv_timeout(EVENT_POLL_TIMEOUT).is_ok() {
+ continue;
+ }
+
+ // We always return if the above recv call times out, to ensure this does not block
+ // forever
+ return Event::None;
+ }
}
/// Add an event after existing events.
@@ -88,3 +109,148 @@ impl Sender {
self.sender.send(EventAction::PushEvent(event)).map_err(map_send_err)
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::thread::{sleep, spawn};
+
+ use crossbeam_channel::bounded;
+
+ use super::*;
+
+ #[test]
+ fn end() {
+ let (sender, receiver) = bounded(1);
+ let (_, new_event_receiver) = bounded(1);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ sender.end().unwrap();
+ let event = receiver.try_recv().expect("Unable to recv event");
+ assert_eq!(event, EventAction::End);
+ }
+
+ #[test]
+ fn end_error() {
+ let (sender, _) = bounded(0);
+ let (_, new_event_receiver) = bounded(0);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ assert_eq!(sender.end().unwrap_err().to_string(), "Unable to send data");
+ }
+
+ #[test]
+ fn read_event_empty() {
+ let (sender, _) = bounded(1);
+ let (_, new_event_receiver) = bounded(1);
+
+ let mut sender = Sender::new(sender, new_event_receiver);
+ assert_eq!(sender.read_event(), Event::None);
+ }
+
+ #[test]
+ fn read_event_ready() {
+ let (sender, _) = bounded(1);
+ let (_, new_event_receiver) = bounded(1);
+
+ let mut sender = Sender::new(sender, new_event_receiver);
+ let event_queue = sender.clone_event_queue();
+ event_queue.lock().push_back(Event::from('a'));
+ assert_eq!(sender.read_event(), Event::from('a'));
+ }
+
+ #[test]
+ fn read_event_clear_existing_events() {
+ let (sender, _) = bounded(1);
+ let (new_event_sender, new_event_receiver) = bounded(1);
+ new_event_sender.send(()).unwrap();
+
+ let mut sender = Sender::new(sender, new_event_receiver);
+ let event_queue = sender.clone_event_queue();
+ event_queue.lock().push_back(Event::from('a'));
+ let _event = sender.read_event();
+ assert_eq!(new_event_sender.len(), 0);
+ }
+
+ #[test]
+ fn read_event_wait_for_event() {
+ let (sender, _) = bounded(1);
+ let (new_event_sender, new_event_receiver) = bounded(1);
+
+ let mut sender = Sender::new(sender, new_event_receiver);
+ let event_queue = sender.clone_event_queue();
+
+ let ready = Arc::new(AtomicBool::new(false));
+ let event = Arc::new(Mutex::new(Event::None));
+ let thread_ready = ready.clone();
+ let thread_event = event.clone();
+ let handle = spawn(move || {
+ for _ in 0..100 {
+ thread_ready.store(true, Ordering::Release);
+ let event = sender.read_event();
+
+ if event == Event::None {
+ sleep(Duration::from_millis(10));
+ continue;
+ }
+
+ *thread_event.lock() = event;
+ break;
+ }
+ });
+
+ while !ready.load(Ordering::Acquire) {}
+ sleep(Duration::from_millis(10)); // this is probably fragile
+
+ event_queue.lock().push_back(Event::from('a'));
+ new_event_sender.send(()).unwrap();
+ handle.join().unwrap();
+
+ assert_eq!(*event.lock(), Event::from('a'));
+ }
+
+ #[test]
+ fn enqueue_event() {
+ let (sender, receiver) = bounded(1);
+ let (_, new_event_receiver) = bounded(1);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ sender.enqueue_event(Event::from('a')).unwrap();
+ let event = receiver.try_recv().expect("Unable to recv event");
+ assert_eq!(event, EventAction::EnqueueEvent(Event::from('a')));
+ }
+
+ #[test]
+ fn enqueue_event_error() {
+ let (sender, _) = bounded(0);
+ let (_, new_event_receiver) = bounded(0);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ assert_eq!(
+ sender.enqueue_event(Event::from('a')).unwrap_err().to_string(),
+ "Unable to send data"
+ );
+ }
+
+ #[test]
+ fn push_event() {
+ let (sender, receiver) = bounded(1);
+ let (_, new_event_receiver) = bounded(1);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ sender.push_event(Event::from('a')).unwrap();
+ let event = receiver.try_recv().expect("Unable to recv event");
+ assert_eq!(event, EventAction::PushEvent(Event::from('a')));
+ }
+
+ #[test]
+ fn push_event_error() {
+ let (sender, _) = bounded(0);
+ let (_, new_event_receiver) = bounded(0);
+
+ let sender = Sender::new(sender, new_event_receiver);
+ assert_eq!(
+ sender.push_event(Event::from('a')).unwrap_err().to_string(),
+ "Unable to send data"
+ );
+ }
+}
diff --git a/src/input/src/testutil.rs b/src/input/src/testutil.rs
index 1d1fd41..84b6fdc 100644
--- a/src/input/src/testutil.rs
+++ b/src/input/src/testutil.rs
@@ -74,7 +74,8 @@ pub fn with_event_handler<C>(events: &[Event], callback: C)
where C: FnOnce(TestContext) {
let event_handler = EventHandler::new(create_test_keybindings());
let (sender, receiver) = crossbeam_channel::bounded(10);
- let event_sender = Sender::new(sender);
+ let (_, new_event_receiver) = crossbeam_channel::unbounded();
+ let event_sender = Sender::new(sender, new_event_receiver);
let event_queue = event_sender.clone_event_queue();
for event in events {
diff --git a/src/input/src/thread.rs b/src/input/src/thread.rs
index c11f16a..f4a8e69 100644
--- a/src/input/src/thread.rs
+++ b/src/input/src/thread.rs
@@ -4,7 +4,7 @@ use std::{
};
use anyhow::Result;
-use crossbeam_channel::bounded;
+use crossbeam_channel::{bounded, unbounded};
use crate::{event::Event, event_action::EventAction, sender::Sender};
@@ -19,7 +19,8 @@ const MAXIMUM_EVENTS: usize = 100;
pub fn spawn_event_thread<F: Send + 'static>(event_provider: F) -> (Sender, JoinHandle<()>)
where F: Fn() -> Result<Option<crossterm::event::Event>> {
let (sender, receiver) = bounded(0);
- let event_sender = Sender::new(sender);
+ let (new_event_sender, new_event_receiver) = unbounded();
+ let event_sender = Sender::new(sender, new_event_receiver);
let event_queue = event_sender.clone_event_queue();
let push_thread_event_sender = event_sender.clone();
let poisoned = event_sender.clone_poisoned();
@@ -35,6 +36,7 @@ where F: Fn() -> Result<Option<crossterm::event::Event>> {
if events.len() < MAXIMUM_EVENTS {
events.push_back(event);
}
+ let _send_result = new_event_sender.send(());
},
EventAction::PushEvent(event) => {
let mut events = event_queue.lock();
@@ -42,6 +44,7 @@ where F: Fn() -> Result<Option<crossterm::event::Event>> {
let _ = events.pop_back();
}
events.push_front(event);
+ let _send_result = new_event_sender.send(());
},
}
}