summaryrefslogtreecommitdiffstats
path: root/glances/plugins/wifi/model.py
diff options
context:
space:
mode:
Diffstat (limited to 'glances/plugins/wifi/model.py')
-rw-r--r--glances/plugins/wifi/model.py268
1 files changed, 0 insertions, 268 deletions
diff --git a/glances/plugins/wifi/model.py b/glances/plugins/wifi/model.py
deleted file mode 100644
index c221d688..00000000
--- a/glances/plugins/wifi/model.py
+++ /dev/null
@@ -1,268 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# This file is part of Glances.
-#
-# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com>
-#
-# SPDX-License-Identifier: LGPL-3.0-only
-#
-
-"""Wifi plugin.
-
-Stats are retreived from the nmcli command line (Linux only):
-
-# nmcli -t -f active,ssid,signal,security,chan dev wifi
-
-# nmcli -t -f active,ssid,signal dev wifi
-no:Livebox-C820:77
-yes:Livebox-C820:72
-
-or the /proc/net/wireless file (Linux only):
-
-# cat /proc/net/wireless
-Inter-| sta-| Quality | Discarded packets | Missed | WE
- face | tus | link level noise | nwid crypt frag retry misc | beacon | 22
-wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0
-"""
-
-import operator
-from shutil import which
-import threading
-import time
-
-from glances.globals import nativestr, file_exists
-from glances.plugins.plugin.model import GlancesPluginModel
-from glances.secure import secure_popen
-from glances.logger import logger
-
-# Test if the nmcli command exists and is executable
-# it allows to get the list of the available hotspots
-NMCLI_COMMAND = which('nmcli')
-NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi'
-nmcli_command_exists = NMCLI_COMMAND is not None
-
-# Backup solution is to use the /proc/net/wireless file
-# but it only give signal information about the current hotspot
-WIRELESS_FILE = '/proc/net/wireless'
-wireless_file_exists = file_exists(WIRELESS_FILE)
-
-if not nmcli_command_exists and not wireless_file_exists:
- logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE))
-
-
-class PluginModel(GlancesPluginModel):
- """Glances Wifi plugin.
-
- Get stats of the current Wifi hotspots.
- """
-
- def __init__(self, args=None, config=None):
- """Init the plugin."""
- super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
-
- # We want to display the stat in the curse interface
- self.display_curse = True
-
- # Global Thread running all the scans
- self._thread = None
-
- def exit(self):
- """Overwrite the exit method to close threads."""
- if self._thread is not None:
- self._thread.stop()
- # Call the father class
- super(PluginModel, self).exit()
-
- def get_key(self):
- """Return the key of the list.
-
- :returns: string -- SSID is the dict key
- """
- return 'ssid'
-
- @GlancesPluginModel._check_decorator
- @GlancesPluginModel._log_result_decorator
- def update(self):
- """Update Wifi stats using the input method.
-
- Stats is a list of dict (one dict per hotspot)
-
- :returns: list -- Stats is a list of dict (hotspot)
- """
- # Init new stats
- stats = self.get_init_value()
-
- # Exist if we can not grab the stats
- if not nmcli_command_exists and not wireless_file_exists:
- return stats
-
- if self.input_method == 'local' and nmcli_command_exists:
- # Only refresh if there is not other scanning thread
- if self._thread is None:
- thread_is_running = False
- else:
- thread_is_running = self._thread.is_alive()
- if not thread_is_running:
- # Run hotspot scanner thanks to the nmcli command
- self._thread = ThreadHotspot(self.get_refresh_time())
- self._thread.start()
- # Get the result (or [] if the scan is ongoing)
- stats = self._thread.stats
- elif self.input_method == 'local' and wireless_file_exists:
- # As a backup solution, use the /proc/net/wireless file
- with open(WIRELESS_FILE, 'r') as f:
- # The first two lines are header
- f.readline()
- f.readline()
- # Others lines are Wifi stats
- wifi_stats = f.readline()
- while wifi_stats != '':
- # Extract the stats
- wifi_stats = wifi_stats.split()
- # Add the Wifi link to the list
- stats.append(
- {
- 'key': self.get_key(),
- 'ssid': wifi_stats[0][:-1],
- 'signal': float(wifi_stats[3]),
- 'security': '',
- }
- )
- # Next line
- wifi_stats = f.readline()
-
- elif self.input_method == 'snmp':
- # Update stats using SNMP
-
- # Not implemented yet
- pass
-
- # Update the stats
- self.stats = stats
-
- return self.stats
-
- def get_alert(self, value):
- """Overwrite the default get_alert method.
-
- Alert is on signal quality where lower is better...
-
- :returns: string -- Signal alert
- """
- ret = 'OK'
- try:
- if value <= self.get_limit('critical', stat_name=self.plugin_name):
- ret = 'CRITICAL'
- elif value <= self.get_limit('warning', stat_name=self.plugin_name):
- ret = 'WARNING'
- elif value <= self.get_limit('careful', stat_name=self.plugin_name):
- ret = 'CAREFUL'
- except (TypeError, KeyError):
- # Catch TypeError for issue1373
- ret = 'DEFAULT'
-
- return ret
-
- def update_views(self):
- """Update stats views."""
- # Call the father's method
- super(PluginModel, self).update_views()
-
- # Add specifics information
- # Alert on signal thresholds
- for i in self.stats:
- self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal'])
-
- def msg_curse(self, args=None, max_width=None):
- """Return the dict to display in the curse interface."""
- # Init the return message
- ret = []
-
- # Only process if stats exist and display plugin enable...
- if not self.stats or not wireless_file_exists or self.is_disabled():
- return ret
-
- # Max size for the interface name
- if_name_max_width = max_width - 5
-
- # Build the string message
- # Header
- msg = '{:{width}}'.format('WIFI', width=if_name_max_width)
- ret.append(self.curse_add_line(msg, "TITLE"))
- msg = '{:>7}'.format('dBm')
- ret.append(self.curse_add_line(msg))
-
- # Hotspot list (sorted by name)
- for i in sorted(self.stats, key=operator.itemgetter(self.get_key())):
- # Do not display hotspot with no name (/ssid)...
- # of ssid/signal None... See issue #1151 and #issue1973
- if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None:
- continue
- ret.append(self.curse_new_line())
- # New hotspot
- hotspot_name = i['ssid']
- # Cut hotspot_name if it is too long
- if len(hotspot_name) > if_name_max_width:
- hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :]
- # Add the new hotspot to the message
- msg = '{:{width}} {security}'.format(
- nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security']
- )
- ret.append(self.curse_add_line(msg))
- msg = '{:>7}'.format(
- i['signal'],
- )
- ret.append(
- self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration'))
- )
-
- return ret
-
-
-class ThreadHotspot(threading.Thread):
- """
- Specific thread for the Wifi hotspot scanner.
- """
-
- def __init__(self, refresh_time=2):
- """Init the class."""
- super(ThreadHotspot, self).__init__()
- # Refresh time
- self.refresh_time = refresh_time
- # Event needed to stop properly the thread
- self._stopper = threading.Event()
- # Is part of Ports plugin
- self.plugin_name = "wifi"
-
- def run(self):
- """Get hotspots stats using the nmcli command line"""
- while not self.stopped():
- # Run the nmcli command
- nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n')
- nmcli_result = []
- for h in nmcli_raw:
- h = h.split(':')
- if len(h) != 4 or h[0] != 'yes':
- # Do not process the line if it is not the active hotspot
- continue
- nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]})
- self.thread_stats = nmcli_result
- # Wait refresh time until next scan
- # Note: nmcli cache the result for x seconds
- time.sleep(self.refresh_time)
-
- @property
- def stats(self):
- """Stats getter."""
- if hasattr(self, 'thread_stats'):
- return self.thread_stats
- else:
- return []
-
- def stop(self, timeout=None):
- """Stop the thread."""
- self._stopper.set()
-
- def stopped(self):
- """Return True is the thread is stopped."""
- return self._stopper.is_set()