From 160ebc6b7e41b5f6434713bba4f375f5e003ba71 Mon Sep 17 00:00:00 2001 From: Felix Bauer Date: Tue, 27 Aug 2019 11:53:16 +0200 Subject: Add implementation of generic rules (expressions) (#96) Generic rules allow to access Sample attributes and Cuckoo Report attributes from the rulest configuration and construct expressions that evaluate to a result. Implement a parser for generic rules based on pyparsing. Implement a rule that can use generic logical expressions to categorize samples. Smime signatures can now directly be ignored in the ruleset as a combination of delcared name and declared type (closes #83, closes #42. --- docs/source/ruleset.rst | 88 +++++++ peekaboo/ruleset/engine.py | 1 + peekaboo/ruleset/expressions.py | 515 ++++++++++++++++++++++++++++++++++++++++ peekaboo/ruleset/rules.py | 104 ++++++-- peekaboo/sample.py | 19 -- peekaboo/toolbox/cuckoo.py | 14 ++ requirements.txt | 1 + ruleset.conf.sample | 21 +- tests/test.py | 83 ++++++- 9 files changed, 803 insertions(+), 43 deletions(-) create mode 100644 docs/source/ruleset.rst create mode 100644 peekaboo/ruleset/expressions.py diff --git a/docs/source/ruleset.rst b/docs/source/ruleset.rst new file mode 100644 index 0000000..46d4bd9 --- /dev/null +++ b/docs/source/ruleset.rst @@ -0,0 +1,88 @@ +======= +Ruleset +======= + +This chapter explains how to use and take care of the ruleset. We assume you +have peekaboo up and running and want to tweak or understand the default +ruleset. + +We also asume you are familiar with python config parser. + +Section: rules +============== + +Here rules can be disabled by putting a ``#`` (comment) in front. Also the +order in which the rules will be processed can be changed by changing how +the rules are listed (note that the trailing number is not relevant). + +Following sections +================== + +The following sections are processed (if enabled in rules section) and +contain for example the whitelist mime types. Individual entries within +for example the whitelist can be disabled by putting an ``#`` in front. + +Expressions +=========== + +* rule : a rule of the ruleset, e.g. evil_sig or expression +* expression : an expression of the expression rule +* condition : the condition before ``->`` + +Expressions will be tried one after another until one matches. The general +structure of an expression is: `` -> ``. If condition +evaluates to true, the expression will be considered matching and result will +be returned by the rule. + +Possible results are: ``unknown``, ``ignore``, ``good`` and ``bad``. The +latter three will terminate ruleset processing and use the result as final +analysis result while the former will continue on with the next rule of the +ruleset. + +It is a lot like Python itself. + +They can contain operators: +``+ - * ** / // % << >> . < <= > >= == != in not in is is not isdisjoint and or`` + +Datatypes are: +``boolean, integer, real, string, regex, identifier, result`` + +Rules can then be constructed like: + +.. code-block:: shell + + expression.1 : sample.mimetypes <= {'text/plain', 'inode/x-empty', 'image/jpeg'} -> ignore + expression.2 : sample.meta_info_name_declared == 'smime.p7s' + and sample.meta_info_type_declared in { + 'application/pkcs7-signature', + 'application/x-pkcs7-signature', + 'application/pkcs7-mime', + 'application/x-pkcs7-mime' + } -> ignore + expression.3 : /DDE/ in cuckooreport.signature_descriptions -> bad + +Attributes of sample +-------------------- + +.. code-block:: shell + + filename + sha256sum + name_declared + file_extension + mimetypes + file_size + meta_info_name_declared + meta_info_type_declared + +Attributes of cuckooreport +-------------------------- + +.. code-block:: shell + + requested_domains + signatures + signature_descriptions + score + errors + cuckoo_server_messages diff --git a/peekaboo/ruleset/engine.py b/peekaboo/ruleset/engine.py index 6e736f5..c615e2b 100644 --- a/peekaboo/ruleset/engine.py +++ b/peekaboo/ruleset/engine.py @@ -44,6 +44,7 @@ class RulesetEngine(object): known_rules = [ KnownRule, FileLargerThanRule, + ExpressionRule, FileTypeOnWhitelistRule, FileTypeOnGreylistRule, CuckooEvilSigRule, diff --git a/peekaboo/ruleset/expressions.py b/peekaboo/ruleset/expressions.py new file mode 100644 index 0000000..e4fe0ad --- /dev/null +++ b/peekaboo/ruleset/expressions.py @@ -0,0 +1,515 @@ +############################################################################### +# # +# Peekaboo Extended Email Attachment Behavior Observation Owl # +# # +# ruleset/ # +# expressions.py # +############################################################################### +# # +# Copyright (C) 2016-2019 science + computing ag # +# Based on pyparsing's eval_arith.py. +# Copyright 2009, 2011 Paul McGuire +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or (at # +# your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # +# General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +""" A simple expression grammar used for writing generic rules. """ + +from future.builtins import super + +import logging +import operator +import re +from pyparsing import nums, alphas, alphanums, Word, Combine, Suppress, \ + oneOf, opAssoc, infixNotation, Literal, Keyword, Group, \ + delimitedList, QuotedString, ParserElement, ParseException +from peekaboo.ruleset import Result + + +logger = logging.getLogger(__name__) + + +class EvalBase(object): + """ Base class of evaluatable objects providing common infrastructure. """ + def __init__(self, tokens): + """ Just store the tokens for later evaluation. Expects all relevant + tokens to be grouped together in the first element of the token list + passed. This is the default for operand+operator+operand+... constructs + with infixNotation and can be forced for others using Group(): + + rvallist = Group(Suppress('[') + delimitedList(rval) + Suppress(']')) + """ + self.value = self.token = tokens[0] + self.context = None + self.convert() + self.string_repr_format = "(%s)" + + def convert(self): + """ Method to (optionally) convert the input token(s) into something + else. Particularly used for conversion to base types. """ + self.value = self.token + + def feedback(self, info): + """ Accept and process feedback evaluation children. """ + # by default propagate feedback upwards if we have a context + if self.context and 'parent' in self.context: + self.context['parent'].feedback(info) + + def subeval(self, expression, update=None): + """ Evaluate a subexpression with an updated evaluation context + containing common metadata such as that we're it's parent and optional + additional data. """ + context = self.context.copy() + context['parent'] = self + if update: + context.update(update) + return expression.eval(context) + + def set_context(self, context): + """ Save an evaluation context internally for later use by e.g. + feedback(). """ + self.context = context + + def eval(self, context): + """ Evaluate the object content against a context. Just return the + stored (and optionally converted) value by default and remember our + context for possible feedback to our parent or from our children. """ + self.set_context(context) + return self.value + + def __str__(self): + return self.string_repr_format % ( + " ".join(["%s" % x for x in self.token])) + + +class EvalBoolean(EvalBase): + """ Class to evaluate a parsed boolean constant """ + def convert(self): + logger.debug("Boolean: %s", self.value) + self.value = self.token == "True" + + +class EvalInteger(EvalBase): + """ Class to evaluate a parsed integer constant """ + def convert(self): + logger.debug("Integer: %s", self.token) + self.value = int(self.token) + + +class EvalReal(EvalBase): + """ Class to evaluate a parsed real constant """ + def convert(self): + logger.debug("Real: %s", self.token) + self.value = float(self.token) + + +class EvalString(EvalBase): + """ Class to evaluate a parsed string constant """ + def convert(self): + logger.debug("String: %s", self.token) + self.value = self.token + + +class OperatorRegex(object): + """ A class implementing operators on regular expressions. """ + def __init__(self, string): + self.regex = re.compile(string) + + @staticmethod + def compare_op_impl(function, other): + """ Implement handling of iterable operands. """ + if isinstance(other, (list, set)): + for val in other: + logger.debug("Regular expression match: %s == %s", + function, val) + if function(val): + return True + return False + + return function(other) + + def __eq__(self, other): + """ Implement equality using re.match """ + logger.debug("Regular expression match: %s == %s", self.regex, other) + return self.compare_op_impl(self.regex.match, other) + + def __contains__(self, other): + """ Implement membership using re.search """ + logger.debug("Regular expression search: %s in %s", self.regex, other) + return self.compare_op_impl(self.regex.search, other) + + +class EvalRegex(EvalBase): + """ Class to evaluate a regular expression """ + def convert(self): + logger.debug("Regular expression: %s", self.token) + self.value = OperatorRegex(self.token) + + def eval(self, context): + self.set_context(context) + self.feedback({'regex_parsed': True}) + return self.value + + +class RegexIterableMixIn(object): + """ Common functionality for lists and sets containing regular expressions + with different behaviour of membership operators. """ + def __eq__(self, other): + if not isinstance(other, (list, set)): + other = [other] + + # in contrast to normal lists, a list of regexes compared to a list + # of strings is considered equal if any regex matches any string + for regex in self: + logger.debug("Eval regex: %s == %s", regex, other) + if regex == other: + return True + + return False + + def __contains__(self, item): + for regex in self: + logger.debug("Eval regex: %s in %s", regex, item) + # we implement "regex in string" of our grammar as "string in + # regex" so that our overridden operator + # regex.__contains__(string) is called and searching can be + # done. Otherwise error "TypeError: 'in ' requires + # string as left operand, not OperatorRegex" would ensue. + if item in regex: + return True + + return False + + +class RegexList(RegexIterableMixIn, list): + """ A list containing regular expressions with different behaviour of + membership operators. """ + + +class RegexSet(RegexIterableMixIn, set): + """ A set containing regular expressions with different behaviour of + membership operators. """ + + +class EvalRegexIterableMixIn(object): + """ Common functionality for iterables which may contain regular + expressions. """ + def __init__(self, tokens): + super().__init__(tokens) + self.contains_regexes = False + + def feedback(self, info): + """ Mark this object as containing regular expressions if a child + object reports so in its feedback to us. """ + if 'regex_parsed' in info: + self.contains_regexes = True + del info['regex_parsed'] + + super().feedback(info) + + +class EvalList(EvalRegexIterableMixIn, EvalBase): + """ Class to evaluate a parsed list """ + def __init__(self, token): + super().__init__(token) + self.string_repr_format = "[%s]" + + def eval(self, context): + self.set_context(context) + logger.debug("List: %s", self.value) + ret = [] + for val in self.value: + ret.append(self.subeval(val)) + if self.contains_regexes: + return RegexList(ret) + return ret + + +class EvalSet(EvalRegexIterableMixIn, EvalBase): + """ Class to evaluate a parsed list """ + def __init__(self, token): + super().__init__(token) + self.string_repr_format = "{%s}" + + def eval(self, context): + self.set_context(context) + logger.debug("Set: %s", self.value) + ret = set() + for val in self.value: + ret.add(self.subeval(val)) + if self.contains_regexes: + return RegexSet(ret) + return ret + + +class IdentifierMissingException(KeyError): + pass + + +class EvalIdentifier(EvalBase): + """ Class to evaluate a parsed object name """ + def eval(self, context): + logger.debug("Identifier: %s", self.value) + if 'member' in context and context['member']: + return self.value + + try: + return context['variables'][self.value] + except KeyError as error: + raise IdentifierMissingException(error.args[0]) + + +class EvalResult(EvalBase): + """ Class to evaluate a analysis result """ + def convert(self): + logger.debug("Result: %s", self.token) + result_map = { + 'fail': Result.failed, + 'ignore': Result.ignored, + } + + if self.token in result_map: + self.value = result_map[self.token] + else: + self.value = Result[self.token] + + +class EvalModifier(EvalBase): + """ Class to evaluate typical single-operand modifier expressions such as + explicit sign change, bitwise and logical not. """ + def __init__(self, tokens): + super().__init__(tokens) + self.operator, self.value = tokens[0] + + def eval(self, context): + self.set_context(context) + val = self.subeval(self.value) + if self.operator == '+': + return val + elif self.operator == '-': + return -val + elif self.operator == '~': + return ~val + elif self.operator == 'not': + return not val + + raise ValueError('Invalid operator %s' % self.operator) + + +class EvalPower(EvalBase): + """ Class to evaluate exponentiation expressions """ + def eval(self, context): + self.set_context(context) + res = self.subeval(self.value[-1]) + for val in self.value[-3::-2]: + res = self.subeval(val)**res + + return res + + +def operator_operands(tokenlist): + """ Generator to extract operators and operands in pairs """ + iterator = iter(tokenlist) + while True: + try: + yield (next(iterator), next(iterator)) + except StopIteration: + break + + +class EvalArith(EvalBase): + """ Class to evaluate typical arithmetic and bitwise operations like + addition, multiplication, division and shifts expressions. Operator + precedence is handled by the order in which they're evaluated by the + parser, i.e. given to infixNotation. """ + def eval(self, context): + self.set_context(context) + ret = self.subeval(self.value[0]) + for op, val in operator_operands(self.value[1:]): + if op == '+': + ret += self.subeval(val) + elif op == '-': + ret -= self.subeval(val) + elif op == '*': + ret *= self.subeval(val) + elif op == '/': + ret /= self.subeval(val) + elif op == '//': + ret //= self.subeval(val) + elif op == '%': + ret %= self.subeval(val) + elif op == '<<': + ret <<= self.subeval(val) + elif op == '>>': + ret >>= self.subeval(val) + elif op == '.': + ret = getattr(ret, self.subeval(val, update={'member': True})) + elif op == "->": + if ret: + ret = self.subeval(val) + else: + ret = None + else: + raise ValueError('Invalid operator %s' % op) + + return ret + + +class EvalLogic(EvalBase): + """ Class to evaluate comparison expressions """ + def __init__(self, tokens): + super().__init__(tokens) + self.operator_map = { + "<": operator.lt, + "<=": operator.le, + ">": operator.gt, + ">=": operator.ge, + "==": operator.eq, + "!=": operator.ne, + "in": EvalLogic.in_, + "not in": EvalLogic.not_in, + "is": operator.is_, + "is not": operator.is_not, + "isdisjoint": lambda a, b: a.isdisjoint(b), + "and": operator.and_, + "or": operator.or_, + } + + @staticmethod + def in_(a, b): + """ Literally implement membership test. Make it a static method so we + can do identity checks. Do not use operator.contains because it needs + operands swapped. """ + return a in b + + @staticmethod + def not_in(a, b): + """ Naively implement non-membership test. """ + return a not in b + + @staticmethod + def handle_regexes(function, val1, val2): + """ Special handling of equality and membership checks for regular + expressions. """ + if (function in (operator.eq, operator.ne) + and isinstance(val2, (OperatorRegex, RegexIterableMixIn))): + # swap operands around in case the first does not contain any regex + # but the other does to reliably reroute to our overridden __eq__ + # operator, just do that always to keep checks simple since + # (in)equality is commutative anyway + val1, val2 = val2, val1 + elif (function in (EvalLogic.in_, EvalLogic.not_in) + and isinstance(val1, (OperatorRegex, RegexIterableMixIn))): + # " in |" of our grammar directly + # implemented using the "in" operator would call + # |.__contains__() which we cannot + # override with reasonable effort. To get a call of + # .__contains__(|) we need to + # switch operands. Otherwise error "TypeError: 'in ' + # requires string as left operand, not OperatorRegex" would ensue. + val1, val2 = val2, val1 + + # nothing special + return function(val1, val2) + + def eval(self, context): + self.set_context(context) + val1 = self.subeval(self.value[0]) + for op, parseobj in operator_operands(self.value[1:]): + val2 = self.subeval(parseobj) + logger.debug("Comparison: %s %s %s", val1, op, val2) + function = self.operator_map[op] + if not self.handle_regexes(function, val1, val2): + break + val1 = val2 + else: + return True + + return False + + +class ExpressionParser(object): + """ Define and run the parser. """ + def __init__(self): + # speed up infixNotation considerably at the price of some cache memory + ParserElement.enablePackrat() + + boolean = Keyword('True') | Keyword('False') + integer = Word(nums) + real = Combine(Word(nums) + "." + Word(nums)) + string = (QuotedString('"', escChar='\\') + | QuotedString("'", escChar='\\')) + regex = QuotedString('/', escChar='\\') + identifier = Word(alphas, alphanums + '_') + dereference = infixNotation(identifier, [ + (Literal('.'), 2, opAssoc.LEFT, EvalArith), + ]) + result = (Keyword('bad') | Keyword('fail') | Keyword('good') + | Keyword('ignore') | Keyword('unknown')) + rval = boolean | real | integer | string | regex | result | dereference + rvallist = Group(Suppress('[') + delimitedList(rval) + Suppress(']')) + rvalset = Group(Suppress('{') + delimitedList(rval) + Suppress('}')) + operand = rval | rvallist | rvalset + + # parse actions replace the parsed tokens with an instantiated object + # which we can later call into for evaluation of its content + boolean.setParseAction(EvalBoolean) + integer.setParseAction(EvalInteger) + real.setParseAction(EvalReal) + string.setParseAction(EvalString) + regex.setParseAction(EvalRegex) + identifier.setParseAction(EvalIdentifier) + result.setParseAction(EvalResult) + rvallist.setParseAction(EvalList) + rvalset.setParseAction(EvalSet) + + identity_test = Keyword('is') + ~Keyword('not') | Combine( + Keyword('is') + Keyword('not'), adjacent=False, joinString=' ') + membership_test = Keyword('in') | Combine( + Keyword('not') + Keyword('in'), adjacent=False, joinString=' ') + comparison_op = oneOf('< <= > >= != == isdisjoint') + comparison = identity_test | membership_test | comparison_op + + self.parser = infixNotation(operand, [ + (Literal('**'), 2, opAssoc.LEFT, EvalPower), + (oneOf('+ - ~'), 1, opAssoc.RIGHT, EvalModifier), + (oneOf('* / // %'), 2, opAssoc.LEFT, EvalArith), + (oneOf('+ -'), 2, opAssoc.LEFT, EvalArith), + (oneOf('<< >>'), 2, opAssoc.LEFT, EvalArith), + (Literal('&'), 2, opAssoc.LEFT, EvalArith), + (Literal('^'), 2, opAssoc.LEFT, EvalArith), + (Literal('|'), 2, opAssoc.LEFT, EvalArith), + (comparison, 2, opAssoc.LEFT, EvalLogic), + (Keyword('not'), 1, opAssoc.RIGHT, EvalModifier), + (Keyword('and'), 2, opAssoc.LEFT, EvalLogic), + (Keyword('or'), 2, opAssoc.LEFT, EvalLogic), + (Keyword('->'), 2, opAssoc.LEFT, EvalArith), + ]) + + def parse(self, expression): + """ Parse an expression and return an object supporting evaluation of + that expression against a context. """ + try: + return self.parser.parseString(expression, parseAll=True)[0] + except ParseException as parse_error: + col = parse_error.col + raise SyntaxError( + "Expression parse error near character %d: %s>>%s<<%s" % ( + parse_error.col, expression[0:col], expression[col], + expression[col+1:])) + + +if __name__ == '__main__': + print(ExpressionParser().parse('foo == (bar - blub)')) diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py index 6635c0c..36daa1c 100644 --- a/peekaboo/ruleset/rules.py +++ b/peekaboo/ruleset/rules.py @@ -29,6 +29,8 @@ import re import logging from peekaboo.ruleset import Result, RuleResult +from peekaboo.ruleset.expressions import ExpressionParser, \ + IdentifierMissingException from peekaboo.exceptions import PeekabooAnalysisDeferred, \ CuckooSubmitFailedException, PeekabooRulesetConfigError from peekaboo.toolbox.ole import Oletools, OletoolsReport, \ @@ -104,6 +106,33 @@ class Rule(object): return self.config.get_by_type( self.rule_name, option, fallback=default, option_type=option_type) + def get_cuckoo_report(self, sample): + """ Get the samples cuckoo_report or submit the sample for analysis by + Cuckoo. + + @returns: CuckooReport + """ + report = sample.cuckoo_report + if report is not None: + return report + + try: + job_id = sample.submit_to_cuckoo() + except CuckooSubmitFailedException as failed: + logger.error("Submit to Cuckoo failed: %s", failed) + # exception message intentionally not present in message + # delivered back to client as to not disclose internal + # information, should request user to contact admin instead + return self.result( + Result.failed, + _("Behavioral analysis by Cuckoo has produced an error " + "and did not finish successfully"), + False) + + logger.info('Sample submitted to Cuckoo. Job ID: %s. ' + 'Sample: %s', job_id, sample) + raise PeekabooAnalysisDeferred() + class KnownRule(Rule): """ A rule determining if a sample is known by looking at the database for @@ -293,24 +322,7 @@ class CuckooRule(Rule): @raises PeekabooAnalysisDeferred: if the sample was submitted to Cuckoo @returns: RuleResult containing verdict. """ - report = sample.cuckoo_report - if report is None: - try: - job_id = sample.submit_to_cuckoo() - except CuckooSubmitFailedException as failed: - logger.error("Submit to Cuckoo failed: %s", failed) - # exception message intentionally not present in message - # delivered back to client as to not disclose internal - # information, should request user to contact admin instead - return self.result( - Result.failed, - _("Behavioral analysis by Cuckoo has produced an error " - "and did not finish successfully"), - False) - - logger.info('Sample submitted to Cuckoo. Job ID: %s. ' - 'Sample: %s', job_id, sample) - raise PeekabooAnalysisDeferred() + report = self.get_cuckoo_report(sample) # call report evaluation function if we get here return self.evaluate_report(report) @@ -467,6 +479,62 @@ class CuckooAnalysisFailedRule(CuckooRule): return self.result(Result.failed, failure_reason, False) +class ExpressionRule(Rule): + """ A rule checking the sample and cuckoo report against an almost + arbitrary logical expression. """ + rule_name = 'expressions' + + def get_config(self): + expressions = self.get_config_value('expression', []) + if not expressions: + raise PeekabooRulesetConfigError( + "List of expressions empty, check %s rule config." + % self.rule_name) + + self.rules = [] + parser = ExpressionParser() + for expr in expressions: + try: + rule = parser.parse(expr) + logger.debug("EXPR: %s", expr) + logger.debug("RULE: %s", rule) + self.rules.append(rule) + except SyntaxError as error: + raise PeekabooRulesetConfigError(error) + + def evaluate(self, sample): + """ Match what rules report against our known result status names. """ + for rule in self.rules: + result = None + context = {'variables': {'sample': sample}} + + while result is None: + try: + result = rule.eval(context = context) + # otherwise this is an endless loop + if result is None: + break + except IdentifierMissingException as error: + if error.message == "cuckooreport": + context['variables']['cuckooreport'] = self.get_cuckoo_report(sample) + # here elif for other reports + else: + return self.result( + Result.failed, + _("Evaluation of expression uses undefined identifier."), + False) + + if result: + return self.result(result, + _("A rule classified the sample as %s") + % result, + False) + + return self.result(Result.unknown, + _("No rule classified the sample in any way."), + True) + + class FinalRule(Rule): """ A catch-all rule. """ rule_name = 'final_rule' diff --git a/peekaboo/sample.py b/peekaboo/sample.py index d4c5a62..58099a7 100644 --- a/peekaboo/sample.py +++ b/peekaboo/sample.py @@ -423,25 +423,6 @@ class Sample(object): if self.name_declared: declared_filename = self.name_declared - # check if the sample is an S/MIME signature (smime.p7s) - # If so, don't overwrite the MIME type since we do not want to analyse - # S/MIME signatures. - # FIXME: This is oddly specific for this generic routine. Should it be - # some sort of callback or plugin? - leave_alone_types = { - 'p7s': [ - 'application/pkcs7-signature', - 'application/x-pkcs7-signature', - 'application/pkcs7-mime', - 'application/x-pkcs7-mime', - ] - } - - if declared_filename == 'smime.p7s' and declared_mt in leave_alone_types['p7s']: - logger.info('S/MIME signature detected. Using declared MIME type over detected ones.') - self.__mimetypes = set([declared_mt]) - return self.__mimetypes - # determine mime on original p[0-9]* file # result of __submit_path would be "inode/symlink" content_based_mime_type = guess_mime_type_from_file_contents(self.__path) diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py index 47966aa..8b0b646 100644 --- a/peekaboo/toolbox/cuckoo.py +++ b/peekaboo/toolbox/cuckoo.py @@ -534,6 +534,20 @@ class CuckooReport(object): except KeyError: return [] + @property + def signature_descriptions(self): + """ + Gets the description of triggered Cuckoo signatures from report. + + @returns: The description of triggered signatures from the Cuckoo + report or empty list if there was an error parsing the + Cuckoo report. + """ + descriptions = [] + for sig in self.signatures: + descriptions.append(sig['description']) + return descriptions + @property def score(self): """ diff --git a/requirements.txt b/requirements.txt index 4681594..d814a70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ yara-python>=3.6.3 requests>=2.19.0 configparser future +pyparsing diff --git a/ruleset.conf.sample b/ruleset.conf.sample index 9a9ea73..789a45f 100644 --- a/ruleset.conf.sample +++ b/ruleset.conf.sample @@ -11,11 +11,12 @@ rule.3 : file_type_on_whitelist rule.4 : file_type_on_greylist #rule.5 : office_macro #rule.6 : office_macro_with_suspicious_keyword -rule.7 : cuckoo_evil_sig -rule.8 : cuckoo_score -#rule.9 : requests_evil_domain -rule.10 : cuckoo_analysis_failed -#rule.11 : contains_peekabooyar +rule.7 : expressions +rule.8 : cuckoo_evil_sig +rule.9 : cuckoo_score +#rule.10 : requests_evil_domain +rule.11 : cuckoo_analysis_failed +#rule.12 : contains_peekabooyar rule.12 : final_rule # rule specific configuration options @@ -76,6 +77,16 @@ greylist.36 : application/msword keyword.1 : AutoOpen keyword.2 : AutoClose +[expressions] +expression.1 : sample.mimetypes <= {'text/plain', 'inode/x-empty'} -> ignore +expression.2 : sample.meta_info_name_declared == 'smime.p7s' + and sample.meta_info_type_declared in { + 'application/pkcs7-signature', + 'application/x-pkcs7-signature', + 'application/pkcs7-mime', + 'application/x-pkcs7-mime' + } -> ignore + [cuckoo_evil_sig] signature.1 : A potential heapspray has been detected. .* signature.2 : A process attempted to delay the analysis task. diff --git a/tests/test.py b/tests/test.py index 39318fb..667255f 100755 --- a/tests/test.py +++ b/tests/test.py @@ -52,7 +52,9 @@ from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \ FileTypeOnGreylistRule, CuckooAnalysisFailedRule, \ KnownRule, FileLargerThanRule, CuckooEvilSigRule, \ CuckooScoreRule, RequestsEvilDomainRule, FinalRule, \ - OfficeMacroRule, OfficeMacroWithSuspiciousKeyword + OfficeMacroRule, OfficeMacroWithSuspiciousKeyword, \ + ExpressionRule + from peekaboo.toolbox.cuckoo import CuckooReport from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError # pylint: enable=wrong-import-position @@ -780,6 +782,85 @@ unknown : baz''' result = rule.evaluate(sample) self.assertEqual(result.result, expected) + def test_rule_ignore_generic_whitelist(self): + """ Test rule to ignore file types on whitelist. """ + config = '''[expressions] + expression.4 : sample.mimetypes <= {'text/plain', 'inode/x-empty', 'image/jpeg'} -> ignore + ''' + factory = CreatingSampleFactory( + cuckoo=None, base_dir="", + job_hash_regex="", keep_mail_data=False, + processing_info_dir=None) + + sample = factory.create_sample('file.txt', 'abc') + rule = ExpressionRule(CreatingConfigParser(config)) + result = rule.evaluate(sample) + self.assertEqual(result.result, Result.ignored) + + sample = factory.create_sample('file.html', ' ignore''' + + part = { "full_name": "p001", + "name_declared": "smime.p7s", + "type_declared": "application/pkcs7-signature" + } + + factory = SampleFactory( + cuckoo=None, base_dir=None, job_hash_regex=None, + keep_mail_data=False, processing_info_dir=None) + + sample = factory.make_sample('', metainfo=part) + rule = ExpressionRule(CreatingConfigParser(config)) + result = rule.evaluate(sample) + self.assertEqual(result.result, Result.ignored) + + sample.meta_info_name_declared = "file" + rule = ExpressionRule(CreatingConfigParser(config)) + result = rule.evaluate(sample) + self.assertEqual(result.result, Result.unknown) + + def test_rule_expressions(self): + """ Test generic rule on cuckoo report. """ + config = '''[expressions] + expression.1 : /DDE/ in cuckooreport.signature_descriptions -> bad + ''' + + report = { + "signatures": [ + { "description": "Malicious document featuring Office DDE has been identified" } + ] + } + cuckooreport = CuckooReport(report) + + factory = SampleFactory( + cuckoo=None, base_dir=None, job_hash_regex=None, + keep_mail_data=False, processing_info_dir=None) + + sample = factory.make_sample('') + sample.register_cuckoo_report(cuckooreport) + rule = ExpressionRule(CreatingConfigParser(config)) + result = rule.evaluate(sample) + self.assertEqual(result.result, Result.bad) + def test_config_file_type_on_whitelist(self): """ Test whitelist rule configuration. """ config = '''[file_type_on_whitelist] -- cgit v1.2.3