summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge
diff options
context:
space:
mode:
authorPradeepKiruvale <PRADEEPKIRUVALE@gmail.com>2022-01-19 14:45:05 +0530
committerGitHub <noreply@github.com>2022-01-19 14:45:05 +0530
commit073526d5272ee6bc72b82c540be501be20e73889 (patch)
treead609fdf45742ca5210fb8fd2cc98cdb08e63910 /crates/core/tedge
parent82ff6a80c4fb5320ef4862a362c8bf29a1300256 (diff)
[710] Simplify tedge connect c8y (#751)
* mqtt direct connection * break on error message receival * handle errors * refactor device connection * check connection with jwt token * add log * fix typos * remove unwrap * address review comments * fix the loop * fix * replace event loop without spwaning thread * switch to mosquito user to read keyfile * create function to read certificate * remove unwrap * address review comments * user_manager * remove rustls_native_certs * refactor check if bridge exists * fix typos Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge')
-rw-r--r--crates/core/tedge/Cargo.toml3
-rw-r--r--crates/core/tedge/src/cli/connect/c8y_direct_connection.rs148
-rw-r--r--crates/core/tedge/src/cli/connect/cli.rs2
-rw-r--r--crates/core/tedge/src/cli/connect/command.rs122
-rw-r--r--crates/core/tedge/src/cli/connect/error.rs11
-rw-r--r--crates/core/tedge/src/cli/connect/mod.rs1
6 files changed, 200 insertions, 87 deletions
diff --git a/crates/core/tedge/Cargo.toml b/crates/core/tedge/Cargo.toml
index 30665115..e19f9ee5 100644
--- a/crates/core/tedge/Cargo.toml
+++ b/crates/core/tedge/Cargo.toml
@@ -21,7 +21,8 @@ hyper = { version = "0.14", default-features = false }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
rpassword = "5.0"
rumqttc = "0.10"
-rustls = "0.20"
+rustls = "0.20.2"
+rustls_0_19 = {package = "rustls", version = "0.19.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
structopt = "0.3"
diff --git a/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs b/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs
new file mode 100644
index 00000000..5efdf032
--- /dev/null
+++ b/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs
@@ -0,0 +1,148 @@
+use super::{BridgeConfig, ConnectError};
+
+use rumqttc::{
+ self, certs, pkcs8_private_keys, Client, Event, Incoming, MqttOptions, Outgoing, Packet, QoS,
+ Transport,
+};
+use rustls_0_19::ClientConfig;
+
+use std::fs;
+use std::io::{Error, ErrorKind};
+use std::{fs::File, io::BufReader};
+use tedge_config::FilePath;
+
+use tedge_users::UserManager;
+
+// Connect directly to the c8y cloud over mqtt and publish device create message.
+pub fn create_device_with_direct_connection(
+ user_manager: UserManager,
+ bridge_config: &BridgeConfig,
+) -> Result<(), ConnectError> {
+ const DEVICE_ALREADY_EXISTS: &[u8] = b"41,100,Device already existing";
+ const DEVICE_CREATE_ERROR_TOPIC: &str = "s/e";
+
+ let address = bridge_config.address.clone();
+ let host: Vec<&str> = address.split(":").collect();
+
+ let mut mqtt_options = MqttOptions::new(bridge_config.remote_clientid.clone(), host[0], 8883);
+ mqtt_options.set_keep_alive(std::time::Duration::from_secs(5));
+
+ let mut client_config = ClientConfig::new();
+
+ let () = load_root_certs(
+ &mut client_config.root_store,
+ bridge_config.bridge_root_cert_path.clone(),
+ )?;
+
+ let pvt_key = read_pvt_key(user_manager, bridge_config.bridge_keyfile.clone())?;
+ let cert_chain = read_cert_chain(bridge_config.bridge_certfile.clone())?;
+
+ let _ = client_config.set_single_client_cert(cert_chain, pvt_key);
+ mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
+
+ let (mut client, mut connection) = Client::new(mqtt_options, 10);
+
+ client.subscribe(DEVICE_CREATE_ERROR_TOPIC, QoS::AtLeastOnce)?;
+
+ let mut device_create_try: usize = 0;
+ for event in connection.iter() {
+ match event {
+ Ok(Event::Incoming(Packet::SubAck(_))) => {
+ publish_device_create_message(&mut client, &bridge_config.remote_clientid.clone())?;
+ }
+ Ok(Event::Incoming(Packet::Publish(response))) => {
+ // We got a response
+ if response.payload == DEVICE_ALREADY_EXISTS {
+ return Ok(());
+ }
+ }
+ Ok(Event::Outgoing(Outgoing::PingReq)) => {
+ // If not received any response then resend the device create request again.
+ // else timeout.
+ if device_create_try < 1 {
+ publish_device_create_message(
+ &mut client,
+ &bridge_config.remote_clientid.clone(),
+ )?;
+ device_create_try += 1;
+ } else {
+ // No messages have been received for a while
+ break;
+ }
+ }
+ Ok(Event::Incoming(Incoming::Disconnect)) => {
+ eprintln!("ERROR: Disconnected");
+ break;
+ }
+ Err(err) => {
+ eprintln!("ERROR: {:?}", err);
+ return Err(ConnectError::ConnectionCheckError);
+ }
+ _ => {}
+ }
+ }
+
+ // The request has not even been sent
+ println!("No response from Cumulocity");
+ Err(ConnectError::TimeoutElapsedError)
+}
+
+fn publish_device_create_message(client: &mut Client, device_id: &str) -> Result<(), ConnectError> {
+ const DEVICE_CREATE_PUBLISH_TOPIC: &str = "s/us";
+ const DEVICE_TYPE: &str = "thin-edge.io";
+ client.publish(
+ DEVICE_CREATE_PUBLISH_TOPIC,
+ QoS::ExactlyOnce,
+ false,
+ format!("100,{},{}", device_id, DEVICE_TYPE).as_bytes(),
+ )?;
+ Ok(())
+}
+
+fn load_root_certs(
+ root_store: &mut rustls_0_19::RootCertStore,
+ cert_dir: FilePath,
+) -> Result<(), ConnectError> {
+ for file_entry in fs::read_dir(cert_dir)? {
+ let file = file_entry?;
+ let f = File::open(file.path())?;
+ let mut rd = BufReader::new(f);
+ let _ = root_store
+ .add_pem_file(&mut rd)
+ .map(|_| ())
+ .map_err(|()| Error::new(ErrorKind::InvalidData, format!("could not load PEM file")));
+ }
+ Ok(())
+}
+
+fn read_pvt_key(
+ user_manager: UserManager,
+ key_file: tedge_config::FilePath,
+) -> Result<rustls_0_19::PrivateKey, ConnectError> {
+ // Become BROKER_USER to read the private key
+ let _user_guard = user_manager.become_user(tedge_users::BROKER_USER)?;
+ let f = File::open(key_file)?;
+ let mut key_reader = BufReader::new(f);
+ let result = pkcs8_private_keys(&mut key_reader);
+ match result {
+ Ok(key) => Ok(key[0].clone()),
+ Err(_) => {
+ return Err(ConnectError::RumqttcPrivateKey);
+ }
+ }
+}
+
+fn read_cert_chain(
+ cert_file: tedge_config::FilePath,
+) -> Result<Vec<rustls_0_19::Certificate>, ConnectError> {
+ let f = File::open(cert_file)?;
+ let mut cert_reader = BufReader::new(f);
+ let result = certs(&mut cert_reader);
+ let cert_chain: Vec<rustls_0_19::Certificate> = match result {
+ Ok(cert) => cert,
+ Err(_) => {
+ return Err(ConnectError::RumqttcCertificate);
+ }
+ };
+ Ok(cert_chain)
+}
diff --git a/crates/core/tedge/src/cli/connect/cli.rs b/crates/core/tedge/src/cli/connect/cli.rs
index 1f3e2643..64a351f4 100644
--- a/crates/core/tedge/src/cli/connect/cli.rs
+++ b/crates/core/tedge/src/cli/connect/cli.rs
@@ -37,6 +37,7 @@ impl BuildCommand for TEdgeConnectOpt {
context.user_manager.clone(),
context.config_location.tedge_config_root_path,
)?,
+ user_manager: context.user_manager,
},
TEdgeConnectOpt::Az { is_test_connection } => ConnectCommand {
config_location: context.config_location.clone(),
@@ -48,6 +49,7 @@ impl BuildCommand for TEdgeConnectOpt {
context.user_manager.clone(),
context.config_location.tedge_config_root_path,
)?,
+ user_manager: context.user_manager,
},
}
.into_boxed())
diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs
index 262e6163..ecbe7988 100644
--- a/crates/core/tedge/src/cli/connect/command.rs
+++ b/crates/core/tedge/src/cli/connect/command.rs
@@ -7,11 +7,12 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tedge_config::*;
+use tedge_users::UserManager;
use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile};
use which::which;
pub(crate) const DEFAULT_HOST: &str = "localhost";
-const WAIT_FOR_CHECK_SECONDS: u64 = 10;
+const WAIT_FOR_CHECK_SECONDS: u64 = 2;
const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf";
const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf";
pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -26,10 +27,10 @@ pub struct ConnectCommand {
pub common_mosquitto_config: CommonMosquittoConfig,
pub is_test_connection: bool,
pub service_manager: Arc<dyn SystemServiceManager>,
+ pub user_manager: UserManager,
}
pub enum DeviceStatus {
- MightBeNew,
AlreadyExists,
Unknown,
}
@@ -69,12 +70,15 @@ impl Command for ConnectCommand {
fn execute(&self) -> anyhow::Result<()> {
let mut config = self.config_repository.load()?;
-
if self.is_test_connection {
let br_config = self.bridge_config(&config)?;
- if self.check_if_bridge_exists(br_config) {
+ if self.check_if_bridge_exists(&br_config) {
return match self.check_connection(&config) {
- Ok(_) => Ok(()),
+ Ok(_) => {
+ let cloud = br_config.cloud_name.clone();
+ println!("Connection check to {} cloud is successfull.\n", cloud);
+ Ok(())
+ }
Err(err) => Err(err.into()),
};
} else {
@@ -118,30 +122,15 @@ impl Command for ConnectCommand {
&bridge_config,
&updated_mosquitto_config,
self.service_manager.as_ref(),
+ self.user_manager.clone(),
&self.config_location,
)?;
match self.check_connection(&config) {
- Ok(DeviceStatus::AlreadyExists) => {}
- Ok(DeviceStatus::MightBeNew) | Ok(DeviceStatus::Unknown) => {
- if let Cloud::C8y = self.cloud {
- println!("Restarting mosquitto to resubscribe to bridged inbound cloud topics after device creation.\n");
- restart_mosquitto(
- &bridge_config,
- self.service_manager.as_ref(),
- &self.config_location,
- )?;
-
- println!(
- "Awaiting mosquitto to start. This may take up to {} seconds.\n",
- MOSQUITTO_RESTART_TIMEOUT_SECONDS
- );
- std::thread::sleep(std::time::Duration::from_secs(
- MOSQUITTO_RESTART_TIMEOUT_SECONDS,
- ));
- }
+ Ok(DeviceStatus::AlreadyExists) => {
+ println!("Connection check is successfull.\n");
}
- Err(_) => {
+ _ => {
println!(
"Warning: Bridge has been configured, but {} connection check failed.\n",
self.cloud.as_str()
@@ -208,23 +197,23 @@ impl ConnectCommand {
fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> {
let port = config.query(MqttPortSetting)?.into();
- let device_id = config.query(DeviceIdSetting)?;
+
println!(
"Sending packets to check connection. This may take up to {} seconds.\n",
WAIT_FOR_CHECK_SECONDS
);
match self.cloud {
Cloud::Azure => check_device_status_azure(port),
- Cloud::C8y => check_device_status_c8y(port, device_id.as_str()),
+ Cloud::C8y => check_device_status_c8y(config),
}
}
- fn check_if_bridge_exists(&self, br_config: BridgeConfig) -> bool {
+ fn check_if_bridge_exists(&self, br_config: &BridgeConfig) -> bool {
let bridge_conf_path = self
.config_location
.tedge_config_root_path
.join(TEDGE_BRIDGE_CONF_DIR_PATH)
- .join(br_config.config_file);
+ .join(br_config.config_file.clone());
Path::new(&bridge_conf_path).exists()
}
@@ -243,75 +232,30 @@ where
Ok(())
}
-// Check the connection by using the response of the SmartREST template 100.
-// If getting the response '41,100,Device already existing', the connection is established.
-//
-// If the device is already registered, it can finish in the first try.
-// If the device is new, the device is going to be registered here and
-// the check can finish in the second try as there is no error response in the first try.
-fn check_device_status_c8y(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectError> {
- for i in 0..2 {
- println!("Try {} / 2: Sending a message to Cumulocity. ", i + 1,);
-
- match create_device(port, device_id) {
- Ok(DeviceStatus::MightBeNew) => return Ok(DeviceStatus::MightBeNew),
- Ok(DeviceStatus::AlreadyExists) => {
- println!("Received expected response message, connection check is successful.\n",);
- if i == 0 {
- // If the DeviceAlreadyExists response comes on the first attempt itself, the device definitely was created earlier
- return Ok(DeviceStatus::AlreadyExists);
- } else {
- // If the DeviceAlreadyExists response comes on the second attempt only,
- // it may have been created on the first attempt or earlier.
- // The absence of the response on the first attempt can't be considered as definite proof of new device creation
- // due to the possibility of a DeviceAlreadyExists response from that first attempt getting lost in transit.
- // So, we could only say that the device might be new, but not entirely sure.
- return Ok(DeviceStatus::MightBeNew);
- }
- }
- Ok(DeviceStatus::Unknown) => {
- if i == 0 {
- println!("... No response. If the device is new, it's normal to get no response in the first try.");
- } else {
- println!("... No response. ");
- return Err(ConnectError::ConnectionCheckError);
- }
- }
- Err(err) => {
- return Err(err);
- }
- }
- }
- Ok(DeviceStatus::MightBeNew)
-}
-
-fn create_device(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectError> {
- const C8Y_TOPIC_BUILTIN_MESSAGE_UPSTREAM: &str = "c8y/s/us";
- const C8Y_TOPIC_ERROR_MESSAGE_DOWNSTREAM: &str = "c8y/s/e";
+// Check the connection by using the jwt token retrival over the mqtt.
+// If successfull in getting the jwt token '71,xxxxx', the connection is established.
+fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> {
+ const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat";
+ const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat";
const CLIENT_ID: &str = "check_connection_c8y";
- const REGISTRATION_ERROR: &[u8] = b"41,100,Device already existing";
- const DEVICE_TYPE: &str = "thin-edge.io";
-
- let registration_payload = format!("100,{},{}", device_id, DEVICE_TYPE);
- let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port);
+ let mut options = MqttOptions::new(
+ CLIENT_ID,
+ DEFAULT_HOST,
+ tedge_config.query(MqttPortSetting)?.into(),
+ );
options.set_keep_alive(RESPONSE_TIMEOUT);
let (mut client, mut connection) = rumqttc::Client::new(options, 10);
let mut acknowledged = false;
- client.subscribe(C8Y_TOPIC_ERROR_MESSAGE_DOWNSTREAM, AtLeastOnce)?;
+ client.subscribe(C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM, AtLeastOnce)?;
for event in connection.iter() {
match event {
Ok(Event::Incoming(Packet::SubAck(_))) => {
// We are ready to get the response, hence send the request
- client.publish(
- C8Y_TOPIC_BUILTIN_MESSAGE_UPSTREAM,
- AtLeastOnce,
- false,
- registration_payload.as_bytes(),
- )?;
+ client.publish(C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM, AtLeastOnce, false, "")?;
}
Ok(Event::Incoming(Packet::PubAck(_))) => {
// The request has been sent
@@ -319,7 +263,8 @@ fn create_device(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectErro
}
Ok(Event::Incoming(Packet::Publish(response))) => {
// We got a response
- if response.payload == REGISTRATION_ERROR {
+ let token = String::from_utf8(response.payload.to_vec()).unwrap();
+ if token.contains("71") {
return Ok(DeviceStatus::AlreadyExists);
}
}
@@ -426,6 +371,7 @@ fn new_bridge(
bridge_config: &BridgeConfig,
common_mosquitto_config: &CommonMosquittoConfig,
service_manager: &dyn SystemServiceManager,
+ user_manager: UserManager,
config_location: &TEdgeConfigLocation,
) -> Result<(), ConnectError> {
println!("Checking if {} is available.\n", service_manager.name());
@@ -446,6 +392,10 @@ fn new_bridge(
println!("Validating the bridge certificates.\n");
let () = bridge_config.validate()?;
+ println!("Create the device.\n");
+ let () =
+ c8y_direct_connection::create_device_with_direct_connection(user_manager, bridge_config)?;
+
println!("Saving configuration for requested bridge.\n");
if let Err(err) =
write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config)
diff --git a/crates/core/tedge/src/cli/connect/error.rs b/crates/core/tedge/src/cli/connect/error.rs
index 9a85682c..a14c294d 100644
--- a/crates/core/tedge/src/cli/connect/error.rs
+++ b/crates/core/tedge/src/cli/connect/error.rs
@@ -1,3 +1,5 @@
+use tedge_users::UserSwitchError;
+
#[derive(thiserror::Error, Debug)]
pub enum ConnectError {
#[error("Couldn't load certificate, provide valid certificate path in configuration. Use 'tedge config --set'")]
@@ -43,4 +45,13 @@ pub enum ConnectError {
"The JWT token received from Cumulocity is invalid.\nToken: {token}\nReason: {reason}"
)]
InvalidJWTToken { token: String, reason: String },
+
+ #[error("Could not parse private key")]
+ RumqttcPrivateKey,
+
+ #[error("Could not parse certificate")]
+ RumqttcCertificate,
+
+ #[error(transparent)]
+ UserSwitchError(#[from] UserSwitchError),
}
diff --git a/crates/core/tedge/src/cli/connect/mod.rs b/crates/core/tedge/src/cli/connect/mod.rs
index edd233ba..b27c80b7 100644
--- a/crates/core/tedge/src/cli/connect/mod.rs
+++ b/crates/core/tedge/src/cli/connect/mod.rs
@@ -6,6 +6,7 @@ pub use self::{
mod bridge_config;
mod bridge_config_azure;
mod bridge_config_c8y;
+mod c8y_direct_connection;
mod cli;
mod command;
mod common_mosquitto_config;