summaryrefslogtreecommitdiffstats
path: root/glances/plugins/wifi
diff options
context:
space:
mode:
Diffstat (limited to 'glances/plugins/wifi')
-rw-r--r--glances/plugins/wifi/__init__.py268
-rw-r--r--glances/plugins/wifi/model.py268
2 files changed, 268 insertions, 268 deletions
diff --git a/glances/plugins/wifi/__init__.py b/glances/plugins/wifi/__init__.py
index e69de29b..c221d688 100644
--- a/glances/plugins/wifi/__init__.py
+++ b/glances/plugins/wifi/__init__.py
@@ -0,0 +1,268 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of Glances.
+#
+# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com>
+#
+# SPDX-License-Identifier: LGPL-3.0-only
+#
+
+"""Wifi plugin.
+
+Stats are retreived from the nmcli command line (Linux only):
+
+# nmcli -t -f active,ssid,signal,security,chan dev wifi
+
+# nmcli -t -f active,ssid,signal dev wifi
+no:Livebox-C820:77
+yes:Livebox-C820:72
+
+or the /proc/net/wireless file (Linux only):
+
+# cat /proc/net/wireless
+Inter-| sta-| Quality | Discarded packets | Missed | WE
+ face | tus | link level noise | nwid crypt frag retry misc | beacon | 22
+wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0
+"""
+
+import operator
+from shutil import which
+import threading
+import time
+
+from glances.globals import nativestr, file_exists
+from glances.plugins.plugin.model import GlancesPluginModel
+from glances.secure import secure_popen
+from glances.logger import logger
+
+# Test if the nmcli command exists and is executable
+# it allows to get the list of the available hotspots
+NMCLI_COMMAND = which('nmcli')
+NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi'
+nmcli_command_exists = NMCLI_COMMAND is not None
+
+# Backup solution is to use the /proc/net/wireless file
+# but it only give signal information about the current hotspot
+WIRELESS_FILE = '/proc/net/wireless'
+wireless_file_exists = file_exists(WIRELESS_FILE)
+
+if not nmcli_command_exists and not wireless_file_exists:
+ logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE))
+
+
+class PluginModel(GlancesPluginModel):
+ """Glances Wifi plugin.
+
+ Get stats of the current Wifi hotspots.
+ """
+
+ def __init__(self, args=None, config=None):
+ """Init the plugin."""
+ super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
+
+ # We want to display the stat in the curse interface
+ self.display_curse = True
+
+ # Global Thread running all the scans
+ self._thread = None
+
+ def exit(self):
+ """Overwrite the exit method to close threads."""
+ if self._thread is not None:
+ self._thread.stop()
+ # Call the father class
+ super(PluginModel, self).exit()
+
+ def get_key(self):
+ """Return the key of the list.
+
+ :returns: string -- SSID is the dict key
+ """
+ return 'ssid'
+
+ @GlancesPluginModel._check_decorator
+ @GlancesPluginModel._log_result_decorator
+ def update(self):
+ """Update Wifi stats using the input method.
+
+ Stats is a list of dict (one dict per hotspot)
+
+ :returns: list -- Stats is a list of dict (hotspot)
+ """
+ # Init new stats
+ stats = self.get_init_value()
+
+ # Exist if we can not grab the stats
+ if not nmcli_command_exists and not wireless_file_exists:
+ return stats
+
+ if self.input_method == 'local' and nmcli_command_exists:
+ # Only refresh if there is not other scanning thread
+ if self._thread is None:
+ thread_is_running = False
+ else:
+ thread_is_running = self._thread.is_alive()
+ if not thread_is_running:
+ # Run hotspot scanner thanks to the nmcli command
+ self._thread = ThreadHotspot(self.get_refresh_time())
+ self._thread.start()
+ # Get the result (or [] if the scan is ongoing)
+ stats = self._thread.stats
+ elif self.input_method == 'local' and wireless_file_exists:
+ # As a backup solution, use the /proc/net/wireless file
+ with open(WIRELESS_FILE, 'r') as f:
+ # The first two lines are header
+ f.readline()
+ f.readline()
+ # Others lines are Wifi stats
+ wifi_stats = f.readline()
+ while wifi_stats != '':
+ # Extract the stats
+ wifi_stats = wifi_stats.split()
+ # Add the Wifi link to the list
+ stats.append(
+ {
+ 'key': self.get_key(),
+ 'ssid': wifi_stats[0][:-1],
+ 'signal': float(wifi_stats[3]),
+ 'security': '',
+ }
+ )
+ # Next line
+ wifi_stats = f.readline()
+
+ elif self.input_method == 'snmp':
+ # Update stats using SNMP
+
+ # Not implemented yet
+ pass
+
+ # Update the stats
+ self.stats = stats
+
+ return self.stats
+
+ def get_alert(self, value):
+ """Overwrite the default get_alert method.
+
+ Alert is on signal quality where lower is better...
+
+ :returns: string -- Signal alert
+ """
+ ret = 'OK'
+ try:
+ if value <= self.get_limit('critical', stat_name=self.plugin_name):
+ ret = 'CRITICAL'
+ elif value <= self.get_limit('warning', stat_name=self.plugin_name):
+ ret = 'WARNING'
+ elif value <= self.get_limit('careful', stat_name=self.plugin_name):
+ ret = 'CAREFUL'
+ except (TypeError, KeyError):
+ # Catch TypeError for issue1373
+ ret = 'DEFAULT'
+
+ return ret
+
+ def update_views(self):
+ """Update stats views."""
+ # Call the father's method
+ super(PluginModel, self).update_views()
+
+ # Add specifics information
+ # Alert on signal thresholds
+ for i in self.stats:
+ self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal'])
+
+ def msg_curse(self, args=None, max_width=None):
+ """Return the dict to display in the curse interface."""
+ # Init the return message
+ ret = []
+
+ # Only process if stats exist and display plugin enable...
+ if not self.stats or not wireless_file_exists or self.is_disabled():
+ return ret
+
+ # Max size for the interface name
+ if_name_max_width = max_width - 5
+
+ # Build the string message
+ # Header
+ msg = '{:{width}}'.format('WIFI', width=if_name_max_width)
+ ret.append(self.curse_add_line(msg, "TITLE"))
+ msg = '{:>7}'.format('dBm')
+ ret.append(self.curse_add_line(msg))
+
+ # Hotspot list (sorted by name)
+ for i in sorted(self.stats, key=operator.itemgetter(self.get_key())):
+ # Do not display hotspot with no name (/ssid)...
+ # of ssid/signal None... See issue #1151 and #issue1973
+ if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None:
+ continue
+ ret.append(self.curse_new_line())
+ # New hotspot
+ hotspot_name = i['ssid']
+ # Cut hotspot_name if it is too long
+ if len(hotspot_name) > if_name_max_width:
+ hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :]
+ # Add the new hotspot to the message
+ msg = '{:{width}} {security}'.format(
+ nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security']
+ )
+ ret.append(self.curse_add_line(msg))
+ msg = '{:>7}'.format(
+ i['signal'],
+ )
+ ret.append(
+ self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration'))
+ )
+
+ return ret
+
+
+class ThreadHotspot(threading.Thread):
+ """
+ Specific thread for the Wifi hotspot scanner.
+ """
+
+ def __init__(self, refresh_time=2):
+ """Init the class."""
+ super(ThreadHotspot, self).__init__()
+ # Refresh time
+ self.refresh_time = refresh_time
+ # Event needed to stop properly the thread
+ self._stopper = threading.Event()
+ # Is part of Ports plugin
+ self.plugin_name = "wifi"
+
+ def run(self):
+ """Get hotspots stats using the nmcli command line"""
+ while not self.stopped():
+ # Run the nmcli command
+ nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n')
+ nmcli_result = []
+ for h in nmcli_raw:
+ h = h.split(':')
+ if len(h) != 4 or h[0] != 'yes':
+ # Do not process the line if it is not the active hotspot
+ continue
+ nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]})
+ self.thread_stats = nmcli_result
+ # Wait refresh time until next scan
+ # Note: nmcli cache the result for x seconds
+ time.sleep(self.refresh_time)
+
+ @property
+ def stats(self):
+ """Stats getter."""
+ if hasattr(self, 'thread_stats'):
+ return self.thread_stats
+ else:
+ return []
+
+ def stop(self, timeout=None):
+ """Stop the thread."""
+ self._stopper.set()
+
+ def stopped(self):
+ """Return True is the thread is stopped."""
+ return self._stopper.is_set()
diff --git a/glances/plugins/wifi/model.py b/glances/plugins/wifi/model.py
deleted file mode 100644
index c221d688..00000000
--- a/glances/plugins/wifi/model.py
+++ /dev/null
@@ -1,268 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# This file is part of Glances.
-#
-# SPDX-FileCopyrightText: 2023 Nicolas Hennion <nicolas@nicolargo.com>
-#
-# SPDX-License-Identifier: LGPL-3.0-only
-#
-
-"""Wifi plugin.
-
-Stats are retreived from the nmcli command line (Linux only):
-
-# nmcli -t -f active,ssid,signal,security,chan dev wifi
-
-# nmcli -t -f active,ssid,signal dev wifi
-no:Livebox-C820:77
-yes:Livebox-C820:72
-
-or the /proc/net/wireless file (Linux only):
-
-# cat /proc/net/wireless
-Inter-| sta-| Quality | Discarded packets | Missed | WE
- face | tus | link level noise | nwid crypt frag retry misc | beacon | 22
-wlp2s0: 0000 51. -59. -256 0 0 0 0 5881 0
-"""
-
-import operator
-from shutil import which
-import threading
-import time
-
-from glances.globals import nativestr, file_exists
-from glances.plugins.plugin.model import GlancesPluginModel
-from glances.secure import secure_popen
-from glances.logger import logger
-
-# Test if the nmcli command exists and is executable
-# it allows to get the list of the available hotspots
-NMCLI_COMMAND = which('nmcli')
-NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi'
-nmcli_command_exists = NMCLI_COMMAND is not None
-
-# Backup solution is to use the /proc/net/wireless file
-# but it only give signal information about the current hotspot
-WIRELESS_FILE = '/proc/net/wireless'
-wireless_file_exists = file_exists(WIRELESS_FILE)
-
-if not nmcli_command_exists and not wireless_file_exists:
- logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli', WIRELESS_FILE))
-
-
-class PluginModel(GlancesPluginModel):
- """Glances Wifi plugin.
-
- Get stats of the current Wifi hotspots.
- """
-
- def __init__(self, args=None, config=None):
- """Init the plugin."""
- super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
-
- # We want to display the stat in the curse interface
- self.display_curse = True
-
- # Global Thread running all the scans
- self._thread = None
-
- def exit(self):
- """Overwrite the exit method to close threads."""
- if self._thread is not None:
- self._thread.stop()
- # Call the father class
- super(PluginModel, self).exit()
-
- def get_key(self):
- """Return the key of the list.
-
- :returns: string -- SSID is the dict key
- """
- return 'ssid'
-
- @GlancesPluginModel._check_decorator
- @GlancesPluginModel._log_result_decorator
- def update(self):
- """Update Wifi stats using the input method.
-
- Stats is a list of dict (one dict per hotspot)
-
- :returns: list -- Stats is a list of dict (hotspot)
- """
- # Init new stats
- stats = self.get_init_value()
-
- # Exist if we can not grab the stats
- if not nmcli_command_exists and not wireless_file_exists:
- return stats
-
- if self.input_method == 'local' and nmcli_command_exists:
- # Only refresh if there is not other scanning thread
- if self._thread is None:
- thread_is_running = False
- else:
- thread_is_running = self._thread.is_alive()
- if not thread_is_running:
- # Run hotspot scanner thanks to the nmcli command
- self._thread = ThreadHotspot(self.get_refresh_time())
- self._thread.start()
- # Get the result (or [] if the scan is ongoing)
- stats = self._thread.stats
- elif self.input_method == 'local' and wireless_file_exists:
- # As a backup solution, use the /proc/net/wireless file
- with open(WIRELESS_FILE, 'r') as f:
- # The first two lines are header
- f.readline()
- f.readline()
- # Others lines are Wifi stats
- wifi_stats = f.readline()
- while wifi_stats != '':
- # Extract the stats
- wifi_stats = wifi_stats.split()
- # Add the Wifi link to the list
- stats.append(
- {
- 'key': self.get_key(),
- 'ssid': wifi_stats[0][:-1],
- 'signal': float(wifi_stats[3]),
- 'security': '',
- }
- )
- # Next line
- wifi_stats = f.readline()
-
- elif self.input_method == 'snmp':
- # Update stats using SNMP
-
- # Not implemented yet
- pass
-
- # Update the stats
- self.stats = stats
-
- return self.stats
-
- def get_alert(self, value):
- """Overwrite the default get_alert method.
-
- Alert is on signal quality where lower is better...
-
- :returns: string -- Signal alert
- """
- ret = 'OK'
- try:
- if value <= self.get_limit('critical', stat_name=self.plugin_name):
- ret = 'CRITICAL'
- elif value <= self.get_limit('warning', stat_name=self.plugin_name):
- ret = 'WARNING'
- elif value <= self.get_limit('careful', stat_name=self.plugin_name):
- ret = 'CAREFUL'
- except (TypeError, KeyError):
- # Catch TypeError for issue1373
- ret = 'DEFAULT'
-
- return ret
-
- def update_views(self):
- """Update stats views."""
- # Call the father's method
- super(PluginModel, self).update_views()
-
- # Add specifics information
- # Alert on signal thresholds
- for i in self.stats:
- self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal'])
-
- def msg_curse(self, args=None, max_width=None):
- """Return the dict to display in the curse interface."""
- # Init the return message
- ret = []
-
- # Only process if stats exist and display plugin enable...
- if not self.stats or not wireless_file_exists or self.is_disabled():
- return ret
-
- # Max size for the interface name
- if_name_max_width = max_width - 5
-
- # Build the string message
- # Header
- msg = '{:{width}}'.format('WIFI', width=if_name_max_width)
- ret.append(self.curse_add_line(msg, "TITLE"))
- msg = '{:>7}'.format('dBm')
- ret.append(self.curse_add_line(msg))
-
- # Hotspot list (sorted by name)
- for i in sorted(self.stats, key=operator.itemgetter(self.get_key())):
- # Do not display hotspot with no name (/ssid)...
- # of ssid/signal None... See issue #1151 and #issue1973
- if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None:
- continue
- ret.append(self.curse_new_line())
- # New hotspot
- hotspot_name = i['ssid']
- # Cut hotspot_name if it is too long
- if len(hotspot_name) > if_name_max_width:
- hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1 :]
- # Add the new hotspot to the message
- msg = '{:{width}} {security}'.format(
- nativestr(hotspot_name), width=if_name_max_width - len(i['security']) - 1, security=i['security']
- )
- ret.append(self.curse_add_line(msg))
- msg = '{:>7}'.format(
- i['signal'],
- )
- ret.append(
- self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration'))
- )
-
- return ret
-
-
-class ThreadHotspot(threading.Thread):
- """
- Specific thread for the Wifi hotspot scanner.
- """
-
- def __init__(self, refresh_time=2):
- """Init the class."""
- super(ThreadHotspot, self).__init__()
- # Refresh time
- self.refresh_time = refresh_time
- # Event needed to stop properly the thread
- self._stopper = threading.Event()
- # Is part of Ports plugin
- self.plugin_name = "wifi"
-
- def run(self):
- """Get hotspots stats using the nmcli command line"""
- while not self.stopped():
- # Run the nmcli command
- nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n')
- nmcli_result = []
- for h in nmcli_raw:
- h = h.split(':')
- if len(h) != 4 or h[0] != 'yes':
- # Do not process the line if it is not the active hotspot
- continue
- nmcli_result.append({'key': 'ssid', 'ssid': h[1], 'signal': -float(h[2]), 'security': h[3]})
- self.thread_stats = nmcli_result
- # Wait refresh time until next scan
- # Note: nmcli cache the result for x seconds
- time.sleep(self.refresh_time)
-
- @property
- def stats(self):
- """Stats getter."""
- if hasattr(self, 'thread_stats'):
- return self.thread_stats
- else:
- return []
-
- def stop(self, timeout=None):
- """Stop the thread."""
- self._stopper.set()
-
- def stopped(self):
- """Return True is the thread is stopped."""
- return self._stopper.is_set()