summaryrefslogtreecommitdiffstats
path: root/glances/filter.py
blob: 55950060434a193ecca6e02e73ab65ca28fe775b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <nicolas@nicolargo.com>
#
# SPDX-License-Identifier: LGPL-3.0-only
#

import re

from glances.logger import logger


class GlancesFilter(object):

    """Allow Glances to filter processes

    >>> f = GlancesFilter()
    >>> f.filter = '.*python.*'
    >>> f.filter
    '.*python.*'
    >>> f.key
    None
    >>> f.filter = 'user:nicolargo'
    >>> f.filter
    'nicolargo'
    >>> f.key
    'user'
    >>> f.filter = 'username:.*nico.*'
    >>> f.filter
    '.*nico.*'
    >>> f.key
    'username'
    """

    def __init__(self):
        # Filter entered by the user (string)
        self._filter_input = None
        # Filter to apply
        self._filter = None
        # Filter regular expression
        self._filter_re = None
        # Dict key where the filter should be applied
        # Default is None: search on command line and process name
        self._filter_key = None

    @property
    def filter_input(self):
        """Return the filter given by the user (as a string)"""
        return self._filter_input

    @property
    def filter(self):
        """Return the current filter to be applied"""
        return self._filter

    @filter.setter
    def filter(self, value):
        """Set the filter (as a string) and compute the regular expression

        A filter could be one of the following:
        - python > Process name of cmd start with python
        - .*python.* > Process name of cmd contain python
        - username:nicolargo > Process of nicolargo user
        """
        self._filter_input = value
        if value is None:
            self._filter = None
            self._filter_key = None
        else:
            new_filter = value.split(':')
            if len(new_filter) == 1:
                self._filter = new_filter[0]
                self._filter_key = None
            else:
                self._filter = new_filter[1]
                self._filter_key = new_filter[0]

        self._filter_re = None
        if self.filter is not None:
            logger.info("Set filter to {} on key {}".format(self.filter, self.filter_key))
            # Compute the regular expression
            try:
                self._filter_re = re.compile(self.filter)
                logger.debug("Filter regex compilation OK: {}".format(self.filter))
            except Exception as e:
                logger.error("Cannot compile filter regex: {} ({})".format(self.filter, e))
                self._filter = None
                self._filter_re = None
                self._filter_key = None

    @property
    def filter_re(self):
        """Return the filter regular expression"""
        return self._filter_re

    @property
    def filter_key(self):
        """key where the filter should be applied"""
        return self._filter_key

    def is_filtered(self, process):
        """Return True if the process item match the current filter

        :param process: A dict corresponding to the process item.
        """
        if self.filter is None:
            # No filter => Not filtered
            return False

        if self.filter_key is None:
            # Apply filter on command line and process name
            return self._is_process_filtered(process, key='name') or self._is_process_filtered(process, key='cmdline')
        else:
            # Apply filter on <key>
            return self._is_process_filtered(process)

    def _is_process_filtered(self, process, key=None):
        """Return True if the process[key] should be filtered according to the current filter"""
        if key is None:
            key = self.filter_key
        try:
            # If the item process[key] is a list, convert it to a string
            # in order to match it with the current regular expression
            if isinstance(process[key], list):
                value = ' '.join(process[key])
            else:
                value = process[key]
        except KeyError:
            # If the key did not exist
            return False
        try:
            return self._filter_re.fullmatch(value) is None
        except (AttributeError, TypeError):
            # AttributeError -  Filter processes crashes with a bad regular expression pattern (issue #665)
            # TypeError - Filter processes crashes if value is None (issue #1105)
            return False