summaryrefslogtreecommitdiffstats
path: root/ci
diff options
context:
space:
mode:
authorMichael Abel <75477722+abelikt@users.noreply.github.com>2021-11-22 13:44:02 +0100
committerGitHub <noreply@github.com>2021-11-22 13:44:02 +0100
commita534f4920b6297eece3255846b513718a21bfd08 (patch)
treead361db05fe66415494efe60a58d8e57db0afb0b /ci
parent7e67ebda4485dad5da927f43fef4b65961a1c28b (diff)
[CIT-455] Finalize azure roundtrip (#581)
* Checkout changes from CI branch * Fix typos and switch to main repo * Add forgotten file * Add device name to topic * Add secret to workflow * Ignore Azure for now * Extract settings to GH Secrets * Revert "Ignore Azure for now" This reverts commit 92b5ec93a437e33ecfb9d027355a080129f447be. * Use more pythonic way of accessing the environment * Further review comments and improvements * Bugfix + run black * Rename c8y smoketest * Rewrite get_auth_token * Rewrite generate_sas_token
Diffstat (limited to 'ci')
-rwxr-xr-xci/az_upload_device_cert.py163
-rwxr-xr-xci/ci_run_all_tests.sh13
-rwxr-xr-xci/ci_smoke_test_az.sh41
-rwxr-xr-xci/ci_smoke_test_c8y.sh (renamed from ci/ci_smoke_test.sh)22
-rwxr-xr-xci/configure_bridge.sh26
-rwxr-xr-xci/roundtrip_local_to_az.py344
6 files changed, 589 insertions, 20 deletions
diff --git a/ci/az_upload_device_cert.py b/ci/az_upload_device_cert.py
new file mode 100755
index 00000000..b659419a
--- /dev/null
+++ b/ci/az_upload_device_cert.py
@@ -0,0 +1,163 @@
+#!/usr/bin/python3
+"""Upload a device certificate for Azure
+
+See also:
+https://docs.microsoft.com/en-us/rest/api/iothub/
+https://docs.microsoft.com/en-us/rest/api/iothub/service/devices/create-or-update-identity
+
+
+call example:
+$ ./az_upload_device_cert.py -d devpi3 -t 01F...222 -u ThinEdgeHub -s iothubowner
+
+Export environment variable SASKEYIOTHUB to the Shared access key of your IoT Hub.
+"""
+
+import argparse
+import base64
+import hashlib
+import hmac
+import os
+import sys
+import time
+import urllib
+
+import requests
+
+
+def generate_sas_token(uri, key, policy_name, expiry=3600):
+ """Generate Shared Access Token
+ Analog to:
+ https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-dev-guide-sas?tabs=python
+ """
+ ttlive = int(time.time() + expiry)
+ newuri = urllib.parse.quote_plus(uri)
+ skey = "%s\n%d" % ((newuri), ttlive)
+ code = hmac.HMAC(
+ base64.b64decode(key), skey.encode("utf-8"), hashlib.sha256
+ ).digest()
+ sign = base64.b64encode(code)
+ token = {"sr": uri, "sig": sign, "se": str(ttlive)}
+ if policy_name is not None:
+ token["skn"] = policy_name
+ final_token = "SharedAccessSignature " + urllib.parse.urlencode(token)
+ return final_token
+
+
+def delete_device(devname, hub, sas_name):
+ """Delete the device"""
+
+ try:
+ sas_policy_primary_key_iothub = os.environ["SASKEYIOTHUB"]
+ except KeyError:
+ print("Error environment variable SASKEYIOTHUB not set")
+ sys.exit(1)
+
+ expiry = 3600
+
+ uri = f"{hub}.azure-devices.net"
+
+ # generate a shared access token
+ token = generate_sas_token(uri, sas_policy_primary_key_iothub, sas_name, expiry)
+
+ url = f"https://{hub}.azure-devices.net/devices/{devname}"
+
+ headers = {
+ "Content-Type": "application/json",
+ "Content-Encoding": "utf-8",
+ "Authorization": token,
+ "If-Match": "*",
+ }
+
+ params = {"api-version": "2020-05-31-preview"}
+ req = requests.delete(url, params=params, headers=headers)
+
+ if req.status_code == 200:
+ print("Deleted the device")
+ print("Device Properties: ", req.text)
+ elif req.status_code == 204:
+ print("Unconditionally deleted the device")
+ print("Deleted Device Properties: ", req.text)
+ elif req.status_code == 404:
+ print("Device is not there, not deleted")
+ else:
+ print(f"Error: {req.status_code}")
+ print(f"Response Properties {req.text}")
+ req.raise_for_status()
+
+
+def upload_device_cert(devname, thprint, hub, sas_name, verbose):
+ """Upload device certificate
+ first generate an SAS access token, then upload the certificate"""
+
+ try:
+ sas_policy_primary_key_iothub = os.environ["SASKEYIOTHUB"]
+ except KeyError:
+ print("Error environment variable SASKEYIOTHUB not set")
+ sys.exit(1)
+
+ expiry = 3600
+
+ uri = f"{hub}.azure-devices.net"
+
+ # generate a sharec access token
+ token = generate_sas_token(uri, sas_policy_primary_key_iothub, sas_name, expiry)
+
+ # Now upload the certificate
+
+ url = f"https://{hub}.azure-devices.net/devices/{devname}"
+
+ headers = {
+ "Content-Type": "application/json",
+ "Content-Encoding": "utf-8",
+ "Authorization": token,
+ }
+
+ params = {"api-version": "2020-05-31-preview"}
+
+ data = (
+ '{"deviceId":"%s", "authentication": {"type" : "selfSigned",' % devname
+ + '"x509Thumbprint": { "primaryThumbprint":"%s", "secondaryThumbprint":"%s" }}}'
+ % (thprint, thprint)
+ )
+
+ req = requests.put(url, data, params=params, headers=headers)
+
+ if req.status_code == 200:
+ print("Uploaded device certificate")
+ if verbose:
+ print("Uploaded Device Properties : ", req.text)
+ else:
+ print(f"Error: {req.status_code}")
+ print("Response Properties", req.text)
+
+
+def main():
+ """Main entry point"""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-d", "--device", help="Device name")
+ parser.add_argument("-t", "--thumbprint", help="Device thumbprint")
+ parser.add_argument("-u", "--hub", help="IoT Hub")
+ parser.add_argument("-s", "--name", help="Name of the IoT hub SAS policy")
+
+ parser.add_argument("-v", "--verbose", help="Verbosity", action="count", default=0)
+ args = parser.parse_args()
+
+ try:
+ os.environ["SASKEYIOTHUB"]
+ except KeyError:
+ print("Error environment variable SASKEYIOTHUB not set")
+ sys.exit(1)
+
+ devname = args.device
+ thprint = args.thumbprint
+ hub = args.hub
+ sas_name = args.name
+ verbose = args.verbose
+
+ delete_device(devname, hub, sas_name)
+
+ upload_device_cert(devname, thprint, hub, sas_name, verbose)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/ci/ci_run_all_tests.sh b/ci/ci_run_all_tests.sh
index fb544492..9c57d12a 100755
--- a/ci/ci_run_all_tests.sh
+++ b/ci/ci_run_all_tests.sh
@@ -14,13 +14,26 @@
# EXAMPLEDIR : The direcory of the sawtooth example
# C8YURL : e.g. https://thin-edge-io.eu-latest.cumulocity.com
+# Adding sbin seems to be necessary for non Raspberry P OS systems as Debian or Ubuntu
+PATH=$PATH:/usr/sbin
+
+echo "Disconnect old bridge"
+
+# Disconnect - may fail if not there
+sudo tedge disconnect c8y
+
+# From now on exit if a command exits with a non-zero status.
+# Commands above are allowed to fail
set -e
+
cd $TEBASEDIR
# Check if clients are installed
dpkg -s mosquitto-clients
+./ci/configure_bridge.sh
+
# Run all PySys tests
python3 -m venv ~/env-pysys
diff --git a/ci/ci_smoke_test_az.sh b/ci/ci_smoke_test_az.sh
new file mode 100755
index 00000000..e6cf5c00
--- /dev/null
+++ b/ci/ci_smoke_test_az.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/bash
+
+# Smoke test for Azure IoT
+# The bridge should be already configured (done by configure_bridge.sh)
+# lets avoid to create a new certifiate in this script as it is shared with C8y.
+
+# This script is intended to be executed by a GitHub self-hosted runner
+# on a Raspberry Pi.
+
+# Disconnect - may fail if not there
+sudo tedge disconnect az
+sudo tedge disconnect c8y
+
+set -e
+
+# The bridge should be already configured
+# lets avoid to create a new certifiate here ()
+# ./ci/configure_bridge.sh
+
+# Read device thumbprint from command line
+THUMB=$(sudo tedge cert show | grep Thumb | cut -c13-)
+echo "DEVICE Thumbprint is " $THUMB
+
+
+python3 -m venv ~/env-eventhub
+source ~/env-eventhub/bin/activate
+pip install azure-eventhub
+
+./ci/az_upload_device_cert.py -d $C8YDEVICE -t $THUMB -u $IOTHUBNAME -s iothubowner
+
+sudo tedge connect az
+
+# Get messages from a service bus
+#./ci/roundtrip_local_to_az.py -p sas_policy2 -b thinedgebus -q testqueue2
+# Use Azure SDK to access the IoT Hub
+./ci/roundtrip_local_to_az.py eventhub
+
+sudo tedge disconnect az
+
+deactivate
+
diff --git a/ci/ci_smoke_test.sh b/ci/ci_smoke_test_c8y.sh
index 3a14f344..e56a94b3 100755
--- a/ci/ci_smoke_test.sh
+++ b/ci/ci_smoke_test_c8y.sh
@@ -10,7 +10,7 @@
# on a Raspberry Pi.
# Command line parameters:
-# ci_smoke_test.sh <timezone>
+# ci_smoke_test_c8y.sh <timezone>
# Environment variables:
# C8YDEVICE
# C8YUSERNAME
@@ -92,25 +92,7 @@ sudo tedge disconnect c8y
# Commands above are allowed to fail
set -e
-echo "Configuring Bridge"
-
-sudo tedge cert remove
-
-sudo tedge cert create --device-id=$C8YDEVICE
-
-sudo tedge cert show
-
-sudo tedge config set c8y.url thin-edge-io.eu-latest.cumulocity.com
-
-sudo tedge config set c8y.root.cert.path /etc/ssl/certs
-
-sudo tedge config list
-
-# Note: This will always upload a new certificate. From time to time
-# we should delete the old ones in c8y
-sudo -E tedge cert upload c8y --user $C8YUSERNAME
-
-cat /etc/mosquitto/mosquitto.conf
+./ci/configure_bridge.sh
echo "Connect again"
sudo tedge connect c8y
diff --git a/ci/configure_bridge.sh b/ci/configure_bridge.sh
new file mode 100755
index 00000000..5c1f9c77
--- /dev/null
+++ b/ci/configure_bridge.sh
@@ -0,0 +1,26 @@
+
+set -e
+
+echo "Configuring Bridge"
+
+sudo tedge cert remove
+
+sudo tedge cert create --device-id=$C8YDEVICE
+
+sudo tedge cert show
+
+sudo tedge config set c8y.url thin-edge-io.eu-latest.cumulocity.com
+
+sudo tedge config set c8y.root.cert.path /etc/ssl/certs
+
+sudo tedge config set az.url $IOTHUBNAME.azure-devices.net
+
+sudo tedge config set az.root.cert.path /etc/ssl/certs/Baltimore_CyberTrust_Root.pem
+
+sudo tedge config list
+
+# Note: This will always upload a new certificate. From time to time
+# we should delete the old ones in c8y
+sudo -E tedge cert upload c8y --user $C8YUSERNAME
+
+cat /etc/mosquitto/mosquitto.conf
diff --git a/ci/roundtrip_local_to_az.py b/ci/roundtrip_local_to_az.py
new file mode 100755
index 00000000..bace0d60
--- /dev/null
+++ b/ci/roundtrip_local_to_az.py
@@ -0,0 +1,344 @@
+#!/usr/bin/env python3
+
+"""Perform a full roundtrip of messages from thin-edge to Azure IoT.
+
+We publish with thin-edge to Azure IoT; then route the messages to a
+Service Bus Queue; from there we retrieve the messages via a REST
+Interface and compare them with what we have sent in the beginning.
+
+Alternatively, we can use the Azure SDK to access the IoT Hub directly.
+
+When this script is called you need to be already connected to Azure.
+
+Call example:
+$ ./roundtrip_local_to_az.py -a 10 -p sas_policy -b thinedgebus -q testqueue
+ Set Env:
+ - SASKEYQUEUE : Shared Access Key to the service bus queue
+
+Alternatively:
+./ci/roundtrip_local_to_az.py eventhub
+ Set Env:
+ - AZUREENDPOINT : Endpoint descritpion string copied from the Azure UI
+ - AZUREEVENTHUB : Name of the IoT Hub
+"""
+
+import argparse
+import base64
+import json
+import json.decoder
+import hashlib
+import hmac
+import os
+import sys
+import subprocess
+import time
+import urllib
+
+import requests
+
+import logging
+from azure.eventhub import EventHubConsumerClient
+import datetime
+
+debug = False
+if debug:
+ logging.basicConfig(level=logging.INFO)
+else:
+ logging.basicConfig()
+
+logger = logging.getLogger("roundtrip")
+logger.setLevel(level=logging.INFO)
+
+
+def publish_az(amount, topic, key):
+ """Publish to Azure topic"""
+
+ logger.info(f"Publishing messages to topic {topic}")
+
+ for i in range(amount):
+ message = f'{{"{key}": {i} }}'
+
+ cmd = ["/usr/bin/tedge", "mqtt", "pub", topic, message]
+
+ try:
+ ret = subprocess.run(cmd, check=True)
+ except subprocess.CalledProcessError as e:
+ logger.error("Failed to publish %s", e)
+ sys.exit(1)
+ ret.check_returncode()
+
+ logger.info("Published message: %s" % message)
+ time.sleep(0.05)
+
+
+def get_auth_token(sb_name, eh_name, sas_name, sas_value):
+ """Create authentication token
+ Analog to:
+ https://docs.microsoft.com/en-us/rest/api/eventhub/generate-sas-token
+ """
+ newuri = urllib.parse.quote_plus(
+ f"https://{sb_name}.servicebus.windows.net/{eh_name}"
+ )
+ sas_enc = sas_value.encode("utf-8")
+ expiry = str(int(time.time()) + 10000)
+ str_sign = newuri + "\n" + expiry
+ signed_hmac = hmac.HMAC(sas_enc, str_sign.encode("utf-8"), hashlib.sha256)
+ signature = urllib.parse.quote(base64.b64encode(signed_hmac.digest()))
+ ret = {
+ "sb_name": sb_name,
+ "eh_name": eh_name,
+ "token": f"SharedAccessSignature sr={newuri}&sig={signature}&se={expiry}&skn={sas_name}",
+ }
+ return ret
+
+
+def retrieve_queue_az(
+ sas_policy_name, service_bus_name, queue_name, amount, verbose, key
+):
+ """Get the published messages back from a service bus queue
+ Probably soon obsolete.
+ """
+
+ try:
+ sas_policy_primary_key = os.environ["SASKEYQUEUE"]
+ except KeyError:
+ print("Error environment variable SASKEYQUEUE not set")
+ sys.exit(1)
+
+ tokendict = get_auth_token(
+ service_bus_name, queue_name, sas_policy_name, sas_policy_primary_key
+ )
+
+ token = tokendict["token"]
+
+ if verbose:
+ print("Token", token)
+
+ # See also:
+ # https://docs.microsoft.com/en-us/rest/api/servicebus/receive-and-delete-message-destructive-read
+
+ url = (
+ f"https://{service_bus_name}.servicebus.windows.net/{queue_name}/messages/head"
+ )
+
+ print(f"Downloading mesages from {url}")
+ headers = {
+ "Accept": "application/json",
+ "Content-Type": "application/json;charset=utf-8",
+ "Authorization": token,
+ }
+ messages = []
+
+ while True:
+
+ try:
+ req = requests.delete(url, headers=headers)
+ except requests.exceptions.ConnectionError as e:
+ print("Exception: ", e)
+ print("Connection error: We wait for some seconds and then continue ...")
+ time.sleep(10)
+ continue
+
+ if req.status_code == 200:
+ text = req.text
+ props = json.loads(req.headers["BrokerProperties"])
+ number = props["SequenceNumber"]
+ queuetime = props["EnqueuedTimeUtc"]
+
+ try:
+ data = json.loads(text)
+ value = data[key]
+ except json.decoder.JSONDecodeError:
+ print("Json Parsing Error: ", text)
+ value = None
+ except KeyError:
+ print("Parsing Error: ", text)
+ value = None
+
+ print(
+ f'Got message {number} from {queuetime} message is "{text}" value: "{value}"'
+ )
+ messages.append(value)
+
+ elif req.status_code == 204:
+ print("Queue Empty: HTTP status: ", req.status_code)
+ break
+ elif req.status_code == 401:
+ print("Token Expired: HTTP status: ", req.status_code)
+ raise SystemError("Token Expired")
+ else:
+ print(req)
+ print("Error HTTP status: ", req.status_code)
+ raise SystemError("HTTP Error")
+
+ if messages == list(range(amount)):
+ print("Validation PASSED")
+ return True
+ else:
+ print("Validation FAILED")
+ return False
+
+
+class EventHub:
+ """Class to host all properties and access functions for an IoT Hub/ Eventhub
+ Needs https://pypi.org/project/azure-eventhub
+
+ Docs:
+ https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
+ https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html
+ https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html#azure.eventhub.EventData
+ """
+
+ def __init__(self, message_key, amount):
+
+ try:
+ connection_str = os.environ["AZUREENDPOINT"]
+ except KeyError:
+ logger.error("Error environment variable AZUREENDPOINT not set")
+ sys.exit(1)
+
+ try:
+ eventhub_name = os.environ["AZUREEVENTHUB"]
+ except KeyError:
+ logger.error("Error environment variable AZUREEVENTHUB not set")
+ sys.exit(1)
+
+ self.message_key = message_key
+ self.amount = amount
+ consumer_group = "$Default"
+ timeout = 10 # 10s : minimum timeout
+
+ self.client = EventHubConsumerClient.from_connection_string(
+ connection_str,
+ consumer_group,
+ eventhub_name=eventhub_name,
+ idle_timeout=timeout,
+ )
+
+ self.received_messages = []
+
+ def on_error(self, partition_context, event):
+ logger.error(
+ "Received Error from partition {}".format(partition_context.partition_id)
+ )
+ logger.error(f"Event: {event}")
+
+ def on_event(self, partition_context, event):
+ logger.debug(
+ "Received event from partition {}".format(partition_context.partition_id)
+ )
+ logger.debug(f"Event: {event}")
+
+ if event == None:
+ logger.debug("Timeout: Exiting event loop ... ")
+ self.client.close()
+ return
+
+ partition_context.update_checkpoint(event)
+
+ jevent = event.body_as_json()
+
+ message = jevent.get(self.message_key)
+ if message != None:
+ logger.info("Matched key: %s" % message)
+ self.received_messages.append(message)
+ else:
+ logger.info("Not matched key: %s" % jevent)
+
+ def read_from_hub(self, start):
+ """Read data from the event hub
+
+ Possible values for start:
+ start = "-1" : Read all messages
+ start = "@latest" : Read only the latest messages
+ start = datetime.datetime.now(tz=datetime.timezone.utc) : use current sdate
+
+ When no messages are received the client.receive will return.
+ """
+
+ with self.client:
+ self.client.receive(
+ on_event=self.on_event,
+ on_error=self.on_error,
+ starting_position=start,
+ max_wait_time=10,
+ )
+ logger.info("Exiting event loop")
+
+ def validate(self):
+ """Validate the messages that we have received against"""
+
+ if self.received_messages == list(range(self.amount)):
+ print("Validation PASSED")
+ return True
+ else:
+ print("Validation FAILED")
+ return False
+
+
+def main():
+ """Main entry point"""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("method", choices=["eventhub", "servicebus"])
+ parser.add_argument("-b", "--bus", help="Service Bus Name")
+ parser.add_argument("-p", "--policy", help="SAS Policy Name")
+ parser.add_argument("-q", "--queue", help="Queue Name")
+ parser.add_argument(
+ "-a", "--amount", help="Amount of messages to send", type=int, default=20
+ )
+ parser.add_argument("-v", "--verbose", help="Verbosity", action="count", default=0)
+ args = parser.parse_args()
+
+ amount = args.amount
+ sas_policy_name = args.policy
+ service_bus_name = args.bus
+ queue_name = args.queue
+ verbose = args.verbose
+ method = args.method
+
+ if method == "servicebus":
+ try:
+ os.environ["SASKEYQUEUE"]
+ except KeyError:
+ print("Error environment variable SASKEYQUEUE not set")
+ sys.exit(1)
+
+ try:
+ device = os.environ["C8YDEVICE"]
+ except KeyError:
+ print("Error environment variable C8YDEVICE not set")
+ sys.exit(1)
+
+ # Send roundtrip via the tedge mapper
+ mqtt_topic = "tedge/measurements"
+ # In case that we want to avoid the azure mapper
+ # mqtt_topic = "az/messages/events/"
+
+ message_key = "thin-edge-azure-roundtrip-" + device
+
+ if method == "eventhub":
+
+ eh = EventHub(message_key=message_key, amount=amount)
+
+ start = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ publish_az(amount, mqtt_topic, message_key)
+
+ eh.read_from_hub(start)
+ if not eh.validate():
+ sys.exit(1)
+
+ elif method == "servicebus":
+
+ publish_az(amount, mqtt_topic, message_key)
+
+ result = retrieve_queue_az(
+ sas_policy_name, service_bus_name, queue_name, amount, verbose, message_key
+ )
+
+ if not result:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()