summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Solomes <alex.solomes@softwareag.com>2021-09-21 16:26:13 +0100
committerGitHub <noreply@github.com>2021-09-21 16:26:13 +0100
commit32eaacb31ca3dbb773473ba889fb2af23bd637d4 (patch)
tree5f742575bfb3b81fb6062c1fb4c29f20473a1306
parentf55798a44456206d9beb31114d5b731ae74b50d1 (diff)
[CIT-550] flockfile and test for tedge agent (#437)
* [CIT-550] flockfile and test for tedge agent * fmt and clippy code formatting * mosquitto-available to fix workflow test [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * moved flockfile utility, updated error msg, reverted changed [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * fix workflow flockfile test [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * removed create_sm_agent, moved to ..::new [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * fixed panic SmAgent, comments & imports [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * cargo-fmt [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * PySys testing for stating two agents [CIT-55] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * Revert "PySys testing for stating two agents [CIT-55]" This reverts commit b22e744158b3d65bcde62aa750772eca7f880daf. * fixed test with cfg flag [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * re-write of test to avoid sleep [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com> * panic cleanup [CIT-550] Signed-off-by: Alex Solomes <alex.solomes@softwareag.com>
-rw-r--r--Cargo.lock2
-rw-r--r--common/flockfile/src/unix.rs51
-rw-r--r--mapper/tedge_mapper/src/mapper.rs13
-rw-r--r--sm/tedge_agent/Cargo.toml2
-rw-r--r--sm/tedge_agent/src/agent.rs13
-rw-r--r--sm/tedge_agent/src/main.rs2
-rw-r--r--sm/tedge_agent/tests/main.rs60
7 files changed, 120 insertions, 23 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 812e6129..670f2fba 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2517,6 +2517,7 @@ name = "tedge_agent"
version = "0.3.1"
dependencies = [
"anyhow",
+ "assert_cmd",
"async-trait",
"chrono",
"clock",
@@ -2527,6 +2528,7 @@ dependencies = [
"mqtt_client",
"once_cell",
"plugin_sm",
+ "predicates 2.0.1",
"rumqttd",
"rumqttlog",
"serde",
diff --git a/common/flockfile/src/unix.rs b/common/flockfile/src/unix.rs
index 491e37ad..28646601 100644
--- a/common/flockfile/src/unix.rs
+++ b/common/flockfile/src/unix.rs
@@ -5,15 +5,23 @@ use std::{
os::unix::io::AsRawFd,
path::{Path, PathBuf},
};
-use tracing::{debug, warn};
+use tracing::{debug, error, warn};
#[derive(thiserror::Error, Debug)]
pub enum FlockfileError {
- #[error(transparent)]
- FromIo(#[from] std::io::Error),
+ #[error("Couldn't acquire file lock.")]
+ FromIo {
+ path: PathBuf,
+ #[source]
+ source: std::io::Error,
+ },
#[error("Couldn't acquire file lock.")]
- FromNix(#[from] nix::Error),
+ FromNix {
+ path: PathBuf,
+ #[source]
+ source: nix::Error,
+ },
}
/// flockfile creates a lockfile in the filesystem under `/run/lock` and then creates a filelock using system fcntl with flock.
@@ -35,13 +43,24 @@ impl Flockfile {
pub fn new_lock(lock_name: impl AsRef<Path>) -> Result<Flockfile, FlockfileError> {
let path = Path::new("/run/lock").join(lock_name);
- let file = OpenOptions::new()
+ let file = match OpenOptions::new()
.create(true)
.read(true)
.write(true)
- .open(&path)?;
+ .open(&path)
+ {
+ Ok(file) => file,
+ Err(err) => {
+ return Err(FlockfileError::FromIo { path, source: err });
+ }
+ };
- let () = flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock)?;
+ let () = match flock(file.as_raw_fd(), FlockArg::LockExclusiveNonblock) {
+ Ok(()) => (),
+ Err(err) => {
+ return Err(FlockfileError::FromNix { path, source: err });
+ }
+ };
debug!(r#"Lockfile created "{:?}""#, &path);
Ok(Flockfile {
@@ -85,6 +104,22 @@ impl AsRef<Path> for Flockfile {
}
}
+/// Check /run/lock/ for a lock file of a given `app_name`
+pub fn check_another_instance_is_not_running(app_name: &str) -> Result<Flockfile, FlockfileError> {
+ match Flockfile::new_lock(format!("{}.lock", app_name)) {
+ Ok(file) => Ok(file),
+ Err(err) => {
+ return match &err {
+ FlockfileError::FromIo { path, .. } | FlockfileError::FromNix { path, .. } => {
+ error!("Another instance of {} is running.", app_name);
+ error!("Lock file path: {}", path.as_path().to_str().unwrap());
+ Err(err)
+ }
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
@@ -130,7 +165,7 @@ mod tests {
assert_matches!(
Flockfile::new_lock(&path).unwrap_err(),
- FlockfileError::FromNix(_)
+ FlockfileError::FromNix { .. }
);
}
}
diff --git a/mapper/tedge_mapper/src/mapper.rs b/mapper/tedge_mapper/src/mapper.rs
index 89ea997f..f52ff695 100644
--- a/mapper/tedge_mapper/src/mapper.rs
+++ b/mapper/tedge_mapper/src/mapper.rs
@@ -1,7 +1,8 @@
use crate::converter::*;
use crate::error::*;
-use flockfile::{Flockfile, FlockfileError};
+use flockfile::{check_another_instance_is_not_running, Flockfile};
+
use mqtt_client::{Client, MqttClient, MqttClientError, Topic};
use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
use tokio::task::JoinHandle;
@@ -40,16 +41,6 @@ pub(crate) fn mqtt_config(
Ok(mqtt_client::Config::default().with_port(tedge_config.query(MqttPortSetting)?.into()))
}
-fn check_another_instance_is_not_running(app_name: &str) -> Result<Flockfile, FlockfileError> {
- match flockfile::Flockfile::new_lock(format!("{}.lock", app_name)) {
- Ok(file) => Ok(file),
- Err(err) => {
- error!("Another instance of {} is running.", app_name);
- Err(err)
- }
- }
-}
-
pub struct Mapper {
client: mqtt_client::Client,
config: MapperConfig,
diff --git a/sm/tedge_agent/Cargo.toml b/sm/tedge_agent/Cargo.toml
index 69f8b3c8..533332ae 100644
--- a/sm/tedge_agent/Cargo.toml
+++ b/sm/tedge_agent/Cargo.toml
@@ -44,3 +44,5 @@ rumqttd = "0.7"
rumqttlog = "0.7"
once_cell = "1.8"
tempfile = "3.2"
+assert_cmd = "2.0"
+predicates = "2.0"
diff --git a/sm/tedge_agent/src/agent.rs b/sm/tedge_agent/src/agent.rs
index 174a3117..fc268cc7 100644
--- a/sm/tedge_agent/src/agent.rs
+++ b/sm/tedge_agent/src/agent.rs
@@ -2,6 +2,9 @@ use crate::{
error::AgentError,
state::{AgentStateRepository, State, StateRepository},
};
+
+use flockfile::{check_another_instance_is_not_running, Flockfile, FlockfileError};
+
use json_sm::{
software_filter_topic, Jsonify, SoftwareError, SoftwareListRequest, SoftwareListResponse,
SoftwareOperationStatus, SoftwareRequestResponse, SoftwareUpdateRequest,
@@ -113,17 +116,21 @@ pub struct SmAgent {
config: SmAgentConfig,
name: String,
persistance_store: AgentStateRepository,
+ _flock: Flockfile,
}
impl SmAgent {
- pub fn new(name: &str, config: SmAgentConfig) -> Self {
+ pub fn new(name: &str, config: SmAgentConfig) -> Result<Self, FlockfileError> {
let persistance_store = AgentStateRepository::new(config.sm_home.clone());
+ let flock = check_another_instance_is_not_running(&name)?;
+ info!("{} starting", &name);
- Self {
+ Ok(Self {
config,
name: name.into(),
+ _flock: flock,
persistance_store,
- }
+ })
}
#[instrument(skip(self), name = "sm-agent")]
diff --git a/sm/tedge_agent/src/main.rs b/sm/tedge_agent/src/main.rs
index 2121fad3..0c603251 100644
--- a/sm/tedge_agent/src/main.rs
+++ b/sm/tedge_agent/src/main.rs
@@ -29,6 +29,6 @@ async fn main() -> Result<(), anyhow::Error> {
"tedge_agent",
SmAgentConfig::try_new(tedge_config_location)?,
);
- agent.start().await?;
+ agent?.start().await?;
Ok(())
}
diff --git a/sm/tedge_agent/tests/main.rs b/sm/tedge_agent/tests/main.rs
new file mode 100644
index 00000000..386635af
--- /dev/null
+++ b/sm/tedge_agent/tests/main.rs
@@ -0,0 +1,60 @@
+#[cfg(test)]
+#[cfg(feature = "mosquitto-available")]
+mod tests {
+ use std::process::{Command, Stdio};
+ use std::{thread, time};
+
+ use assert_cmd::prelude::*;
+ use predicates::prelude::*;
+
+ #[test]
+ /// Tests that only one instance of `tedge_agent` is running.
+ /// This is done by spawning/running two instances of `tedge_agent`
+ /// expecting the first one to work and the second to fail.
+ fn tedge_agent_check_no_multiple_instances_running() -> Result<(), Box<dyn std::error::Error>> {
+ let _ignore_errors = std::fs::remove_file("/run/lock/tedge_agent.lock");
+
+ // running first `tedge_agent` binary
+ let mut agent = Command::cargo_bin(env!("CARGO_PKG_NAME"))?
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
+
+ // running second `tedge_agent` binary
+ let mut agent_2 = Command::cargo_bin(env!("CARGO_PKG_NAME"))?
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
+
+ // trying up to 10 times before breaking out.
+ for _ in 0..10 {
+ if let Ok(Some(code)) = agent.try_wait() {
+ agent.wait_with_output().unwrap().assert().failure().stdout(
+ predicate::str::contains("Another instance of tedge_agent is running."),
+ );
+ agent_2.kill();
+ let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock");
+ return Ok(());
+ } else if let Ok(Some(code)) = agent_2.try_wait() {
+ agent_2
+ .wait_with_output()
+ .unwrap()
+ .assert()
+ .failure()
+ .stdout(predicate::str::contains(
+ "Another instance of tedge_agent is running.",
+ ));
+ agent.kill();
+ let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock");
+ return Ok(());
+ }
+ thread::sleep(time::Duration::from_millis(200));
+ }
+
+ // cleanup before panic
+ agent.kill();
+ agent_2.kill();
+ let _ignore_error = std::fs::remove_file("/run/lock/tedge_agent.lock");
+ panic!("Agent failed to stop.")
+ }
+}