diff options
Diffstat (limited to 'peekaboo/ruleset/rules.py')
-rw-r--r-- | peekaboo/ruleset/rules.py | 180 |
1 files changed, 123 insertions, 57 deletions
diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py index a631cc1..90f9dcb 100644 --- a/peekaboo/ruleset/rules.py +++ b/peekaboo/ruleset/rules.py @@ -30,7 +30,7 @@ import re import logging from peekaboo.ruleset import Result, RuleResult from peekaboo.exceptions import PeekabooAnalysisDeferred, \ - CuckooSubmitFailedException + CuckooSubmitFailedException, PeekabooRulesetConfigError logger = logging.getLogger(__name__) @@ -45,13 +45,10 @@ class Rule(object): def __init__(self, config=None, db_con=None): """ Initialize common configuration and resources """ self.db_con = db_con + self.config = config - # initialise and retain config as empty dict if no rule config is given - # to us so the rule can rely on it and does not need to do any type - # checking - self.config = {} - if config is not None: - self.config = config + # initialise and validate configuration + self.get_config() def result(self, result, reason, further_analysis): """ Construct a RuleResult for returning to the engine. """ @@ -68,6 +65,56 @@ class Rule(object): """ raise NotImplementedError + def get_config(self): + """ Extract this rule's configuration out of the ruleset configuration + object given at creation. To be overridden by child classes if they + have configuration options. """ + # pass + + # the following getters are somewhat boilerplate but unavoidable for now. + # They serve the purpose of keeping config access specifics out of rules for + # the sake of readablility. + def get_config_value(self, getter, option, *args, **kwargs): + """ Get a configuation value for this rule from the ruleset + configuration. Getter routine and option name to be provided by caller. + The rule's name is always used as configuration section name. + + @param getter: getter routine to use + @type getter: getter method of PeekabooConfigParser + @param option: name of option to read + @type option: string + @param args, kwargs: additional arguments passed to the getter routine, + such as fallback. + + @returns: configuration value read from config + """ + # additional common logic to go here + return getter(self.rule_name, option, *args, **kwargs) + + def get_config_int(self, option, default=None): + """ Get an integer from the ruleset configuration. See get_config_value + for parameters. """ + return self.get_config_value( + self.config.getint, option, fallback=default) + + def get_config_float(self, option, default=None): + """ Get a float from the ruleset configuration. See get_config_value + for parameters. """ + return self.get_config_value( + self.config.getfloat, option, fallback=default) + + def get_config_list(self, option, default=None): + """ Get a list from the ruleset configuration. See get_config_value + for parameters. """ + return self.get_config_value( + self.config.getlist, option, fallback=default) + + def get_config_relist(self, option, default=None): + """ Get a list of compiled regular expressions from the ruleset. See + get_config_value for parameters. """ + return self.get_config_value( + self.config.getrelist, option, fallback=default) + class KnownRule(Rule): """ A rule determining if a sample is known by looking at the database for @@ -92,12 +139,13 @@ class FileLargerThanRule(Rule): """ rule_name = 'file_larger_than' + def get_config(self): + self.size_threshold = self.get_config_int('bytes', 5) + def evaluate(self, sample): """ Evaluate whether the sample is larger than a certain threshold. Advise the engine to stop processing if the size is below the threshold. """ - size = int(self.config.get('bytes', 5)) - try: sample_size = sample.file_size except OSError as oserr: @@ -106,14 +154,15 @@ class FileLargerThanRule(Rule): _("Failure to determine sample file size: %s") % oserr, False) - if sample_size > size: + if sample_size > self.size_threshold: return self.result(Result.unknown, - _("File has more than %d bytes") % size, + _("File has more than %d bytes") + % self.size_threshold, True) return self.result( Result.ignored, - _("File is more than %d bytes long") % sample.file_size, + _("File is only %d bytes long") % sample_size, False) @@ -122,15 +171,18 @@ class FileTypeOnWhitelistRule(Rule): whitelist. """ rule_name = 'file_type_on_whitelist' + def get_config(self): + whitelist = self.get_config_list('whitelist') + if not whitelist: + raise PeekabooRulesetConfigError( + "Empty whitelist, check %s rule config." % self.rule_name) + + self.whitelist = set(whitelist) + def evaluate(self, sample): """ Ignore the file only if *all* of its mime types are on the whitelist and we could determine at least one. """ - whitelist = self.config.get('whitelist', ()) - if not whitelist: - logger.warning("Empty whitelist, check ruleset config.") - return self.result(Result.unknown, "Whitelist ist leer", True) - - if sample.mimetypes and sample.mimetypes.issubset(set(whitelist)): + if sample.mimetypes and sample.mimetypes.issubset(self.whitelist): return self.result(Result.ignored, _("File type is on whitelist"), False) @@ -145,15 +197,18 @@ class FileTypeOnGreylistRule(Rule): greylist, i.e. enabled for analysis. """ rule_name = 'file_type_on_greylist' + def get_config(self): + greylist = self.get_config_list('greylist') + if not greylist: + raise PeekabooRulesetConfigError( + "Empty greylist, check %s rule config." % self.rule_name) + + self.greylist = set(greylist) + def evaluate(self, sample): """ Continue analysis if any of the sample's MIME types are on the greylist or in case we don't have one. """ - greylist = self.config.get('greylist', ()) - if not greylist: - logger.warning("Empty greylist, check ruleset config.") - return self.result(Result.unknown, "Greylist is leer", False) - - if not sample.mimetypes or sample.mimetypes.intersection(set(greylist)): + if not sample.mimetypes or sample.mimetypes.intersection(self.greylist): return self.result(Result.unknown, _("File type is on the list of types to " "analyze"), @@ -232,18 +287,18 @@ class CuckooEvilSigRule(CuckooRule): of signatures considered bad. """ rule_name = 'cuckoo_evil_sig' - def evaluate_report(self, report): - """ Evaluate the sample against signatures that if matched mark a - sample as bad. """ + def get_config(self): # list all installed signatures # grep -o "description.*" -R . ~/cuckoo2.0/modules/signatures/ - bad_sigs = self.config.get('signature', ()) - if not bad_sigs: - logger.warning("Empty bad signature list, check ruleset config.") - return self.result(Result.unknown, - _("Empty list of malicious signatures"), - True) + self.bad_sigs = self.get_config_relist('signature') + if not self.bad_sigs: + raise PeekabooRulesetConfigError( + "Empty bad signature list, check %s rule config." % + self.rule_name) + def evaluate_report(self, report): + """ Evaluate the sample against signatures that if matched mark a + sample as bad. """ # look through matched signatures sigs = [] for descr in report.signatures: @@ -252,10 +307,15 @@ class CuckooEvilSigRule(CuckooRule): # check if there is a "bad" signatures and return bad matched_bad_sigs = [] - for sig in bad_sigs: - match = re.search(sig, "\n".join(sigs)) - if match: - matched_bad_sigs.append(sig) + for bad_sig in self.bad_sigs: + # iterate over each sig individually to allow regexes to use + # anchors such as ^ and $ and avoid mismatches, e.g. by ['foo', + # 'bar'] being stringified to "['foo', 'bar']" and matching + # /fo.*ar/. + for sig in sigs: + match = re.search(bad_sig, sig) + if match: + matched_bad_sigs.append(sig) if not matched_bad_sigs: return self.result(Result.unknown, @@ -274,20 +334,22 @@ class CuckooScoreRule(CuckooRule): threshold. """ rule_name = 'cuckoo_score' + def get_config(self): + self.score_threshold = self.get_config_float('higher_than', 4.0) + def evaluate_report(self, report): """ Evaluate the score reported by Cuckoo against the threshold from the configuration and report sample as bad if above. """ - threshold = float(self.config.get('higher_than', 4.0)) - if report.score >= threshold: + if report.score >= self.score_threshold: return self.result(Result.bad, _("Cuckoo score >= %s: %s") % - (threshold, report.score), + (self.score_threshold, report.score), False) return self.result(Result.unknown, _("Cuckoo score < %s: %s") % - (threshold, report.score), + (self.score_threshold, report.score), True) @@ -296,16 +358,19 @@ class RequestsEvilDomainRule(CuckooRule): Cuckoo against a blacklist. """ rule_name = 'requests_evil_domain' + def get_config(self): + self.evil_domains = self.get_config_list('domain') + if not self.evil_domains: + raise PeekabooRulesetConfigError( + "Empty evil domain list, check %s rule config." + % self.rule_name) + def evaluate_report(self, report): """ Report the sample as bad if one of the requested domains is on our list of evil domains. """ - evil_domains = self.config.get('domain', ()) - if not evil_domains: - logger.warning("Empty evil domain list, check ruleset config.") - return self.result(Result.unknown, _("Empty domain list"), True) for domain in report.requested_domains: - if domain in evil_domains: + if domain in self.evil_domains: return self.result(Result.bad, _("The file attempts to contact at least " "one domain on the blacklist (%s)") @@ -322,6 +387,11 @@ class CuckooAnalysisFailedRule(CuckooRule): """ A rule checking the final status reported by Cuckoo for success. """ rule_name = 'cuckoo_analysis_failed' + def get_config(self): + self.failure_matches = self.get_config_list('failure', []) + self.success_matches = self.get_config_list( + 'success', ['analysis completed successfully']) + def evaluate_report(self, report): """ Report the sample as bad if the Cuckoo indicates that the analysis has failed. """ @@ -332,19 +402,15 @@ class CuckooAnalysisFailedRule(CuckooRule): failure_reason = _("Behavioral analysis by Cuckoo has produced " "an error and did not finish successfully") - failure_matches = self.config.get('failure') - if failure_matches is not None: - for entry in report.cuckoo_server_messages: - for failure in failure_matches: - if failure in entry: - logger.debug('Failure indicator "%s" found in Cuckoo ' - 'messages', failure) - return self.result(Result.failed, failure_reason, False) + for entry in report.cuckoo_server_messages: + for failure in self.failure_matches: + if failure in entry: + logger.debug('Failure indicator "%s" found in Cuckoo ' + 'messages', failure) + return self.result(Result.failed, failure_reason, False) - success_matches = self.config.get( - 'success', ['analysis completed successfully']) for entry in report.cuckoo_server_messages: - for success in success_matches: + for success in self.success_matches: if success in entry: logger.debug('Success indicator "%s" found in Cuckoo ' 'messages', success) |