diff options
Diffstat (limited to 'crates/core/tedge_lib/tests/test_pubsub.rs')
-rw-r--r-- | crates/core/tedge_lib/tests/test_pubsub.rs | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/crates/core/tedge_lib/tests/test_pubsub.rs b/crates/core/tedge_lib/tests/test_pubsub.rs new file mode 100644 index 00000000..f303e90f --- /dev/null +++ b/crates/core/tedge_lib/tests/test_pubsub.rs @@ -0,0 +1,300 @@ +use futures::FutureExt; + +use tedge_core::TedgeApplication; + +#[derive(Clone, Debug, bevy_reflect::TypeUuid)] +#[uuid = "38c02e81-0a0b-4eca-891b-9bb2e844ef40"] +pub struct Msg; + +impl tedge_api::Message for Msg {} + +mod publisher { + extern crate tedge_lib; + + use super::Msg; + + use async_trait::async_trait; + use tedge_api::plugin::PluginExt; + use tedge_api::Plugin; + use tedge_api::PluginBuilder; + use tedge_api::PluginConfiguration; + use tedge_api::PluginDirectory; + use tedge_api::PluginError; + use tracing::trace; + + pub struct PubBuilder; + + #[async_trait::async_trait] + impl<PD: PluginDirectory> PluginBuilder<PD> for PubBuilder { + fn kind_name() -> &'static str { + "pub" + } + + async fn verify_configuration( + &self, + _config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + Ok(()) + } + + async fn instantiate( + &self, + _config: PluginConfiguration, + _cancellation_token: tedge_api::CancellationToken, + _plugin_dir: &PD, + ) -> Result<tedge_api::plugin::BuiltPlugin, PluginError> { + Ok(Pub { + sender: tokio::sync::broadcast::channel(10).0, + } + .finish()) + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + Pub::get_handled_types() + } + } + + #[derive(Debug)] + pub struct Pub { + sender: tokio::sync::broadcast::Sender<Msg>, + } + + tedge_api::make_receiver_bundle!(struct SubReq(tedge_lib::pubsub::SubscribeRequest<Msg>)); + + impl tedge_api::plugin::PluginDeclaration for Pub { + type HandledMessages = (tedge_lib::pubsub::SubscribeRequest<Msg>,); + } + + #[async_trait] + impl Plugin for Pub { + async fn main(&self) -> Result<(), PluginError> { + loop { + match self.sender.send(Msg) { + Ok(_) => { + trace!("Send message"); + break; + } + Err(e) => { + trace!("Error while trying to send message: {:?}", e); + } + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + Ok(()) + } + } + + #[async_trait] + impl tedge_api::plugin::Handle<tedge_lib::pubsub::SubscribeRequest<Msg>> for Pub { + async fn handle_message( + &self, + _message: tedge_lib::pubsub::SubscribeRequest<Msg>, + sender: tedge_api::address::ReplySenderFor<tedge_lib::pubsub::SubscribeRequest<Msg>>, + ) -> Result<(), PluginError> { + let _ = sender + .reply(tedge_lib::pubsub::SubscribeReply::new_from_sender( + &self.sender, + )) + .expect("Failed to reply"); + Ok(()) + } + } +} + +mod subscriber { + use super::Msg; + + use async_trait::async_trait; + use tedge_api::plugin::PluginExt; + use tedge_api::Plugin; + use tedge_api::PluginBuilder; + use tedge_api::PluginConfiguration; + use tedge_api::PluginDirectory; + use tedge_api::PluginError; + use tracing::trace; + + pub struct SubPluginBuilder; + + #[derive(Debug, serde::Deserialize)] + pub struct SubConfig { + target: tedge_lib::config::Address, + } + + #[derive(Debug, miette::Diagnostic, thiserror::Error)] + pub enum Error { + #[error("Failed to parse configuration")] + ConfigParseFailed(#[from] toml::de::Error), + } + + tedge_api::make_receiver_bundle!(pub struct SubscribeRequestBundle(tedge_lib::pubsub::SubscribeRequest<Msg>)); + + #[async_trait::async_trait] + impl<PD: PluginDirectory> PluginBuilder<PD> for SubPluginBuilder { + fn kind_name() -> &'static str { + "sub" + } + + async fn verify_configuration( + &self, + config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + config + .clone() + .try_into() + .map(|_: SubConfig| ()) + .map_err(Error::from) + .map_err(PluginError::from) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + _cancellation_token: tedge_api::CancellationToken, + plugin_dir: &PD, + ) -> Result<tedge_api::plugin::BuiltPlugin, PluginError> { + let config: SubConfig = config + .clone() + .try_into() + .map_err(Error::from) + .map_err(PluginError::from)?; + let publisher_addr = + plugin_dir.get_address_for::<SubscribeRequestBundle>(config.target.as_ref())?; + Ok(Sub { publisher_addr }.finish()) + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + Sub::get_handled_types() + } + } + + struct Sub { + publisher_addr: tedge_api::Address<SubscribeRequestBundle>, + } + + impl tedge_api::plugin::PluginDeclaration for Sub { + type HandledMessages = (); + } + + #[async_trait] + impl Plugin for Sub { + async fn main(&self) -> Result<(), PluginError> { + let mut receiver = self + .publisher_addr + .send_and_wait(tedge_lib::pubsub::SubscribeRequest::new()) + .await + .expect("Sending subscribe request failed") + .wait_for_reply(std::time::Duration::from_millis(100)) + .await + .expect("No reply for my subscriberequest received") + .into_inner() + .expect("Subscribing failed"); + + loop { + match receiver.recv().await { + Ok(Msg) => { + trace!("Received message"); + } + + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + trace!("Channel closed"); + break; + } + + Err(tokio::sync::broadcast::error::RecvError::Lagged(u)) => { + trace!("Lagged {}", u); + } + } + } + Ok(()) + } + } +} + +#[test] +fn test_pubsub() -> Result<(), Box<(dyn std::error::Error + 'static)>> { + let _ = tracing_subscriber::fmt::try_init(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let res = rt.block_on(async { + let config_file_path = { + let dir = std::env::current_exe() + .unwrap() + .parent() + .unwrap() + .join("../../../"); + let mut name = std::path::PathBuf::from(std::file!()); + name.set_extension("toml"); + let filepath = dir.join(name); + assert!( + filepath.exists(), + "Config file does not exist: {}", + filepath.display() + ); + filepath + }; + let (cancel_sender, application) = TedgeApplication::builder() + .with_plugin_builder(crate::publisher::PubBuilder {}) + .with_plugin_builder(crate::subscriber::SubPluginBuilder {}) + .with_config_from_path(config_file_path) + .await?; + + let mut run_fut = tokio::spawn(application.run()); + + // send a cancel request to the app after 1 sec + let mut cancel_fut = Box::pin({ + tokio::time::sleep(std::time::Duration::from_secs(1)).then(|_| async { + tracing::info!("Cancelling app now"); + cancel_sender.cancel_app() + }) + }); + + // Abort the test after 5 secs, because it seems not to stop the application + let mut test_abort = Box::pin({ + tokio::time::sleep(std::time::Duration::from_secs(5)).then(|_| async { + tracing::info!("Aborting test"); + }) + }); + + loop { + tracing::info!("Looping"); + tokio::select! { + _test_abort = &mut test_abort => { + tracing::error!("Test aborted"); + run_fut.abort(); + miette::bail!("Timeout reached, shutdown did not happen") + }, + + res = &mut run_fut => { + tracing::info!("application.run() returned"); + if res.is_err() { + panic!("result = {:?}", res); + } + break; + }, + + _ = &mut cancel_fut => { + panic!("Cancellation happened..."); + } + } + } + + Ok(()) + }); + + rt.shutdown_background(); + if let Err(e) = res { + panic!("{e:?}"); + } + Ok(()) +} |