summaryrefslogtreecommitdiffstats
path: root/glances/ports_list.py
blob: 6f824cb526d422fd23e1044640b18dc48e6c2940 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#

"""Manage the Glances ports list (Ports plugin)."""

from glances.logger import logger
from glances.globals import BSD

# XXX *BSDs: Segmentation fault (core dumped)
# -- https://bitbucket.org/al45tair/netifaces/issues/15
# Also used in the glances_ip plugin
if not BSD:
    try:
        import netifaces

        netifaces_tag = True
    except ImportError:
        netifaces_tag = False
else:
    netifaces_tag = False


class GlancesPortsList(object):

    """Manage the ports list for the ports plugin."""

    _section = "ports"
    _default_refresh = 60
    _default_timeout = 3

    def __init__(self, config=None, args=None):
        # ports_list is a list of dict (JSON compliant)
        # [ {'host': 'www.google.fr', 'port': 443, 'refresh': 30, 'description': Internet, 'status': True} ... ]
        # Load the configuration file
        self._ports_list = self.load(config)

    def load(self, config):
        """Load the ports list from the configuration file."""
        ports_list = []

        if config is None:
            logger.debug("No configuration file available. Cannot load ports list.")
        elif not config.has_section(self._section):
            logger.debug("No [%s] section in the configuration file. Cannot load ports list." % self._section)
        else:
            logger.debug("Start reading the [%s] section in the configuration file" % self._section)

            refresh = int(config.get_value(self._section, 'refresh', default=self._default_refresh))
            timeout = int(config.get_value(self._section, 'timeout', default=self._default_timeout))

            # Add default gateway on top of the ports_list lists
            default_gateway = config.get_value(self._section, 'port_default_gateway', default='False')
            if default_gateway.lower().startswith('true') and netifaces_tag:
                new_port = {}
                try:
                    new_port['host'] = netifaces.gateways()['default'][netifaces.AF_INET][0]
                except KeyError:
                    new_port['host'] = None
                # ICMP
                new_port['port'] = 0
                new_port['description'] = 'DefaultGateway'
                new_port['refresh'] = refresh
                new_port['timeout'] = timeout
                new_port['status'] = None
                new_port['rtt_warning'] = None
                new_port['indice'] = str('port_0')
                logger.debug("Add default gateway %s to the static list" % (new_port['host']))
                ports_list.append(new_port)

            # Read the scan list
            for i in range(1, 256):
                new_port = {}
                postfix = 'port_%s_' % str(i)

                # Read mandatory configuration key: host
                new_port['host'] = config.get_value(self._section, '%s%s' % (postfix, 'host'))

                if new_port['host'] is None:
                    continue

                # Read optionals configuration keys
                # Port is set to 0 by default. 0 mean ICMP check instead of TCP check
                new_port['port'] = config.get_value(self._section, '%s%s' % (postfix, 'port'), 0)
                new_port['description'] = config.get_value(
                    self._section, '%sdescription' % postfix, default="%s:%s" % (new_port['host'], new_port['port'])
                )

                # Default status
                new_port['status'] = None

                # Refresh rate in second
                new_port['refresh'] = refresh

                # Timeout in second
                new_port['timeout'] = int(config.get_value(self._section, '%stimeout' % postfix, default=timeout))

                # RTT warning
                new_port['rtt_warning'] = config.get_value(self._section, '%srtt_warning' % postfix, default=None)
                if new_port['rtt_warning'] is not None:
                    # Convert to second
                    new_port['rtt_warning'] = int(new_port['rtt_warning']) / 1000.0

                # Indice
                new_port['indice'] = 'port_' + str(i)

                # Add the server to the list
                logger.debug("Add port %s:%s to the static list" % (new_port['host'], new_port['port']))
                ports_list.append(new_port)

            # Ports list loaded
            logger.debug("Ports list loaded: %s" % ports_list)

        return ports_list

    def get_ports_list(self):
        """Return the current server list (dict of dict)."""
        return self._ports_list

    def set_server(self, pos, key, value):
        """Set the key to the value for the pos (position in the list)."""
        self._ports_list[pos][key] = value