summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-05-02 15:15:43 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-07-01 22:02:13 +0200
commit3b270cbdabdcd9e0372443e80e0dd65f1bfeeb09 (patch)
treef5cca8ad2bec0863e656baee3dedf5e617d4ee51
parent1f0cdb12c8e83ea7cc5663c8baad5d07c239a171 (diff)
Migrate tedge_mapper to tedge_api based setupfeature/add_tedge_api/port-existing-binaries
This patch ports the tedge_mapper main() to be based on the tedge_api and tedge_core implementations. This patch is as minimal as possible, to show only the very first step towards moving the tedge_agent implementation to the new architecture and ultimatively to a single-binary, all-batteries-included, multi-component architecture based application infrastructure. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock3
-rw-r--r--crates/core/tedge_mapper/Cargo.toml4
-rw-r--r--crates/core/tedge_mapper/src/component.rs2
-rw-r--r--crates/core/tedge_mapper/src/main.rs133
4 files changed, 121 insertions, 21 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f9049cc6..7e076f41 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3665,6 +3665,7 @@ dependencies = [
"download",
"flockfile",
"futures",
+ "miette",
"mockall",
"mqtt_channel",
"mqtt_tests",
@@ -3673,7 +3674,9 @@ dependencies = [
"serde_json",
"serial_test",
"structopt",
+ "tedge_api",
"tedge_config",
+ "tedge_core",
"tedge_users",
"tedge_utils",
"tempfile",
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index f6da873c..b44e2dfd 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -52,6 +52,10 @@ thiserror = "1.0"
tokio = { version = "1.8", features = ["rt", "sync", "time"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
+miette = "4.4"
+
+tedge_api = { path = "../../../crates/core/tedge_api" }
+tedge_core = { path = "../../../crates/core/tedge_core" }
[dev-dependencies]
assert_matches = "1.5"
diff --git a/crates/core/tedge_mapper/src/component.rs b/crates/core/tedge_mapper/src/component.rs
index 6e5fedc6..7be67226 100644
--- a/crates/core/tedge_mapper/src/component.rs
+++ b/crates/core/tedge_mapper/src/component.rs
@@ -2,6 +2,6 @@ use async_trait::async_trait;
use tedge_config::TEdgeConfig;
#[async_trait]
-pub trait TEdgeComponent {
+pub trait TEdgeComponent: Send {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>;
}
diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs
index b28cb0ee..6beaec42 100644
--- a/crates/core/tedge_mapper/src/main.rs
+++ b/crates/core/tedge_mapper/src/main.rs
@@ -7,7 +7,10 @@ use crate::{
};
use flockfile::check_another_instance_is_not_running;
use structopt::*;
+use tedge_api::plugin::{BuiltPlugin, HandleTypes, PluginDeclaration, PluginExt};
+use tedge_api::{Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError};
use tedge_config::*;
+use tedge_core::TedgeApplication;
use tedge_utils::paths::home_dir;
mod az_converter;
@@ -36,7 +39,7 @@ fn lookup_component(component_name: &MapperName) -> Box<dyn TEdgeComponent> {
}
}
-#[derive(Debug, StructOpt)]
+#[derive(Clone, Debug, StructOpt)]
#[structopt(
name = clap::crate_name!(),
version = clap::crate_version!(),
@@ -64,7 +67,7 @@ pub struct MapperOpt {
pub clear: bool,
}
-#[derive(Debug, StructOpt)]
+#[derive(Clone, Debug, StructOpt)]
pub enum MapperName {
Az,
C8y,
@@ -83,27 +86,117 @@ impl fmt::Display for MapperName {
}
}
-#[tokio::main]
-async fn main() -> anyhow::Result<()> {
- let mapper = MapperOpt::from_args();
- tedge_utils::logging::initialise_tracing_subscriber(mapper.debug);
-
- let component = lookup_component(&mapper.name);
- let config = tedge_config()?;
- // Run only one instance of a mapper
- let _flock = check_another_instance_is_not_running(&mapper.name.to_string())?;
-
- if mapper.init {
- let mut mapper = CumulocitySoftwareManagementMapper::new();
- mapper.init_session().await
- } else if mapper.clear {
- let mut mapper = CumulocitySoftwareManagementMapper::new();
- mapper.clear_session().await
- } else {
- component.start(config).await
+struct TedgeMapperPluginBuilder {
+ opt: MapperOpt,
+}
+
+#[async_trait::async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for TedgeMapperPluginBuilder {
+ fn kind_name() -> &'static str {
+ "tedge_mapper"
+ }
+
+ fn kind_message_types() -> HandleTypes {
+ TedgeMapperPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(&self, _config: &PluginConfiguration) -> Result<(), PluginError> {
+ // hardcoded, nothing not verify
+ Ok(())
+ }
+
+ async fn instantiate(
+ &self,
+ _config: PluginConfiguration,
+ _cancellation_token: tedge_api::CancellationToken,
+ _core_comms: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ // Run only one instance of a mapper
+ let flock = check_another_instance_is_not_running(&self.opt.name.to_string())
+ .map_err(|e| miette::miette!(e))?;
+
+ Ok(TedgeMapperPlugin {
+ opt: self.opt.clone(),
+ flock,
+ }
+ .finish())
+ }
+}
+
+struct TedgeMapperPlugin {
+ opt: MapperOpt,
+
+ #[allow(dead_code)]
+ flock: flockfile::Flockfile, // must be alive over app runtime for now
+}
+
+impl PluginDeclaration for TedgeMapperPlugin {
+ type HandledMessages = ();
+}
+
+#[async_trait::async_trait]
+impl Plugin for TedgeMapperPlugin {
+ async fn main(&self) -> Result<(), PluginError> {
+ if self.opt.init {
+ let mut mapper = CumulocitySoftwareManagementMapper::new();
+ mapper
+ .init_session()
+ .await
+ .map_err(|e| miette::miette!(e))?;
+ } else if self.opt.clear {
+ let mut mapper = CumulocitySoftwareManagementMapper::new();
+ mapper
+ .clear_session()
+ .await
+ .map_err(|e| miette::miette!(e))?;
+ } else {
+ let config = tedge_config().map_err(|e| miette::miette!(e))?;
+ let component = lookup_component(&self.opt.name);
+ let f = component.start(config);
+ let r = f.await;
+ r.map_err(|e| miette::miette!(e))?;
+ }
+ Ok(())
}
}
+const CONFIG: &str = r#"
+# Because there is no communication between plugins in this case, we minimize the internal communication buffer size
+# it cannot be zero though
+communication_buffer_size = 1
+plugin_shutdown_timeout_ms = 1000 # 1s
+
+[plugins.mapper]
+kind = "tedge_mapper"
+[plugins.mapper.configuration]
+# empty
+"#;
+
+#[cfg(test)]
+#[test]
+fn test_config_deser() {
+ let cfg: Result<tedge_core::configuration::TedgeConfiguration, _> = toml::from_str(CONFIG);
+ assert!(
+ cfg.is_ok(),
+ "Failed to deserialize hardcoded config: {:?}",
+ cfg.unwrap_err()
+ );
+}
+
+#[tokio::main]
+async fn main() -> miette::Result<()> {
+ let opt = MapperOpt::from_args();
+ tedge_utils::logging::initialise_tracing_subscriber(opt.debug);
+
+ let (_, app) = TedgeApplication::builder()
+ .with_plugin_builder(TedgeMapperPluginBuilder { opt })?
+ .with_config(toml::from_str(CONFIG).unwrap())?; // unwrap tested in `test_config_deser`
+
+ app.run().await?;
+
+ Ok(())
+}
+
fn tedge_config() -> anyhow::Result<TEdgeConfig> {
let config_repository = config_repository()?;
Ok(config_repository.load()?)