diff options
author | Alex Solomes <alex.solomes@softwareag.com> | 2021-09-21 16:26:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-21 16:26:13 +0100 |
commit | 32eaacb31ca3dbb773473ba889fb2af23bd637d4 (patch) | |
tree | 5f742575bfb3b81fb6062c1fb4c29f20473a1306 | |
parent | f55798a44456206d9beb31114d5b731ae74b50d1 (diff) |
[CIT-550] flockfile and test for tedge agent (#437)
* [CIT-550] flockfile and test for tedge agent
* fmt and clippy code formatting
* mosquitto-available to fix workflow test [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* moved flockfile utility, updated error msg, reverted changed [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* fix workflow flockfile test [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* removed create_sm_agent, moved to ..::new [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* fixed panic SmAgent, comments & imports [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* cargo-fmt [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* PySys testing for stating two agents [CIT-55]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* Revert "PySys testing for stating two agents [CIT-55]"
This reverts commit b22e744158b3d65bcde62aa750772eca7f880daf.
* fixed test with cfg flag [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* re-write of test to avoid sleep [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
* panic cleanup [CIT-550]
Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | common/flockfile/src/unix.rs | 51 | ||||
-rw-r--r-- | mapper/tedge_mapper/src/mapper.rs | 13 | ||||
-rw-r--r-- | sm/tedge_agent/Cargo.toml | 2 | ||||
-rw-r--r-- | sm/tedge_agent/src/agent.rs | 13 | ||||
-rw-r--r-- | sm/tedge_agent/src/main.rs | 2 | ||||
-rw-r--r-- | sm/tedge_agent/tests/main.rs | 60 |
7 files changed, 120 insertions, 23 deletions
@@ -2517,6 +2517,7 @@ name = "tedge_agent" version = "0.3.1" dependencies = [ "anyhow", + "assert_cmd", "async-trait", "chrono", "clock", @@ -2527,6 +2528,7 @@ dependencies = [ "mqtt_client", "once_cell", "plugin_sm", + "predicates 2.0.1", "rumqttd", "rumqttlog", "serde", diff --git a/common/flockfile/src/unix.rs b/common/flockfile/src/unix.rs index 491e37ad..28646601 100644 --- a/common/flockfile/src/unix.rs +++ b/common/flockfile/src/unix.rs @@ -5,15 +5,23 @@ use std::{ os::unix::io::AsRawFd, path::{Path, PathBuf}, }; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; #[derive(thiserror::Error, Debug)] pub enum FlockfileError { - #[error(transparent)] - FromIo(#[from] std::io::Error), + #[error("Couldn't acquire file lock.")] + FromIo { + path: PathBuf, + #[source] + source: std::io::Error, + }, #[error("Couldn't acquire file lock.")] - FromNix(#[from] nix::Error), + FromNix { + path: PathBuf, + #[source] + source: nix::Error, + }, } /// flockfile creates a lockfile in the filesystem under `/run/lock` and then creates a filelock using system fcntl with flock. @@ -35,13 +43,24 @@ impl Flockfile { pub fn new_lock(lock_name: impl AsRef<Path>) -> Result<Flockfile, FlockfileError> { let path = Path::new("/run/lock").join(lock_name); - let file = OpenOptions::new() + let file = match OpenOptions::new() .create(true) .read(true) .write(true) - .open(&path)?; + .open(&path) + { + Ok(file) => file, + Err(err) => { + return Err(FlockfileError::FromIo { path, source: err }); + } + }; - let () = flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock)?; + let () = match flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock) { + Ok(()) => (), + Err(err) => { + return Err(FlockfileError::FromNix { path, source: err }); + } + }; debug!(r#"Lockfile created "{:?}""#, &path); Ok(Flockfile { @@ -85,6 +104,22 @@ impl AsRef<Path> for Flockfile { } } +/// Check /run/lock/ for a lock file of a given `app_name` +pub fn check_another_instance_is_not_running(app_name: &str) -> Result<Flockfile, FlockfileError> { + match Flockfile::new_lock(format!("{}.lock", app_name)) { + Ok(file) => Ok(file), + Err(err) => { + return match &err { + FlockfileError::FromIo { path, .. } | FlockfileError::FromNix { path, .. } => { + error!("Another instance of {} is running.", app_name); + error!("Lock file path: {}", path.as_path().to_str().unwrap()); + Err(err) + } + } + } + } +} + #[cfg(test)] mod tests { @@ -130,7 +165,7 @@ mod tests { assert_matches!( Flockfile::new_lock(&path).unwrap_err(), - FlockfileError::FromNix(_) + FlockfileError::FromNix { .. } ); } } diff --git a/mapper/tedge_mapper/src/mapper.rs b/mapper/tedge_mapper/src/mapper.rs index 89ea997f..f52ff695 100644 --- a/mapper/tedge_mapper/src/mapper.rs +++ b/mapper/tedge_mapper/src/mapper.rs @@ -1,7 +1,8 @@ use crate::converter::*; use crate::error::*; -use flockfile::{Flockfile, FlockfileError}; +use flockfile::{check_another_instance_is_not_running, Flockfile}; + use mqtt_client::{Client, MqttClient, MqttClientError, Topic}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; use tokio::task::JoinHandle; @@ -40,16 +41,6 @@ pub(crate) fn mqtt_config( Ok(mqtt_client::Config::default().with_port(tedge_config.query(MqttPortSetting)?.into())) } -fn check_another_instance_is_not_running(app_name: &str) -> Result<Flockfile, FlockfileError> { - match flockfile::Flockfile::new_lock(format!("{}.lock", app_name)) { - Ok(file) => Ok(file), - Err(err) => { - error!("Another instance of {} is running.", app_name); - Err(err) - } - } -} - pub struct Mapper { client: mqtt_client::Client, config: MapperConfig, diff --git a/sm/tedge_agent/Cargo.toml b/sm/tedge_agent/Cargo.toml index 69f8b3c8..533332ae 100644 --- a/sm/tedge_agent/Cargo.toml +++ b/sm/tedge_agent/Cargo.toml @@ -44,3 +44,5 @@ rumqttd = "0.7" rumqttlog = "0.7" once_cell = "1.8" tempfile = "3.2" +assert_cmd = "2.0" +predicates = "2.0" diff --git a/sm/tedge_agent/src/agent.rs b/sm/tedge_agent/src/agent.rs index 174a3117..fc268cc7 100644 --- a/sm/tedge_agent/src/agent.rs +++ b/sm/tedge_agent/src/agent.rs @@ -2,6 +2,9 @@ use crate::{ error::AgentError, state::{AgentStateRepository, State, StateRepository}, }; + +use flockfile::{check_another_instance_is_not_running, Flockfile, FlockfileError}; + use json_sm::{ software_filter_topic, Jsonify, SoftwareError, SoftwareListRequest, SoftwareListResponse, SoftwareOperationStatus, SoftwareRequestResponse, SoftwareUpdateRequest, @@ -113,17 +116,21 @@ pub struct SmAgent { config: SmAgentConfig, name: String, persistance_store: AgentStateRepository, + _flock: Flockfile, } impl SmAgent { - pub fn new(name: &str, config: SmAgentConfig) -> Self { + pub fn new(name: &str, config: SmAgentConfig) -> Result<Self, FlockfileError> { let persistance_store = AgentStateRepository::new(config.sm_home.clone()); + let flock = check_another_instance_is_not_running(&name)?; + info!("{} starting", &name); - Self { + Ok(Self { config, name: name.into(), + _flock: flock, persistance_store, - } + }) } #[instrument(skip(self), name = "sm-agent")] diff --git a/sm/tedge_agent/src/main.rs b/sm/tedge_agent/src/main.rs index 2121fad3..0c603251 100644 --- a/sm/tedge_agent/src/main.rs +++ b/sm/tedge_agent/src/main.rs @@ -29,6 +29,6 @@ async fn main() -> Result<(), anyhow::Error> { "tedge_agent", SmAgentConfig::try_new(tedge_config_location)?, ); - agent.start().await?; + agent?.start().await?; Ok(()) } diff --git a/sm/tedge_agent/tests/main.rs b/sm/tedge_agent/tests/main.rs new file mode 100644 index 00000000..386635af --- /dev/null +++ b/sm/tedge_agent/tests/main.rs @@ -0,0 +1,60 @@ +#[cfg(test)] +#[cfg(feature = "mosquitto-available")] +mod tests { + use std::process::{Command, Stdio}; + use std::{thread, time}; + + use assert_cmd::prelude::*; + use predicates::prelude::*; + + #[test] + /// Tests that only one instance of `tedge_agent` is running. + /// This is done by spawning/running two instances of `tedge_agent` + /// expecting the first one to work and the second to fail. + fn tedge_agent_check_no_multiple_instances_running() -> Result<(), Box<dyn std::error::Error>> { + let _ignore_errors = std::fs::remove_file("/run/lock/tedge_agent.lock"); + + // running first `tedge_agent` binary + let mut agent = Command::cargo_bin(env!("CARGO_PKG_NAME"))? + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + // running second `tedge_agent` binary + let mut agent_2 = Command::cargo_bin(env!("CARGO_PKG_NAME"))? + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + // trying up to 10 times before breaking out. + for _ in 0..10 { + if let Ok(Some(code)) = agent.try_wait() { + agent.wait_with_output().unwrap().assert().failure().stdout( + predicate::str::contains("Another instance of tedge_agent is running."), + ); + agent_2.kill(); + let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock"); + return Ok(()); + } else if let Ok(Some(code)) = agent_2.try_wait() { + agent_2 + .wait_with_output() + .unwrap() + .assert() + .failure() + .stdout(predicate::str::contains( + "Another instance of tedge_agent is running.", + )); + agent.kill(); + let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock"); + return Ok(()); + } + thread::sleep(time::Duration::from_millis(200)); + } + + // cleanup before panic + agent.kill(); + agent_2.kill(); + let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock"); + panic!("Agent failed to stop.") + } +} |