diff options
author | Michael Abel <75477722+abelikt@users.noreply.github.com> | 2022-02-08 12:41:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-08 12:41:53 +0100 |
commit | 0eb940bf48840b3eb2b57aa18a2468288e1f6144 (patch) | |
tree | 9a6cbf2634ac7eb713f5bd0f41c42b5b18aeefe6 /tests | |
parent | c15cfc4bee97473411ed173903f808c1132b8d8c (diff) |
[#738] Frequent mosquitto restarts cause test failures as systemd interferes (#802)
* Add a first working version within a separate base class
* Move Cumumlocity class to own file
* Add connection management methods to the TedgeEnvironment
* Use tedge management methods in c8y environment
* Rename file as preparation of having a separate package
* Use new environment and replace all tedge connect calls etc
* Make all the standard tests pass
* Add psutil to requirements
* Fix merge issue
* Run black for moved code
* Remove rolldice in case it is installed
* Avoid an additional misplaced check
* Avoid misleading comment
* Make comment clearer
* Use "frequently" instead of "often"
* Add explanation about why to wait 10s instead of 5s
* Detect path of ps ...
... due to differing locations on different platforms
Diffstat (limited to 'tests')
18 files changed, 409 insertions, 357 deletions
diff --git a/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py b/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py index 3ef6b4eb..923dad18 100644 --- a/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py +++ b/tests/PySys/cloud_c8y/c8y_jwt_token_retrieval/run.py @@ -2,6 +2,8 @@ from pysys.basetest import BaseTest import time +from environment_tedge import TedgeEnvironment + """ Validate retrieved JWT Token from cumulocity cloud @@ -14,28 +16,20 @@ Cleanup the Certificate and Key path and delete the temporary directory """ -class ValidateJWTTokenRetrieval(BaseTest): +class ValidateJWTTokenRetrieval(TedgeEnvironment): def setup(self): self.tedge = "/usr/bin/tedge" self.sudo = "/usr/bin/sudo" # disconnect the device from cloud - c8y_disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="c8y_disconnect", - ) - + self.tedge_disconnect_c8y() + self.addCleanupFunction(self.jwt_token_cleanup) def execute(self): # connect the device to cloud - c8y_disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y"], - stdouterr="c8y_connect", - ) - + self.tedge_connect_c8y() + # Subscribe for the jwt token response topic resp_jwt_token = self.startProcess( command=self.sudo, @@ -64,12 +58,8 @@ class ValidateJWTTokenRetrieval(BaseTest): def validate(self): # validate the correct response/token is received self.assertGrep("resp_jwt.out", "\[c8y/s/dat\] 71,", contains=True) - - def jwt_token_cleanup(self): + + def jwt_token_cleanup(self): # disconnect the test - c8y_disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="c8y_disconnect", - ) - + self.tedge_disconnect_c8y() + diff --git a/tests/PySys/environments/cumulocity.py b/tests/PySys/environments/cumulocity.py new file mode 100644 index 00000000..37e62d4b --- /dev/null +++ b/tests/PySys/environments/cumulocity.py @@ -0,0 +1,225 @@ +import base64 +import json +import re +import requests + + +class Cumulocity(object): + """Class to retrieve information about Cumulocity. + TODO : Review if we download enough data -> pageSize + TODO : Documentation and test for all of these functions + TODO : Extract as separate package + TODO : Make this more bulletproof + """ + + c8y_url = "" + tenant_id = "" + username = "" + password = "" + auth = "" + timeout_req = "" + + def __init__(self, c8y_url, tenant_id, username, password, log): + self.c8y_url = c8y_url + self.tenant_id = tenant_id + self.username = username + self.password = password + self.timeout_req = 60 # seconds, got timeout with 60s + self.log = log + + self.auth = ("%s/%s" % (self.tenant_id, self.username), self.password) + + def request(self, method, url_path, **kwargs) -> requests.Response: + + return requests.request( + method, self.c8y_url + url_path, auth=self.auth, **kwargs + ) + + def get_all_devices(self) -> requests.Response: + params = {"fragmentType": "c8y_IsDevice"} + res = requests.get( + url=self.c8y_url + "/inventory/managedObjects", + params=params, + auth=self.auth, + ) + + return self.to_json_response(res) + + def to_json_response(self, res: requests.Response): + if res.status_code != 200: + raise Exception( + "Received invalid response with exit code: {}, reason: {}".format( + res.status_code, res.reason + ) + ) + return json.loads(res.text) + + def get_all_devices_by_type(self, type: str) -> requests.Response: + params = { + "fragmentType": "c8y_IsDevice", + "type": type, + "pageSize": 100, + } + res = requests.get( + url=self.c8y_url + "/inventory/managedObjects", + params=params, + auth=self.auth, + ) + return self.to_json_response(res) + + def get_all_thin_edge_devices(self) -> requests.Response: + return self.get_all_devices_by_type("thin-edge.io") + + def get_thin_edge_device_by_name(self, device_id: str): + """ + TODO: Update : What is returned here ? Its the json data structure from C8y - + Do we call this device fragment ? + Hint: Device_id is the name of the device + """ + + # Hint this will fail, when the device does not have the type set "thin-edge.io" in C8y + json_response = self.get_all_devices_by_type("thin-edge.io") + + for device in json_response["managedObjects"]: + if device_id in device["name"]: + return device + return None + + def get_header(self): + auth = bytes(f"{self.tenant_id}/{self.username}:{self.password}", "utf-8") + header = { + b"Authorization": b"Basic " + base64.b64encode(auth), + b"content-type": b"application/json", + b"Accept": b"application/json", + } + return header + + def trigger_log_request(self, log_file_request_payload, device_id): + url = f"{self.c8y_url}/devicecontrol/operations" + log_file_request_payload = { + "deviceId": device_id, + "description": "Log file request", + "c8y_LogfileRequest": log_file_request_payload, + } + req = requests.post( + url, + json=log_file_request_payload, + headers=self.get_header(), + timeout=self.timeout_req, + ) + jresponse = json.loads(req.text) + + operation_id = jresponse.get("id") + + if not operation_id: + raise SystemError("field id is missing in response") + + return operation_id + + def retrieve_log_file(self, operation_id): + """Check if log received""" + + url = f"{self.c8y_url}/devicecontrol/operations/{operation_id}" + req = requests.get(url, headers=self.get_header(), timeout=self.timeout_req) + + req.raise_for_status() + + jresponse = json.loads(req.text) + ret = "" + + log_response = jresponse.get("c8y_LogfileRequest") + # check if the response contains the logfile + log_file = log_response.get("file") + self.log.info("log response %s", log_file) + if log_file != None: + ret = log_file + return ret + + def get_child_device_of_thin_edge_device_by_name( + self, thin_edge_device_id: str, child_device_id: str + ): + self.device_fragment = self.get_thin_edge_device_by_name(thin_edge_device_id) + internal_id = self.device_fragment["id"] + child_devices = self.to_json_response( + requests.get( + url="{}/inventory/managedObjects/{}/childDevices".format( + self.c8y_url, internal_id + ), + auth=self.auth, + ) + ) + for child_device in child_devices["references"]: + if child_device_id in child_device["managedObject"]["name"]: + if child_device["managedObject"] == None: + print("Oh no it is None") + print(f"Cannot find {child_device_id}") + return child_device["managedObject"] + + return None + + def get_last_measurements_from_device(self, device_internal_id: str): + return self.get_last_n_measurements_from_device( + device_internal_id=device_internal_id, target_size=1 + )[0] + + def get_last_n_measurements_from_device( + self, device_internal_id: int, target_size: int + ): + params = { + "source": device_internal_id, + "pageSize": target_size, + "dateFrom": "1970-01-01", + "revert": True, + } + res = requests.get( + url=self.c8y_url + "/measurement/measurements", + params=params, + auth=self.auth, + ) + measurements_json = self.to_json_response(res) + return measurements_json["measurements"] + + def get_last_n_alarms_from_device( + self, + device_internal_id: int, + target_size=2000, + status="ACTIVE", + date_from="1970-01-01", + ): + params = { + "source": device_internal_id, + "pageSize": target_size, + "dateFrom": "1970-01-01", + "status": status, + } + res = requests.get( + url=self.c8y_url + "/alarm/alarms", params=params, auth=self.auth + ) + measurements_json = self.to_json_response(res) + return measurements_json["alarms"] + + def get_last_alarm_from_device(self, device_internal_id: int): + return self.get_last_n_alarms_from_device( + device_internal_id=device_internal_id, target_size=1 + )[0] + + def clear_all_alarms_from_device(self, device_internal_id: int): + params = { + "source": device_internal_id, + } + payload = {"status": "CLEARED"} + res = requests.put( + url=self.c8y_url + "/alarm/alarms", + params=params, + json=payload, + auth=self.auth, + ) + res.raise_for_status() + + def delete_managed_object_by_internal_id(self, internal_id: str): + res = requests.delete( + url="{}/inventory/managedObjects/{}".format(self.c8y_url, internal_id), + auth=self.auth, + ) + if res.status_code != 204 or res.status_code != 404: + res.raise_for_status() diff --git a/tests/PySys/environments/environment_c8y.py b/tests/PySys/environments/environment_c8y.py index 721f28f1..57b4a801 100644 --- a/tests/PySys/environments/environment_c8y.py +++ b/tests/PySys/environments/environment_c8y.py @@ -1,197 +1,34 @@ -import json import base64 +import json import re -from pysys.constants import FAILED import requests + +import psutil + from pysys.basetest import BaseTest +from pysys.constants import FAILED -""" -Environment to manage automated connect and disconnect to c8y +from cumulocity import Cumulocity +from environment_tedge import TedgeEnvironment -Tests that derive from class EnvironmentC8y use automated connect and -disconnect to Cumulocity. Additional checks are made for the status of -service mosquitto and service tedge-mapper. +""" +Environment to manage automated connects and disconnects to c8y """ - -class Cumulocity(object): - """Class to retrieve information about Cumulocity. - TODO : Review if we download enough data -> pageSize +class EnvironmentC8y(TedgeEnvironment): """ + Pysys Environment to manage automated connect and disconnect to c8y - c8y_url = "" - tenant_id = "" - username = "" - password = "" - auth = "" - timeout_req = "" - - def __init__(self, c8y_url, tenant_id, username, password, log): - self.c8y_url = c8y_url - self.tenant_id = tenant_id - self.username = username - self.password = password - self.timeout_req = 60 # seconds, got timeout with 60s - self.log = log - - self.auth = ('%s/%s' % (self.tenant_id, self.username), self.password) - - def request(self, method, url_path, **kwargs) -> requests.Response: - - return requests.request(method, self.c8y_url + url_path, auth=self.auth, **kwargs) - - def get_all_devices(self) -> requests.Response: - params = { - "fragmentType": "c8y_IsDevice" - } - res = requests.get( - url=self.c8y_url + "/inventory/managedObjects", params=params, auth=self.auth) - - return self.to_json_response(res) - - def to_json_response(self, res: requests.Response): - if res.status_code != 200: - raise Exception( - "Received invalid response with exit code: {}, reason: {}".format(res.status_code, res.reason)) - return json.loads(res.text) - - def get_all_devices_by_type(self, type: str) -> requests.Response: - params = { - "fragmentType": "c8y_IsDevice", - "type": type, - "pageSize": 100, - } - res = requests.get( - url=self.c8y_url + "/inventory/managedObjects", params=params, auth=self.auth) - return self.to_json_response(res) - - def get_all_thin_edge_devices(self) -> requests.Response: - return self.get_all_devices_by_type("thin-edge.io") - - def get_thin_edge_device_by_name(self, device_id: str): - json_response = self.get_all_devices_by_type("thin-edge.io") - for device in json_response['managedObjects']: - if device_id in device['name']: - return device - return None - - def get_header(self): - auth = bytes( - f"{self.tenant_id}/{self.username}:{self.password}", "utf-8") - header = { - b"Authorization": b"Basic " + base64.b64encode(auth), - b"content-type": b"application/json", - b"Accept": b"application/json", - } - return header - - def trigger_log_request(self, log_file_request_payload, device_id): - url = f"{self.c8y_url}/devicecontrol/operations" - log_file_request_payload = { - "deviceId": device_id, - "description": "Log file request", - "c8y_LogfileRequest": log_file_request_payload, - } - req = requests.post( - url, json=log_file_request_payload, headers=self.get_header(), timeout=self.timeout_req - ) - jresponse = json.loads(req.text) - - operation_id = jresponse.get("id") - - if not operation_id: - raise SystemError("field id is missing in response") - - return operation_id - - def retrieve_log_file(self, operation_id): - """Check if log received""" - - url = f"{self.c8y_url}/devicecontrol/operations/{operation_id}" - req = requests.get(url, headers=self.get_header(), - timeout=self.timeout_req) - - req.raise_for_status() - - jresponse = json.loads(req.text) - ret = "" - - log_response = jresponse.get("c8y_LogfileRequest") - # check if the response contains the logfile - log_file = log_response.get("file") - self.log.info("log response %s", log_file) - if log_file != None: - ret = log_file - return ret - - def get_child_device_of_thin_edge_device_by_name(self, thin_edge_device_id: str, child_device_id: str): - self.device_fragment = self.get_thin_edge_device_by_name( - thin_edge_device_id) - internal_id = self.device_fragment['id'] - child_devices = self.to_json_response(requests.get( - url="{}/inventory/managedObjects/{}/childDevices".format(self.c8y_url, internal_id), auth=self.auth)) - for child_device in child_devices['references']: - if child_device_id in child_device['managedObject']['name']: - return child_device['managedObject'] - - return None - - def get_last_measurements_from_device(self, device_internal_id: str): - return self.get_last_n_measurements_from_device( - device_internal_id=device_internal_id, target_size=1)[0] - - def get_last_n_measurements_from_device(self, device_internal_id: int, target_size: int): - params = { - "source": device_internal_id, - "pageSize": target_size, - "dateFrom": "1970-01-01", - "revert": True - } - res = requests.get( - url=self.c8y_url + "/measurement/measurements", params=params, auth=self.auth) - measurements_json = self.to_json_response(res) - return measurements_json['measurements'] - - def get_last_n_alarms_from_device(self, device_internal_id: int, target_size=2000, status="ACTIVE", date_from="1970-01-01"): - params = { - "source": device_internal_id, - "pageSize": target_size, - "dateFrom": "1970-01-01", - "status": status - } - res = requests.get( - url=self.c8y_url + "/alarm/alarms", params=params, auth=self.auth) - measurements_json = self.to_json_response(res) - return measurements_json['alarms'] - - def get_last_alarm_from_device(self, device_internal_id: int): - return self.get_last_n_alarms_from_device( - device_internal_id=device_internal_id, target_size=1)[0] - - def clear_all_alarms_from_device(self, device_internal_id: int): - params = { - "source": device_internal_id, - } - payload = { - "status": "CLEARED" - } - res = requests.put( - url=self.c8y_url + "/alarm/alarms", params=params, json=payload, auth=self.auth) - res.raise_for_status() - - def delete_managed_object_by_internal_id(self, internal_id: str): - res = requests.delete( - url="{}/inventory/managedObjects/{}".format(self.c8y_url, internal_id), auth=self.auth) - if res.status_code != 204 or res.status_code != 404: - res.raise_for_status() - + Tests that derive from class EnvironmentC8y use automated connect and + disconnect to Cumulocity. Additional checks are made for the status of + service mosquitto and service tedge-mapper. + """ -class EnvironmentC8y(BaseTest): cumulocity: Cumulocity def setup(self): self.log.debug("EnvironmentC8y Setup") - + super().setup() if self.project.c8yurl == "": self.abort( FAILED, "Cumulocity tenant URL is not set. Set with the env variable C8YURL") @@ -208,10 +45,6 @@ class EnvironmentC8y(BaseTest): self.abort( FAILED, "Device ID is not set. Set with the env variable C8YDEVICEID") - self.tedge = "/usr/bin/tedge" - self.tedge_mapper_c8y = "tedge-mapper-c8y" - self.sudo = "/usr/bin/sudo" - self.systemctl = "/usr/bin/systemctl" self.log.info("EnvironmentC8y Setup") self.addCleanupFunction(self.myenvcleanup) @@ -224,18 +57,10 @@ class EnvironmentC8y(BaseTest): ) # Connect the bridge - connect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y"], - stdouterr="tedge_connect", - ) + self.tedge_connect_c8y() # Test the bridge connection - connect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y", "--test"], - stdouterr="tedge_connect_test", - ) + self.tedge_connect_c8y_test() # Check if mosquitto is running well serv_mosq = self.startProcess( @@ -278,11 +103,7 @@ class EnvironmentC8y(BaseTest): self.log.debug("EnvironmentC8y Cleanup") # Disconnect Bridge - disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="tedge_disconnect", - ) + self.tedge_disconnect_c8y() # Check if tedge-mapper is disabled serv_mosq = self.startProcess( @@ -291,3 +112,4 @@ class EnvironmentC8y(BaseTest): stdouterr="serv_mapper5", expectedExitStatus="==3", ) + diff --git a/tests/PySys/environments/environment_tedge.py b/tests/PySys/environments/environment_tedge.py new file mode 100644 index 00000000..9add9f6e --- /dev/null +++ b/tests/PySys/environments/environment_tedge.py @@ -0,0 +1,81 @@ +import os +import subprocess +import time + +from pysys.basetest import BaseTest + + +class TedgeEnvironment(BaseTest): + """Class with helper and convenicence methods for testing tedge""" + + def setup(self): + self.sudo = "/usr/bin/sudo" + self.tedge = "/usr/bin/tedge" + self.tedge_mapper_c8y = "tedge-mapper-c8y" + self.sudo = "/usr/bin/sudo" + self.systemctl = "/usr/bin/systemctl" + + def wait_if_restarting_mosquitto_too_frequently(self): + """Make sure we do not restart mosqiotto too frequently + Systemd will become suspicious when mosquitto is restarted more + freqeuntly than 5 seconds. + """ + # Ideally we would expect 5 seconds here, but only waiting + # for 10 makes the issue diappear + minimum_time = 10 + + # Make sure we use the right path for ps, there seems to be an + # issue with injecting PATH in pysys, so we use an absolute path for now + if os.path.exists("/usr/bin/ps"): + path_ps = "/usr/bin/ps" + elif os.path.exists("/bin/ps"): + # the place where mythic beasts has the ps + path_ps = "/bin/ps" + else: + raise SystemError("Cannot find ps") + + etimes = subprocess.check_output( + f"{path_ps} -o etimes $(pidof mosquitto)", shell=True + ) + runtime = int(etimes.split()[1]) + if runtime <= minimum_time: + self.log.info( + f"Restarting mosquitto too frequently in the last {minimum_time} seconds. It was only up for {runtime} seconds" + ) + # Derive additional delay time and add one safety second + delay = minimum_time - runtime + 1 + self.log.info(f"Delaying execution by {delay} seconds") + time.sleep(delay) + + def tedge_connect_c8y(self, expectedExitStatus="==0"): + + self.wait_if_restarting_mosquitto_too_frequently() + + connect = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "connect", "c8y"], + stdouterr="tedge_connect_c8y", + expectedExitStatus=expectedExitStatus, + ) + return connect + + def tedge_disconnect_c8y(self, expectedExitStatus="==0"): + + self.wait_if_restarting_mosquitto_too_frequently() + + connect = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "disconnect", "c8y"], + stdouterr="tedge_disconnect_c8y", + expectedExitStatus=expectedExitStatus, + ) + return connect + + def tedge_connect_c8y_test(self, expectedExitStatus="==0"): + connect_c8y = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "connect", "c8y", "--test"], + stdouterr="tedge_connect_c8y_test", + expectedExitStatus=expectedExitStatus, + ) + return connect_c8y diff --git a/tests/PySys/misc_features/invalid_device_id/run.py b/tests/PySys/misc_features/invalid_device_id/run.py index b93780eb..692d5f58 100644 --- a/tests/PySys/misc_features/invalid_device_id/run.py +++ b/tests/PySys/misc_features/invalid_device_id/run.py @@ -17,7 +17,7 @@ class InvalidDeviceId(BaseTest): def setup(self): self.tedge = "/usr/bin/tedge" self.sudo = "/usr/bin/sudo" - + # create a custom certiticate directory for testing purpose create_cert_dir = self.startProcess( command=self.sudo, diff --git a/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py b/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py index 4487dc03..9ff427b6 100644 --- a/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py +++ b/tests/PySys/misc_features/mqtt_port_change_connection_works/run.py @@ -21,18 +21,14 @@ Now validate the services that use the mqtt port """ -class MqttPortChangeConnectionWorks(BaseTest): +class MqttPortChangeConnectionWorks(TedgeEnvironment): def setup(self): self.tedge = "/usr/bin/tedge" self.sudo = "/usr/bin/sudo" # disconnect from c8y cloud - disconnect_c8y = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="disconnect_c8y", - ) + self.tedge_disconnect_c8y() # set a new mqtt port for local communication mqtt_port = self.startProcess( command=self.sudo, @@ -43,18 +39,10 @@ class MqttPortChangeConnectionWorks(BaseTest): def execute(self): # connect to c8y cloud - connect_c8y = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y"], - stdouterr="connect_c8y", - ) + self.tedge_connect_c8y() # check connection - connect_c8y = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y", "--test"], - stdouterr="check_con_c8y", - ) + self.tedge_connect_c8y_test() # subscribe for messages mqtt_sub = self.startProcess( @@ -69,7 +57,7 @@ class MqttPortChangeConnectionWorks(BaseTest): # validate tedge mqtt pub/sub self.validate_tedge_mqtt() # validate c8y connection - self.assertGrep("check_con_c8y.out", + self.assertGrep("tedge_connect_c8y_test.out", "connection check is successful", contains=True) # validate c8y mapper self.validate_tedge_mapper_c8y() @@ -168,11 +156,7 @@ class MqttPortChangeConnectionWorks(BaseTest): # disconnect again # disconnect Bridge - c8y_disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="c8y_disconnect", - ) + self.tedge_disconnect_c8y() # unset a new mqtt port, falls back to default port (1883) mqtt_port = self.startProcess( @@ -182,16 +166,7 @@ class MqttPortChangeConnectionWorks(BaseTest): ) # connect Bridge - c8y_connect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "connect", "c8y"], - stdouterr="c8y_connect", - ) + self.tedge_connect_c8y() # disconnect Bridge - c8y_disconnect = self.startProcess( - command=self.sudo, - arguments=[self.tedge, "disconnect", "c8y"], - stdouterr="c8y_disconnect", - ) - + self.tedge_disconnect_c8y() diff --git a/tests/PySys/misc_features/mqtt_port_set/run.py b/tests/PySys/misc_features/mqtt_port_set/run.py index 72b3dbc4..61d237b5 100644 --- a/tests/PySys/misc_features/mqtt_port_set/run.py +++ b/tests/PySys/misc_features/mqtt_port_set/r |