diff options
Diffstat (limited to 'crates/core/plugin_sm/src/plugin.rs')
-rw-r--r-- | crates/core/plugin_sm/src/plugin.rs | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/crates/core/plugin_sm/src/plugin.rs b/crates/core/plugin_sm/src/plugin.rs new file mode 100644 index 00000000..977ed88b --- /dev/null +++ b/crates/core/plugin_sm/src/plugin.rs @@ -0,0 +1,468 @@ +use crate::logged_command::LoggedCommand; +use async_trait::async_trait; +use csv::ReaderBuilder; +use download::Downloader; +use json_sm::*; +use std::{path::PathBuf, process::Output}; +use tokio::io::BufWriter; +use tokio::{fs::File, io::AsyncWriteExt}; +use tracing::error; + +#[async_trait] +pub trait Plugin { + async fn prepare(&self, logger: &mut BufWriter<File>) -> Result<(), SoftwareError>; + + async fn install( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError>; + + async fn remove( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError>; + + async fn update_list( + &self, + modules: &Vec<SoftwareModuleUpdate>, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError>; + + async fn finalize(&self, logger: &mut BufWriter<File>) -> Result<(), SoftwareError>; + + async fn list( + &self, + logger: &mut BufWriter<File>, + ) -> Result<Vec<SoftwareModule>, SoftwareError>; + + async fn version( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<Option<String>, SoftwareError>; + + async fn apply( + &self, + update: &SoftwareModuleUpdate, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + match update.clone() { + SoftwareModuleUpdate::Install { mut module } => { + let module_url = module.url.clone(); + match module_url { + Some(url) => self.install_from_url(&mut module, &url, logger).await?, + None => self.install(&module, logger).await?, + } + + Ok(()) + } + SoftwareModuleUpdate::Remove { module } => self.remove(&module, logger).await, + } + } + + async fn apply_all( + &self, + mut updates: Vec<SoftwareModuleUpdate>, + logger: &mut BufWriter<File>, + ) -> Vec<SoftwareError> { + let mut failed_updates = Vec::new(); + + // Prepare the updates + if let Err(prepare_error) = self.prepare(logger).await { + failed_updates.push(prepare_error); + return failed_updates; + } + + // Download all modules for which a download URL is provided + let mut downloaders = Vec::new(); + for update in updates.iter_mut() { + let module = match update { + SoftwareModuleUpdate::Remove { module } => module, + SoftwareModuleUpdate::Install { module } => module, + }; + let module_url = module.url.clone(); + if let Some(url) = module_url { + match Self::download_from_url(module, &url, logger).await { + Err(prepare_error) => { + failed_updates.push(prepare_error); + break; + } + Ok(downloader) => downloaders.push(downloader), + } + } + } + + // Execute the updates + if failed_updates.is_empty() { + let outcome = self.update_list(&updates, logger).await; + if let Err(SoftwareError::UpdateListNotSupported(_)) = outcome { + for update in updates.iter() { + if let Err(error) = self.apply(update, logger).await { + failed_updates.push(error); + }; + } + } else if let Err(update_list_error) = outcome { + failed_updates.push(update_list_error); + } + } + + // Finalize the updates + if let Err(finalize_error) = self.finalize(logger).await { + failed_updates.push(finalize_error); + } + + // Cleanup all the downloaded modules + for downloader in downloaders { + if let Err(cleanup_error) = Self::cleanup_downloaded_artefacts(downloader, logger).await + { + failed_updates.push(cleanup_error); + } + } + + failed_updates + } + + async fn install_from_url( + &self, + module: &mut SoftwareModule, + url: &DownloadInfo, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + let downloader = Self::download_from_url(module, url, logger).await?; + let result = self.install(module, logger).await; + Self::cleanup_downloaded_artefacts(downloader, logger).await?; + + result + } + + async fn download_from_url( + module: &mut SoftwareModule, + url: &DownloadInfo, + logger: &mut BufWriter<File>, + ) -> Result<Downloader, SoftwareError> { + let downloader = Downloader::new(&module.name, &module.version, "/tmp"); + + logger + .write_all( + format!( + "----- $ Downloading: {} to {} \n", + &url.url(), + &downloader.filename().to_string_lossy().to_string() + ) + .as_bytes(), + ) + .await?; + + if let Err(err) = + downloader + .download(url) + .await + .map_err(|err| SoftwareError::DownloadError { + reason: err.to_string(), + url: url.url().to_string(), + }) + { + error!("Download error: {}", &err); + logger + .write_all(format!("error: {}\n", &err).as_bytes()) + .await?; + return Err(err); + } + + module.file_path = Some(downloader.filename().to_owned()); + + Ok(downloader) + } + + async fn cleanup_downloaded_artefacts( + downloader: Downloader, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + if let Err(err) = downloader + .cleanup() + .await + .map_err(|err| SoftwareError::IoError { + reason: err.to_string(), + }) + { + logger + .write_all(format!("warn: {}\n", &err).as_bytes()) + .await?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct ExternalPluginCommand { + pub name: SoftwareType, + pub path: PathBuf, + pub sudo: Option<PathBuf>, +} + +impl ExternalPluginCommand { + pub fn new(name: impl Into<SoftwareType>, path: impl Into<PathBuf>) -> ExternalPluginCommand { + ExternalPluginCommand { + name: name.into(), + path: path.into(), + sudo: Some("sudo".into()), + } + } + + pub fn command( + &self, + action: &str, + maybe_module: Option<&SoftwareModule>, + ) -> Result<LoggedCommand, SoftwareError> { + let mut command = if let Some(sudo) = &self.sudo { + let mut command = LoggedCommand::new(sudo); + command.arg(&self.path); + command + } else { + LoggedCommand::new(&self.path) + }; + command.arg(action); + + if let Some(module) = maybe_module { + self.check_module_type(module)?; + command.arg(&module.name); + if let Some(ref version) = module.version { + command.arg("--module-version"); + command.arg(version); + } + + if let Some(ref path) = module.file_path { + command.arg("--file"); + command.arg(path); + } + } + + Ok(command) + } + + pub async fn execute( + &self, + command: LoggedCommand, + logger: &mut BufWriter<File>, + ) -> Result<Output, SoftwareError> { + let output = command + .execute(logger) + .await + .map_err(|err| self.plugin_error(err))?; + Ok(output) + } + + pub fn content(&self, bytes: Vec<u8>) -> Result<String, SoftwareError> { + String::from_utf8(bytes).map_err(|err| self.plugin_error(err)) + } + + pub fn plugin_error(&self, err: impl std::fmt::Display) -> SoftwareError { + SoftwareError::Plugin { + software_type: self.name.clone(), + reason: format!("{}", err), + } + } + + /// This test validates if an incoming module can be handled by it, by matching the module type with the plugin type + pub fn check_module_type(&self, module: &SoftwareModule) -> Result<(), SoftwareError> { + match &module.module_type { + Some(name) if name == &self.name.clone() => Ok(()), + Some(name) if name == DEFAULT => Ok(()), + Some(name) => Err(SoftwareError::WrongModuleType { + actual: self.name.clone(), + expected: name.clone(), + }), + None => Ok(()), // A software module without a type can be handled by any plugin that's configured as default plugin + } + } +} + +const PREPARE: &str = "prepare"; +const INSTALL: &str = "install"; +const REMOVE: &str = "remove"; +const UPDATE_LIST: &str = "update-list"; +const FINALIZE: &str = "finalize"; +pub const LIST: &str = "list"; +const VERSION: &str = "version"; + +#[async_trait] +impl Plugin for ExternalPluginCommand { + async fn prepare(&self, logger: &mut BufWriter<File>) -> Result<(), SoftwareError> { + let command = self.command(PREPARE, None)?; + let output = self.execute(command, logger).await?; + + if output.status.success() { + Ok(()) + } else { + Err(SoftwareError::Prepare { + software_type: self.name.clone(), + reason: self.content(output.stderr)?, + }) + } + } + + async fn install( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + let command = self.command(INSTALL, Some(module))?; + let output = self.execute(command, logger).await?; + + if output.status.success() { + Ok(()) + } else { + Err(SoftwareError::Install { + module: module.clone(), + reason: self.content(output.stderr)?, + }) + } + } + + async fn remove( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + let command = self.command(REMOVE, Some(module))?; + let output = self.execute(command, logger).await?; + + if output.status.success() { + Ok(()) + } else { + Err(SoftwareError::Remove { + module: module.clone(), + reason: self.content(output.stderr)?, + }) + } + } + + async fn update_list( + &self, + updates: &Vec<SoftwareModuleUpdate>, + logger: &mut BufWriter<File>, + ) -> Result<(), SoftwareError> { + let mut command = self.command(UPDATE_LIST, None)?; + + let mut child = command.spawn()?; + let child_stdin = + child + .inner_child + .stdin + .as_mut() + .ok_or_else(|| SoftwareError::IoError { + reason: "Plugin stdin unavailable".into(), + })?; + + for update in updates { + let action = match update { + SoftwareModuleUpdate::Install { module } => { + format!( + "install\t{}\t{}\t{}\n", + module.name, + module.version.clone().map_or("".into(), |v| v), + module.file_path.clone().map_or("".into(), |v| v + .to_str() + .map_or("".into(), |u| u.to_string())) + ) + } + + SoftwareModuleUpdate::Remove { module } => { + format!( + "remove\t{}\t{}\t\n", + module.name, + module.version.clone().map_or("".into(), |v| v), + ) + } + }; + + child_stdin.write_all(action.as_bytes()).await? + } + + let output = child.wait_with_output(logger).await?; + match output.status.code() { + Some(0) => Ok(()), + Some(1) => Err(SoftwareError::UpdateListNotSupported(self.name.clone())), + Some(_) => Err(SoftwareError::UpdateList { + software_type: self.name.clone(), + reason: self.content(output.stderr)?, + }), + None => Err(SoftwareError::UpdateList { + software_type: self.name.clone(), + reason: "Interrupted".into(), + }), + } + } + + async fn finalize(&self, logger: &mut BufWriter<File>) -> Result<(), SoftwareError> { + let command = self.command(FINALIZE, None)?; + let output = self.execute(command, logger).await?; + + if output.status.success() { + Ok(()) + } else { + Err(SoftwareError::Finalize { + software_type: self.name.clone(), + reason: self.content(output.stderr)?, + }) + } + } + + async fn list( + &self, + logger: &mut BufWriter<File>, + ) -> Result<Vec<SoftwareModule>, SoftwareError> { + let command = self.command(LIST, None)?; + let output = self.execute(command, logger).await?; + if output.status.success() { + let mut software_list = Vec::new(); + let mut rdr = ReaderBuilder::new() + .has_headers(false) + .delimiter(b'\t') + .from_reader(output.stdout.as_slice()); + + for module in rdr.deserialize() { + let (name, version): (String, Option<String>) = module?; + software_list.push(SoftwareModule { + name, + version, + module_type: Some(self.name.clone()), + file_path: None, + url: None, + }); + } + + Ok(software_list) + } else { + Err(SoftwareError::Plugin { + software_type: self.name.clone(), + reason: self.content(output.stderr)?, + }) + } + } + + async fn version( + &self, + module: &SoftwareModule, + logger: &mut BufWriter<File>, + ) -> Result<Option<String>, SoftwareError> { + let command = self.command(VERSION, Some(module))?; + let output = self.execute(command, logger).await?; + + if output.status.success() { + let version = String::from(self.content(output.stdout)?.trim()); + if version.is_empty() { + Ok(None) + } else { + Ok(Some(version)) + } + } else { + Err(SoftwareError::Plugin { + software_type: self.name.clone(), + reason: self.content(output.stderr)?, + }) + } + } +} |