summaryrefslogtreecommitdiffstats
path: root/plugins/plugin_measurement_filter/src/plugin.rs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugin_measurement_filter/src/plugin.rs')
-rw-r--r--plugins/plugin_measurement_filter/src/plugin.rs66
1 files changed, 66 insertions, 0 deletions
diff --git a/plugins/plugin_measurement_filter/src/plugin.rs b/plugins/plugin_measurement_filter/src/plugin.rs
new file mode 100644
index 00000000..0e664d41
--- /dev/null
+++ b/plugins/plugin_measurement_filter/src/plugin.rs
@@ -0,0 +1,66 @@
+use async_trait::async_trait;
+
+use tedge_api::address::Address;
+use tedge_api::address::ReplySenderFor;
+use tedge_api::error::PluginError;
+use tedge_api::plugin::Handle;
+use tedge_api::plugin::Plugin;
+use tedge_lib::measurement::Measurement;
+use tracing::trace;
+
+use crate::builder::MeasurementReceiver;
+use crate::extractor::Extractable;
+use crate::filter::Filterable;
+
+#[derive(Debug)]
+pub struct MeasurementFilterPlugin {
+ target: Address<MeasurementReceiver>,
+ filtered_target: Option<Address<MeasurementReceiver>>,
+
+ extractor: crate::extractor::Extractor,
+ filter: crate::filter::Filter,
+}
+
+impl MeasurementFilterPlugin {
+ pub fn new(
+ target: Address<MeasurementReceiver>,
+ filtered_target: Option<Address<MeasurementReceiver>>,
+ extractor: crate::extractor::Extractor,
+ filter: crate::filter::Filter,
+ ) -> Self {
+ Self {
+ target,
+ filtered_target,
+ extractor,
+ filter,
+ }
+ }
+}
+
+impl tedge_api::plugin::PluginDeclaration for MeasurementFilterPlugin {
+ type HandledMessages = (Measurement,);
+}
+
+#[async_trait]
+impl Plugin for MeasurementFilterPlugin {}
+
+#[async_trait]
+impl Handle<Measurement> for MeasurementFilterPlugin {
+ #[tracing::instrument(name = "plugin.measurement_filter.handle_message", level = "trace")]
+ async fn handle_message(
+ &self,
+ message: Measurement,
+ _sender: ReplySenderFor<Measurement>,
+ ) -> Result<(), PluginError> {
+ trace!(plugin.extractor = ?self.extractor, ?message, "Extracting from message");
+ if let Some(value) = message.extract(&self.extractor.0) {
+ trace!(plugin.filter = ?self.filter, ?value, "Applying filter");
+ if value.apply_filter(&self.filter) {
+ let _ = self.target.send_and_wait(message).await;
+ } else if let Some(ftarget) = self.filtered_target.as_ref() {
+ let _ = ftarget.send_and_wait(message).await;
+ }
+ }
+ Ok(())
+ }
+}