diff options
author | PradeepKiruvale <PRADEEPKIRUVALE@gmail.com> | 2022-01-19 14:45:05 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-19 14:45:05 +0530 |
commit | 073526d5272ee6bc72b82c540be501be20e73889 (patch) | |
tree | ad609fdf45742ca5210fb8fd2cc98cdb08e63910 /crates/core/tedge | |
parent | 82ff6a80c4fb5320ef4862a362c8bf29a1300256 (diff) |
[710] Simplify tedge connect c8y (#751)
* mqtt direct connection
* break on error message receival
* handle errors
* refactor device connection
* check connection with jwt token
* add log
* fix typos
* remove unwrap
* address review comments
* fix the loop
* fix
* replace event loop without spwaning thread
* switch to mosquito user to read keyfile
* create function to read certificate
* remove unwrap
* address review comments
* user_manager
* remove rustls_native_certs
* refactor check if bridge exists
* fix typos
Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge')
-rw-r--r-- | crates/core/tedge/Cargo.toml | 3 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/connect/c8y_direct_connection.rs | 148 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/connect/cli.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/connect/command.rs | 122 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/connect/error.rs | 11 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/connect/mod.rs | 1 |
6 files changed, 200 insertions, 87 deletions
diff --git a/crates/core/tedge/Cargo.toml b/crates/core/tedge/Cargo.toml index 30665115..e19f9ee5 100644 --- a/crates/core/tedge/Cargo.toml +++ b/crates/core/tedge/Cargo.toml @@ -21,7 +21,8 @@ hyper = { version = "0.14", default-features = false } reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] } rpassword = "5.0" rumqttc = "0.10" -rustls = "0.20" +rustls = "0.20.2" +rustls_0_19 = {package = "rustls", version = "0.19.0" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" structopt = "0.3" diff --git a/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs b/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs new file mode 100644 index 00000000..5efdf032 --- /dev/null +++ b/crates/core/tedge/src/cli/connect/c8y_direct_connection.rs @@ -0,0 +1,148 @@ +use super::{BridgeConfig, ConnectError}; + +use rumqttc::{ + self, certs, pkcs8_private_keys, Client, Event, Incoming, MqttOptions, Outgoing, Packet, QoS, + Transport, +}; +use rustls_0_19::ClientConfig; + +use std::fs; +use std::io::{Error, ErrorKind}; +use std::{fs::File, io::BufReader}; +use tedge_config::FilePath; + +use tedge_users::UserManager; + +// Connect directly to the c8y cloud over mqtt and publish device create message. +pub fn create_device_with_direct_connection( + user_manager: UserManager, + bridge_config: &BridgeConfig, +) -> Result<(), ConnectError> { + const DEVICE_ALREADY_EXISTS: &[u8] = b"41,100,Device already existing"; + const DEVICE_CREATE_ERROR_TOPIC: &str = "s/e"; + + let address = bridge_config.address.clone(); + let host: Vec<&str> = address.split(":").collect(); + + let mut mqtt_options = MqttOptions::new(bridge_config.remote_clientid.clone(), host[0], 8883); + mqtt_options.set_keep_alive(std::time::Duration::from_secs(5)); + + let mut client_config = ClientConfig::new(); + + let () = load_root_certs( + &mut client_config.root_store, + bridge_config.bridge_root_cert_path.clone(), + )?; + + let pvt_key = read_pvt_key(user_manager, bridge_config.bridge_keyfile.clone())?; + let cert_chain = read_cert_chain(bridge_config.bridge_certfile.clone())?; + + let _ = client_config.set_single_client_cert(cert_chain, pvt_key); + mqtt_options.set_transport(Transport::tls_with_config(client_config.into())); + + let (mut client, mut connection) = Client::new(mqtt_options, 10); + + client.subscribe(DEVICE_CREATE_ERROR_TOPIC, QoS::AtLeastOnce)?; + + let mut device_create_try: usize = 0; + for event in connection.iter() { + match event { + Ok(Event::Incoming(Packet::SubAck(_))) => { + publish_device_create_message(&mut client, &bridge_config.remote_clientid.clone())?; + } + Ok(Event::Incoming(Packet::Publish(response))) => { + // We got a response + if response.payload == DEVICE_ALREADY_EXISTS { + return Ok(()); + } + } + Ok(Event::Outgoing(Outgoing::PingReq)) => { + // If not received any response then resend the device create request again. + // else timeout. + if device_create_try < 1 { + publish_device_create_message( + &mut client, + &bridge_config.remote_clientid.clone(), + )?; + device_create_try += 1; + } else { + // No messages have been received for a while + break; + } + } + Ok(Event::Incoming(Incoming::Disconnect)) => { + eprintln!("ERROR: Disconnected"); + break; + } + Err(err) => { + eprintln!("ERROR: {:?}", err); + return Err(ConnectError::ConnectionCheckError); + } + _ => {} + } + } + + // The request has not even been sent + println!("No response from Cumulocity"); + Err(ConnectError::TimeoutElapsedError) +} + +fn publish_device_create_message(client: &mut Client, device_id: &str) -> Result<(), ConnectError> { + const DEVICE_CREATE_PUBLISH_TOPIC: &str = "s/us"; + const DEVICE_TYPE: &str = "thin-edge.io"; + client.publish( + DEVICE_CREATE_PUBLISH_TOPIC, + QoS::ExactlyOnce, + false, + format!("100,{},{}", device_id, DEVICE_TYPE).as_bytes(), + )?; + Ok(()) +} + +fn load_root_certs( + root_store: &mut rustls_0_19::RootCertStore, + cert_dir: FilePath, +) -> Result<(), ConnectError> { + for file_entry in fs::read_dir(cert_dir)? { + let file = file_entry?; + let f = File::open(file.path())?; + let mut rd = BufReader::new(f); + let _ = root_store + .add_pem_file(&mut rd) + .map(|_| ()) + .map_err(|()| Error::new(ErrorKind::InvalidData, format!("could not load PEM file"))); + } + Ok(()) +} + +fn read_pvt_key( + user_manager: UserManager, + key_file: tedge_config::FilePath, +) -> Result<rustls_0_19::PrivateKey, ConnectError> { + // Become BROKER_USER to read the private key + let _user_guard = user_manager.become_user(tedge_users::BROKER_USER)?; + let f = File::open(key_file)?; + let mut key_reader = BufReader::new(f); + let result = pkcs8_private_keys(&mut key_reader); + match result { + Ok(key) => Ok(key[0].clone()), + Err(_) => { + return Err(ConnectError::RumqttcPrivateKey); + } + } +} + +fn read_cert_chain( + cert_file: tedge_config::FilePath, +) -> Result<Vec<rustls_0_19::Certificate>, ConnectError> { + let f = File::open(cert_file)?; + let mut cert_reader = BufReader::new(f); + let result = certs(&mut cert_reader); + let cert_chain: Vec<rustls_0_19::Certificate> = match result { + Ok(cert) => cert, + Err(_) => { + return Err(ConnectError::RumqttcCertificate); + } + }; + Ok(cert_chain) +} diff --git a/crates/core/tedge/src/cli/connect/cli.rs b/crates/core/tedge/src/cli/connect/cli.rs index 1f3e2643..64a351f4 100644 --- a/crates/core/tedge/src/cli/connect/cli.rs +++ b/crates/core/tedge/src/cli/connect/cli.rs @@ -37,6 +37,7 @@ impl BuildCommand for TEdgeConnectOpt { context.user_manager.clone(), context.config_location.tedge_config_root_path, )?, + user_manager: context.user_manager, }, TEdgeConnectOpt::Az { is_test_connection } => ConnectCommand { config_location: context.config_location.clone(), @@ -48,6 +49,7 @@ impl BuildCommand for TEdgeConnectOpt { context.user_manager.clone(), context.config_location.tedge_config_root_path, )?, + user_manager: context.user_manager, }, } .into_boxed()) diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 262e6163..ecbe7988 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -7,11 +7,12 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tedge_config::*; +use tedge_users::UserManager; use tedge_utils::paths::{create_directories, ok_if_not_found, DraftFile}; use which::which; pub(crate) const DEFAULT_HOST: &str = "localhost"; -const WAIT_FOR_CHECK_SECONDS: u64 = 10; +const WAIT_FOR_CHECK_SECONDS: u64 = 2; const C8Y_CONFIG_FILENAME: &str = "c8y-bridge.conf"; const AZURE_CONFIG_FILENAME: &str = "az-bridge.conf"; pub(crate) const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); @@ -26,10 +27,10 @@ pub struct ConnectCommand { pub common_mosquitto_config: CommonMosquittoConfig, pub is_test_connection: bool, pub service_manager: Arc<dyn SystemServiceManager>, + pub user_manager: UserManager, } pub enum DeviceStatus { - MightBeNew, AlreadyExists, Unknown, } @@ -69,12 +70,15 @@ impl Command for ConnectCommand { fn execute(&self) -> anyhow::Result<()> { let mut config = self.config_repository.load()?; - if self.is_test_connection { let br_config = self.bridge_config(&config)?; - if self.check_if_bridge_exists(br_config) { + if self.check_if_bridge_exists(&br_config) { return match self.check_connection(&config) { - Ok(_) => Ok(()), + Ok(_) => { + let cloud = br_config.cloud_name.clone(); + println!("Connection check to {} cloud is successfull.\n", cloud); + Ok(()) + } Err(err) => Err(err.into()), }; } else { @@ -118,30 +122,15 @@ impl Command for ConnectCommand { &bridge_config, &updated_mosquitto_config, self.service_manager.as_ref(), + self.user_manager.clone(), &self.config_location, )?; match self.check_connection(&config) { - Ok(DeviceStatus::AlreadyExists) => {} - Ok(DeviceStatus::MightBeNew) | Ok(DeviceStatus::Unknown) => { - if let Cloud::C8y = self.cloud { - println!("Restarting mosquitto to resubscribe to bridged inbound cloud topics after device creation.\n"); - restart_mosquitto( - &bridge_config, - self.service_manager.as_ref(), - &self.config_location, - )?; - - println!( - "Awaiting mosquitto to start. This may take up to {} seconds.\n", - MOSQUITTO_RESTART_TIMEOUT_SECONDS - ); - std::thread::sleep(std::time::Duration::from_secs( - MOSQUITTO_RESTART_TIMEOUT_SECONDS, - )); - } + Ok(DeviceStatus::AlreadyExists) => { + println!("Connection check is successfull.\n"); } - Err(_) => { + _ => { println!( "Warning: Bridge has been configured, but {} connection check failed.\n", self.cloud.as_str() @@ -208,23 +197,23 @@ impl ConnectCommand { fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> { let port = config.query(MqttPortSetting)?.into(); - let device_id = config.query(DeviceIdSetting)?; + println!( "Sending packets to check connection. This may take up to {} seconds.\n", WAIT_FOR_CHECK_SECONDS ); match self.cloud { Cloud::Azure => check_device_status_azure(port), - Cloud::C8y => check_device_status_c8y(port, device_id.as_str()), + Cloud::C8y => check_device_status_c8y(config), } } - fn check_if_bridge_exists(&self, br_config: BridgeConfig) -> bool { + fn check_if_bridge_exists(&self, br_config: &BridgeConfig) -> bool { let bridge_conf_path = self .config_location .tedge_config_root_path .join(TEDGE_BRIDGE_CONF_DIR_PATH) - .join(br_config.config_file); + .join(br_config.config_file.clone()); Path::new(&bridge_conf_path).exists() } @@ -243,75 +232,30 @@ where Ok(()) } -// Check the connection by using the response of the SmartREST template 100. -// If getting the response '41,100,Device already existing', the connection is established. -// -// If the device is already registered, it can finish in the first try. -// If the device is new, the device is going to be registered here and -// the check can finish in the second try as there is no error response in the first try. -fn check_device_status_c8y(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectError> { - for i in 0..2 { - println!("Try {} / 2: Sending a message to Cumulocity. ", i + 1,); - - match create_device(port, device_id) { - Ok(DeviceStatus::MightBeNew) => return Ok(DeviceStatus::MightBeNew), - Ok(DeviceStatus::AlreadyExists) => { - println!("Received expected response message, connection check is successful.\n",); - if i == 0 { - // If the DeviceAlreadyExists response comes on the first attempt itself, the device definitely was created earlier - return Ok(DeviceStatus::AlreadyExists); - } else { - // If the DeviceAlreadyExists response comes on the second attempt only, - // it may have been created on the first attempt or earlier. - // The absence of the response on the first attempt can't be considered as definite proof of new device creation - // due to the possibility of a DeviceAlreadyExists response from that first attempt getting lost in transit. - // So, we could only say that the device might be new, but not entirely sure. - return Ok(DeviceStatus::MightBeNew); - } - } - Ok(DeviceStatus::Unknown) => { - if i == 0 { - println!("... No response. If the device is new, it's normal to get no response in the first try."); - } else { - println!("... No response. "); - return Err(ConnectError::ConnectionCheckError); - } - } - Err(err) => { - return Err(err); - } - } - } - Ok(DeviceStatus::MightBeNew) -} - -fn create_device(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectError> { - const C8Y_TOPIC_BUILTIN_MESSAGE_UPSTREAM: &str = "c8y/s/us"; - const C8Y_TOPIC_ERROR_MESSAGE_DOWNSTREAM: &str = "c8y/s/e"; +// Check the connection by using the jwt token retrival over the mqtt. +// If successfull in getting the jwt token '71,xxxxx', the connection is established. +fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> { + const C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM: &str = "c8y/s/dat"; + const C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM: &str = "c8y/s/uat"; const CLIENT_ID: &str = "check_connection_c8y"; - const REGISTRATION_ERROR: &[u8] = b"41,100,Device already existing"; - const DEVICE_TYPE: &str = "thin-edge.io"; - - let registration_payload = format!("100,{},{}", device_id, DEVICE_TYPE); - let mut options = MqttOptions::new(CLIENT_ID, DEFAULT_HOST, port); + let mut options = MqttOptions::new( + CLIENT_ID, + DEFAULT_HOST, + tedge_config.query(MqttPortSetting)?.into(), + ); options.set_keep_alive(RESPONSE_TIMEOUT); let (mut client, mut connection) = rumqttc::Client::new(options, 10); let mut acknowledged = false; - client.subscribe(C8Y_TOPIC_ERROR_MESSAGE_DOWNSTREAM, AtLeastOnce)?; + client.subscribe(C8Y_TOPIC_BUILTIN_JWT_TOKEN_DOWNSTREAM, AtLeastOnce)?; for event in connection.iter() { match event { Ok(Event::Incoming(Packet::SubAck(_))) => { // We are ready to get the response, hence send the request - client.publish( - C8Y_TOPIC_BUILTIN_MESSAGE_UPSTREAM, - AtLeastOnce, - false, - registration_payload.as_bytes(), - )?; + client.publish(C8Y_TOPIC_BUILTIN_JWT_TOKEN_UPSTREAM, AtLeastOnce, false, "")?; } Ok(Event::Incoming(Packet::PubAck(_))) => { // The request has been sent @@ -319,7 +263,8 @@ fn create_device(port: u16, device_id: &str) -> Result<DeviceStatus, ConnectErro } Ok(Event::Incoming(Packet::Publish(response))) => { // We got a response - if response.payload == REGISTRATION_ERROR { + let token = String::from_utf8(response.payload.to_vec()).unwrap(); + if token.contains("71") { return Ok(DeviceStatus::AlreadyExists); } } @@ -426,6 +371,7 @@ fn new_bridge( bridge_config: &BridgeConfig, common_mosquitto_config: &CommonMosquittoConfig, service_manager: &dyn SystemServiceManager, + user_manager: UserManager, config_location: &TEdgeConfigLocation, ) -> Result<(), ConnectError> { println!("Checking if {} is available.\n", service_manager.name()); @@ -446,6 +392,10 @@ fn new_bridge( println!("Validating the bridge certificates.\n"); let () = bridge_config.validate()?; + println!("Create the device.\n"); + let () = + c8y_direct_connection::create_device_with_direct_connection(user_manager, bridge_config)?; + println!("Saving configuration for requested bridge.\n"); if let Err(err) = write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config) diff --git a/crates/core/tedge/src/cli/connect/error.rs b/crates/core/tedge/src/cli/connect/error.rs index 9a85682c..a14c294d 100644 --- a/crates/core/tedge/src/cli/connect/error.rs +++ b/crates/core/tedge/src/cli/connect/error.rs @@ -1,3 +1,5 @@ +use tedge_users::UserSwitchError; + #[derive(thiserror::Error, Debug)] pub enum ConnectError { #[error("Couldn't load certificate, provide valid certificate path in configuration. Use 'tedge config --set'")] @@ -43,4 +45,13 @@ pub enum ConnectError { "The JWT token received from Cumulocity is invalid.\nToken: {token}\nReason: {reason}" )] InvalidJWTToken { token: String, reason: String }, + + #[error("Could not parse private key")] + RumqttcPrivateKey, + + #[error("Could not parse certificate")] + RumqttcCertificate, + + #[error(transparent)] + UserSwitchError(#[from] UserSwitchError), } diff --git a/crates/core/tedge/src/cli/connect/mod.rs b/crates/core/tedge/src/cli/connect/mod.rs index edd233ba..b27c80b7 100644 --- a/crates/core/tedge/src/cli/connect/mod.rs +++ b/crates/core/tedge/src/cli/connect/mod.rs @@ -6,6 +6,7 @@ pub use self::{ mod bridge_config; mod bridge_config_azure; mod bridge_config_c8y; +mod c8y_direct_connection; mod cli; mod command; mod common_mosquitto_config; |