diff options
author | Michael Abel <75477722+abelikt@users.noreply.github.com> | 2021-11-22 13:44:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-22 13:44:02 +0100 |
commit | a534f4920b6297eece3255846b513718a21bfd08 (patch) | |
tree | ad361db05fe66415494efe60a58d8e57db0afb0b /ci | |
parent | 7e67ebda4485dad5da927f43fef4b65961a1c28b (diff) |
[CIT-455] Finalize azure roundtrip (#581)
* Checkout changes from CI branch
* Fix typos and switch to main repo
* Add forgotten file
* Add device name to topic
* Add secret to workflow
* Ignore Azure for now
* Extract settings to GH Secrets
* Revert "Ignore Azure for now"
This reverts commit 92b5ec93a437e33ecfb9d027355a080129f447be.
* Use more pythonic way of accessing the environment
* Further review comments and improvements
* Bugfix + run black
* Rename c8y smoketest
* Rewrite get_auth_token
* Rewrite generate_sas_token
Diffstat (limited to 'ci')
-rwxr-xr-x | ci/az_upload_device_cert.py | 163 | ||||
-rwxr-xr-x | ci/ci_run_all_tests.sh | 13 | ||||
-rwxr-xr-x | ci/ci_smoke_test_az.sh | 41 | ||||
-rwxr-xr-x | ci/ci_smoke_test_c8y.sh (renamed from ci/ci_smoke_test.sh) | 22 | ||||
-rwxr-xr-x | ci/configure_bridge.sh | 26 | ||||
-rwxr-xr-x | ci/roundtrip_local_to_az.py | 344 |
6 files changed, 589 insertions, 20 deletions
diff --git a/ci/az_upload_device_cert.py b/ci/az_upload_device_cert.py new file mode 100755 index 00000000..b659419a --- /dev/null +++ b/ci/az_upload_device_cert.py @@ -0,0 +1,163 @@ +#!/usr/bin/python3 +"""Upload a device certificate for Azure + +See also: +https://docs.microsoft.com/en-us/rest/api/iothub/ +https://docs.microsoft.com/en-us/rest/api/iothub/service/devices/create-or-update-identity + + +call example: +$ ./az_upload_device_cert.py -d devpi3 -t 01F...222 -u ThinEdgeHub -s iothubowner + +Export environment variable SASKEYIOTHUB to the Shared access key of your IoT Hub. +""" + +import argparse +import base64 +import hashlib +import hmac +import os +import sys +import time +import urllib + +import requests + + +def generate_sas_token(uri, key, policy_name, expiry=3600): + """Generate Shared Access Token + Analog to: + https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-dev-guide-sas?tabs=python + """ + ttlive = int(time.time() + expiry) + newuri = urllib.parse.quote_plus(uri) + skey = "%s\n%d" % ((newuri), ttlive) + code = hmac.HMAC( + base64.b64decode(key), skey.encode("utf-8"), hashlib.sha256 + ).digest() + sign = base64.b64encode(code) + token = {"sr": uri, "sig": sign, "se": str(ttlive)} + if policy_name is not None: + token["skn"] = policy_name + final_token = "SharedAccessSignature " + urllib.parse.urlencode(token) + return final_token + + +def delete_device(devname, hub, sas_name): + """Delete the device""" + + try: + sas_policy_primary_key_iothub = os.environ["SASKEYIOTHUB"] + except KeyError: + print("Error environment variable SASKEYIOTHUB not set") + sys.exit(1) + + expiry = 3600 + + uri = f"{hub}.azure-devices.net" + + # generate a shared access token + token = generate_sas_token(uri, sas_policy_primary_key_iothub, sas_name, expiry) + + url = f"https://{hub}.azure-devices.net/devices/{devname}" + + headers = { + "Content-Type": "application/json", + "Content-Encoding": "utf-8", + "Authorization": token, + "If-Match": "*", + } + + params = {"api-version": "2020-05-31-preview"} + req = requests.delete(url, params=params, headers=headers) + + if req.status_code == 200: + print("Deleted the device") + print("Device Properties: ", req.text) + elif req.status_code == 204: + print("Unconditionally deleted the device") + print("Deleted Device Properties: ", req.text) + elif req.status_code == 404: + print("Device is not there, not deleted") + else: + print(f"Error: {req.status_code}") + print(f"Response Properties {req.text}") + req.raise_for_status() + + +def upload_device_cert(devname, thprint, hub, sas_name, verbose): + """Upload device certificate + first generate an SAS access token, then upload the certificate""" + + try: + sas_policy_primary_key_iothub = os.environ["SASKEYIOTHUB"] + except KeyError: + print("Error environment variable SASKEYIOTHUB not set") + sys.exit(1) + + expiry = 3600 + + uri = f"{hub}.azure-devices.net" + + # generate a sharec access token + token = generate_sas_token(uri, sas_policy_primary_key_iothub, sas_name, expiry) + + # Now upload the certificate + + url = f"https://{hub}.azure-devices.net/devices/{devname}" + + headers = { + "Content-Type": "application/json", + "Content-Encoding": "utf-8", + "Authorization": token, + } + + params = {"api-version": "2020-05-31-preview"} + + data = ( + '{"deviceId":"%s", "authentication": {"type" : "selfSigned",' % devname + + '"x509Thumbprint": { "primaryThumbprint":"%s", "secondaryThumbprint":"%s" }}}' + % (thprint, thprint) + ) + + req = requests.put(url, data, params=params, headers=headers) + + if req.status_code == 200: + print("Uploaded device certificate") + if verbose: + print("Uploaded Device Properties : ", req.text) + else: + print(f"Error: {req.status_code}") + print("Response Properties", req.text) + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--device", help="Device name") + parser.add_argument("-t", "--thumbprint", help="Device thumbprint") + parser.add_argument("-u", "--hub", help="IoT Hub") + parser.add_argument("-s", "--name", help="Name of the IoT hub SAS policy") + + parser.add_argument("-v", "--verbose", help="Verbosity", action="count", default=0) + args = parser.parse_args() + + try: + os.environ["SASKEYIOTHUB"] + except KeyError: + print("Error environment variable SASKEYIOTHUB not set") + sys.exit(1) + + devname = args.device + thprint = args.thumbprint + hub = args.hub + sas_name = args.name + verbose = args.verbose + + delete_device(devname, hub, sas_name) + + upload_device_cert(devname, thprint, hub, sas_name, verbose) + + +if __name__ == "__main__": + main() diff --git a/ci/ci_run_all_tests.sh b/ci/ci_run_all_tests.sh index fb544492..9c57d12a 100755 --- a/ci/ci_run_all_tests.sh +++ b/ci/ci_run_all_tests.sh @@ -14,13 +14,26 @@ # EXAMPLEDIR : The direcory of the sawtooth example # C8YURL : e.g. https://thin-edge-io.eu-latest.cumulocity.com +# Adding sbin seems to be necessary for non Raspberry P OS systems as Debian or Ubuntu +PATH=$PATH:/usr/sbin + +echo "Disconnect old bridge" + +# Disconnect - may fail if not there +sudo tedge disconnect c8y + +# From now on exit if a command exits with a non-zero status. +# Commands above are allowed to fail set -e + cd $TEBASEDIR # Check if clients are installed dpkg -s mosquitto-clients +./ci/configure_bridge.sh + # Run all PySys tests python3 -m venv ~/env-pysys diff --git a/ci/ci_smoke_test_az.sh b/ci/ci_smoke_test_az.sh new file mode 100755 index 00000000..e6cf5c00 --- /dev/null +++ b/ci/ci_smoke_test_az.sh @@ -0,0 +1,41 @@ +#!/usr/bin/bash + +# Smoke test for Azure IoT +# The bridge should be already configured (done by configure_bridge.sh) +# lets avoid to create a new certifiate in this script as it is shared with C8y. + +# This script is intended to be executed by a GitHub self-hosted runner +# on a Raspberry Pi. + +# Disconnect - may fail if not there +sudo tedge disconnect az +sudo tedge disconnect c8y + +set -e + +# The bridge should be already configured +# lets avoid to create a new certifiate here () +# ./ci/configure_bridge.sh + +# Read device thumbprint from command line +THUMB=$(sudo tedge cert show | grep Thumb | cut -c13-) +echo "DEVICE Thumbprint is " $THUMB + + +python3 -m venv ~/env-eventhub +source ~/env-eventhub/bin/activate +pip install azure-eventhub + +./ci/az_upload_device_cert.py -d $C8YDEVICE -t $THUMB -u $IOTHUBNAME -s iothubowner + +sudo tedge connect az + +# Get messages from a service bus +#./ci/roundtrip_local_to_az.py -p sas_policy2 -b thinedgebus -q testqueue2 +# Use Azure SDK to access the IoT Hub +./ci/roundtrip_local_to_az.py eventhub + +sudo tedge disconnect az + +deactivate + diff --git a/ci/ci_smoke_test.sh b/ci/ci_smoke_test_c8y.sh index 3a14f344..e56a94b3 100755 --- a/ci/ci_smoke_test.sh +++ b/ci/ci_smoke_test_c8y.sh @@ -10,7 +10,7 @@ # on a Raspberry Pi. # Command line parameters: -# ci_smoke_test.sh <timezone> +# ci_smoke_test_c8y.sh <timezone> # Environment variables: # C8YDEVICE # C8YUSERNAME @@ -92,25 +92,7 @@ sudo tedge disconnect c8y # Commands above are allowed to fail set -e -echo "Configuring Bridge" - -sudo tedge cert remove - -sudo tedge cert create --device-id=$C8YDEVICE - -sudo tedge cert show - -sudo tedge config set c8y.url thin-edge-io.eu-latest.cumulocity.com - -sudo tedge config set c8y.root.cert.path /etc/ssl/certs - -sudo tedge config list - -# Note: This will always upload a new certificate. From time to time -# we should delete the old ones in c8y -sudo -E tedge cert upload c8y --user $C8YUSERNAME - -cat /etc/mosquitto/mosquitto.conf +./ci/configure_bridge.sh echo "Connect again" sudo tedge connect c8y diff --git a/ci/configure_bridge.sh b/ci/configure_bridge.sh new file mode 100755 index 00000000..5c1f9c77 --- /dev/null +++ b/ci/configure_bridge.sh @@ -0,0 +1,26 @@ + +set -e + +echo "Configuring Bridge" + +sudo tedge cert remove + +sudo tedge cert create --device-id=$C8YDEVICE + +sudo tedge cert show + +sudo tedge config set c8y.url thin-edge-io.eu-latest.cumulocity.com + +sudo tedge config set c8y.root.cert.path /etc/ssl/certs + +sudo tedge config set az.url $IOTHUBNAME.azure-devices.net + +sudo tedge config set az.root.cert.path /etc/ssl/certs/Baltimore_CyberTrust_Root.pem + +sudo tedge config list + +# Note: This will always upload a new certificate. From time to time +# we should delete the old ones in c8y +sudo -E tedge cert upload c8y --user $C8YUSERNAME + +cat /etc/mosquitto/mosquitto.conf diff --git a/ci/roundtrip_local_to_az.py b/ci/roundtrip_local_to_az.py new file mode 100755 index 00000000..bace0d60 --- /dev/null +++ b/ci/roundtrip_local_to_az.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 + +"""Perform a full roundtrip of messages from thin-edge to Azure IoT. + +We publish with thin-edge to Azure IoT; then route the messages to a +Service Bus Queue; from there we retrieve the messages via a REST +Interface and compare them with what we have sent in the beginning. + +Alternatively, we can use the Azure SDK to access the IoT Hub directly. + +When this script is called you need to be already connected to Azure. + +Call example: +$ ./roundtrip_local_to_az.py -a 10 -p sas_policy -b thinedgebus -q testqueue + Set Env: + - SASKEYQUEUE : Shared Access Key to the service bus queue + +Alternatively: +./ci/roundtrip_local_to_az.py eventhub + Set Env: + - AZUREENDPOINT : Endpoint descritpion string copied from the Azure UI + - AZUREEVENTHUB : Name of the IoT Hub +""" + +import argparse +import base64 +import json +import json.decoder +import hashlib +import hmac +import os +import sys +import subprocess +import time +import urllib + +import requests + +import logging +from azure.eventhub import EventHubConsumerClient +import datetime + +debug = False +if debug: + logging.basicConfig(level=logging.INFO) +else: + logging.basicConfig() + +logger = logging.getLogger("roundtrip") +logger.setLevel(level=logging.INFO) + + +def publish_az(amount, topic, key): + """Publish to Azure topic""" + + logger.info(f"Publishing messages to topic {topic}") + + for i in range(amount): + message = f'{{"{key}": {i} }}' + + cmd = ["/usr/bin/tedge", "mqtt", "pub", topic, message] + + try: + ret = subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + logger.error("Failed to publish %s", e) + sys.exit(1) + ret.check_returncode() + + logger.info("Published message: %s" % message) + time.sleep(0.05) + + +def get_auth_token(sb_name, eh_name, sas_name, sas_value): + """Create authentication token + Analog to: + https://docs.microsoft.com/en-us/rest/api/eventhub/generate-sas-token + """ + newuri = urllib.parse.quote_plus( + f"https://{sb_name}.servicebus.windows.net/{eh_name}" + ) + sas_enc = sas_value.encode("utf-8") + expiry = str(int(time.time()) + 10000) + str_sign = newuri + "\n" + expiry + signed_hmac = hmac.HMAC(sas_enc, str_sign.encode("utf-8"), hashlib.sha256) + signature = urllib.parse.quote(base64.b64encode(signed_hmac.digest())) + ret = { + "sb_name": sb_name, + "eh_name": eh_name, + "token": f"SharedAccessSignature sr={newuri}&sig={signature}&se={expiry}&skn={sas_name}", + } + return ret + + +def retrieve_queue_az( + sas_policy_name, service_bus_name, queue_name, amount, verbose, key +): + """Get the published messages back from a service bus queue + Probably soon obsolete. + """ + + try: + sas_policy_primary_key = os.environ["SASKEYQUEUE"] + except KeyError: + print("Error environment variable SASKEYQUEUE not set") + sys.exit(1) + + tokendict = get_auth_token( + service_bus_name, queue_name, sas_policy_name, sas_policy_primary_key + ) + + token = tokendict["token"] + + if verbose: + print("Token", token) + + # See also: + # https://docs.microsoft.com/en-us/rest/api/servicebus/receive-and-delete-message-destructive-read + + url = ( + f"https://{service_bus_name}.servicebus.windows.net/{queue_name}/messages/head" + ) + + print(f"Downloading mesages from {url}") + headers = { + "Accept": "application/json", + "Content-Type": "application/json;charset=utf-8", + "Authorization": token, + } + messages = [] + + while True: + + try: + req = requests.delete(url, headers=headers) + except requests.exceptions.ConnectionError as e: + print("Exception: ", e) + print("Connection error: We wait for some seconds and then continue ...") + time.sleep(10) + continue + + if req.status_code == 200: + text = req.text + props = json.loads(req.headers["BrokerProperties"]) + number = props["SequenceNumber"] + queuetime = props["EnqueuedTimeUtc"] + + try: + data = json.loads(text) + value = data[key] + except json.decoder.JSONDecodeError: + print("Json Parsing Error: ", text) + value = None + except KeyError: + print("Parsing Error: ", text) + value = None + + print( + f'Got message {number} from {queuetime} message is "{text}" value: "{value}"' + ) + messages.append(value) + + elif req.status_code == 204: + print("Queue Empty: HTTP status: ", req.status_code) + break + elif req.status_code == 401: + print("Token Expired: HTTP status: ", req.status_code) + raise SystemError("Token Expired") + else: + print(req) + print("Error HTTP status: ", req.status_code) + raise SystemError("HTTP Error") + + if messages == list(range(amount)): + print("Validation PASSED") + return True + else: + print("Validation FAILED") + return False + + +class EventHub: + """Class to host all properties and access functions for an IoT Hub/ Eventhub + Needs https://pypi.org/project/azure-eventhub + + Docs: + https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin + https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html + https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html#azure.eventhub.EventData + """ + + def __init__(self, message_key, amount): + + try: + connection_str = os.environ["AZUREENDPOINT"] + except KeyError: + logger.error("Error environment variable AZUREENDPOINT not set") + sys.exit(1) + + try: + eventhub_name = os.environ["AZUREEVENTHUB"] + except KeyError: + logger.error("Error environment variable AZUREEVENTHUB not set") + sys.exit(1) + + self.message_key = message_key + self.amount = amount + consumer_group = "$Default" + timeout = 10 # 10s : minimum timeout + + self.client = EventHubConsumerClient.from_connection_string( + connection_str, + consumer_group, + eventhub_name=eventhub_name, + idle_timeout=timeout, + ) + + self.received_messages = [] + + def on_error(self, partition_context, event): + logger.error( + "Received Error from partition {}".format(partition_context.partition_id) + ) + logger.error(f"Event: {event}") + + def on_event(self, partition_context, event): + logger.debug( + "Received event from partition {}".format(partition_context.partition_id) + ) + logger.debug(f"Event: {event}") + + if event == None: + logger.debug("Timeout: Exiting event loop ... ") + self.client.close() + return + + partition_context.update_checkpoint(event) + + jevent = event.body_as_json() + + message = jevent.get(self.message_key) + if message != None: + logger.info("Matched key: %s" % message) + self.received_messages.append(message) + else: + logger.info("Not matched key: %s" % jevent) + + def read_from_hub(self, start): + """Read data from the event hub + + Possible values for start: + start = "-1" : Read all messages + start = "@latest" : Read only the latest messages + start = datetime.datetime.now(tz=datetime.timezone.utc) : use current sdate + + When no messages are received the client.receive will return. + """ + + with self.client: + self.client.receive( + on_event=self.on_event, + on_error=self.on_error, + starting_position=start, + max_wait_time=10, + ) + logger.info("Exiting event loop") + + def validate(self): + """Validate the messages that we have received against""" + + if self.received_messages == list(range(self.amount)): + print("Validation PASSED") + return True + else: + print("Validation FAILED") + return False + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser() + parser.add_argument("method", choices=["eventhub", "servicebus"]) + parser.add_argument("-b", "--bus", help="Service Bus Name") + parser.add_argument("-p", "--policy", help="SAS Policy Name") + parser.add_argument("-q", "--queue", help="Queue Name") + parser.add_argument( + "-a", "--amount", help="Amount of messages to send", type=int, default=20 + ) + parser.add_argument("-v", "--verbose", help="Verbosity", action="count", default=0) + args = parser.parse_args() + + amount = args.amount + sas_policy_name = args.policy + service_bus_name = args.bus + queue_name = args.queue + verbose = args.verbose + method = args.method + + if method == "servicebus": + try: + os.environ["SASKEYQUEUE"] + except KeyError: + print("Error environment variable SASKEYQUEUE not set") + sys.exit(1) + + try: + device = os.environ["C8YDEVICE"] + except KeyError: + print("Error environment variable C8YDEVICE not set") + sys.exit(1) + + # Send roundtrip via the tedge mapper + mqtt_topic = "tedge/measurements" + # In case that we want to avoid the azure mapper + # mqtt_topic = "az/messages/events/" + + message_key = "thin-edge-azure-roundtrip-" + device + + if method == "eventhub": + + eh = EventHub(message_key=message_key, amount=amount) + + start = datetime.datetime.now(tz=datetime.timezone.utc) + + publish_az(amount, mqtt_topic, message_key) + + eh.read_from_hub(start) + if not eh.validate(): + sys.exit(1) + + elif method == "servicebus": + + publish_az(amount, mqtt_topic, message_key) + + result = retrieve_queue_az( + sas_policy_name, service_bus_name, queue_name, amount, verbose, message_key + ) + + if not result: + sys.exit(1) + + +if __name__ == "__main__": + main() |