summaryrefslogtreecommitdiffstats
path: root/peekaboo/ruleset/rules.py
diff options
context:
space:
mode:
Diffstat (limited to 'peekaboo/ruleset/rules.py')
-rw-r--r--peekaboo/ruleset/rules.py180
1 files changed, 123 insertions, 57 deletions
diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py
index a631cc1..90f9dcb 100644
--- a/peekaboo/ruleset/rules.py
+++ b/peekaboo/ruleset/rules.py
@@ -30,7 +30,7 @@ import re
import logging
from peekaboo.ruleset import Result, RuleResult
from peekaboo.exceptions import PeekabooAnalysisDeferred, \
- CuckooSubmitFailedException
+ CuckooSubmitFailedException, PeekabooRulesetConfigError
logger = logging.getLogger(__name__)
@@ -45,13 +45,10 @@ class Rule(object):
def __init__(self, config=None, db_con=None):
""" Initialize common configuration and resources """
self.db_con = db_con
+ self.config = config
- # initialise and retain config as empty dict if no rule config is given
- # to us so the rule can rely on it and does not need to do any type
- # checking
- self.config = {}
- if config is not None:
- self.config = config
+ # initialise and validate configuration
+ self.get_config()
def result(self, result, reason, further_analysis):
""" Construct a RuleResult for returning to the engine. """
@@ -68,6 +65,56 @@ class Rule(object):
"""
raise NotImplementedError
+ def get_config(self):
+ """ Extract this rule's configuration out of the ruleset configuration
+ object given at creation. To be overridden by child classes if they
+ have configuration options. """
+ # pass
+
+ # the following getters are somewhat boilerplate but unavoidable for now.
+ # They serve the purpose of keeping config access specifics out of rules for
+ # the sake of readablility.
+ def get_config_value(self, getter, option, *args, **kwargs):
+ """ Get a configuation value for this rule from the ruleset
+ configuration. Getter routine and option name to be provided by caller.
+ The rule's name is always used as configuration section name.
+
+ @param getter: getter routine to use
+ @type getter: getter method of PeekabooConfigParser
+ @param option: name of option to read
+ @type option: string
+ @param args, kwargs: additional arguments passed to the getter routine,
+ such as fallback.
+
+ @returns: configuration value read from config
+ """
+ # additional common logic to go here
+ return getter(self.rule_name, option, *args, **kwargs)
+
+ def get_config_int(self, option, default=None):
+ """ Get an integer from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getint, option, fallback=default)
+
+ def get_config_float(self, option, default=None):
+ """ Get a float from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getfloat, option, fallback=default)
+
+ def get_config_list(self, option, default=None):
+ """ Get a list from the ruleset configuration. See get_config_value
+ for parameters. """
+ return self.get_config_value(
+ self.config.getlist, option, fallback=default)
+
+ def get_config_relist(self, option, default=None):
+ """ Get a list of compiled regular expressions from the ruleset. See
+ get_config_value for parameters. """
+ return self.get_config_value(
+ self.config.getrelist, option, fallback=default)
+
class KnownRule(Rule):
""" A rule determining if a sample is known by looking at the database for
@@ -92,12 +139,13 @@ class FileLargerThanRule(Rule):
"""
rule_name = 'file_larger_than'
+ def get_config(self):
+ self.size_threshold = self.get_config_int('bytes', 5)
+
def evaluate(self, sample):
""" Evaluate whether the sample is larger than a certain threshold.
Advise the engine to stop processing if the size is below the
threshold. """
- size = int(self.config.get('bytes', 5))
-
try:
sample_size = sample.file_size
except OSError as oserr:
@@ -106,14 +154,15 @@ class FileLargerThanRule(Rule):
_("Failure to determine sample file size: %s") % oserr,
False)
- if sample_size > size:
+ if sample_size > self.size_threshold:
return self.result(Result.unknown,
- _("File has more than %d bytes") % size,
+ _("File has more than %d bytes")
+ % self.size_threshold,
True)
return self.result(
Result.ignored,
- _("File is more than %d bytes long") % sample.file_size,
+ _("File is only %d bytes long") % sample_size,
False)
@@ -122,15 +171,18 @@ class FileTypeOnWhitelistRule(Rule):
whitelist. """
rule_name = 'file_type_on_whitelist'
+ def get_config(self):
+ whitelist = self.get_config_list('whitelist')
+ if not whitelist:
+ raise PeekabooRulesetConfigError(
+ "Empty whitelist, check %s rule config." % self.rule_name)
+
+ self.whitelist = set(whitelist)
+
def evaluate(self, sample):
""" Ignore the file only if *all* of its mime types are on the
whitelist and we could determine at least one. """
- whitelist = self.config.get('whitelist', ())
- if not whitelist:
- logger.warning("Empty whitelist, check ruleset config.")
- return self.result(Result.unknown, "Whitelist ist leer", True)
-
- if sample.mimetypes and sample.mimetypes.issubset(set(whitelist)):
+ if sample.mimetypes and sample.mimetypes.issubset(self.whitelist):
return self.result(Result.ignored,
_("File type is on whitelist"),
False)
@@ -145,15 +197,18 @@ class FileTypeOnGreylistRule(Rule):
greylist, i.e. enabled for analysis. """
rule_name = 'file_type_on_greylist'
+ def get_config(self):
+ greylist = self.get_config_list('greylist')
+ if not greylist:
+ raise PeekabooRulesetConfigError(
+ "Empty greylist, check %s rule config." % self.rule_name)
+
+ self.greylist = set(greylist)
+
def evaluate(self, sample):
""" Continue analysis if any of the sample's MIME types are on the
greylist or in case we don't have one. """
- greylist = self.config.get('greylist', ())
- if not greylist:
- logger.warning("Empty greylist, check ruleset config.")
- return self.result(Result.unknown, "Greylist is leer", False)
-
- if not sample.mimetypes or sample.mimetypes.intersection(set(greylist)):
+ if not sample.mimetypes or sample.mimetypes.intersection(self.greylist):
return self.result(Result.unknown,
_("File type is on the list of types to "
"analyze"),
@@ -232,18 +287,18 @@ class CuckooEvilSigRule(CuckooRule):
of signatures considered bad. """
rule_name = 'cuckoo_evil_sig'
- def evaluate_report(self, report):
- """ Evaluate the sample against signatures that if matched mark a
- sample as bad. """
+ def get_config(self):
# list all installed signatures
# grep -o "description.*" -R . ~/cuckoo2.0/modules/signatures/
- bad_sigs = self.config.get('signature', ())
- if not bad_sigs:
- logger.warning("Empty bad signature list, check ruleset config.")
- return self.result(Result.unknown,
- _("Empty list of malicious signatures"),
- True)
+ self.bad_sigs = self.get_config_relist('signature')
+ if not self.bad_sigs:
+ raise PeekabooRulesetConfigError(
+ "Empty bad signature list, check %s rule config." %
+ self.rule_name)
+ def evaluate_report(self, report):
+ """ Evaluate the sample against signatures that if matched mark a
+ sample as bad. """
# look through matched signatures
sigs = []
for descr in report.signatures:
@@ -252,10 +307,15 @@ class CuckooEvilSigRule(CuckooRule):
# check if there is a "bad" signatures and return bad
matched_bad_sigs = []
- for sig in bad_sigs:
- match = re.search(sig, "\n".join(sigs))
- if match:
- matched_bad_sigs.append(sig)
+ for bad_sig in self.bad_sigs:
+ # iterate over each sig individually to allow regexes to use
+ # anchors such as ^ and $ and avoid mismatches, e.g. by ['foo',
+ # 'bar'] being stringified to "['foo', 'bar']" and matching
+ # /fo.*ar/.
+ for sig in sigs:
+ match = re.search(bad_sig, sig)
+ if match:
+ matched_bad_sigs.append(sig)
if not matched_bad_sigs:
return self.result(Result.unknown,
@@ -274,20 +334,22 @@ class CuckooScoreRule(CuckooRule):
threshold. """
rule_name = 'cuckoo_score'
+ def get_config(self):
+ self.score_threshold = self.get_config_float('higher_than', 4.0)
+
def evaluate_report(self, report):
""" Evaluate the score reported by Cuckoo against the threshold from
the configuration and report sample as bad if above. """
- threshold = float(self.config.get('higher_than', 4.0))
- if report.score >= threshold:
+ if report.score >= self.score_threshold:
return self.result(Result.bad,
_("Cuckoo score >= %s: %s") %
- (threshold, report.score),
+ (self.score_threshold, report.score),
False)
return self.result(Result.unknown,
_("Cuckoo score < %s: %s") %
- (threshold, report.score),
+ (self.score_threshold, report.score),
True)
@@ -296,16 +358,19 @@ class RequestsEvilDomainRule(CuckooRule):
Cuckoo against a blacklist. """
rule_name = 'requests_evil_domain'
+ def get_config(self):
+ self.evil_domains = self.get_config_list('domain')
+ if not self.evil_domains:
+ raise PeekabooRulesetConfigError(
+ "Empty evil domain list, check %s rule config."
+ % self.rule_name)
+
def evaluate_report(self, report):
""" Report the sample as bad if one of the requested domains is on our
list of evil domains. """
- evil_domains = self.config.get('domain', ())
- if not evil_domains:
- logger.warning("Empty evil domain list, check ruleset config.")
- return self.result(Result.unknown, _("Empty domain list"), True)
for domain in report.requested_domains:
- if domain in evil_domains:
+ if domain in self.evil_domains:
return self.result(Result.bad,
_("The file attempts to contact at least "
"one domain on the blacklist (%s)")
@@ -322,6 +387,11 @@ class CuckooAnalysisFailedRule(CuckooRule):
""" A rule checking the final status reported by Cuckoo for success. """
rule_name = 'cuckoo_analysis_failed'
+ def get_config(self):
+ self.failure_matches = self.get_config_list('failure', [])
+ self.success_matches = self.get_config_list(
+ 'success', ['analysis completed successfully'])
+
def evaluate_report(self, report):
""" Report the sample as bad if the Cuckoo indicates that the analysis
has failed. """
@@ -332,19 +402,15 @@ class CuckooAnalysisFailedRule(CuckooRule):
failure_reason = _("Behavioral analysis by Cuckoo has produced "
"an error and did not finish successfully")
- failure_matches = self.config.get('failure')
- if failure_matches is not None:
- for entry in report.cuckoo_server_messages:
- for failure in failure_matches:
- if failure in entry:
- logger.debug('Failure indicator "%s" found in Cuckoo '
- 'messages', failure)
- return self.result(Result.failed, failure_reason, False)
+ for entry in report.cuckoo_server_messages:
+ for failure in self.failure_matches:
+ if failure in entry:
+ logger.debug('Failure indicator "%s" found in Cuckoo '
+ 'messages', failure)
+ return self.result(Result.failed, failure_reason, False)
- success_matches = self.config.get(
- 'success', ['analysis completed successfully'])
for entry in report.cuckoo_server_messages:
- for success in success_matches:
+ for success in self.success_matches:
if success in entry:
logger.debug('Success indicator "%s" found in Cuckoo '
'messages', success)