summaryrefslogtreecommitdiffstats
path: root/glances/plugins/ip/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'glances/plugins/ip/__init__.py')
-rw-r--r--glances/plugins/ip/__init__.py300
1 files changed, 300 insertions, 0 deletions
diff --git a/glances/plugins/ip/__init__.py b/glances/plugins/ip/__init__.py
index e69de29b..02d884eb 100644
--- a/glances/plugins/ip/__init__.py
+++ b/glances/plugins/ip/__init__.py
@@ -0,0 +1,300 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Glances.
+#
+# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
+#
+# SPDX-License-Identifier: LGPL-3.0-only
+#
+
+"""IP plugin."""
+
+import threading
+from ujson import loads
+
+from glances.globals import urlopen, queue, urlopen_auth
+from glances.logger import logger
+from glances.timer import Timer
+from glances.timer import getTimeSinceLastUpdate
+from glances.plugins.plugin.model import GlancesPluginModel
+
+# Import plugin specific dependency
+try:
+ import netifaces
+except ImportError as e:
+ import_error_tag = True
+ logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
+else:
+ import_error_tag = False
+
+# List of online services to retrieve public IP address
+# List of tuple (url, json, key)
+# - url: URL of the Web site
+# - json: service return a JSON (True) or string (False)
+# - key: key of the IP address in the JSON structure
+urls = [
+ ('https://httpbin.org/ip', True, 'origin'),
+ ('https://api.ipify.org/?format=json', True, 'ip'),
+ ('https://ipv4.jsonip.com', True, 'ip'),
+]
+
+
+class PluginModel(GlancesPluginModel):
+ """Glances IP Plugin.
+
+ stats is a dict
+ """
+
+ _default_public_refresh_interval = 300
+ _default_public_ip_disabled = ["False"]
+
+ def __init__(self, args=None, config=None):
+ """Init the plugin."""
+ super(PluginModel, self).__init__(args=args, config=config)
+
+ # We want to display the stat in the curse interface
+ self.display_curse = True
+
+ # For public IP address
+ self.public_address = ""
+ self.public_address_refresh_interval = self.get_conf_value(
+ "public_refresh_interval", default=self._default_public_refresh_interval
+ )
+
+ public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
+ self.public_ip_disabled = True if public_ip_disabled == ["True"] else False
+
+ # For the Censys options (see issue #2105)
+ self.public_info = ""
+ self.censys_url = self.get_conf_value("censys_url", default=[None])[0]
+ self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
+ self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
+ self.censys_fields = self.get_conf_value("censys_fields", default=[None])
+ self.public_info_disabled = (
+ self.censys_url is None
+ or self.censys_username is None
+ or self.censys_password is None
+ or self.censys_fields is None
+ )
+
+ @GlancesPluginModel._check_decorator
+ @GlancesPluginModel._log_result_decorator
+ def update(self):
+ """Update IP stats using the input method.
+
+ :return: the stats dict
+ """
+ # Init new stats
+ stats = self.get_init_value()
+
+ if self.input_method == 'local' and not import_error_tag:
+ # Update stats using the netifaces lib
+ # Start with the default IP gateway
+ try:
+ default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
+ except (KeyError, AttributeError) as e:
+ logger.debug("Cannot grab default gateway IP address ({})".format(e))
+ return {}
+ else:
+ stats['gateway'] = default_gw[0]
+
+ # Then the private IP address
+ # If multiple IP addresses are available, only the one with the default gateway is returned
+ try:
+ address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
+ mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
+ except (KeyError, AttributeError) as e:
+ logger.debug("Cannot grab private IP address ({})".format(e))
+ return {}
+ else:
+ stats['address'] = address
+ stats['mask'] = mask
+ stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
+
+ # Finally with the public IP address
+ time_since_update = getTimeSinceLastUpdate('public-ip')
+ try:
+ if not self.public_ip_disabled and (
+ self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
+ ):
+ self.public_address = PublicIpAddress().get()
+ if not self.public_info_disabled:
+ self.public_info = PublicIpInfo(
+ self.public_address, self.censys_url, self.censys_username, self.censys_password
+ ).get()
+ except (KeyError, AttributeError) as e:
+ logger.debug("Cannot grab public IP information ({})".format(e))
+ else:
+ stats['public_address'] = self.public_address
+ # Too much information provided in the public_info
+ # Limit it to public_info_for_human
+ # stats['public_info'] = self.public_info
+ stats['public_info_human'] = self.public_info_for_human(self.public_info)
+
+ elif self.input_method == 'snmp':
+ # Not implemented yet
+ pass
+
+ # Update the stats
+ self.stats = stats
+
+ return self.stats
+
+ def msg_curse(self, args=None, max_width=None):
+ """Return the dict to display in the curse interface."""
+ # Init the return message
+ ret = []
+
+ # Only process if stats exist and display plugin enable...
+ if not self.stats or self.is_disabled() or import_error_tag:
+ return ret
+
+ # Build the string message
+ msg = ' - '
+ ret.append(self.curse_add_line(msg, optional=True))
+
+ # Start with the private IP information
+ msg = 'IP '
+ ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
+ if 'address' in self.stats:
+ msg = '{}'.format(self.stats['address'])
+ ret.append(self.curse_add_line(msg, optional=True))
+ if 'mask_cidr' in self.stats:
+ # VPN with no internet access (issue #842)
+ msg = '/{}'.format(self.stats['mask_cidr'])
+ ret.append(self.curse_add_line(msg, optional=True))
+
+ # Then with the public IP information
+ try:
+ msg_pub = '{}'.format(self.stats['public_address'])
+ except (UnicodeEncodeError, KeyError):
+ # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
+ pass
+ else:
+ if self.stats['public_address']:
+ msg = ' Pub '
+ ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
+ ret.append(self.curse_add_line(msg_pub, optional=True))
+
+ if self.stats['public_info_human']:
+ ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
+
+ return ret
+
+ def public_info_for_human(self, public_info):
+ """Return the data to pack to the client."""
+ if not public_info:
+ return ''
+
+ field_result = []
+ for f in self.censys_fields:
+ field = f.split(':')
+ if len(field) == 1 and field[0] in public_info:
+ field_result.append('{}'.format(public_info[field[0]]))
+ elif len(field) == 2 and field[0] in public_info and field[1] in public_info[field[0]]:
+ field_result.append('{}'.format(public_info[field[0]][field[1]]))
+ return '/'.join(field_result)
+
+ @staticmethod
+ def ip_to_cidr(ip):
+ """Convert IP address to CIDR.
+
+ Example: '255.255.255.0' will return 24
+ """
+ # Thanks to @Atticfire
+ # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
+ if ip is None:
+ # Correct issue #1528
+ return 0
+ return sum(bin(int(x)).count('1') for x in ip.split('.'))
+
+
+class PublicIpAddress(object):
+ """Get public IP address from online services."""
+
+ def __init__(self, timeout=2):
+ """Init the class."""
+ self.timeout = timeout
+
+ def get(self):
+ """Get the first public IP address returned by one of the online services."""
+ q = queue.Queue()
+
+ for u, j, k in urls:
+ t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
+ t.daemon = True
+ t.start()
+
+ timer = Timer(self.timeout)
+ ip = None
+ while not timer.finished() and ip is None:
+ if q.qsize() > 0:
+ ip = q.get()
+
+ if ip is None:
+ return None
+
+ return ', '.join(set([x.strip() for x in ip.split(',')]))
+
+ def _get_ip_public(self, queue_target, url, json=False, key=None):
+ """Request the url service and put the result in the queue_target."""
+ try:
+ response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
+ except Exception as e:
+ logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
+ queue_target.put(None)
+ else:
+ # Request depend on service
+ try:
+ if not json:
+ queue_target.put(response)
+ else:
+ queue_target.put(loads(response)[key])
+ except ValueError:
+ queue_target.put(None)
+
+
+class PublicIpInfo(object):
+ """Get public IP information from Censys online service."""
+
+ def __init__(self, ip, url, username, password, timeout=2):
+ """Init the class."""
+ self.ip = ip
+ self.url = url
+ self.username = username
+ self.password = password
+ self.timeout = timeout
+
+ def get(self):
+ """Return the public IP information returned by one of the online service."""
+ q = queue.Queue()
+
+ t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password))
+ t.daemon = True
+ t.start()
+
+ timer = Timer(self.timeout)
+ info = None
+ while not timer.finished() and info is None:
+ if q.qsize() > 0:
+ info = q.get()
+
+ if info is None:
+ return None
+
+ return info
+
+ def _get_ip_public_info(self, queue_target, ip, url, username, password):
+ """Request the url service and put the result in the queue_target."""
+ request_url = "{}/v2/hosts/{}".format(url, ip)
+ try:
+ response = urlopen_auth(request_url, username, password).read()
+ except Exception as e:
+ logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e))
+ queue_target.put(None)
+ else:
+ try:
+ queue_target.put(loads(response)['result'])
+ except (ValueError, KeyError) as e:
+ logger.debug("IP plugin - Cannot get result field from {} ({})".format(request_url, e))
+ queue_target.put(None)