#![allow(clippy::unit_arg)] use crate::signal::os::{OsExtraData, OsStorage}; use crate::sync::mpsc::Sender; use once_cell::sync::Lazy; use std::ops; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; pub(crate) type EventId = usize; /// State for a specific event, whether a notification is pending delivery, /// and what listeners are registered. #[derive(Default, Debug)] pub(crate) struct EventInfo { pending: AtomicBool, recipients: Mutex>>, } /// An interface for retrieving the `EventInfo` for a particular eventId. pub(crate) trait Storage { /// Gets the `EventInfo` for `id` if it exists. fn event_info(&self, id: EventId) -> Option<&EventInfo>; /// Invokes `f` once for each defined `EventInfo` in this storage. fn for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo); } impl Storage for Vec { fn event_info(&self, id: EventId) -> Option<&EventInfo> { self.get(id) } fn for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo), { self.iter().for_each(f) } } /// An interface for initializing a type. Useful for situations where we cannot /// inject a configured instance in the constructor of another type. pub(crate) trait Init { fn init() -> Self; } /// Manages and distributes event notifications to any registered listeners. /// /// Generic over the underlying storage to allow for domain specific /// optimizations (e.g. eventIds may or may not be contiguous). #[derive(Debug)] pub(crate) struct Registry { storage: S, } impl Registry { fn new(storage: S) -> Self { Self { storage } } } impl Registry { /// Registers a new listener for `event_id`. fn register_listener(&self, event_id: EventId, listener: Sender<()>) { self.storage .event_info(event_id) .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) .recipients .lock() .unwrap() .push(listener); } /// Marks `event_id` as having been delivered, without broadcasting it to /// any listeners. fn record_event(&self, event_id: EventId) { if let Some(event_info) = self.storage.event_info(event_id) { event_info.pending.store(true, Ordering::SeqCst) } } /// Broadcasts all previously recorded events to their respective listeners. /// /// Returns `true` if an event was delivered to at least one listener. fn broadcast(&self) -> bool { use crate::sync::mpsc::error::TrySendError; let mut did_notify = false; self.storage.for_each(|event_info| { // Any signal of this kind arrived since we checked last? if !event_info.pending.swap(false, Ordering::SeqCst) { return; } let mut recipients = event_info.recipients.lock().unwrap(); // Notify all waiters on this signal that the signal has been // received. If we can't push a message into the queue then we don't // worry about it as everything is coalesced anyway. If the channel // has gone away then we can remove that slot. for i in (0..recipients.len()).rev() { match recipients[i].try_send(()) { Ok(()) => did_notify = true, Err(TrySendError::Closed(..)) => { recipients.swap_remove(i); } // Channel is full, ignore the error since the // receiver has already been woken up Err(_) => {} } } }); did_notify } } pub(crate) struct Globals { extra: OsExtraData, registry: Registry, } impl ops::Deref for Globals { type Target = OsExtraData; fn deref(&self) -> &Self::Target { &self.extra } } impl Globals { /// Registers a new listener for `event_id`. pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) { self.registry.register_listener(event_id, listener); } /// Marks `event_id` as having been delivered, without broadcasting it to /// any listeners. pub(crate) fn record_event(&self, event_id: EventId) { self.registry.record_event(event_id); } /// Broadcasts all previously recorded events to their respective listeners. /// /// Returns `true` if an event was delivered to at least one listener. pub(crate) fn broadcast(&self) -> bool { self.registry.broadcast() } #[cfg(unix)] pub(crate) fn storage(&self) -> &OsStorage { &self.registry.storage } } pub(crate) fn globals() -> Pin<&'static Globals> where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init, { static GLOBALS: Lazy>> = Lazy::new(|| { Box::pin(Globals { extra: OsExtraData::init(), registry: Registry::new(OsStorage::init()), }) }); GLOBALS.as_ref() } #[cfg(all(test, not(loom)))] mod tests { use super::*; use crate::runtime::{self, Runtime}; use crate::sync::{mpsc, oneshot}; use futures::future; #[test] fn smoke() { let rt = rt(); rt.block_on(async move { let registry = Registry::new(vec![ EventInfo::default(), EventInfo::default(), EventInfo::default(), ]); let (first_tx, first_rx) = mpsc::channel(3); let (second_tx, second_rx) = mpsc::channel(3); let (third_tx, third_rx) = mpsc::channel(3); registry.register_listener(0, first_tx); registry.register_listener(1, second_tx); registry.register_listener(2, third_tx); let (fire, wait) = oneshot::channel(); crate::spawn(async { wait.await.expect("wait failed"); // Record some events which should get coalesced registry.record_event(0); registry.record_event(0); registry.record_event(1); registry.record_event(1); registry.broadcast(); // Send subsequent signal registry.record_event(0); registry.broadcast(); drop(registry); }); let _ = fire.send(()); let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx)); let (first_results, second_results, third_results) = all.await; assert_eq!(2, first_results.len()); assert_eq!(1, second_results.len()); assert_eq!(0, third_results.len()); }); } #[test] #[should_panic = "invalid event_id: 1"] fn register_panics_on_invalid_input() { let registry = Registry::new(vec![EventInfo::default()]); let (tx, _) = mpsc::channel(1); registry.register_listener(1, tx); } #[test] fn record_invalid_event_does_nothing() { let registry = Registry::new(vec![EventInfo::default()]); registry.record_event(42); } #[test] fn broadcast_cleans_up_disconnected_listeners() { let rt = Runtime::new().unwrap(); rt.block_on(async { let registry = Registry::new(vec![EventInfo::default()]); let (first_tx, first_rx) = mpsc::channel(1); let (second_tx, second_rx) = mpsc::channel(1); let (third_tx, third_rx) = mpsc::channel(1); registry.register_listener(0, first_tx); registry.register_listener(0, second_tx); registry.register_listener(0, third_tx); drop(first_rx); drop(second_rx); let (fire, wait) = oneshot::channel(); crate::spawn(async { wait.await.expect("wait failed"); registry.record_event(0); registry.broadcast(); assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len()); drop(registry); }); let _ = fire.send(()); let results = collect(third_rx).await; assert_eq!(1, results.len()); }); } #[test] fn broadcast_returns_if_at_least_one_event_fired() { let registry = Registry::new(vec![EventInfo::default()]); registry.record_event(0); assert_eq!(false, registry.broadcast()); let (first_tx, first_rx) = mpsc::channel(1); let (second_tx, second_rx) = mpsc::channel(1); registry.register_listener(0, first_tx); registry.register_listener(0, second_tx); registry.record_event(0); assert_eq!(true, registry.broadcast()); drop(first_rx); registry.record_event(0); assert_eq!(false, registry.broadcast()); drop(second_rx); } fn rt() -> Runtime { runtime::Builder::new_current_thread().build().unwrap() } async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> { let mut ret = vec![]; while let Some(v) = rx.recv().await { ret.push(v); } ret } }