summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_core/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_core/src/lib.rs')
-rw-r--r--crates/core/tedge_core/src/lib.rs266
1 files changed, 266 insertions, 0 deletions
diff --git a/crates/core/tedge_core/src/lib.rs b/crates/core/tedge_core/src/lib.rs
new file mode 100644
index 00000000..ede90e39
--- /dev/null
+++ b/crates/core/tedge_core/src/lib.rs
@@ -0,0 +1,266 @@
+#![doc = include_str!("../README.md")]
+
+use std::collections::HashMap;
+use std::path::Path;
+use std::path::PathBuf;
+
+use itertools::Itertools;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::PluginBuilder;
+use tokio_util::sync::CancellationToken;
+use tracing::debug;
+
+use tracing::event;
+
+use tracing::Level;
+
+mod communication;
+pub mod configuration;
+mod core_task;
+pub mod errors;
+mod message_handler;
+mod plugin_task;
+mod reactor;
+mod utils;
+
+pub use crate::communication::PluginDirectory;
+use crate::configuration::PluginInstanceConfiguration;
+use crate::configuration::TedgeConfiguration;
+
+use crate::errors::PluginConfigurationError;
+use crate::errors::PluginKindUnknownError;
+use crate::errors::TedgeApplicationBuilderError;
+use crate::errors::TedgeApplicationError;
+
+/// A TedgeApplication
+///
+/// This is the main entry point for building a thin-edge application. It provides functions for
+/// setting up the application and then run it.
+///
+/// # Details
+///
+/// This type implements only the setup functionality, how to construct an application object. The
+/// implementation of the orchestration and lifecycle of the application is implemented in
+/// [`crate::reactor::Reactor`]. Note that this is solely for code seperation.
+pub struct TedgeApplication {
+ config_path: PathBuf,
+ config: TedgeConfiguration,
+ cancellation_token: CancellationToken,
+ plugin_builders: HashMap<String, (HandleTypes, Box<dyn PluginBuilder<PluginDirectory>>)>,
+}
+
+impl std::fmt::Debug for TedgeApplication {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("TedgeApplication").finish()
+ }
+}
+
+impl TedgeApplication {
+ /// Retrieve a [`TedgeApplicationBuilder`] object that can be used to construct a
+ /// [`TedgeApplication`] object easily.
+ pub fn builder() -> TedgeApplicationBuilder {
+ TedgeApplicationBuilder {
+ cancellation_token: CancellationToken::new(),
+ plugin_builders: HashMap::new(),
+ errors: vec![],
+ }
+ }
+
+ pub(crate) fn config_path(&self) -> &Path {
+ &self.config_path
+ }
+
+ pub(crate) fn config(&self) -> &TedgeConfiguration {
+ &self.config
+ }
+
+ pub(crate) fn plugin_builders(
+ &self,
+ ) -> &HashMap<String, (HandleTypes, Box<dyn PluginBuilder<PluginDirectory>>)> {
+ &self.plugin_builders
+ }
+
+ pub(crate) fn cancellation_token(&self) -> &CancellationToken {
+ &self.cancellation_token
+ }
+
+ /// Run the TedgeApplication that has been setup for running
+ ///
+ /// This function runs as long as there is no shutdown signalled to the application.
+ ///
+ /// # Note
+ ///
+ /// This function makes sure that the configuration is verified before the plugins are started.
+ /// So there is no need to call [TedgeApplication::verify_configuration] before this.
+ pub async fn run(self) -> Result<(), TedgeApplicationError> {
+ crate::reactor::Reactor(self).run().await
+ }
+
+ /// Check whether all configured plugin kinds exist (are available in registered plugins)
+ /// and that the configurations for the individual plugins are actually valid.
+ #[tracing::instrument(skip(self))]
+ pub async fn verify_configurations(&self) -> Result<(), TedgeApplicationError> {
+ use futures::stream::StreamExt;
+
+ debug!("Verifying configurations");
+ let results = self
+ .config()
+ .plugins()
+ .iter()
+ .map(
+ |(plugin_name, plugin_cfg): (&String, &PluginInstanceConfiguration)| {
+ let plugin_name = plugin_name.to_string();
+ async move {
+ if let Some((_, builder)) =
+ self.plugin_builders().get(plugin_cfg.kind().as_ref())
+ {
+ debug!("Verifying {}", plugin_cfg.kind().as_ref());
+ let res = plugin_cfg
+ .configuration()
+ .verify_with_builder(&plugin_name, &**builder, self.config_path())
+ .await;
+
+ Ok(res?)
+ } else {
+ Err(PluginConfigurationError::UnknownKind(
+ PluginKindUnknownError {
+ name: plugin_cfg.kind().as_ref().to_string(),
+ alternatives: None,
+ },
+ ))
+ }
+ }
+ },
+ )
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<Result<_, _>>>()
+ .await;
+
+ let (_oks, errors): (Vec<_>, Vec<_>) = results.into_iter().partition_result();
+
+ if !errors.is_empty() {
+ return Err(TedgeApplicationError::PluginConfigVerificationsError { errors });
+ }
+
+ Ok(())
+ }
+}
+
+/// Helper type for constructing a [`TedgeApplication`]
+pub struct TedgeApplicationBuilder {
+ cancellation_token: CancellationToken,
+ plugin_builders: HashMap<String, (HandleTypes, Box<dyn PluginBuilder<PluginDirectory>>)>,
+ errors: Vec<TedgeApplicationBuilderError>,
+}
+
+impl TedgeApplicationBuilder {
+ /// Register a [`tedge_api::PluginBuilder`]
+ ///
+ /// This function can be used to register a [`tedge_api::PluginBuilder`] within the
+ /// [`TedgeApplication`] which is about to be built.
+ ///
+ /// Registering a [`PluginBuilder`] does not mean that a plugin from this builder will be
+ /// running once the application starts up, but merely that the application _knows_ about this
+ /// plugin builder and is able to construct a plugin with this builder, if necessary (e.g. if
+ /// configured in a configuration file).
+ pub fn with_plugin_builder<PB: PluginBuilder<PluginDirectory>>(mut self, builder: PB) -> Self {
+ let handle_types = PB::kind_message_types();
+ let kind_name = PB::kind_name();
+ event!(
+ Level::INFO,
+ plugin.kind = kind_name,
+ plugin.handled_types = ?handle_types,
+ "Registered plugin builder"
+ );
+
+ if self.plugin_builders.contains_key(kind_name) {
+ self.errors
+ .push(TedgeApplicationBuilderError::DuplicateKind {
+ name: kind_name.to_string(),
+ builder_name: std::any::type_name::<PB>(),
+ });
+ return self;
+ }
+
+ self.plugin_builders
+ .insert(kind_name.to_string(), (handle_types, Box::new(builder)));
+ self
+ }
+
+ /// Finalize the [`TedgeApplication`] by instantiating it with a `TedgeConfiguration`]
+ ///
+ /// This instantiates the application object, but does not run it.
+ pub async fn with_config_from_path(
+ self,
+ config_path: impl AsRef<Path>,
+ ) -> Result<(TedgeApplicationCancelSender, TedgeApplication), TedgeApplicationError> {
+ if !self.errors.is_empty() {
+ return Err(TedgeApplicationError::ApplicationBuilderErrors {
+ errors: self.errors,
+ });
+ }
+ let config_path = config_path.as_ref();
+ debug!(?config_path, "Loading config from path");
+
+ let config_str = tokio::fs::read_to_string(&config_path).await.map_err(|e| {
+ TedgeApplicationError::ApplicationBuilderErrors {
+ errors: vec![TedgeApplicationBuilderError::PathNotReadable {
+ path: config_path.to_path_buf(),
+ error: e,
+ }],
+ }
+ })?;
+ let config = toml::de::from_str(&config_str).map_err(|e| {
+ TedgeApplicationError::ApplicationBuilderErrors {
+ errors: vec![TedgeApplicationBuilderError::ConfigNotParseable {
+ path: config_path.to_path_buf(),
+ error: e,
+ }],
+ }
+ })?;
+ let cancellation = TedgeApplicationCancelSender(self.cancellation_token.clone());
+ let app = TedgeApplication {
+ config_path: config_path.to_path_buf(),
+ config,
+ cancellation_token: self.cancellation_token,
+ plugin_builders: self.plugin_builders,
+ };
+
+ Ok((cancellation, app))
+ }
+
+ /// Fetch the currently registered plugin kind names from the TedgeApplicationBuilder instance
+ pub fn plugin_kind_names(&self) -> impl Iterator<Item = &str> {
+ self.plugin_builders.keys().map(String::as_ref)
+ }
+
+ #[cfg(test)]
+ pub fn plugin_builders(
+ &self,
+ ) -> &HashMap<String, (HandleTypes, Box<dyn PluginBuilder<PluginDirectory>>)> {
+ &self.plugin_builders
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct TedgeApplicationCancelSender(CancellationToken);
+
+impl TedgeApplicationCancelSender {
+ pub fn cancel_app(&self) {
+ debug!("Cancelling application");
+ self.0.cancel()
+ }
+
+ pub fn is_cancelled(&self) -> bool {
+ self.0.is_cancelled()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn test_deser_empty_plugin_config() {
+ let s = "";
+ let _: tedge_api::PluginConfiguration = toml::de::from_str(s).unwrap();
+ }
+}