diff options
Diffstat (limited to 'glances/plugins/wifi')
-rw-r--r-- | glances/plugins/wifi/__init__.py | 268 | ||||
-rw-r--r-- | glances/plugins/wifi/model.py | 268 |
2 files changed, 268 insertions, 268 deletions
diff --git a/glances/plugins/wifi/__init__.py b/glances/plugins/wifi/__init__.py index e69de29b..c221d688 100644 --- a/glances/plugins/wifi/__init__.py +++ b/glances/plugins/wifi/__init__.py @@ -0,0 +1,268 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Glances. +# +# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com> +# +# SPDX-License-Identifier: LGPL-3.0-only +# + +"""Wifi plugin. + +Stats are retreived from the nmcli command line (Linux only): + +# nmcli -t -f active,ssid,signal,security,chan dev wifi + +# nmcli -t -f active,ssid,signal dev wifi +no:Livebox-C820:77 +yes:Livebox-C820:72 + +or the /proc/net/wireless file (Linux only): + +# cat /proc/net/wireless +Inter-| sta-| Quality | Discarded packets | Missed | WE + face | tus | link level noise | nwid crypt frag retry misc | beacon | 22 +wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0 +""" + +import operator +from shutil import which +import threading +import time + +from glances.globals import nativestr, file_exists +from glances.plugins.plugin.model import GlancesPluginModel +from glances.secure import secure_popen +from glances.logger import logger + +# Test if the nmcli command exists and is executable +# it allows to get the list of the available hotspots +NMCLI_COMMAND = which('nmcli') +NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi' +nmcli_command_exists = NMCLI_COMMAND is not None + +# Backup solution is to use the /proc/net/wireless file +# but it only give signal information about the current hotspot +WIRELESS_FILE = '/proc/net/wireless' +wireless_file_exists = file_exists(WIRELESS_FILE) + +if not nmcli_command_exists and not wireless_file_exists: + logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE)) + + +class PluginModel(GlancesPluginModel): + """Glances Wifi plugin. + + Get stats of the current Wifi hotspots. + """ + + def __init__(self, args=None, config=None): + """Init the plugin.""" + super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[]) + + # We want to display the stat in the curse interface + self.display_curse = True + + # Global Thread running all the scans + self._thread = None + + def exit(self): + """Overwrite the exit method to close threads.""" + if self._thread is not None: + self._thread.stop() + # Call the father class + super(PluginModel, self).exit() + + def get_key(self): + """Return the key of the list. + + :returns: string -- SSID is the dict key + """ + return 'ssid' + + @GlancesPluginModel._check_decorator + @GlancesPluginModel._log_result_decorator + def update(self): + """Update Wifi stats using the input method. + + Stats is a list of dict (one dict per hotspot) + + :returns: list -- Stats is a list of dict (hotspot) + """ + # Init new stats + stats = self.get_init_value() + + # Exist if we can not grab the stats + if not nmcli_command_exists and not wireless_file_exists: + return stats + + if self.input_method == 'local' and nmcli_command_exists: + # Only refresh if there is not other scanning thread + if self._thread is None: + thread_is_running = False + else: + thread_is_running = self._thread.is_alive() + if not thread_is_running: + # Run hotspot scanner thanks to the nmcli command + self._thread = ThreadHotspot(self.get_refresh_time()) + self._thread.start() + # Get the result (or [] if the scan is ongoing) + stats = self._thread.stats + elif self.input_method == 'local' and wireless_file_exists: + # As a backup solution, use the /proc/net/wireless file + with open(WIRELESS_FILE, 'r') as f: + # The first two lines are header + f.readline() + f.readline() + # Others lines are Wifi stats + wifi_stats = f.readline() + while wifi_stats != '': + # Extract the stats + wifi_stats = wifi_stats.split() + # Add the Wifi link to the list + stats.append( + { + 'key': self.get_key(), + 'ssid': wifi_stats[0][:-1], + 'signal': float(wifi_stats[3]), + 'security': '', + } + ) + # Next line + wifi_stats = f.readline() + + elif self.input_method == 'snmp': + # Update stats using SNMP + + # Not implemented yet + pass + + # Update the stats + self.stats = stats + + return self.stats + + def get_alert(self, value): + """Overwrite the default get_alert method. + + Alert is on signal quality where lower is better... + + :returns: string -- Signal alert + """ + ret = 'OK' + try: + if value <= self.get_limit('critical', stat_name=self.plugin_name): + ret = 'CRITICAL' + elif value <= self.get_limit('warning', stat_name=self.plugin_name): + ret = 'WARNING' + elif value <= self.get_limit('careful', stat_name=self.plugin_name): + ret = 'CAREFUL' + except (TypeError, KeyError): + # Catch TypeError for issue1373 + ret = 'DEFAULT' + + return ret + + def update_views(self): + """Update stats views.""" + # Call the father's method + super(PluginModel, self).update_views() + + # Add specifics information + # Alert on signal thresholds + for i in self.stats: + self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal']) + + def msg_curse(self, args=None, max_width=None): + """Return the dict to display in the curse interface.""" + # Init the return message + ret = [] + + # Only process if stats exist and display plugin enable... + if not self.stats or not wireless_file_exists or self.is_disabled(): + return ret + + # Max size for the interface name + if_name_max_width = max_width - 5 + + # Build the string message + # Header + msg = '{:{width}}'.format('WIFI', width=if_name_max_width) + ret.append(self.curse_add_line(msg, "TITLE")) + msg = '{:>7}'.format('dBm') + ret.append(self.curse_add_line(msg)) + + # Hotspot list (sorted by name) + for i in sorted(self.stats, key=operator.itemgetter(self.get_key())): + # Do not display hotspot with no name (/ssid)... + # of ssid/signal None... See issue #1151 and #issue1973 + if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None: + continue + ret.append(self.curse_new_line()) + # New hotspot + hotspot_name = i['ssid'] + # Cut hotspot_name if it is too long + if len(hotspot_name) > if_name_max_width: + hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :] + # Add the new hotspot to the message + msg = '{:{width}} {security}'.format( + nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security'] + ) + ret.append(self.curse_add_line(msg)) + msg = '{:>7}'.format( + i['signal'], + ) + ret.append( + self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration')) + ) + + return ret + + +class ThreadHotspot(threading.Thread): + """ + Specific thread for the Wifi hotspot scanner. + """ + + def __init__(self, refresh_time=2): + """Init the class.""" + super(ThreadHotspot, self).__init__() + # Refresh time + self.refresh_time = refresh_time + # Event needed to stop properly the thread + self._stopper = threading.Event() + # Is part of Ports plugin + self.plugin_name = "wifi" + + def run(self): + """Get hotspots stats using the nmcli command line""" + while not self.stopped(): + # Run the nmcli command + nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n') + nmcli_result = [] + for h in nmcli_raw: + h = h.split(':') + if len(h) != 4 or h[0] != 'yes': + # Do not process the line if it is not the active hotspot + continue + nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]}) + self.thread_stats = nmcli_result + # Wait refresh time until next scan + # Note: nmcli cache the result for x seconds + time.sleep(self.refresh_time) + + @property + def stats(self): + """Stats getter.""" + if hasattr(self, 'thread_stats'): + return self.thread_stats + else: + return [] + + def stop(self, timeout=None): + """Stop the thread.""" + self._stopper.set() + + def stopped(self): + """Return True is the thread is stopped.""" + return self._stopper.is_set() diff --git a/glances/plugins/wifi/model.py b/glances/plugins/wifi/model.py deleted file mode 100644 index c221d688..00000000 --- a/glances/plugins/wifi/model.py +++ /dev/null @@ -1,268 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of Glances. -# -# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com> -# -# SPDX-License-Identifier: LGPL-3.0-only -# - -"""Wifi plugin. - -Stats are retreived from the nmcli command line (Linux only): - -# nmcli -t -f active,ssid,signal,security,chan dev wifi - -# nmcli -t -f active,ssid,signal dev wifi -no:Livebox-C820:77 -yes:Livebox-C820:72 - -or the /proc/net/wireless file (Linux only): - -# cat /proc/net/wireless -Inter-| sta-| Quality | Discarded packets | Missed | WE - face | tus | link level noise | nwid crypt frag retry misc | beacon | 22 -wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0 -""" - -import operator -from shutil import which -import threading -import time - -from glances.globals import nativestr, file_exists -from glances.plugins.plugin.model import GlancesPluginModel -from glances.secure import secure_popen -from glances.logger import logger - -# Test if the nmcli command exists and is executable -# it allows to get the list of the available hotspots -NMCLI_COMMAND = which('nmcli') -NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi' -nmcli_command_exists = NMCLI_COMMAND is not None - -# Backup solution is to use the /proc/net/wireless file -# but it only give signal information about the current hotspot -WIRELESS_FILE = '/proc/net/wireless' -wireless_file_exists = file_exists(WIRELESS_FILE) - -if not nmcli_command_exists and not wireless_file_exists: - logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE)) - - -class PluginModel(GlancesPluginModel): - """Glances Wifi plugin. - - Get stats of the current Wifi hotspots. - """ - - def __init__(self, args=None, config=None): - """Init the plugin.""" - super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[]) - - # We want to display the stat in the curse interface - self.display_curse = True - - # Global Thread running all the scans - self._thread = None - - def exit(self): - """Overwrite the exit method to close threads.""" - if self._thread is not None: - self._thread.stop() - # Call the father class - super(PluginModel, self).exit() - - def get_key(self): - """Return the key of the list. - - :returns: string -- SSID is the dict key - """ - return 'ssid' - - @GlancesPluginModel._check_decorator - @GlancesPluginModel._log_result_decorator - def update(self): - """Update Wifi stats using the input method. - - Stats is a list of dict (one dict per hotspot) - - :returns: list -- Stats is a list of dict (hotspot) - """ - # Init new stats - stats = self.get_init_value() - - # Exist if we can not grab the stats - if not nmcli_command_exists and not wireless_file_exists: - return stats - - if self.input_method == 'local' and nmcli_command_exists: - # Only refresh if there is not other scanning thread - if self._thread is None: - thread_is_running = False - else: - thread_is_running = self._thread.is_alive() - if not thread_is_running: - # Run hotspot scanner thanks to the nmcli command - self._thread = ThreadHotspot(self.get_refresh_time()) - self._thread.start() - # Get the result (or [] if the scan is ongoing) - stats = self._thread.stats - elif self.input_method == 'local' and wireless_file_exists: - # As a backup solution, use the /proc/net/wireless file - with open(WIRELESS_FILE, 'r') as f: - # The first two lines are header - f.readline() - f.readline() - # Others lines are Wifi stats - wifi_stats = f.readline() - while wifi_stats != '': - # Extract the stats - wifi_stats = wifi_stats.split() - # Add the Wifi link to the list - stats.append( - { - 'key': self.get_key(), - 'ssid': wifi_stats[0][:-1], - 'signal': float(wifi_stats[3]), - 'security': '', - } - ) - # Next line - wifi_stats = f.readline() - - elif self.input_method == 'snmp': - # Update stats using SNMP - - # Not implemented yet - pass - - # Update the stats - self.stats = stats - - return self.stats - - def get_alert(self, value): - """Overwrite the default get_alert method. - - Alert is on signal quality where lower is better... - - :returns: string -- Signal alert - """ - ret = 'OK' - try: - if value <= self.get_limit('critical', stat_name=self.plugin_name): - ret = 'CRITICAL' - elif value <= self.get_limit('warning', stat_name=self.plugin_name): - ret = 'WARNING' - elif value <= self.get_limit('careful', stat_name=self.plugin_name): - ret = 'CAREFUL' - except (TypeError, KeyError): - # Catch TypeError for issue1373 - ret = 'DEFAULT' - - return ret - - def update_views(self): - """Update stats views.""" - # Call the father's method - super(PluginModel, self).update_views() - - # Add specifics information - # Alert on signal thresholds - for i in self.stats: - self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal']) - - def msg_curse(self, args=None, max_width=None): - """Return the dict to display in the curse interface.""" - # Init the return message - ret = [] - - # Only process if stats exist and display plugin enable... - if not self.stats or not wireless_file_exists or self.is_disabled(): - return ret - - # Max size for the interface name - if_name_max_width = max_width - 5 - - # Build the string message - # Header - msg = '{:{width}}'.format('WIFI', width=if_name_max_width) - ret.append(self.curse_add_line(msg, "TITLE")) - msg = '{:>7}'.format('dBm') - ret.append(self.curse_add_line(msg)) - - # Hotspot list (sorted by name) - for i in sorted(self.stats, key=operator.itemgetter(self.get_key())): - # Do not display hotspot with no name (/ssid)... - # of ssid/signal None... See issue #1151 and #issue1973 - if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None: - continue - ret.append(self.curse_new_line()) - # New hotspot - hotspot_name = i['ssid'] - # Cut hotspot_name if it is too long - if len(hotspot_name) > if_name_max_width: - hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :] - # Add the new hotspot to the message - msg = '{:{width}} {security}'.format( - nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security'] - ) - ret.append(self.curse_add_line(msg)) - msg = '{:>7}'.format( - i['signal'], - ) - ret.append( - self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration')) - ) - - return ret - - -class ThreadHotspot(threading.Thread): - """ - Specific thread for the Wifi hotspot scanner. - """ - - def __init__(self, refresh_time=2): - """Init the class.""" - super(ThreadHotspot, self).__init__() - # Refresh time - self.refresh_time = refresh_time - # Event needed to stop properly the thread - self._stopper = threading.Event() - # Is part of Ports plugin - self.plugin_name = "wifi" - - def run(self): - """Get hotspots stats using the nmcli command line""" - while not self.stopped(): - # Run the nmcli command - nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n') - nmcli_result = [] - for h in nmcli_raw: - h = h.split(':') - if len(h) != 4 or h[0] != 'yes': - # Do not process the line if it is not the active hotspot - continue - nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]}) - self.thread_stats = nmcli_result - # Wait refresh time until next scan - # Note: nmcli cache the result for x seconds - time.sleep(self.refresh_time) - - @property - def stats(self): - """Stats getter.""" - if hasattr(self, 'thread_stats'): - return self.thread_stats - else: - return [] - - def stop(self, timeout=None): - """Stop the thread.""" - self._stopper.set() - - def stopped(self): - """Return True is the thread is stopped.""" - return self._stopper.is_set() |