summaryrefslogtreecommitdiffstats
path: root/glances/plugins/containers/stats_streamer.py
blob: 9218c4836579e70f64e4fcff73fcb77781048eaf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only

import threading
import time

from glances.logger import logger


class StatsStreamer:
    """
    Utility class to stream an iterable using a background / daemon Thread

    Use `StatsStreamer.stats` to access the latest streamed results
    """

    def __init__(self, iterable, initial_stream_value=None):
        """
        iterable: an Iterable instance that needs to be streamed
        """
        self._iterable = iterable
        # Iterable results are stored here
        self._raw_result = initial_stream_value
        # Use a Thread to stream iterable (daemon=True to automatically kill thread when main process dies)
        self._thread = threading.Thread(target=self._stream_results, daemon=True)
        # Event needed to stop the thread manually
        self._stopper = threading.Event()
        # Lock to avoid the daemon thread updating stats when main thread reads the stats
        self.result_lock = threading.Lock()
        # Last result streamed time (initial val 0)
        self._last_update_time = 0

        self._thread.start()

    def stop(self):
        """Stop the thread."""
        self._stopper.set()

    def stopped(self):
        """Return True is the thread is stopped."""
        return self._stopper.is_set()

    def _stream_results(self):
        """Grab the stats.

        Infinite loop, should be stopped by calling the stop() method
        """
        try:
            for res in self._iterable:
                self._pre_update_hook()
                self._raw_result = res
                self._post_update_hook()

                time.sleep(0.1)
                if self.stopped():
                    break

        except Exception as e:
            logger.debug("docker plugin - Exception thrown during run ({})".format(e))
            self.stop()

    def _pre_update_hook(self):
        """Hook that runs before worker thread updates the raw_stats"""
        self.result_lock.acquire()

    def _post_update_hook(self):
        """Hook that runs after worker thread updates the raw_stats"""
        self._last_update_time = time.time()
        self.result_lock.release()

    @property
    def stats(self):
        """Raw Stats getter."""
        return self._raw_result

    @property
    def last_update_time(self):
        """Raw Stats getter."""
        return self._last_update_time