diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-02 15:15:43 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-07-01 22:02:13 +0200 |
commit | 3b270cbdabdcd9e0372443e80e0dd65f1bfeeb09 (patch) | |
tree | f5cca8ad2bec0863e656baee3dedf5e617d4ee51 | |
parent | 1f0cdb12c8e83ea7cc5663c8baad5d07c239a171 (diff) |
Migrate tedge_mapper to tedge_api based setupfeature/add_tedge_api/port-existing-binaries
This patch ports the tedge_mapper main() to be based on the tedge_api and
tedge_core implementations.
This patch is as minimal as possible, to show only the very first step
towards moving the tedge_agent implementation to the new architecture
and ultimatively to a single-binary, all-batteries-included,
multi-component architecture based application infrastructure.
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 3 | ||||
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/component.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 133 |
4 files changed, 121 insertions, 21 deletions
@@ -3665,6 +3665,7 @@ dependencies = [ "download", "flockfile", "futures", + "miette", "mockall", "mqtt_channel", "mqtt_tests", @@ -3673,7 +3674,9 @@ dependencies = [ "serde_json", "serial_test", "structopt", + "tedge_api", "tedge_config", + "tedge_core", "tedge_users", "tedge_utils", "tempfile", diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index f6da873c..b44e2dfd 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -52,6 +52,10 @@ thiserror = "1.0" tokio = { version = "1.8", features = ["rt", "sync", "time"] } toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } +miette = "4.4" + +tedge_api = { path = "../../../crates/core/tedge_api" } +tedge_core = { path = "../../../crates/core/tedge_core" } [dev-dependencies] assert_matches = "1.5" diff --git a/crates/core/tedge_mapper/src/component.rs b/crates/core/tedge_mapper/src/component.rs index 6e5fedc6..7be67226 100644 --- a/crates/core/tedge_mapper/src/component.rs +++ b/crates/core/tedge_mapper/src/component.rs @@ -2,6 +2,6 @@ use async_trait::async_trait; use tedge_config::TEdgeConfig; #[async_trait] -pub trait TEdgeComponent { +pub trait TEdgeComponent: Send { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>; } diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index b28cb0ee..6beaec42 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -7,7 +7,10 @@ use crate::{ }; use flockfile::check_another_instance_is_not_running; use structopt::*; +use tedge_api::plugin::{BuiltPlugin, HandleTypes, PluginDeclaration, PluginExt}; +use tedge_api::{Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError}; use tedge_config::*; +use tedge_core::TedgeApplication; use tedge_utils::paths::home_dir; mod az_converter; @@ -36,7 +39,7 @@ fn lookup_component(component_name: &MapperName) -> Box<dyn TEdgeComponent> { } } -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] #[structopt( name = clap::crate_name!(), version = clap::crate_version!(), @@ -64,7 +67,7 @@ pub struct MapperOpt { pub clear: bool, } -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub enum MapperName { Az, C8y, @@ -83,27 +86,117 @@ impl fmt::Display for MapperName { } } -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let mapper = MapperOpt::from_args(); - tedge_utils::logging::initialise_tracing_subscriber(mapper.debug); - - let component = lookup_component(&mapper.name); - let config = tedge_config()?; - // Run only one instance of a mapper - let _flock = check_another_instance_is_not_running(&mapper.name.to_string())?; - - if mapper.init { - let mut mapper = CumulocitySoftwareManagementMapper::new(); - mapper.init_session().await - } else if mapper.clear { - let mut mapper = CumulocitySoftwareManagementMapper::new(); - mapper.clear_session().await - } else { - component.start(config).await +struct TedgeMapperPluginBuilder { + opt: MapperOpt, +} + +#[async_trait::async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for TedgeMapperPluginBuilder { + fn kind_name() -> &'static str { + "tedge_mapper" + } + + fn kind_message_types() -> HandleTypes { + TedgeMapperPlugin::get_handled_types() + } + + async fn verify_configuration(&self, _config: &PluginConfiguration) -> Result<(), PluginError> { + // hardcoded, nothing not verify + Ok(()) + } + + async fn instantiate( + &self, + _config: PluginConfiguration, + _cancellation_token: tedge_api::CancellationToken, + _core_comms: &PD, + ) -> Result<BuiltPlugin, PluginError> { + // Run only one instance of a mapper + let flock = check_another_instance_is_not_running(&self.opt.name.to_string()) + .map_err(|e| miette::miette!(e))?; + + Ok(TedgeMapperPlugin { + opt: self.opt.clone(), + flock, + } + .finish()) + } +} + +struct TedgeMapperPlugin { + opt: MapperOpt, + + #[allow(dead_code)] + flock: flockfile::Flockfile, // must be alive over app runtime for now +} + +impl PluginDeclaration for TedgeMapperPlugin { + type HandledMessages = (); +} + +#[async_trait::async_trait] +impl Plugin for TedgeMapperPlugin { + async fn main(&self) -> Result<(), PluginError> { + if self.opt.init { + let mut mapper = CumulocitySoftwareManagementMapper::new(); + mapper + .init_session() + .await + .map_err(|e| miette::miette!(e))?; + } else if self.opt.clear { + let mut mapper = CumulocitySoftwareManagementMapper::new(); + mapper + .clear_session() + .await + .map_err(|e| miette::miette!(e))?; + } else { + let config = tedge_config().map_err(|e| miette::miette!(e))?; + let component = lookup_component(&self.opt.name); + let f = component.start(config); + let r = f.await; + r.map_err(|e| miette::miette!(e))?; + } + Ok(()) } } +const CONFIG: &str = r#" +# Because there is no communication between plugins in this case, we minimize the internal communication buffer size +# it cannot be zero though +communication_buffer_size = 1 +plugin_shutdown_timeout_ms = 1000 # 1s + +[plugins.mapper] +kind = "tedge_mapper" +[plugins.mapper.configuration] +# empty +"#; + +#[cfg(test)] +#[test] +fn test_config_deser() { + let cfg: Result<tedge_core::configuration::TedgeConfiguration, _> = toml::from_str(CONFIG); + assert!( + cfg.is_ok(), + "Failed to deserialize hardcoded config: {:?}", + cfg.unwrap_err() + ); +} + +#[tokio::main] +async fn main() -> miette::Result<()> { + let opt = MapperOpt::from_args(); + tedge_utils::logging::initialise_tracing_subscriber(opt.debug); + + let (_, app) = TedgeApplication::builder() + .with_plugin_builder(TedgeMapperPluginBuilder { opt })? + .with_config(toml::from_str(CONFIG).unwrap())?; // unwrap tested in `test_config_deser` + + app.run().await?; + + Ok(()) +} + fn tedge_config() -> anyhow::Result<TEdgeConfig> { let config_repository = config_repository()?; Ok(config_repository.load()?) |