summaryrefslogtreecommitdiffstats
path: root/peekaboo/ruleset/rules.py
diff options
context:
space:
mode:
authorMichael Weiser <michael.weiser@gmx.de>2019-04-18 08:57:15 +0000
committerMichael Weiser <michael.weiser@gmx.de>2019-04-25 12:20:20 +0000
commit77d2ff1348a6f1ade05b54b297fc771abd0e14cd (patch)
treee7914e3ad89f36b65aced27f53132244f894832f /peekaboo/ruleset/rules.py
parente6c44a8ca3c2216904731e4d889e53473691c0dc (diff)
Validate ruleset config
Validate the ruleset configuration at startup to inform the user about misconfiguration and exit immediately instead of giving warnings during seemingly normal operation. This also gives us a chance to pre-compile regexes for more efficient matching later on. We give rules a new method get_config() which retrieves their configuration. This is called for each rule by new method the validate_config() of the ruleset engine to catch errors. This way the layout and extent of configuration is still completely governed by the rule and we can interview it about its happiness with what's provided in the configuration file. As an incidental cleanup, merge class PeekabooRulesetConfig into PeekabooRulesetParser because there's nothing left where it could and would need to help the rules with an abstraction of the config file. Also switch class PeekabooConfig to be a subclass of PeekabooConfigParser so it can (potentially) benefit from the list parsing code there. By moving the special log level and by-default-type getters over there as well we end up with nicely generic config classes that can benefit directly from improvements in the configparser module. Update the test suite to test and use this new functionality. Incidentally, remove the convoluted inheritance-based config testing layout in favour of creating subclasses of the config classes.
Diffstat (limited to 'peekaboo/ruleset/rules.py')
-rw-r--r--peekaboo/ruleset/rules.py180
1 files changed, 123 insertions, 57 deletions
diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py
index a631cc1..90f9dcb 100644
--- a/peekaboo/ruleset/rules.py
+++ b/peekaboo/ruleset/rules.py
@@ -30,7 +30,7 @@ import re
import logging
from peekaboo.ruleset import Result, RuleResult
from peekaboo.exceptions import PeekabooAnalysisDeferred, \
- CuckooSubmitFailedException
+ CuckooSubmitFailedException, PeekabooRulesetConfigError
logger = logging.getLogger(__name__)
@@ -45,13 +45,10 @@ class Rule(object):
def __init__(self, config=None, db_con=None):
""" Initialize common configuration and resources """
self.db_con = db_con
+ self.config = config
- # initialise and retain config as empty dict if no rule config is given
- # to us so the rule can rely on it and does not need to do any type
- # checking
- self.config = {}
- if config is not None:
- self.config = config
+ # initialise and validate configuration
+ self.get_config()
def result(self, result, reason, further_analysis):
""" Construct a RuleResult for returning to the engine. """
@@ -68,6 +65,56 @@ class Rule(object):
"""
raise NotImplementedError
+ def get_config(self):
+ """ Extract this rule's configuration out of the ruleset configuration
+ object given at creation. To be overridden by child classes if they
+ have configuration options. """
+ # pass
+
+ # the following getters are somewhat boilerplate but unavoidable for now.
+ # They serve the purpose of keeping config access specifics out of rules for
+ # the sake of readablility.
+ def get_config_value(self, getter, option, *args, **kwargs):
+ """ Get a configuation value for this rule from the ruleset
+ configuration. Getter routine and option name to be provided by caller.
+ The rule's name is always used as configuration section name.
+
+ @param getter: getter routine to use
+ @type getter: getter method of PeekabooConfigParser
+ @param option: name of option to read
+ @type option: string
+ @param args, kwargs: additional arguments passed to the getter routine,
+ such as fallback.
+
+ @returns: configuration value read from config
+ """
+ # additional common logic to go here
+ return getter(self.rule_name, option, *args, **kwargs)
+
+ def get_config_int(self, option, default=None):
+ """ Get an integer from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getint, option, fallback=default)
+
+ def get_config_float(self, option, default=None):
+ """ Get a float from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getfloat, option, fallback=default)
+
+ def get_config_list(self, option, default=None):
+ """ Get a list from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getlist, option, fallback=default)
+
+ def get_config_relist(self, option, default=None):
+ """ Get a list of compiled regular expressions from the ruleset. See
+ get_config_value for parameters. """
+ return self.get_config_value(
+ self.config.getrelist, option, fallback=default)
+
class KnownRule(Rule):
""" A rule determining if a sample is known by looking at the database for
@@ -92,12 +139,13 @@ class FileLargerThanRule(Rule):
"""
rule_name = 'file_larger_than'
+ def get_config(self):
+ self.size_threshold = self.get_config_int('bytes', 5)
+
def evaluate(self, sample):
""" Evaluate whether the sample is larger than a certain threshold.
Advise the engine to stop processing if the size is below the
threshold. """
- size = int(self.config.get('bytes', 5))
-
try:
sample_size = sample.file_size
except OSError as oserr:
@@ -106,14 +154,15 @@ class FileLargerThanRule(Rule):
_("Failure to determine sample file size: %s") % oserr,
False)
- if sample_size > size:
+ if sample_size > self.size_threshold:
return self.result(Result.unknown,
- _("File has more than %d bytes") % size,
+ _("File has more than %d bytes")
+ % self.size_threshold,
True)
return self.result(
Result.ignored,
- _("File is more than %d bytes long") % sample.file_size,
+ _("File is only %d bytes long") % sample_size,
False)
@@ -122,15 +171,18 @@ class FileTypeOnWhitelistRule(Rule):
whitelist. """
rule_name = 'file_type_on_whitelist'
+ def get_config(self):
+ whitelist = self.get_config_list('whitelist')
+ if not whitelist:
+ raise PeekabooRulesetConfigError(
+ "Empty whitelist, check %s rule config." % self.rule_name)
+
+ self.whitelist = set(whitelist)
+
def evaluate(self, sample):
""" Ignore the file only if *all* of its mime types are on the
whitelist and we could determine at least one. """
- whitelist = self.config.get('whitelist', ())
- if not whitelist:
- logger.warning("Empty whitelist, check ruleset config.")
- return self.result(Result.unknown, "Whitelist ist leer", True)
-
- if sample.mimetypes and sample.mimetypes.issubset(set(whitelist)):
+ if sample.mimetypes and sample.mimetypes.issubset(self.whitelist):
return self.result(Result.ignored,
_("File type is on whitelist"),
False)
@@ -145,15 +197,18 @@ class FileTypeOnGreylistRule(Rule):
greylist, i.e. enabled for analysis. """
rule_name = 'file_type_on_greylist'
+ def get_config(self):
+ greylist = self.get_config_list('greylist')
+ if not greylist:
+ raise PeekabooRulesetConfigError(
+ "Empty greylist, check %s rule config." % self.rule_name)
+
+ self.greylist = set(greylist)
+
def evaluate(self, sample):
""" Continue analysis if any of the sample's MIME types are on the
greylist or in case we don't have one. """
- greylist = self.config.get('greylist', ())
- if not greylist:
- logger.warning("Empty greylist, check ruleset config.")
- return self.result(Result.unknown, "Greylist is leer", False)
-
- if not sample.mimetypes or sample.mimetypes.intersection(set(greylist)):
+ if not sample.mimetypes or sample.mimetypes.intersection(self.greylist):
return self.result(Result.unknown,
_("File type is on the list of types to "
"analyze"),
@@ -232,18 +287,18 @@ class CuckooEvilSigRule(CuckooRule):
of signatures considered bad. """
rule_name = 'cuckoo_evil_sig'
- def evaluate_report(self, report):
- """ Evaluate the sample against signatures that if matched mark a
- sample as bad. """
+ def get_config(self):
# list all installed signatures
# grep -o "description.*" -R . ~/cuckoo2.0/modules/signatures/
- bad_sigs = self.config.get('signature', ())
- if not bad_sigs:
- logger.warning("Empty bad signature list, check ruleset config.")
- return self.result(Result.unknown,
- _("Empty list of malicious signatures"),
- True)
+ self.bad_sigs = self.get_config_relist('signature')
+ if not self.bad_sigs:
+ raise PeekabooRulesetConfigError(
+ "Empty bad signature list, check %s rule config." %
+ self.rule_name)
+ def evaluate_report(self, report):
+ """ Evaluate the sample against signatures that if matched mark a
+ sample as bad. """
# look through matched signatures
sigs = []
for descr in report.signatures:
@@ -252,10 +307,15 @@ class CuckooEvilSigRule(CuckooRule):
# check if there is a "bad" signatures and return bad
matched_bad_sigs = []
- for sig in bad_sigs:
- match = re.search(sig, "\n".join(sigs))
- if match:
- matched_bad_sigs.append(sig)
+ for bad_sig in self.bad_sigs:
+ # iterate over each sig individually to allow regexes to use
+ # anchors such as ^ and $ and avoid mismatches, e.g. by ['foo',
+ # 'bar'] being stringified to "['foo', 'bar']" and matching
+ # /fo.*ar/.
+ for sig in sigs:
+ match = re.search(bad_sig, sig)
+ if match:
+ matched_bad_sigs.append(sig)
if not matched_bad_sigs:
return self.result(Result.unknown,
@@ -274,20 +334,22 @@ class CuckooScoreRule(CuckooRule):
threshold. """
rule_name = 'cuckoo_score'
+ def get_config(self):
+ self.score_threshold = self.get_config_float('higher_than', 4.0)
+
def evaluate_report(self, report):
""" Evaluate the score reported by Cuckoo against the threshold from
the configuration and report sample as bad if above. """
- threshold = float(self.config.get('higher_than', 4.0))
- if report.score >= threshold:
+ if report.score >= self.score_threshold:
return self.result(Result.bad,
_("Cuckoo score >= %s: %s") %
- (threshold, report.score),
+ (self.score_threshold, report.score),
False)
return self.result(Result.unknown,
_("Cuckoo score < %s: %s") %
- (threshold, report.score),
+ (self.score_threshold, report.score),
True)
@@ -296,16 +358,19 @@ class RequestsEvilDomainRule(CuckooRule):
Cuckoo against a blacklist. """
rule_name = 'requests_evil_domain'
+ def get_config(self):
+ self.evil_domains = self.get_config_list('domain')
+ if not self.evil_domains:
+ raise PeekabooRulesetConfigError(
+ "Empty evil domain list, check %s rule config."
+ % self.rule_name)
+
def evaluate_report(self, report):
""" Report the sample as bad if one of the requested domains is on our
list of evil domains. """
- evil_domains = self.config.get('domain', ())
- if not evil_domains:
- logger.warning("Empty evil domain list, check ruleset config.")
- return self.result(Result.unknown, _("Empty domain list"), True)
for domain in report.requested_domains:
- if domain in evil_domains:
+ if domain in self.evil_domains:
return self.result(Result.bad,
_("The file attempts to contact at least "
"one domain on the blacklist (%s)")
@@ -322,6 +387,11 @@ class CuckooAnalysisFailedRule(CuckooRule):
""" A rule checking the final status reported by Cuckoo for success. """
rule_name = 'cuckoo_analysis_failed'
+ def get_config(self):
+ self.failure_matches = self.get_config_list('failure', [])
+ self.success_matches = self.get_config_list(
+ 'success', ['analysis completed successfully'])
+
def evaluate_report(self, report):
""" Report the sample as bad if the Cuckoo indicates that the analysis
has failed. """
@@ -332,19 +402,15 @@ class CuckooAnalysisFailedRule(CuckooRule):
failure_reason = _("Behavioral analysis by Cuckoo has produced "
"an error and did not finish successfully")
- failure_matches = self.config.get('failure')
- if failure_matches is not None:
- for entry in report.cuckoo_server_messages:
- for failure in failure_matches:
- if failure in entry:
- logger.debug('Failure indicator "%s" found in Cuckoo '
- 'messages', failure)
- return self.result(Result.failed, failure_reason, False)
+ for entry in report.cuckoo_server_messages:
+ for failure in self.failure_matches:
+ if failure in entry:
+ logger.debug('Failure indicator "%s" found in Cuckoo '
+ 'messages', failure)
+ return self.result(Result.failed, failure_reason, False)
- success_matches = self.config.get(
- 'success', ['analysis completed successfully'])
for entry in report.cuckoo_server_messages:
- for success in success_matches:
+ for success in self.success_matches:
if success in entry:
logger.debug('Success indicator "%s" found in Cuckoo '
'messages', success)