summaryrefslogtreecommitdiffstats
path: root/glances/plugins/wifi/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'glances/plugins/wifi/__init__.py')
-rw-r--r--glances/plugins/wifi/__init__.py268
1 files changed, 268 insertions, 0 deletions
diff --git a/glances/plugins/wifi/__init__.py b/glances/plugins/wifi/__init__.py
index e69de29b..c221d688 100644
--- a/glances/plugins/wifi/__init__.py
+++ b/glances/plugins/wifi/__init__.py
@@ -0,0 +1,268 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Glances.
+#
+# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com>
+#
+# SPDX-License-Identifier: LGPL-3.0-only
+#
+
+"""Wifi plugin.
+
+Stats are retreived from the nmcli command line (Linux only):
+
+# nmcli -t -f active,ssid,signal,security,chan dev wifi
+
+# nmcli -t -f active,ssid,signal dev wifi
+no:Livebox-C820:77
+yes:Livebox-C820:72
+
+or the /proc/net/wireless file (Linux only):
+
+# cat /proc/net/wireless
+Inter-| sta-| Quality | Discarded packets | Missed | WE
+ face | tus | link level noise | nwid crypt frag retry misc | beacon | 22
+wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0
+"""
+
+import operator
+from shutil import which
+import threading
+import time
+
+from glances.globals import nativestr, file_exists
+from glances.plugins.plugin.model import GlancesPluginModel
+from glances.secure import secure_popen
+from glances.logger import logger
+
+# Test if the nmcli command exists and is executable
+# it allows to get the list of the available hotspots
+NMCLI_COMMAND = which('nmcli')
+NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi'
+nmcli_command_exists = NMCLI_COMMAND is not None
+
+# Backup solution is to use the /proc/net/wireless file
+# but it only give signal information about the current hotspot
+WIRELESS_FILE = '/proc/net/wireless'
+wireless_file_exists = file_exists(WIRELESS_FILE)
+
+if not nmcli_command_exists and not wireless_file_exists:
+ logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE))
+
+
+class PluginModel(GlancesPluginModel):
+ """Glances Wifi plugin.
+
+ Get stats of the current Wifi hotspots.
+ """
+
+ def __init__(self, args=None, config=None):
+ """Init the plugin."""
+ super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
+
+ # We want to display the stat in the curse interface
+ self.display_curse = True
+
+ # Global Thread running all the scans
+ self._thread = None
+
+ def exit(self):
+ """Overwrite the exit method to close threads."""
+ if self._thread is not None:
+ self._thread.stop()
+ # Call the father class
+ super(PluginModel, self).exit()
+
+ def get_key(self):
+ """Return the key of the list.
+
+ :returns: string -- SSID is the dict key
+ """
+ return 'ssid'
+
+ @GlancesPluginModel._check_decorator
+ @GlancesPluginModel._log_result_decorator
+ def update(self):
+ """Update Wifi stats using the input method.
+
+ Stats is a list of dict (one dict per hotspot)
+
+ :returns: list -- Stats is a list of dict (hotspot)
+ """
+ # Init new stats
+ stats = self.get_init_value()
+
+ # Exist if we can not grab the stats
+ if not nmcli_command_exists and not wireless_file_exists:
+ return stats
+
+ if self.input_method == 'local' and nmcli_command_exists:
+ # Only refresh if there is not other scanning thread
+ if self._thread is None:
+ thread_is_running = False
+ else:
+ thread_is_running = self._thread.is_alive()
+ if not thread_is_running:
+ # Run hotspot scanner thanks to the nmcli command
+ self._thread = ThreadHotspot(self.get_refresh_time())
+ self._thread.start()
+ # Get the result (or [] if the scan is ongoing)
+ stats = self._thread.stats
+ elif self.input_method == 'local' and wireless_file_exists:
+ # As a backup solution, use the /proc/net/wireless file
+ with open(WIRELESS_FILE, 'r') as f:
+ # The first two lines are header
+ f.readline()
+ f.readline()
+ # Others lines are Wifi stats
+ wifi_stats = f.readline()
+ while wifi_stats != '':
+ # Extract the stats
+ wifi_stats = wifi_stats.split()
+ # Add the Wifi link to the list
+ stats.append(
+ {
+ 'key': self.get_key(),
+ 'ssid': wifi_stats[0][:-1],
+ 'signal': float(wifi_stats[3]),
+ 'security': '',
+ }
+ )
+ # Next line
+ wifi_stats = f.readline()
+
+ elif self.input_method == 'snmp':
+ # Update stats using SNMP
+
+ # Not implemented yet
+ pass
+
+ # Update the stats
+ self.stats = stats
+
+ return self.stats
+
+ def get_alert(self, value):
+ """Overwrite the default get_alert method.
+
+ Alert is on signal quality where lower is better...
+
+ :returns: string -- Signal alert
+ """
+ ret = 'OK'
+ try:
+ if value <= self.get_limit('critical', stat_name=self.plugin_name):
+ ret = 'CRITICAL'
+ elif value <= self.get_limit('warning', stat_name=self.plugin_name):
+ ret = 'WARNING'
+ elif value <= self.get_limit('careful', stat_name=self.plugin_name):
+ ret = 'CAREFUL'
+ except (TypeError, KeyError):
+ # Catch TypeError for issue1373
+ ret = 'DEFAULT'
+
+ return ret
+
+ def update_views(self):
+ """Update stats views."""
+ # Call the father's method
+ super(PluginModel, self).update_views()
+
+ # Add specifics information
+ # Alert on signal thresholds
+ for i in self.stats:
+ self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal'])
+
+ def msg_curse(self, args=None, max_width=None):
+ """Return the dict to display in the curse interface."""
+ # Init the return message
+ ret = []
+
+ # Only process if stats exist and display plugin enable...
+ if not self.stats or not wireless_file_exists or self.is_disabled():
+ return ret
+
+ # Max size for the interface name
+ if_name_max_width = max_width - 5
+
+ # Build the string message
+ # Header
+ msg = '{:{width}}'.format('WIFI', width=if_name_max_width)
+ ret.append(self.curse_add_line(msg, "TITLE"))
+ msg = '{:>7}'.format('dBm')
+ ret.append(self.curse_add_line(msg))
+
+ # Hotspot list (sorted by name)
+ for i in sorted(self.stats, key=operator.itemgetter(self.get_key())):
+ # Do not display hotspot with no name (/ssid)...
+ # of ssid/signal None... See issue #1151 and #issue1973
+ if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None:
+ continue
+ ret.append(self.curse_new_line())
+ # New hotspot
+ hotspot_name = i['ssid']
+ # Cut hotspot_name if it is too long
+ if len(hotspot_name) > if_name_max_width:
+ hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :]
+ # Add the new hotspot to the message
+ msg = '{:{width}} {security}'.format(
+ nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security']
+ )
+ ret.append(self.curse_add_line(msg))
+ msg = '{:>7}'.format(
+ i['signal'],
+ )
+ ret.append(
+ self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration'))
+ )
+
+ return ret
+
+
+class ThreadHotspot(threading.Thread):
+ """
+ Specific thread for the Wifi hotspot scanner.
+ """
+
+ def __init__(self, refresh_time=2):
+ """Init the class."""
+ super(ThreadHotspot, self).__init__()
+ # Refresh time
+ self.refresh_time = refresh_time
+ # Event needed to stop properly the thread
+ self._stopper = threading.Event()
+ # Is part of Ports plugin
+ self.plugin_name = "wifi"
+
+ def run(self):
+ """Get hotspots stats using the nmcli command line"""
+ while not self.stopped():
+ # Run the nmcli command
+ nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n')
+ nmcli_result = []
+ for h in nmcli_raw:
+ h = h.split(':')
+ if len(h) != 4 or h[0] != 'yes':
+ # Do not process the line if it is not the active hotspot
+ continue
+ nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]})
+ self.thread_stats = nmcli_result
+ # Wait refresh time until next scan
+ # Note: nmcli cache the result for x seconds
+ time.sleep(self.refresh_time)
+
+ @property
+ def stats(self):
+ """Stats getter."""
+ if hasattr(self, 'thread_stats'):
+ return self.thread_stats
+ else:
+ return []
+
+ def stop(self, timeout=None):
+ """Stop the thread."""
+ self._stopper.set()
+
+ def stopped(self):
+ """Return True is the thread is stopped."""
+ return self._stopper.is_set()