diff options
Diffstat (limited to 'glances/plugins/ip/model.py')
-rw-r--r-- | glances/plugins/ip/model.py | 300 |
1 files changed, 0 insertions, 300 deletions
diff --git a/glances/plugins/ip/model.py b/glances/plugins/ip/model.py deleted file mode 100644 index 02d884eb..00000000 --- a/glances/plugins/ip/model.py +++ /dev/null @@ -1,300 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of Glances. -# -# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com> -# -# SPDX-License-Identifier: LGPL-3.0-only -# - -"""IP plugin.""" - -import threading -from ujson import loads - -from glances.globals import urlopen, queue, urlopen_auth -from glances.logger import logger -from glances.timer import Timer -from glances.timer import getTimeSinceLastUpdate -from glances.plugins.plugin.model import GlancesPluginModel - -# Import plugin specific dependency -try: - import netifaces -except ImportError as e: - import_error_tag = True - logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e)) -else: - import_error_tag = False - -# List of online services to retrieve public IP address -# List of tuple (url, json, key) -# - url: URL of the Web site -# - json: service return a JSON (True) or string (False) -# - key: key of the IP address in the JSON structure -urls = [ - ('https://httpbin.org/ip', True, 'origin'), - ('https://api.ipify.org/?format=json', True, 'ip'), - ('https://ipv4.jsonip.com', True, 'ip'), -] - - -class PluginModel(GlancesPluginModel): - """Glances IP Plugin. - - stats is a dict - """ - - _default_public_refresh_interval = 300 - _default_public_ip_disabled = ["False"] - - def __init__(self, args=None, config=None): - """Init the plugin.""" - super(PluginModel, self).__init__(args=args, config=config) - - # We want to display the stat in the curse interface - self.display_curse = True - - # For public IP address - self.public_address = "" - self.public_address_refresh_interval = self.get_conf_value( - "public_refresh_interval", default=self._default_public_refresh_interval - ) - - public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled) - self.public_ip_disabled = True if public_ip_disabled == ["True"] else False - - # For the Censys options (see issue #2105) - self.public_info = "" - self.censys_url = self.get_conf_value("censys_url", default=[None])[0] - self.censys_username = self.get_conf_value("censys_username", default=[None])[0] - self.censys_password = self.get_conf_value("censys_password", default=[None])[0] - self.censys_fields = self.get_conf_value("censys_fields", default=[None]) - self.public_info_disabled = ( - self.censys_url is None - or self.censys_username is None - or self.censys_password is None - or self.censys_fields is None - ) - - @GlancesPluginModel._check_decorator - @GlancesPluginModel._log_result_decorator - def update(self): - """Update IP stats using the input method. - - :return: the stats dict - """ - # Init new stats - stats = self.get_init_value() - - if self.input_method == 'local' and not import_error_tag: - # Update stats using the netifaces lib - # Start with the default IP gateway - try: - default_gw = netifaces.gateways()['default'][netifaces.AF_INET] - except (KeyError, AttributeError) as e: - logger.debug("Cannot grab default gateway IP address ({})".format(e)) - return {} - else: - stats['gateway'] = default_gw[0] - - # Then the private IP address - # If multiple IP addresses are available, only the one with the default gateway is returned - try: - address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr'] - mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask'] - except (KeyError, AttributeError) as e: - logger.debug("Cannot grab private IP address ({})".format(e)) - return {} - else: - stats['address'] = address - stats['mask'] = mask - stats['mask_cidr'] = self.ip_to_cidr(stats['mask']) - - # Finally with the public IP address - time_since_update = getTimeSinceLastUpdate('public-ip') - try: - if not self.public_ip_disabled and ( - self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval - ): - self.public_address = PublicIpAddress().get() - if not self.public_info_disabled: - self.public_info = PublicIpInfo( - self.public_address, self.censys_url, self.censys_username, self.censys_password - ).get() - except (KeyError, AttributeError) as e: - logger.debug("Cannot grab public IP information ({})".format(e)) - else: - stats['public_address'] = self.public_address - # Too much information provided in the public_info - # Limit it to public_info_for_human - # stats['public_info'] = self.public_info - stats['public_info_human'] = self.public_info_for_human(self.public_info) - - elif self.input_method == 'snmp': - # Not implemented yet - pass - - # Update the stats - self.stats = stats - - return self.stats - - def msg_curse(self, args=None, max_width=None): - """Return the dict to display in the curse interface.""" - # Init the return message - ret = [] - - # Only process if stats exist and display plugin enable... - if not self.stats or self.is_disabled() or import_error_tag: - return ret - - # Build the string message - msg = ' - ' - ret.append(self.curse_add_line(msg, optional=True)) - - # Start with the private IP information - msg = 'IP ' - ret.append(self.curse_add_line(msg, 'TITLE', optional=True)) - if 'address' in self.stats: - msg = '{}'.format(self.stats['address']) - ret.append(self.curse_add_line(msg, optional=True)) - if 'mask_cidr' in self.stats: - # VPN with no internet access (issue #842) - msg = '/{}'.format(self.stats['mask_cidr']) - ret.append(self.curse_add_line(msg, optional=True)) - - # Then with the public IP information - try: - msg_pub = '{}'.format(self.stats['public_address']) - except (UnicodeEncodeError, KeyError): - # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469) - pass - else: - if self.stats['public_address']: - msg = ' Pub ' - ret.append(self.curse_add_line(msg, 'TITLE', optional=True)) - ret.append(self.curse_add_line(msg_pub, optional=True)) - - if self.stats['public_info_human']: - ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True)) - - return ret - - def public_info_for_human(self, public_info): - """Return the data to pack to the client.""" - if not public_info: - return '' - - field_result = [] - for f in self.censys_fields: - field = f.split(':') - if len(field) == 1 and field[0] in public_info: - field_result.append('{}'.format(public_info[field[0]])) - elif len(field) == 2 and field[0] in public_info and field[1] in public_info[field[0]]: - field_result.append('{}'.format(public_info[field[0]][field[1]])) - return '/'.join(field_result) - - @staticmethod - def ip_to_cidr(ip): - """Convert IP address to CIDR. - - Example: '255.255.255.0' will return 24 - """ - # Thanks to @Atticfire - # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399 - if ip is None: - # Correct issue #1528 - return 0 - return sum(bin(int(x)).count('1') for x in ip.split('.')) - - -class PublicIpAddress(object): - """Get public IP address from online services.""" - - def __init__(self, timeout=2): - """Init the class.""" - self.timeout = timeout - - def get(self): - """Get the first public IP address returned by one of the online services.""" - q = queue.Queue() - - for u, j, k in urls: - t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k)) - t.daemon = True - t.start() - - timer = Timer(self.timeout) - ip = None - while not timer.finished() and ip is None: - if q.qsize() > 0: - ip = q.get() - - if ip is None: - return None - - return ', '.join(set([x.strip() for x in ip.split(',')])) - - def _get_ip_public(self, queue_target, url, json=False, key=None): - """Request the url service and put the result in the queue_target.""" - try: - response = urlopen(url, timeout=self.timeout).read().decode('utf-8') - except Exception as e: - logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e)) - queue_target.put(None) - else: - # Request depend on service - try: - if not json: - queue_target.put(response) - else: - queue_target.put(loads(response)[key]) - except ValueError: - queue_target.put(None) - - -class PublicIpInfo(object): - """Get public IP information from Censys online service.""" - - def __init__(self, ip, url, username, password, timeout=2): - """Init the class.""" - self.ip = ip - self.url = url - self.username = username - self.password = password - self.timeout = timeout - - def get(self): - """Return the public IP information returned by one of the online service.""" - q = queue.Queue() - - t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password)) - t.daemon = True - t.start() - - timer = Timer(self.timeout) - info = None - while not timer.finished() and info is None: - if q.qsize() > 0: - info = q.get() - - if info is None: - return None - - return info - - def _get_ip_public_info(self, queue_target, ip, url, username, password): - """Request the url service and put the result in the queue_target.""" - request_url = "{}/v2/hosts/{}".format(url, ip) - try: - response = urlopen_auth(request_url, username, password).read() - except Exception as e: - logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e)) - queue_target.put(None) - else: - try: - queue_target.put(loads(response)['result']) - except (ValueError, KeyError) as e: - logger.debug("IP plugin - Cannot get result field from {} ({})".format(request_url, e)) - queue_target.put(None) |