summaryrefslogtreecommitdiffstats
path: root/ci/roundtrip_local_to_az.py
blob: 6986daf28cc87c7d02917163bd16623e6894f337 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python3

"""Perform a full roundtrip of messages from thin-edge to Azure IoT.

We publish with thin-edge to Azure IoT; then route the messages to a
Service Bus Queue; from there we retrieve the messages via a REST
Interface and compare them with what we have sent in the beginning.

Alternatively, we can use the Azure SDK to access the IoT Hub directly.

When this script is called you need to be already connected to Azure.

Call example:
$ ./roundtrip_local_to_az.py  -a 10 -p sas_policy -b thinedgebus -q testqueue
    Set Env:
    - SASKEYQUEUE : Shared Access Key to the service bus queue

Alternatively:
./ci/roundtrip_local_to_az.py eventhub
    Set Env:
    - AZUREENDPOINT : Endpoint description string copied from the Azure UI
    - AZUREEVENTHUB : Name of the IoT Hub
"""

import argparse
import base64
import json
import json.decoder
import hashlib
import hmac
import os
import sys
import subprocess
import time
import urllib

import requests

import logging
from azure.eventhub import EventHubConsumerClient
import datetime

debug = False
if debug:
    logging.basicConfig(level=logging.INFO)
else:
    logging.basicConfig()

logger = logging.getLogger("roundtrip")
logger.setLevel(level=logging.INFO)


def publish_az(amount, topic, key):
    """Publish to Azure topic"""

    logger.info(f"Publishing messages to topic {topic}")

    for i in range(amount):
        message = f'{{"{key}": {i} }}'

        cmd = ["/usr/bin/tedge", "mqtt", "pub", topic, message]

        try:
            ret = subprocess.run(cmd, check=True)
        except subprocess.CalledProcessError as e:
            logger.error("Failed to publish %s", e)
            sys.exit(1)
        ret.check_returncode()

        logger.info("Published message: %s" % message)
        time.sleep(0.05)


def get_auth_token(sb_name, eh_name, sas_name, sas_value):
    """Create authentication token
    Analog to:
        https://docs.microsoft.com/en-us/rest/api/eventhub/generate-sas-token
    """
    newuri = urllib.parse.quote_plus(
        f"https://{sb_name}.servicebus.windows.net/{eh_name}"
    )
    sas_enc = sas_value.encode("utf-8")
    expiry = str(int(time.time()) + 10000)
    str_sign = newuri + "\n" + expiry
    signed_hmac = hmac.HMAC(sas_enc, str_sign.encode("utf-8"), hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac.digest()))
    ret = {
        "sb_name": sb_name,
        "eh_name": eh_name,
        "token": f"SharedAccessSignature sr={newuri}&sig={signature}&se={expiry}&skn={sas_name}",
    }
    return ret


def retrieve_queue_az(
    sas_policy_name, service_bus_name, queue_name, amount, verbose, key
):
    """Get the published messages back from a service bus queue
    Probably soon obsolete.
    """

    try:
        sas_policy_primary_key = os.environ["SASKEYQUEUE"]
    except KeyError:
        print("Error environment variable SASKEYQUEUE not set")
        sys.exit(1)

    tokendict = get_auth_token(
        service_bus_name, queue_name, sas_policy_name, sas_policy_primary_key
    )

    token = tokendict["token"]

    if verbose:
        print("Token", token)

    # See also:
    # https://docs.microsoft.com/en-us/rest/api/servicebus/receive-and-delete-message-destructive-read

    url = (
        f"https://{service_bus_name}.servicebus.windows.net/{queue_name}/messages/head"
    )

    print(f"Downloading messages from {url}")
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json;charset=utf-8",
        "Authorization": token,
    }
    messages = []

    while True:

        try:
            req = requests.delete(url, headers=headers)
        except requests.exceptions.ConnectionError as e:
            print("Exception: ", e)
            print("Connection error: We wait for some seconds and then continue ...")
            time.sleep(10)
            continue

        if req.status_code == 200:
            text = req.text
            props = json.loads(req.headers["BrokerProperties"])
            number = props["SequenceNumber"]
            queuetime = props["EnqueuedTimeUtc"]

            try:
                data = json.loads(text)
                value = data[key]
            except json.decoder.JSONDecodeError:
                print("Json Parsing Error: ", text)
                value = None
            except KeyError:
                print("Parsing Error: ", text)
                value = None

            print(
                f'Got message {number} from {queuetime} message is "{text}" value: "{value}"'
            )
            messages.append(value)

        elif req.status_code == 204:
            print("Queue Empty:  HTTP status: ", req.status_code)
            break
        elif req.status_code == 401:
            print("Token Expired:  HTTP status: ", req.status_code)
            raise SystemError("Token Expired")
        else:
            print(req)
            print("Error HTTP status: ", req.status_code)
            raise SystemError("HTTP Error")

    if messages == list(range(amount)):
        print("Validation PASSED")
        return True
    else:
        print("Validation FAILED")
        return False


