summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorMichael Abel <75477722+abelikt@users.noreply.github.com>2022-02-08 12:41:53 +0100
committerGitHub <noreply@github.com>2022-02-08 12:41:53 +0100
commit0eb940bf48840b3eb2b57aa18a2468288e1f6144 (patch)
tree9a6cbf2634ac7eb713f5bd0f41c42b5b18aeefe6 /tests
parentc15cfc4bee97473411ed173903f808c1132b8d8c (diff)
[#738] Frequent mosquitto restarts cause test failures as systemd interferes (#802)
* Add a first working version within a separate base class * Move Cumumlocity class to own file * Add connection management methods to the TedgeEnvironment * Use tedge management methods in c8y environment * Rename file as preparation of having a separate package * Use new environment and replace all tedge connect calls etc * Make all the standard tests pass * Add psutil to requirements * Fix merge issue * Run black for moved code * Remove rolldice in case it is installed * Avoid an additional misplaced check * Avoid misleading comment * Make comment clearer * Use "frequently" instead of "often" * Add explanation about why to wait 10s instead of 5s * Detect path of ps ... ... due to differing locations on different platforms
Diffstat (limited to 'tests')
-rw-r--r--tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py32
-rw-r--r--tests/PySys/environments/cumulocity.py225
-rw-r--r--tests/PySys/environments/environment_c8y.py218
-rw-r--r--tests/PySys/environments/environment_tedge.py81
-rw-r--r--tests/PySys/misc_features/invalid_device_id/run.py2
-rw-r--r--tests/PySys/misc_features/mqtt_port_change_connection_works/run.py41
-rw-r--r--tests/PySys/misc_features/mqtt_port_set/run.py19
-rw-r--r--tests/PySys/misc_features/valid_device_id/run.py33
-rw-r--r--tests/PySys/software_management_end_to_end/environment_sm_management.py7
-rw-r--r--tests/PySys/software_management_end_to_end/sm_apt_install_download_path/run.py15
-rw-r--r--tests/PySys/software_management_end_to_end/sm_apt_install_remove/run.py5
-rw-r--r--tests/PySys/software_management_end_to_end/sm_mapper_fail_and_reconnect/run.py24
-rw-r--r--tests/PySys/tedge_connect/c8y_restart_bridge/run.py4
-rw-r--r--tests/PySys/tedge_connect/tedge_connect_test_negative/run.py15
-rw-r--r--tests/PySys/tedge_connect/tedge_connect_test_positive/run.py10
-rw-r--r--tests/PySys/tedge_connect/tedge_connect_test_sm_services/run.py19
-rw-r--r--tests/PySys/tedge_connect/tedge_disconnect_test_sm_services/run.py14
-rw-r--r--tests/requirements.txt2
18 files changed, 409 insertions, 357 deletions
diff --git a/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py b/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py
index 3ef6b4eb..923dad18 100644
--- a/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py
+++ b/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py
@@ -2,6 +2,8 @@ from pysys.basetest import BaseTest
import time
+from environment_tedge import TedgeEnvironment
+
"""
Validate retrieved JWT Token from cumulocity cloud
@@ -14,28 +16,20 @@ Cleanup the Certificate and Key path and delete the temporary directory
"""
-class ValidateJWTTokenRetrieval(BaseTest):
+class ValidateJWTTokenRetrieval(TedgeEnvironment):
def setup(self):
self.tedge = "/usr/bin/tedge"
self.sudo = "/usr/bin/sudo"
# disconnect the device from cloud
- c8y_disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="c8y_disconnect",
- )
-
+ self.tedge_disconnect_c8y()
+
self.addCleanupFunction(self.jwt_token_cleanup)
def execute(self):
# connect the device to cloud
- c8y_disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y"],
- stdouterr="c8y_connect",
- )
-
+ self.tedge_connect_c8y()
+
# Subscribe for the jwt token response topic
resp_jwt_token = self.startProcess(
command=self.sudo,
@@ -64,12 +58,8 @@ class ValidateJWTTokenRetrieval(BaseTest):
def validate(self):
# validate the correct response/token is received
self.assertGrep("resp_jwt.out", "\[c8y/s/dat\] 71,", contains=True)
-
- def jwt_token_cleanup(self):
+
+ def jwt_token_cleanup(self):
# disconnect the test
- c8y_disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="c8y_disconnect",
- )
-
+ self.tedge_disconnect_c8y()
+
diff --git a/tests/PySys/environments/cumulocity.py b/tests/PySys/environments/cumulocity.py
new file mode 100644
index 00000000..37e62d4b
--- /dev/null
+++ b/tests/PySys/environments/cumulocity.py
@@ -0,0 +1,225 @@
+import base64
+import json
+import re
+import requests
+
+
+class Cumulocity(object):
+ """Class to retrieve information about Cumulocity.
+ TODO : Review if we download enough data -> pageSize
+ TODO : Documentation and test for all of these functions
+ TODO : Extract as separate package
+ TODO : Make this more bulletproof
+ """
+
+ c8y_url = ""
+ tenant_id = ""
+ username = ""
+ password = ""
+ auth = ""
+ timeout_req = ""
+
+ def __init__(self, c8y_url, tenant_id, username, password, log):
+ self.c8y_url = c8y_url
+ self.tenant_id = tenant_id
+ self.username = username
+ self.password = password
+ self.timeout_req = 60 # seconds, got timeout with 60s
+ self.log = log
+
+ self.auth = ("%s/%s" % (self.tenant_id, self.username), self.password)
+
+ def request(self, method, url_path, **kwargs) -> requests.Response:
+
+ return requests.request(
+ method, self.c8y_url + url_path, auth=self.auth, **kwargs
+ )
+
+ def get_all_devices(self) -> requests.Response:
+ params = {"fragmentType": "c8y_IsDevice"}
+ res = requests.get(
+ url=self.c8y_url + "/inventory/managedObjects",
+ params=params,
+ auth=self.auth,
+ )
+
+ return self.to_json_response(res)
+
+ def to_json_response(self, res: requests.Response):
+ if res.status_code != 200:
+ raise Exception(
+ "Received invalid response with exit code: {}, reason: {}".format(
+ res.status_code, res.reason
+ )
+ )
+ return json.loads(res.text)
+
+ def get_all_devices_by_type(self, type: str) -> requests.Response:
+ params = {
+ "fragmentType": "c8y_IsDevice",
+ "type": type,
+ "pageSize": 100,
+ }
+ res = requests.get(
+ url=self.c8y_url + "/inventory/managedObjects",
+ params=params,
+ auth=self.auth,
+ )
+ return self.to_json_response(res)
+
+ def get_all_thin_edge_devices(self) -> requests.Response:
+ return self.get_all_devices_by_type("thin-edge.io")
+
+ def get_thin_edge_device_by_name(self, device_id: str):
+ """
+ TODO: Update : What is returned here ? Its the json data structure from C8y -
+ Do we call this device fragment ?
+ Hint: Device_id is the name of the device
+ """
+
+ # Hint this will fail, when the device does not have the type set "thin-edge.io" in C8y
+ json_response = self.get_all_devices_by_type("thin-edge.io")
+
+ for device in json_response["managedObjects"]:
+ if device_id in device["name"]:
+ return device
+ return None
+
+ def get_header(self):
+ auth = bytes(f"{self.tenant_id}/{self.username}:{self.password}", "utf-8")
+ header = {
+ b"Authorization": b"Basic " + base64.b64encode(auth),
+ b"content-type": b"application/json",
+ b"Accept": b"application/json",
+ }
+ return header
+
+ def trigger_log_request(self, log_file_request_payload, device_id):
+ url = f"{self.c8y_url}/devicecontrol/operations"
+ log_file_request_payload = {
+ "deviceId": device_id,
+ "description": "Log file request",
+ "c8y_LogfileRequest": log_file_request_payload,
+ }
+ req = requests.post(
+ url,
+ json=log_file_request_payload,
+ headers=self.get_header(),
+ timeout=self.timeout_req,
+ )
+ jresponse = json.loads(req.text)
+
+ operation_id = jresponse.get("id")
+
+ if not operation_id:
+ raise SystemError("field id is missing in response")
+
+ return operation_id
+
+ def retrieve_log_file(self, operation_id):
+ """Check if log received"""
+
+ url = f"{self.c8y_url}/devicecontrol/operations/{operation_id}"
+ req = requests.get(url, headers=self.get_header(), timeout=self.timeout_req)
+
+ req.raise_for_status()
+
+ jresponse = json.loads(req.text)
+ ret = ""
+
+ log_response = jresponse.get("c8y_LogfileRequest")
+ # check if the response contains the logfile
+ log_file = log_response.get("file")
+ self.log.info("log response %s", log_file)
+ if log_file != None:
+ ret = log_file
+ return ret
+
+ def get_child_device_of_thin_edge_device_by_name(
+ self, thin_edge_device_id: str, child_device_id: str
+ ):
+ self.device_fragment = self.get_thin_edge_device_by_name(thin_edge_device_id)
+ internal_id = self.device_fragment["id"]
+ child_devices = self.to_json_response(
+ requests.get(
+ url="{}/inventory/managedObjects/{}/childDevices".format(
+ self.c8y_url, internal_id
+ ),
+ auth=self.auth,
+ )
+ )
+ for child_device in child_devices["references"]:
+ if child_device_id in child_device["managedObject"]["name"]:
+ if child_device["managedObject"] == None:
+ print("Oh no it is None")
+ print(f"Cannot find {child_device_id}")
+ return child_device["managedObject"]
+
+ return None
+
+ def get_last_measurements_from_device(self, device_internal_id: str):
+ return self.get_last_n_measurements_from_device(
+ device_internal_id=device_internal_id, target_size=1
+ )[0]
+
+ def get_last_n_measurements_from_device(
+ self, device_internal_id: int, target_size: int
+ ):
+ params = {
+ "source": device_internal_id,
+ "pageSize": target_size,
+ "dateFrom": "1970-01-01",
+ "revert": True,
+ }
+ res = requests.get(
+ url=self.c8y_url + "/measurement/measurements",
+ params=params,
+ auth=self.auth,
+ )
+ measurements_json = self.to_json_response(res)
+ return measurements_json["measurements"]
+
+ def get_last_n_alarms_from_device(
+ self,
+ device_internal_id: int,
+ target_size=2000,
+ status="ACTIVE",
+ date_from="1970-01-01",
+ ):
+ params = {
+ "source": device_internal_id,
+ "pageSize": target_size,
+ "dateFrom": "1970-01-01",
+ "status": status,
+ }
+ res = requests.get(
+ url=self.c8y_url + "/alarm/alarms", params=params, auth=self.auth
+ )
+ measurements_json = self.to_json_response(res)
+ return measurements_json["alarms"]
+
+ def get_last_alarm_from_device(self, device_internal_id: int):
+ return self.get_last_n_alarms_from_device(
+ device_internal_id=device_internal_id, target_size=1
+ )[0]
+
+ def clear_all_alarms_from_device(self, device_internal_id: int):
+ params = {
+ "source": device_internal_id,
+ }
+ payload = {"status": "CLEARED"}
+ res = requests.put(
+ url=self.c8y_url + "/alarm/alarms",
+ params=params,
+ json=payload,
+ auth=self.auth,
+ )
+ res.raise_for_status()
+
+ def delete_managed_object_by_internal_id(self, internal_id: str):
+ res = requests.delete(
+ url="{}/inventory/managedObjects/{}".format(self.c8y_url, internal_id),
+ auth=self.auth,
+ )
+ if res.status_code != 204 or res.status_code != 404:
+ res.raise_for_status()
diff --git a/tests/PySys/environments/environment_c8y.py b/tests/PySys/environments/environment_c8y.py
index 721f28f1..57b4a801 100644
--- a/tests/PySys/environments/environment_c8y.py
+++ b/tests/PySys/environments/environment_c8y.py
@@ -1,197 +1,34 @@
-import json
import base64
+import json
import re
-from pysys.constants import FAILED
import requests
+
+import psutil
+
from pysys.basetest import BaseTest
+from pysys.constants import FAILED
-"""
-Environment to manage automated connect and disconnect to c8y
+from cumulocity import Cumulocity
+from environment_tedge import TedgeEnvironment
-Tests that derive from class EnvironmentC8y use automated connect and
-disconnect to Cumulocity. Additional checks are made for the status of
-service mosquitto and service tedge-mapper.
+"""
+Environment to manage automated connects and disconnects to c8y
"""
-
-class Cumulocity(object):
- """Class to retrieve information about Cumulocity.
- TODO : Review if we download enough data -> pageSize
+class EnvironmentC8y(TedgeEnvironment):
"""
+ Pysys Environment to manage automated connect and disconnect to c8y
- c8y_url = ""
- tenant_id = ""
- username = ""
- password = ""
- auth = ""
- timeout_req = ""
-
- def __init__(self, c8y_url, tenant_id, username, password, log):
- self.c8y_url = c8y_url
- self.tenant_id = tenant_id
- self.username = username
- self.password = password
- self.timeout_req = 60 # seconds, got timeout with 60s
- self.log = log
-
- self.auth = ('%s/%s' % (self.tenant_id, self.username), self.password)
-
- def request(self, method, url_path, **kwargs) -> requests.Response:
-
- return requests.request(method, self.c8y_url + url_path, auth=self.auth, **kwargs)
-
- def get_all_devices(self) -> requests.Response:
- params = {
- "fragmentType": "c8y_IsDevice"
- }
- res = requests.get(
- url=self.c8y_url + "/inventory/managedObjects", params=params, auth=self.auth)
-
- return self.to_json_response(res)
-
- def to_json_response(self, res: requests.Response):
- if res.status_code != 200:
- raise Exception(
- "Received invalid response with exit code: {}, reason: {}".format(res.status_code, res.reason))
- return json.loads(res.text)
-
- def get_all_devices_by_type(self, type: str) -> requests.Response:
- params = {
- "fragmentType": "c8y_IsDevice",
- "type": type,
- "pageSize": 100,
- }
- res = requests.get(
- url=self.c8y_url + "/inventory/managedObjects", params=params, auth=self.auth)
- return self.to_json_response(res)
-
- def get_all_thin_edge_devices(self) -> requests.Response:
- return self.get_all_devices_by_type("thin-edge.io")
-
- def get_thin_edge_device_by_name(self, device_id: str):
- json_response = self.get_all_devices_by_type("thin-edge.io")
- for device in json_response['managedObjects']:
- if device_id in device['name']:
- return device
- return None
-
- def get_header(self):
- auth = bytes(
- f"{self.tenant_id}/{self.username}:{self.password}", "utf-8")
- header = {
- b"Authorization": b"Basic " + base64.b64encode(auth),
- b"content-type": b"application/json",
- b"Accept": b"application/json",
- }
- return header
-
- def trigger_log_request(self, log_file_request_payload, device_id):
- url = f"{self.c8y_url}/devicecontrol/operations"
- log_file_request_payload = {
- "deviceId": device_id,
- "description": "Log file request",
- "c8y_LogfileRequest": log_file_request_payload,
- }
- req = requests.post(
- url, json=log_file_request_payload, headers=self.get_header(), timeout=self.timeout_req
- )
- jresponse = json.loads(req.text)
-
- operation_id = jresponse.get("id")
-
- if not operation_id:
- raise SystemError("field id is missing in response")
-
- return operation_id
-
- def retrieve_log_file(self, operation_id):
- """Check if log received"""
-
- url = f"{self.c8y_url}/devicecontrol/operations/{operation_id}"
- req = requests.get(url, headers=self.get_header(),
- timeout=self.timeout_req)
-
- req.raise_for_status()
-
- jresponse = json.loads(req.text)
- ret = ""
-
- log_response = jresponse.get("c8y_LogfileRequest")
- # check if the response contains the logfile
- log_file = log_response.get("file")
- self.log.info("log response %s", log_file)
- if log_file != None:
- ret = log_file
- return ret
-
- def get_child_device_of_thin_edge_device_by_name(self, thin_edge_device_id: str, child_device_id: str):
- self.device_fragment = self.get_thin_edge_device_by_name(
- thin_edge_device_id)
- internal_id = self.device_fragment['id']
- child_devices = self.to_json_response(requests.get(
- url="{}/inventory/managedObjects/{}/childDevices".format(self.c8y_url, internal_id), auth=self.auth))
- for child_device in child_devices['references']:
- if child_device_id in child_device['managedObject']['name']:
- return child_device['managedObject']
-
- return None
-
- def get_last_measurements_from_device(self, device_internal_id: str):
- return self.get_last_n_measurements_from_device(
- device_internal_id=device_internal_id, target_size=1)[0]
-
- def get_last_n_measurements_from_device(self, device_internal_id: int, target_size: int):
- params = {
- "source": device_internal_id,
- "pageSize": target_size,
- "dateFrom": "1970-01-01",
- "revert": True
- }
- res = requests.get(
- url=self.c8y_url + "/measurement/measurements", params=params, auth=self.auth)
- measurements_json = self.to_json_response(res)
- return measurements_json['measurements']
-
- def get_last_n_alarms_from_device(self, device_internal_id: int, target_size=2000, status="ACTIVE", date_from="1970-01-01"):
- params = {
- "source": device_internal_id,
- "pageSize": target_size,
- "dateFrom": "1970-01-01",
- "status": status
- }
- res = requests.get(
- url=self.c8y_url + "/alarm/alarms", params=params, auth=self.auth)
- measurements_json = self.to_json_response(res)
- return measurements_json['alarms']
-
- def get_last_alarm_from_device(self, device_internal_id: int):
- return self.get_last_n_alarms_from_device(
- device_internal_id=device_internal_id, target_size=1)[0]
-
- def clear_all_alarms_from_device(self, device_internal_id: int):
- params = {
- "source": device_internal_id,
- }
- payload = {
- "status": "CLEARED"
- }
- res = requests.put(
- url=self.c8y_url + "/alarm/alarms", params=params, json=payload, auth=self.auth)
- res.raise_for_status()
-
- def delete_managed_object_by_internal_id(self, internal_id: str):
- res = requests.delete(
- url="{}/inventory/managedObjects/{}".format(self.c8y_url, internal_id), auth=self.auth)
- if res.status_code != 204 or res.status_code != 404:
- res.raise_for_status()
-
+ Tests that derive from class EnvironmentC8y use automated connect and
+ disconnect to Cumulocity. Additional checks are made for the status of
+ service mosquitto and service tedge-mapper.
+ """
-class EnvironmentC8y(BaseTest):
cumulocity: Cumulocity
def setup(self):
self.log.debug("EnvironmentC8y Setup")
-
+ super().setup()
if self.project.c8yurl == "":
self.abort(
FAILED, "Cumulocity tenant URL is not set. Set with the env variable C8YURL")
@@ -208,10 +45,6 @@ class EnvironmentC8y(BaseTest):
self.abort(
FAILED, "Device ID is not set. Set with the env variable C8YDEVICEID")
- self.tedge = "/usr/bin/tedge"
- self.tedge_mapper_c8y = "tedge-mapper-c8y"
- self.sudo = "/usr/bin/sudo"
- self.systemctl = "/usr/bin/systemctl"
self.log.info("EnvironmentC8y Setup")
self.addCleanupFunction(self.myenvcleanup)
@@ -224,18 +57,10 @@ class EnvironmentC8y(BaseTest):
)
# Connect the bridge
- connect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y"],
- stdouterr="tedge_connect",
- )
+ self.tedge_connect_c8y()
# Test the bridge connection
- connect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y", "--test"],
- stdouterr="tedge_connect_test",
- )
+ self.tedge_connect_c8y_test()
# Check if mosquitto is running well
serv_mosq = self.startProcess(
@@ -278,11 +103,7 @@ class EnvironmentC8y(BaseTest):
self.log.debug("EnvironmentC8y Cleanup")
# Disconnect Bridge
- disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="tedge_disconnect",
- )
+ self.tedge_disconnect_c8y()
# Check if tedge-mapper is disabled
serv_mosq = self.startProcess(
@@ -291,3 +112,4 @@ class EnvironmentC8y(BaseTest):
stdouterr="serv_mapper5",
expectedExitStatus="==3",
)
+
diff --git a/tests/PySys/environments/environment_tedge.py b/tests/PySys/environments/environment_tedge.py
new file mode 100644
index 00000000..9add9f6e
--- /dev/null
+++ b/tests/PySys/environments/environment_tedge.py
@@ -0,0 +1,81 @@
+import os
+import subprocess
+import time
+
+from pysys.basetest import BaseTest
+
+
+class TedgeEnvironment(BaseTest):
+ """Class with helper and convenicence methods for testing tedge"""
+
+ def setup(self):
+ self.sudo = "/usr/bin/sudo"
+ self.tedge = "/usr/bin/tedge"
+ self.tedge_mapper_c8y = "tedge-mapper-c8y"
+ self.sudo = "/usr/bin/sudo"
+ self.systemctl = "/usr/bin/systemctl"
+
+ def wait_if_restarting_mosquitto_too_frequently(self):
+ """Make sure we do not restart mosqiotto too frequently
+ Systemd will become suspicious when mosquitto is restarted more
+ freqeuntly than 5 seconds.
+ """
+ # Ideally we would expect 5 seconds here, but only waiting
+ # for 10 makes the issue diappear
+ minimum_time = 10
+
+ # Make sure we use the right path for ps, there seems to be an
+ # issue with injecting PATH in pysys, so we use an absolute path for now
+ if os.path.exists("/usr/bin/ps"):
+ path_ps = "/usr/bin/ps"
+ elif os.path.exists("/bin/ps"):
+ # the place where mythic beasts has the ps
+ path_ps = "/bin/ps"
+ else:
+ raise SystemError("Cannot find ps")
+
+ etimes = subprocess.check_output(
+ f"{path_ps} -o etimes $(pidof mosquitto)", shell=True
+ )
+ runtime = int(etimes.split()[1])
+ if runtime <= minimum_time:
+ self.log.info(
+ f"Restarting mosquitto too frequently in the last {minimum_time} seconds. It was only up for {runtime} seconds"
+ )
+ # Derive additional delay time and add one safety second
+ delay = minimum_time - runtime + 1
+ self.log.info(f"Delaying execution by {delay} seconds")
+ time.sleep(delay)
+
+ def tedge_connect_c8y(self, expectedExitStatus="==0"):
+
+ self.wait_if_restarting_mosquitto_too_frequently()
+
+ connect = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "connect", "c8y"],
+ stdouterr="tedge_connect_c8y",
+ expectedExitStatus=expectedExitStatus,
+ )
+ return connect
+
+ def tedge_disconnect_c8y(self, expectedExitStatus="==0"):
+
+ self.wait_if_restarting_mosquitto_too_frequently()
+
+ connect = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "disconnect", "c8y"],
+ stdouterr="tedge_disconnect_c8y",
+ expectedExitStatus=expectedExitStatus,
+ )
+ return connect
+
+ def tedge_connect_c8y_test(self, expectedExitStatus="==0"):
+ connect_c8y = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "connect", "c8y", "--test"],
+ stdouterr="tedge_connect_c8y_test",
+ expectedExitStatus=expectedExitStatus,
+ )
+ return connect_c8y
diff --git a/tests/PySys/misc_features/invalid_device_id/run.py b/tests/PySys/misc_features/invalid_device_id/run.py
index b93780eb..692d5f58 100644
--- a/tests/PySys/misc_features/invalid_device_id/run.py
+++ b/tests/PySys/misc_features/invalid_device_id/run.py
@@ -17,7 +17,7 @@ class InvalidDeviceId(BaseTest):
def setup(self):
self.tedge = "/usr/bin/tedge"
self.sudo = "/usr/bin/sudo"
-
+
# create a custom certiticate directory for testing purpose
create_cert_dir = self.startProcess(
command=self.sudo,
diff --git a/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py b/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py
index 4487dc03..9ff427b6 100644
--- a/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py
+++ b/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py
@@ -21,18 +21,14 @@ Now validate the services that use the mqtt port
"""
-class MqttPortChangeConnectionWorks(BaseTest):
+class MqttPortChangeConnectionWorks(TedgeEnvironment):
def setup(self):
self.tedge = "/usr/bin/tedge"
self.sudo = "/usr/bin/sudo"
# disconnect from c8y cloud
- disconnect_c8y = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="disconnect_c8y",
- )
+ self.tedge_disconnect_c8y()
# set a new mqtt port for local communication
mqtt_port = self.startProcess(
command=self.sudo,
@@ -43,18 +39,10 @@ class MqttPortChangeConnectionWorks(BaseTest):
def execute(self):
# connect to c8y cloud
- connect_c8y = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y"],
- stdouterr="connect_c8y",
- )
+ self.tedge_connect_c8y()
# check connection
- connect_c8y = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y", "--test"],
- stdouterr="check_con_c8y",
- )
+ self.tedge_connect_c8y_test()
# subscribe for messages
mqtt_sub = self.startProcess(
@@ -69,7 +57,7 @@ class MqttPortChangeConnectionWorks(BaseTest):
# validate tedge mqtt pub/sub
self.validate_tedge_mqtt()
# validate c8y connection
- self.assertGrep("check_con_c8y.out",
+ self.assertGrep("tedge_connect_c8y_test.out",
"connection check is successful", contains=True)
# validate c8y mapper
self.validate_tedge_mapper_c8y()
@@ -168,11 +156,7 @@ class MqttPortChangeConnectionWorks(BaseTest):
# disconnect again
# disconnect Bridge
- c8y_disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="c8y_disconnect",
- )
+ self.tedge_disconnect_c8y()
# unset a new mqtt port, falls back to default port (1883)
mqtt_port = self.startProcess(
@@ -182,16 +166,7 @@ class MqttPortChangeConnectionWorks(BaseTest):
)
# connect Bridge
- c8y_connect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "connect", "c8y"],
- stdouterr="c8y_connect",
- )
+ self.tedge_connect_c8y()
# disconnect Bridge
- c8y_disconnect = self.startProcess(
- command=self.sudo,
- arguments=[self.tedge, "disconnect", "c8y"],
- stdouterr="c8y_disconnect",
- )
-
+ self.tedge_disconnect_c8y()
diff --git a/tests/PySys/misc_features/mqtt_port_set/run.py b/tests/PySys/misc_features/mqtt_port_set/run.py
index 72b3dbc4..61d237b5 100644
--- a/tests/PySys/misc_features/mqtt_port_set/run.py
+++ b/tests/PySys/misc_features/mqtt_port_set/r