summaryrefslogtreecommitdiffstats
path: root/glances/plugins/ip/__init__.py
blob: da644f2c6acbd3274d5a97606d7af502cf061218 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#

"""IP plugin."""

import threading
from ujson import loads

from glances.globals import urlopen, queue, urlopen_auth
from glances.logger import logger
from glances.timer import Timer
from glances.timer import getTimeSinceLastUpdate
from glances.plugins.plugin.model import GlancesPluginModel

# Import plugin specific dependency
try:
    import netifaces
except ImportError as e:
    import_error_tag = True
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
else:
    import_error_tag = False

# List of online services to retrieve public IP address
# List of tuple (url, json, key)
# - url: URL of the Web site
# - json: service return a JSON (True) or string (False)
# - key: key of the IP address in the JSON structure
urls = [
    ('https://httpbin.org/ip', True, 'origin'),
    ('https://api.ipify.org/?format=json', True, 'ip'),
    ('https://ipv4.jsonip.com', True, 'ip'),
]

# Fields description
# description: human readable description
# short_name: shortname to use un UI
# unit: unit type
# rate: is it a rate ? If yes, // by time_since_update when displayed,
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
fields_description = {
    'address': {
        'description': 'Private IP address',
    },
    'mask': {
        'description': 'Private IP mask',
    },
    'mask_cidr': {
        'description': 'Private IP mask in CIDR format',
        'unit': 'number',
    },
    'gateway': {
        'description': 'Private IP gateway',
    },
    'public_address': {
        'description': 'Public IP address',
    },
    'public_info_human': {
        'description': 'Public IP information',
    },
}


class PluginModel(GlancesPluginModel):
    """Glances IP Plugin.

    stats is a dict
    """

    _default_public_refresh_interval = 300
    _default_public_ip_disabled = ["False"]

    def __init__(self, args=None, config=None):
        """Init the plugin."""
        super(PluginModel, self).__init__(
            args=args, config=config,
            fields_description=fields_description
        )

        # We want to display the stat in the curse interface
        self.display_curse = True

        # For public IP address
        self.public_address = ""
        self.public_address_refresh_interval = self.get_conf_value(
            "public_refresh_interval", default=self._default_public_refresh_interval
        )

        public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
        self.public_ip_disabled = True if public_ip_disabled == ["True"] else False

        # For the Censys options (see issue #2105)
        self.public_info = ""
        self.censys_url = self.get_conf_value("censys_url", default=[None])[0]
        self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
        self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
        self.censys_fields = self.get_conf_value("censys_fields", default=[None])
        self.public_info_disabled = (
            self.censys_url is None
            or self.censys_username is None
            or self.censys_password is None
            or self.censys_fields is None
        )

    @GlancesPluginModel._check_decorator
    @GlancesPluginModel._log_result_decorator
    def update(self):
        """Update IP stats using the input method.

        :return: the stats dict
        """
        # Init new stats
        stats = self.get_init_value()

        if self.input_method == 'local' and not import_error_tag:
            # Update stats using the netifaces lib
            # Start with the default IP gateway
            try:
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
                return {}
            else:
                stats['gateway'] = default_gw[0]

            # Then the private IP address
            # If multiple IP addresses are available, only the one with the default gateway is returned
            try:
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab private IP address ({})".format(e))
                return {}
            else:
                stats['address'] = address
                stats['mask'] = mask
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])

            # Finally with the public IP address
            time_since_update = getTimeSinceLastUpdate('public-ip')
            try:
                if not self.public_ip_disabled and (
                    self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
                ):
                    self.public_address = PublicIpAddress().get()
                    if not self.public_info_disabled:
                        self.public_info = PublicIpInfo(
                            self.public_address, self.censys_url, self.censys_username, self.censys_password
                        ).get()
            except (KeyError, AttributeError) as e:
                logger.debug("Cannot grab public IP information ({})".format(e))
            else:
                stats['public_address'] = self.public_address
                # Too much information provided in the public_info
                # Limit it to public_info_for_human
                # stats['public_info'] = self.public_info
                stats['public_info_human'] = self.public_info_for_human(self.public_info)

        elif self.input_method == 'snmp':
            # Not implemented yet
            pass

        # Update the stats
        self.stats = stats

        return self.stats

    def msg_curse(self, args=None, max_width=None):
        """Return the dict to display in the curse interface."""
        # Init the return message
        ret = []

        # Only process if stats exist and display plugin enable...
        if not self.stats or self.is_disabled() or import_error_tag:
            return ret

        # Build the string message
        msg = ' - '
        ret.append(self.curse_add_line(msg, optional=True))

        # Start with the private IP information
        msg = 'IP '
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
        if 'address' in self.stats:
            msg = '{}'.format(self.stats['address'])
            ret.append(self.curse_add_line(msg, optional=True))
        if 'mask_cidr' in self.stats:
            # VPN with no internet access (issue #842)
            msg = '/{}'.format(self.stats['mask_cidr'])
            ret.append(self.curse_add_line(msg, optional=True))

        # Then with the public IP information
        try:
            msg_pub = '{}'.format(self.stats['public_address'])
        except (UnicodeEncodeError, KeyError):
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
            pass
        else:
            if self.stats['public_address']:
                msg = ' Pub '
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
                ret.append(self.curse_add_line(msg_pub, optional=True))

            if self.stats['public_info_human']:
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))

        return ret

    def public_info_for_human(self, public_info):
        """Return the data to pack to the client."""
        if not public_info:
            return ''

        field_result = []
        for f in self.censys_fields:
            field = f.split(':')
            if len(field) == 1 and field[0] in public_info:
                field_result.append('{}'.format(public_info[field[0]]))
            elif len(field) == 2 and field[0] in public_info and field[1] in public_info[field[0]]:
                field_result.append('{}'.format(public_info[field[0]][field[1]]))
        return '/'.join(field_result)

    @staticmethod
    def ip_to_cidr(ip):
        """Convert IP address to CIDR.

        Example: '255.255.255.0' will return 24
        """
        # Thanks to @Atticfire
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
        if ip is None:
            # Correct issue #1528
            return 0
        return sum(bin(int(x)).count('1') for x in ip.split('.'))


class PublicIpAddress(object):
    """Get public IP address from online services."""

    def __init__(self, timeout=2):
        """Init the class."""
        self.timeout = timeout

    def get(self):
        """Get the first public IP address returned by one of the online services."""
        q = queue.Queue()

        for u, j, k in urls:
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
            t.daemon = True
            t.start()

        timer = Timer(self.timeout)
        ip = None
        while not timer.finished() and ip is None:
            if q.qsize() > 0:
                ip = q.get()

        if ip is None:
            return None

        return ', '.join(set([x.strip() for x in ip.split(',')]))

    def _get_ip_public(self, queue_target, url, json=False, key=None):
        """Request the url service and put the result in the queue_target."""
        try:
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
        except Exception as e:
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
            queue_target.put(None)
        else:
            # Request depend on service
            try:
                if not json:
                    queue_target.put(response)
                else:
                    queue_target.put(loads(response)[key])
            except ValueError:
                queue_target.put(