class EventHub:
    """Class to host all properties and access functions for an IoT Hub/ Eventhub
    Needs https://pypi.org/project/azure-eventhub

    Docs:
        https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin
        https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html
        https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html#azure.eventhub.EventData
    """

    def __init__(self, message_key, amount):

        try:
            connection_str = os.environ["AZUREENDPOINT"]
        except KeyError:
            logger.error("Error environment variable AZUREENDPOINT not set")
            sys.exit(1)

        try:
            eventhub_name = os.environ["AZUREEVENTHUB"]
        except KeyError:
            logger.error("Error environment variable AZUREEVENTHUB not set")
            sys.exit(1)

        self.message_key = message_key
        self.amount = amount
        consumer_group = "$Default"
        timeout = 10  # 10s : minimum timeout

        self.client = EventHubConsumerClient.from_connection_string(
            connection_str,
            consumer_group,
            eventhub_name=eventhub_name,
            idle_timeout=timeout,
        )

        self.received_messages = []

    def on_error(self, partition_context, event):
        logger.error(
            "Received Error from partition {}".format(partition_context.partition_id)
        )
        logger.error(f"Event: {event}")

    def on_event(self, partition_context, event):
        logger.debug(
            "Received event from partition {}".format(partition_context.partition_id)
        )
        logger.debug(f"Event: {event}")

        if event == None:
            logger.debug("Timeout: Exiting event loop ... ")
            self.client.close()
            return

        partition_context.update_checkpoint(event)

        jevent = event.body_as_json()

        message = jevent.get(self.message_key)
        if message != None:
            logger.info("Matched key: %s" % message)
            self.received_messages.append(message)
        else:
            logger.info("Not matched key: %s" % jevent)

    def read_from_hub(self, start):
        """Read data from the event hub

        Possible values for start:
        start = "-1" : Read all messages
        start = "@latest" : Read only the latest messages
        start = datetime.datetime.now(tz=datetime.timezone.utc) : use current sdate

        When no messages are received the client.receive will return.
        """

        with self.client:
            self.client.receive(
                on_event=self.on_event,
                on_error=self.on_error,
                starting_position=start,
                max_wait_time=10,
            )
            logger.info("Exiting event loop")

    def validate(self):
        """Validate the messages that we have received against"""

        if self.received_messages == list(range(self.amount)):
            print("Validation PASSED")
            return True
        else:
            print("Validation FAILED")
            return False


def main():
    """Main entry point"""
    parser = argparse.ArgumentParser()
    parser.add_argument("method", choices=["eventhub", "servicebus"])
    parser.add_argument("-b", "--bus", help="Service Bus Name")
    parser.add_argument("-p", "--policy", help="SAS Policy Name")
    parser.add_argument("-q", "--queue", help="Queue Name")
    parser.add_argument(
        "-a", "--amount", help="Amount of messages to send", type=int, default=20
    )
    parser.add_argument("-v", "--verbose", help="Verbosity", action="count", default=0)
    args = parser.parse_args()

    amount = args.amount
    sas_policy_name = args.policy
    service_bus_name = args.bus
    queue_name = args.queue
    verbose = args.verbose
    method = args.method

    if method == "servicebus":
        try:
            os.environ["SASKEYQUEUE"]
        except KeyError:
            print("Error environment variable SASKEYQUEUE not set")
            sys.exit(1)

    try:
        device = os.environ["C8YDEVICE"]
    except KeyError:
        print("Error environment variable C8YDEVICE not set")
        sys.exit(1)

    # Send roundtrip via the tedge mapper
    mqtt_topic = "tedge/measurements"
    # In case that we want to avoid the azure mapper
    # mqtt_topic = "az/messages/events/"

    message_key = "thin-edge-azure-roundtrip-" + device

    if method == "eventhub":

        eh = EventHub(message_key=message_key, amount=amount)

        start = datetime.datetime.now(tz=datetime.timezone.utc)

        publish_az(amount, mqtt_topic, message_key)

        eh.read_from_hub(start)
        if not eh