summaryrefslogtreecommitdiffstats
path: root/glances/plugins/ip/__init__.py
blob: 9c079ef9d1dcc4a3080fefcbc1a3c40e1af8a188 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#

"""IP plugin."""

import threading
from ujson import loads

from glances.globals import queue, urlopen_auth
from glances.logger import logger
from glances.timer import Timer
from glances.timer import getTimeSinceLastUpdate
from glances.plugins.plugin.model import GlancesPluginModel

# Import plugin specific dependency
try:
    import netifaces
except ImportError as e:
    import_error_tag = True
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
else:
    import_error_tag = False

# Fields description
# description: human readable description
# short_name: shortname to use un UI
# unit: unit type
# rate: is it a rate ? If yes, // by time_since_update when displayed,
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
fields_description = {
    'address': {
        'description': 'Private IP address',
    },
    'mask': {
        'description': 'Private IP mask',
    },
    'mask_cidr': {
        'description': 'Private IP mask in CIDR format',
        'unit': 'number',
    },
    'gateway': {
        'description': 'Private IP gateway',
    },
    'public_address': {
        'description': 'Public IP address',
    },
    'public_info_human': {
        'description': 'Public IP information',
    },
}


class PluginModel(GlancesPluginModel):
    """Glances IP Plugin.

    stats is a dict
    """

    _default_public_refresh_interval = 300

    def __init__(self, args=None, config=None):
        """Init the plugin."""
        super(PluginModel, self).__init__(
            args=args, config=config,
            fields_description=fields_description
        )

        # We want to display the stat in the curse interface
        self.display_curse = True

        # Public information (see issue #2732)
        self.public_address = ""
        self.public_info = ""
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
        self.public_field = self.get_conf_value("public_field", default=[None])
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
        self.public_disabled = (
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false' or
            self.public_api is None or self.public_field is None
        )
        self.public_address_refresh_interval = self.get_conf_value(
            "public_refresh_interval", default=self._default_public_refresh_interval
        )

    @GlancesPluginModel._check_decorator
    @GlancesPluginModel._log_result_decorator
    def update(self):
        """Update IP stats using the input method.

        :return: the stats dict
        """
        # Init new stats
        stats = self.get_init_value()

        if self.input_method == 'local' and not import_error_tag:
            # Private IP address
            # Get the default gateway thanks to the netifaces lib
            try:
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
                return self.get_init_value()
            else:
                stats['gateway'] = default_gw[0]
            # If multiple IP addresses are available, only the one with the default gateway is returned
            try:
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab private IP address ({})".format(e))
                return self.get_init_value()
            else:
                stats['address'] = address
                stats['mask'] = mask
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])

            # Public IP address
            time_since_update = getTimeSinceLastUpdate('public-ip')
            try:
                if not self.public_disabled and (
                    self.public_address == "" or time_since_update > self.public_address_refresh_interval
                ):
                    self.public_info = PublicIpInfo(
                        self.public_api, self.public_username, self.public_password
                    ).get()
                    self.public_address = self.public_info['ip']
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab public IP information ({})".format(e))
            else:
                stats['public_address'] = (
                    self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
                )
                stats['public_info_human'] = (
                    self.public_info_for_human(self.public_info)
                )

        elif self.input_method == 'snmp':
            # Not implemented yet
            pass

        # Update the stats
        self.stats = stats

        return self.stats

    def __hide_ip(self, ip):
        """Hide last to digit of the given IP address"""
        return '.'.join(ip.split('.')[0:2]) + '.*.*'

    def msg_curse(self, args=None, max_width=None):
        """Return the dict to display in the curse interface."""
        # Init the return message
        ret = []

        # Only process if stats exist and display plugin enable...
        if not self.stats or self.is_disabled() or import_error_tag:
            return ret

        # Build the string message
        msg = ' - '
        ret.append(self.curse_add_line(msg, optional=True))

        # Start with the private IP information
        msg = 'IP '
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
        if 'address' in self.stats:
            msg = '{}'.format(self.stats['address'])
            ret.append(self.curse_add_line(msg, optional=True))
        if 'mask_cidr' in self.stats:
            # VPN with no internet access (issue #842)
            msg = '/{}'.format(self.stats['mask_cidr'])
            ret.append(self.curse_add_line(msg, optional=True))

        # Then with the public IP information
        try:
            msg_pub = '{}'.format(self.stats['public_address'])
        except (UnicodeEncodeError, KeyError):
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
            pass
        else:
            if self.stats['public_address']:
                msg = ' Pub '
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
                ret.append(self.curse_add_line(msg_pub, optional=True))

            if self.stats['public_info_human']:
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))

        return ret

    def public_info_for_human(self, public_info):
        """Return the data to pack to the client."""
        if not public_info:
            return ''

        return self.public_template.format(**public_info)

    @staticmethod
    def ip_to_cidr(ip):
        """Convert IP address to CIDR.

        Example: '255.255.255.0' will return 24
        """
        # Thanks to @Atticfire
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
        if ip is None:
            # Correct issue #1528
            return 0
        return sum(bin(int(x)).count('1') for x in ip.split('.'))


class PublicIpInfo(object):
    """Get public IP information from online service."""

    def __init__(self, url, username, password, timeout=2):
        """Init the class."""
        self.url = url
        self.username = username
        self.password = password
        self.timeout = timeout

    def get(self):
        """Return the public IP information returned by one of the online service."""
        q = queue.Queue()

        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
        t.daemon = True
        t.start()

        timer = Timer(self.timeout)
        info = None
        while not timer.finished() and info is None:
            if q.qsize() > 0:
                info = q.get()

        return info

    def _get_ip_public_info(self, queue_target, url, username, password):
        """Request the url service and put the result in the queue_target."""
        try:
            response = urlopen_auth(url, username, password).read()
        except Exception as e:
            logger.debug("IP plugin - Cannot get public IP information from {} ({})".format(url, e))
            queue_target.put(None)
        else:
            try:
                queue_target.put(loads(response))
            except (ValueError, KeyError) as e:
                logger.debug("IP plugin - Cannot load public IP information from {} ({})".format(url, e))
                queue_target.put(None)