summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_agent/src/agent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_agent/src/agent.rs')
-rw-r--r--crates/core/tedge_agent/src/agent.rs416
1 files changed, 416 insertions, 0 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
new file mode 100644
index 00000000..16148ab1
--- /dev/null
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -0,0 +1,416 @@
+use crate::{
+ error::AgentError,
+ state::{AgentStateRepository, State, StateRepository},
+};
+
+use flockfile::{check_another_instance_is_not_running, Flockfile};
+
+use json_sm::{
+ software_filter_topic, Jsonify, SoftwareError, SoftwareListRequest, SoftwareListResponse,
+ SoftwareOperationStatus, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
+ SoftwareUpdateResponse,
+};
+use mqtt_client::{Client, Config, Message, MqttClient, Topic, TopicFilter};
+use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
+use std::{
+ path::PathBuf,
+ sync::{Arc, Mutex},
+};
+use tracing::{debug, error, info, instrument};
+
+use crate::operation_logs::{LogKind, OperationLogs};
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, MqttPortSetting,
+ SoftwarePluginDefaultSetting, TEdgeConfigLocation,
+};
+
+#[derive(Debug)]
+pub struct SmAgentConfig {
+ pub errors_topic: Topic,
+ pub mqtt_client_config: mqtt_client::Config,
+ pub request_topic_list: Topic,
+ pub request_topic_update: Topic,
+ pub request_topics: TopicFilter,
+ pub response_topic_list: Topic,
+ pub response_topic_update: Topic,
+ pub sm_home: PathBuf,
+ pub log_dir: PathBuf,
+ config_location: TEdgeConfigLocation,
+}
+
+impl Default for SmAgentConfig {
+ fn default() -> Self {
+ let errors_topic = Topic::new("tedge/errors").expect("Invalid topic");
+
+ let mqtt_client_config = mqtt_client::Config::default().with_packet_size(10 * 1024 * 1024);
+
+ let request_topics = TopicFilter::new(software_filter_topic()).expect("Invalid topic");
+
+ let request_topic_list =
+ Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic");
+
+ let request_topic_update =
+ Topic::new(SoftwareUpdateRequest::topic_name()).expect("Invalid topic");
+
+ let response_topic_list =
+ Topic::new(SoftwareListResponse::topic_name()).expect("Invalid topic");
+
+ let response_topic_update =
+ Topic::new(SoftwareUpdateResponse::topic_name()).expect("Invalid topic");
+
+ let sm_home = PathBuf::from("/etc/tedge");
+
+ let log_dir = PathBuf::from("/var/log/tedge/agent");
+
+ let config_location = TEdgeConfigLocation::from_default_system_location();
+
+ Self {
+ errors_topic,
+ mqtt_client_config,
+ request_topic_list,
+ request_topic_update,
+ request_topics,
+ response_topic_list,
+ response_topic_update,
+ sm_home,
+ log_dir,
+ config_location,
+ }
+ }
+}
+
+impl SmAgentConfig {
+ pub fn try_new(tedge_config_location: TEdgeConfigLocation) -> Result<Self, anyhow::Error> {
+ let config_repository =
+ tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
+ let tedge_config = config_repository.load()?;
+
+ let mqtt_config =
+ mqtt_client::Config::default().with_port(tedge_config.query(MqttPortSetting)?.into());
+
+ let tedge_config_path = config_repository
+ .get_config_location()
+ .tedge_config_root_path()
+ .to_path_buf();
+
+ Ok(SmAgentConfig::default()
+ .with_sm_home(tedge_config_path)
+ .with_mqtt_client_config(mqtt_config)
+ .with_config_location(tedge_config_location))
+ }
+
+ pub fn with_sm_home(self, sm_home: PathBuf) -> Self {
+ Self { sm_home, ..self }
+ }
+
+ pub fn with_mqtt_client_config(self, mqtt_client_config: Config) -> Self {
+ Self {
+ mqtt_client_config,
+ ..self
+ }
+ }
+
+ pub fn with_config_location(self, config_location: TEdgeConfigLocation) -> Self {
+ Self {
+ config_location,
+ ..self
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct SmAgent {
+ config: SmAgentConfig,
+ name: String,
+ operation_logs: OperationLogs,
+ persistance_store: AgentStateRepository,
+ _flock: Flockfile,
+}
+
+impl SmAgent {
+ pub fn try_new(name: &str, config: SmAgentConfig) -> Result<Self, AgentError> {
+ let flock = check_another_instance_is_not_running(name)?;
+ info!("{} starting", &name);
+
+ let persistance_store = AgentStateRepository::new(config.sm_home.clone());
+ let operation_logs = OperationLogs::try_new(config.log_dir.clone())?;
+
+ Ok(Self {
+ config,
+ name: name.into(),
+ operation_logs,
+ persistance_store,
+ _flock: flock,
+ })
+ }
+
+ #[instrument(skip(self), name = "sm-agent")]
+ pub async fn start(&mut self) -> Result<(), AgentError> {
+ info!("Starting tedge agent");
+
+ let plugins = Arc::new(Mutex::new(ExternalPlugins::open(
+ self.config.sm_home.join("sm-plugins"),
+ get_default_plugin(&self.config.config_location)?,
+ Some("sudo".into()),
+ )?));
+
+ if plugins.lock().unwrap().empty() {
+ // `unwrap` should be safe here as we only access data.
+ error!("Couldn't load plugins from /etc/tedge/sm-plugins");
+ return Err(AgentError::NoPlugins);
+ }
+
+ let mqtt = Client::connect(self.name.as_str(), &self.config.mqtt_client_config).await?;
+ let mut errors = mqtt.subscribe_errors();
+ tokio::spawn(async move {
+ while let Some(error) = errors.next().await {
+ error!("{}", error);
+ }
+ });
+
+ let () = self.fail_pending_operation(&mqtt).await?;
+
+ // * Maybe it would be nice if mapper/registry responds
+ let () = publish_capabilities(&mqtt).await?;
+
+ let () = self.subscribe_and_process(&mqtt, &plugins).await?;
+
+ Ok(())
+ }
+
+ async fn subscribe_and_process(
+ &mut self,
+ mqtt: &Client,
+ plugins: &Arc<Mutex<ExternalPlugins>>,
+ ) -> Result<(), AgentError> {
+ let mut operations = mqtt.subscribe(self.config.request_topics.clone()).await?;
+ while let Some(message) = operations.next().await {
+ debug!("Request {:?}", message);
+
+ match &message.topic {
+ topic if topic == &self.config.request_topic_list => {
+ let _success = self
+ .handle_software_list_request(
+ mqtt,
+ plugins.clone(),
+ &self.config.response_topic_list,
+ &message,
+ )
+ .await
+ .map_err(|err| {
+ error!("{:?}", err); // log error and discard such that the agent doesn't exit.
+ });
+ }
+
+ topic if topic == &self.config.request_topic_update => {
+ let () = plugins.lock().unwrap().load()?; // `unwrap` should be safe here as we only access data for write.
+ let () = plugins
+ .lock()
+ .unwrap() // `unwrap` should be safe here as we only access data for write.
+ .update_default(&get_default_plugin(&self.config.config_location)?)?;
+
+ let _success = self
+ .handle_software_update_request(
+ mqtt,
+ plugins.clone(),
+ &self.config.response_topic_update,
+ &message,
+ )
+ .await
+ .map_err(|err| {
+ error!("{:?}", err); // log error and discard such that the agent doesn't exit.
+ });
+ }
+
+ _ => error!("Unknown operation. Discarded."),
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn handle_software_list_request(
+ &self,
+ mqtt: &Client,
+ plugins: Arc<Mutex<ExternalPlugins>>,
+ response_topic: &Topic,
+ message: &Message,
+ ) -> Result<(), AgentError> {
+ let request = match SoftwareListRequest::from_slice(message.payload_trimmed()) {
+ Ok(request) => {
+ let () = self
+ .persistance_store
+ .store(&State {
+ operation_id: Some(request.id.clone()),
+ operation: Some("list".into()),
+ })
+ .await?;
+
+ request
+ }
+
+ Err(error) => {
+ debug!("Parsing error: {}", error);
+ let _ = mqtt
+ .publish(Message::new(
+ &self.config.errors_topic,
+ format!("{}", error),
+ ))
+ .await?;
+
+ return Err(SoftwareError::ParseError {
+ reason: "Parsing Error".into(),
+ }
+ .into());
+ }
+ };
+ let executing_response = SoftwareListResponse::new(&request);
+
+ let _ = mqtt
+ .publish(Message::new(
+ &self.config.response_topic_list,
+ executing_response.to_bytes()?,
+ ))
+ .await?;
+
+ let log_file = self
+ .operation_logs
+ .new_log_file(LogKind::SoftwareList)
+ .await?;
+ let response = plugins.lock().unwrap().list(&request, log_file).await; // `unwrap` should be safe here as we only access data.
+
+ let _ = mqtt
+ .publish(Message::new(response_topic, response.to_bytes()?))
+ .await?;
+
+ let _state = self.persistance_store.clear().await?;
+
+ Ok(())
+ }
+
+ async fn handle_software_update_request(
+ &self,
+ mqtt: &Client,
+ plugins: Arc<Mutex<ExternalPlugins>>,
+ response_topic: &Topic,
+ message: &Message,
+ ) -> Result<(), AgentError> {
+ let request = match SoftwareUpdateRequest::from_slice(message.payload_trimmed()) {
+ Ok(request) => {
+ let () = self
+ .persistance_store
+ .store(&State {
+ operation_id: Some(request.id.clone()),
+ operation: Some("update".into()),
+ })
+ .await?;
+
+ request
+ }
+
+ Err(error) => {
+ error!("Parsing error: {}", error);
+ let _ = mqtt
+ .publish(Message::new(
+ &self.config.errors_topic,
+ format!("{}", error),
+ ))
+ .await?;
+
+ return Err(SoftwareError::ParseError {
+ reason: "Parsing failed".into(),
+ }
+ .into());
+ }
+ };
+
+ let executing_response = SoftwareUpdateResponse::new(&request);
+ let _ = mqtt
+ .publish(Message::new(response_topic, executing_response.to_bytes()?))
+ .await?;
+
+ let log_file = self
+ .operation_logs
+ .new_log_file(LogKind::SoftwareUpdate)
+ .await?;
+
+ let response = plugins.lock().unwrap().process(&request, log_file).await; // `unwrap` should be safe here as we only access data.
+
+ let _ = mqtt
+ .publish(Message::new(response_topic, response.to_bytes()?))
+ .await?;
+
+ let _state = self.persistance_store.clear().await?;
+
+ Ok(())
+ }
+
+ async fn fail_pending_operation(&self, mqtt: &Client) -> Result<(), AgentError> {
+ if let State {
+ operation_id: Some(id),
+ operation: Some(operation_string),
+ } = match self.persistance_store.load().await {
+ Ok(state) => state,
+ Err(_) => State {
+ operation_id: None,
+ operation: None,
+ },
+ } {
+ let topic = match operation_string.into() {
+ SoftwareOperation::CurrentSoftwareList => &self.config.response_topic_list,
+
+ SoftwareOperation::SoftwareUpdates => &self.config.response_topic_update,
+
+ SoftwareOperation::UnknownOperation => {
+ error!("UnknownOperation in store.");
+ &self.config.errors_topic
+ }
+ };
+
+ let response = SoftwareRequestResponse::new(&id, SoftwareOperationStatus::Failed);
+
+ let () = mqtt
+ .publish(Message::new(topic, response.to_bytes()?))
+ .await?;
+ }
+
+ Ok(())
+ }
+}
+
+fn get_default_plugin(
+ config_location: &TEdgeConfigLocation,
+) -> Result<Option<SoftwareType>, AgentError> {
+ let config_repository = tedge_config::TEdgeConfigRepository::new(config_location.clone());
+ let tedge_config = config_repository.load()?;
+
+ Ok(tedge_config.query_string_optional(SoftwarePluginDefaultSetting)?)
+}
+
+async fn publish_capabilities(mqtt: &Client) -> Result<(), AgentError> {
+ mqtt.publish(Message::new(&Topic::new("tedge/capabilities/software/list")?, "").retain())
+ .await?;
+
+ mqtt.publish(Message::new(&Topic::new("tedge/capabilities/software/update")?, "").retain())
+ .await?;
+
+ Ok(())
+}
+
+/// Variants of supported software operations.
+#[derive(Debug, Clone, PartialEq)]
+pub enum SoftwareOperation {
+ CurrentSoftwareList,
+ SoftwareUpdates,
+ UnknownOperation,
+}
+
+impl From<String> for SoftwareOperation {
+ fn from(s: String) -> Self {
+ match s.as_str() {
+ r#"list"# => Self::CurrentSoftwareList,
+ r#"update"# => Self::SoftwareUpdates,
+ _ => Self::UnknownOperation,
+ }
+ }
+}