summaryrefslogtreecommitdiffstats
path: root/plugins/plugin_inotify/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugin_inotify/src/lib.rs')
-rw-r--r--plugins/plugin_inotify/src/lib.rs228
1 files changed, 228 insertions, 0 deletions
diff --git a/plugins/plugin_inotify/src/lib.rs b/plugins/plugin_inotify/src/lib.rs
new file mode 100644
index 00000000..299279df
--- /dev/null
+++ b/plugins/plugin_inotify/src/lib.rs
@@ -0,0 +1,228 @@
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use async_trait::async_trait;
+use miette::IntoDiagnostic;
+use tokio_util::sync::CancellationToken;
+
+use tedge_api::plugin::BuiltPlugin;
+use tedge_api::plugin::HandleTypes;
+use tedge_api::plugin::PluginExt;
+use tedge_api::Address;
+use tedge_api::Plugin;
+use tedge_api::PluginBuilder;
+use tedge_api::PluginConfiguration;
+use tedge_api::PluginDirectory;
+use tedge_api::PluginError;
+use tedge_lib::mainloop::MainloopStopper;
+use tedge_lib::measurement::Measurement;
+use tedge_lib::measurement::MeasurementValue;
+use tracing::debug;
+use tracing::trace;
+use tracing::Instrument;
+
+mod config;
+use config::*;
+
+pub struct InotifyPluginBuilder;
+
+#[derive(Debug, miette::Diagnostic, thiserror::Error)]
+enum Error {
+ #[error("Failed to parse configuration")]
+ ConfigParseFailed(#[from] toml::de::Error),
+}
+
+tedge_api::make_receiver_bundle!(pub struct MeasurementReceiver(Measurement));
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for InotifyPluginBuilder {
+ fn kind_name() -> &'static str {
+ "inotify"
+ }
+
+ fn kind_configuration() -> Option<tedge_api::ConfigDescription> {
+ Some(<InotifyConfig as tedge_api::AsConfig>::as_config())
+ }
+
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized,
+ {
+ InotifyPlugin::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ config
+ .clone()
+ .try_into()
+ .map(|_: InotifyConfig| ())
+ .map_err(Error::from)
+ .map_err(PluginError::from)
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError> {
+ let config = config
+ .try_into::<InotifyConfig>()
+ .map_err(Error::from)
+ .map_err(PluginError::from)?;
+
+ let addr = config.target.build(plugin_dir)?;
+ Ok(InotifyPlugin::new(addr, config).finish())
+ }
+}
+
+#[derive(Debug)]
+struct InotifyPlugin {
+ addr: Address<MeasurementReceiver>,
+ config: InotifyConfig,
+ stopper: Option<MainloopStopper>,
+}
+
+impl tedge_api::plugin::PluginDeclaration for InotifyPlugin {
+ type HandledMessages = ();
+}
+
+impl InotifyPlugin {
+ fn new(addr: Address<MeasurementReceiver>, config: InotifyConfig) -> Self {
+ Self {
+ addr,
+ config,
+ stopper: None,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct State {
+ addr: Address<MeasurementReceiver>,
+ fail_on_err: bool,
+ inotify: inotify::Inotify,
+ watches: HashMap<inotify::WatchDescriptor, PathBuf>,
+}
+
+#[async_trait]
+impl Plugin for InotifyPlugin {
+ #[tracing::instrument(name = "plugin.inotify.start", skip(self))]
+ async fn start(&mut self) -> Result<(), PluginError> {
+ let mut inotify = inotify::Inotify::init().into_diagnostic()?;
+
+ let mut watches = HashMap::new();
+ for (path, modes) in self.config.pathes.iter() {
+ let mask = modes.iter().fold(inotify::WatchMask::empty(), |mask, el| {
+ mask | inotify::WatchMask::from(*el)
+ });
+
+ let descriptor = inotify.add_watch(path, mask).into_diagnostic()?;
+ watches.insert(descriptor, path.clone());
+ }
+
+ let state = State {
+ addr: self.addr.clone(),
+ fail_on_err: self.config.fail_on_err,
+ watches,
+ inotify,
+ };
+
+ let (stopper, mainloop) = tedge_lib::mainloop::Mainloop::detach(state);
+ self.stopper = Some(stopper);
+
+ let _ = tokio::spawn(
+ mainloop
+ .run(main_inotify)
+ .instrument(tracing::debug_span!("plugin.inotify.mainloop")),
+ );
+ trace!("Mainloop spawned");
+ Ok(())
+ }
+
+ #[tracing::instrument(name = "plugin.inotify.shutdown", skip(self))]
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ trace!("Shutdown");
+ if let Some(stopper) = self.stopper.take() {
+ stopper.stop();
+ }
+ Ok(())
+ }
+}
+
+#[tracing::instrument(name = "plugin.inotify.main", skip_all)]
+async fn main_inotify(
+ mut state: State,
+ stopper: tedge_api::CancellationToken,
+) -> Result<(), PluginError> {
+ use futures::stream::StreamExt;
+
+ let mut stream = state
+ .inotify
+ .event_stream(Vec::from([0; 1024]))
+ .into_diagnostic()?;
+
+ loop {
+ tokio::select! {
+ next_event = stream.next() => {
+ match next_event {
+ Some(Ok(event)) => {
+ debug!(?event, "Received inotify event");
+ if let Some(path) = state.watches.get(&event.wd) {
+ let value = MeasurementValue::Text(path.display().to_string());
+ let measurement = Measurement::new(mask_to_string(event.mask).to_string(), value);
+
+ let _ = state.addr.send_and_wait(measurement).await;
+ } else {
+ // what happened? Got a descriptor for a file that we don't watch?
+ }
+ },
+
+ Some(Err(err)) => {
+ debug!(?err, "Received inotify event");
+ if state.fail_on_err {
+ return Err(miette::miette!(err))?;
+ }
+ },
+
+ None => break, // according to inotify doc, this will never happen
+ }
+ },
+
+ _ = stopper.cancelled() => {
+ debug!("Stopping main loop");
+ break;
+ },
+ }
+ }
+
+ Ok(())
+}
+
+/// Transform an EventMask to a String
+///
+/// MUST only be called with one event type
+fn mask_to_string(mask: inotify::EventMask) -> &'static str {
+ match mask {
+ inotify::EventMask::ACCESS => "ACCESS",
+ inotify::EventMask::ATTRIB => "ATTRIB",
+ inotify::EventMask::CLOSE_WRITE => "CLOSE_WRITE",
+ inotify::EventMask::CLOSE_NOWRITE => "CLOSE_NOWRITE",
+ inotify::EventMask::CREATE => "CREATE",
+ inotify::EventMask::DELETE => "DELETE",
+ inotify::EventMask::DELETE_SELF => "DELETE_SELF",
+ inotify::EventMask::MODIFY => "MODIFY",
+ inotify::EventMask::MOVE_SELF => "MOVE_SELF",
+ inotify::EventMask::MOVED_FROM => "MOVED_FROM",
+ inotify::EventMask::MOVED_TO => "MOVED_TO",
+ inotify::EventMask::OPEN => "OPEN",
+ inotify::EventMask::IGNORED => "IGNORED",
+ inotify::EventMask::ISDIR => "ISDIR",
+ inotify::EventMask::Q_OVERFLOW => "Q_OVERFLOW",
+ inotify::EventMask::UNMOUNT => "UNMOUNT",
+ _ => "unknown",
+ }
+}