summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Bauer <jack@ai4me.de>2019-08-27 11:53:16 +0200
committerGitHub <noreply@github.com>2019-08-27 11:53:16 +0200
commit160ebc6b7e41b5f6434713bba4f375f5e003ba71 (patch)
tree4f14a2994bfadc02d76e9c7b99e0fc5f1fc349bf
parent18d0ac2148d76aaf1dfe659c1a11cf74021ebae9 (diff)
Add implementation of generic rules (expressions) (#96)
Generic rules allow to access Sample attributes and Cuckoo Report attributes from the rulest configuration and construct expressions that evaluate to a result. Implement a parser for generic rules based on pyparsing. Implement a rule that can use generic logical expressions to categorize samples. Smime signatures can now directly be ignored in the ruleset as a combination of delcared name and declared type (closes #83, closes #42.
-rw-r--r--docs/source/ruleset.rst88
-rw-r--r--peekaboo/ruleset/engine.py1
-rw-r--r--peekaboo/ruleset/expressions.py515
-rw-r--r--peekaboo/ruleset/rules.py104
-rw-r--r--peekaboo/sample.py19
-rw-r--r--peekaboo/toolbox/cuckoo.py14
-rw-r--r--requirements.txt1
-rw-r--r--ruleset.conf.sample21
-rwxr-xr-xtests/test.py83
9 files changed, 803 insertions, 43 deletions
diff --git a/docs/source/ruleset.rst b/docs/source/ruleset.rst
new file mode 100644
index 0000000..46d4bd9
--- /dev/null
+++ b/docs/source/ruleset.rst
@@ -0,0 +1,88 @@
+=======
+Ruleset
+=======
+
+This chapter explains how to use and take care of the ruleset. We assume you
+have peekaboo up and running and want to tweak or understand the default
+ruleset.
+
+We also asume you are familiar with python config parser.
+
+Section: rules
+==============
+
+Here rules can be disabled by putting a ``#`` (comment) in front. Also the
+order in which the rules will be processed can be changed by changing how
+the rules are listed (note that the trailing number is not relevant).
+
+Following sections
+==================
+
+The following sections are processed (if enabled in rules section) and
+contain for example the whitelist mime types. Individual entries within
+for example the whitelist can be disabled by putting an ``#`` in front.
+
+Expressions
+===========
+
+* rule : a rule of the ruleset, e.g. evil_sig or expression
+* expression : an expression of the expression rule
+* condition : the condition before ``->``
+
+Expressions will be tried one after another until one matches. The general
+structure of an expression is: ``<condition> -> <result>``. If condition
+evaluates to true, the expression will be considered matching and result will
+be returned by the rule.
+
+Possible results are: ``unknown``, ``ignore``, ``good`` and ``bad``. The
+latter three will terminate ruleset processing and use the result as final
+analysis result while the former will continue on with the next rule of the
+ruleset.
+
+It is a lot like Python itself.
+
+They can contain operators:
+``+ - * ** / // % << >> . < <= > >= == != in not in is is not isdisjoint and or``
+
+Datatypes are:
+``boolean, integer, real, string, regex, identifier, result``
+
+Rules can then be constructed like:
+
+.. code-block:: shell
+
+ expression.1 : sample.mimetypes <= {'text/plain', 'inode/x-empty', 'image/jpeg'} -> ignore
+ expression.2 : sample.meta_info_name_declared == 'smime.p7s'
+ and sample.meta_info_type_declared in {
+ 'application/pkcs7-signature',
+ 'application/x-pkcs7-signature',
+ 'application/pkcs7-mime',
+ 'application/x-pkcs7-mime'
+ } -> ignore
+ expression.3 : /DDE/ in cuckooreport.signature_descriptions -> bad
+
+Attributes of sample
+--------------------
+
+.. code-block:: shell
+
+ filename
+ sha256sum
+ name_declared
+ file_extension
+ mimetypes
+ file_size
+ meta_info_name_declared
+ meta_info_type_declared
+
+Attributes of cuckooreport
+--------------------------
+
+.. code-block:: shell
+
+ requested_domains
+ signatures
+ signature_descriptions
+ score
+ errors
+ cuckoo_server_messages
diff --git a/peekaboo/ruleset/engine.py b/peekaboo/ruleset/engine.py
index 6e736f5..c615e2b 100644
--- a/peekaboo/ruleset/engine.py
+++ b/peekaboo/ruleset/engine.py
@@ -44,6 +44,7 @@ class RulesetEngine(object):
known_rules = [
KnownRule,
FileLargerThanRule,
+ ExpressionRule,
FileTypeOnWhitelistRule,
FileTypeOnGreylistRule,
CuckooEvilSigRule,
diff --git a/peekaboo/ruleset/expressions.py b/peekaboo/ruleset/expressions.py
new file mode 100644
index 0000000..e4fe0ad
--- /dev/null
+++ b/peekaboo/ruleset/expressions.py
@@ -0,0 +1,515 @@
+###############################################################################
+# #
+# Peekaboo Extended Email Attachment Behavior Observation Owl #
+# #
+# ruleset/ #
+# expressions.py #
+###############################################################################
+# #
+# Copyright (C) 2016-2019 science + computing ag #
+# Based on pyparsing's eval_arith.py.
+# Copyright 2009, 2011 Paul McGuire
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or (at #
+# your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, but #
+# WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
+# General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+""" A simple expression grammar used for writing generic rules. """
+
+from future.builtins import super
+
+import logging
+import operator
+import re
+from pyparsing import nums, alphas, alphanums, Word, Combine, Suppress, \
+ oneOf, opAssoc, infixNotation, Literal, Keyword, Group, \
+ delimitedList, QuotedString, ParserElement, ParseException
+from peekaboo.ruleset import Result
+
+
+logger = logging.getLogger(__name__)
+
+
+class EvalBase(object):
+ """ Base class of evaluatable objects providing common infrastructure. """
+ def __init__(self, tokens):
+ """ Just store the tokens for later evaluation. Expects all relevant
+ tokens to be grouped together in the first element of the token list
+ passed. This is the default for operand+operator+operand+... constructs
+ with infixNotation and can be forced for others using Group():
+
+ rvallist = Group(Suppress('[') + delimitedList(rval) + Suppress(']'))
+ """
+ self.value = self.token = tokens[0]
+ self.context = None
+ self.convert()
+ self.string_repr_format = "(%s)"
+
+ def convert(self):
+ """ Method to (optionally) convert the input token(s) into something
+ else. Particularly used for conversion to base types. """
+ self.value = self.token
+
+ def feedback(self, info):
+ """ Accept and process feedback evaluation children. """
+ # by default propagate feedback upwards if we have a context
+ if self.context and 'parent' in self.context:
+ self.context['parent'].feedback(info)
+
+ def subeval(self, expression, update=None):
+ """ Evaluate a subexpression with an updated evaluation context
+ containing common metadata such as that we're it's parent and optional
+ additional data. """
+ context = self.context.copy()
+ context['parent'] = self
+ if update:
+ context.update(update)
+ return expression.eval(context)
+
+ def set_context(self, context):
+ """ Save an evaluation context internally for later use by e.g.
+ feedback(). """
+ self.context = context
+
+ def eval(self, context):
+ """ Evaluate the object content against a context. Just return the
+ stored (and optionally converted) value by default and remember our
+ context for possible feedback to our parent or from our children. """
+ self.set_context(context)
+ return self.value
+
+ def __str__(self):
+ return self.string_repr_format % (
+ " ".join(["%s" % x for x in self.token]))
+
+
+class EvalBoolean(EvalBase):
+ """ Class to evaluate a parsed boolean constant """
+ def convert(self):
+ logger.debug("Boolean: %s", self.value)
+ self.value = self.token == "True"
+
+
+class EvalInteger(EvalBase):
+ """ Class to evaluate a parsed integer constant """
+ def convert(self):
+ logger.debug("Integer: %s", self.token)
+ self.value = int(self.token)
+
+
+class EvalReal(EvalBase):
+ """ Class to evaluate a parsed real constant """
+ def convert(self):
+ logger.debug("Real: %s", self.token)
+ self.value = float(self.token)
+
+
+class EvalString(EvalBase):
+ """ Class to evaluate a parsed string constant """
+ def convert(self):
+ logger.debug("String: %s", self.token)
+ self.value = self.token
+
+
+class OperatorRegex(object):
+ """ A class implementing operators on regular expressions. """
+ def __init__(self, string):
+ self.regex = re.compile(string)
+
+ @staticmethod
+ def compare_op_impl(function, other):
+ """ Implement handling of iterable operands. """
+ if isinstance(other, (list, set)):
+ for val in other:
+ logger.debug("Regular expression match: %s == %s",
+ function, val)
+ if function(val):
+ return True
+ return False
+
+ return function(other)
+
+ def __eq__(self, other):
+ """ Implement equality using re.match """
+ logger.debug("Regular expression match: %s == %s", self.regex, other)
+ return self.compare_op_impl(self.regex.match, other)
+
+ def __contains__(self, other):
+ """ Implement membership using re.search """
+ logger.debug("Regular expression search: %s in %s", self.regex, other)
+ return self.compare_op_impl(self.regex.search, other)
+
+
+class EvalRegex(EvalBase):
+ """ Class to evaluate a regular expression """
+ def convert(self):
+ logger.debug("Regular expression: %s", self.token)
+ self.value = OperatorRegex(self.token)
+
+ def eval(self, context):
+ self.set_context(context)
+ self.feedback({'regex_parsed': True})
+ return self.value
+
+
+class RegexIterableMixIn(object):
+ """ Common functionality for lists and sets containing regular expressions
+ with different behaviour of membership operators. """
+ def __eq__(self, other):
+ if not isinstance(other, (list, set)):
+ other = [other]
+
+ # in contrast to normal lists, a list of regexes compared to a list
+ # of strings is considered equal if any regex matches any string
+ for regex in self:
+ logger.debug("Eval regex: %s == %s", regex, other)
+ if regex == other:
+ return True
+
+ return False
+
+ def __contains__(self, item):
+ for regex in self:
+ logger.debug("Eval regex: %s in %s", regex, item)
+ # we implement "regex in string" of our grammar as "string in
+ # regex" so that our overridden operator
+ # regex.__contains__(string) is called and searching can be
+ # done. Otherwise error "TypeError: 'in <string>' requires
+ # string as left operand, not OperatorRegex" would ensue.
+ if item in regex:
+ return True
+
+ return False
+
+
+class RegexList(RegexIterableMixIn, list):
+ """ A list containing regular expressions with different behaviour of
+ membership operators. """
+
+
+class RegexSet(RegexIterableMixIn, set):
+ """ A set containing regular expressions with different behaviour of
+ membership operators. """
+
+
+class EvalRegexIterableMixIn(object):
+ """ Common functionality for iterables which may contain regular
+ expressions. """
+ def __init__(self, tokens):
+ super().__init__(tokens)
+ self.contains_regexes = False
+
+ def feedback(self, info):
+ """ Mark this object as containing regular expressions if a child
+ object reports so in its feedback to us. """
+ if 'regex_parsed' in info:
+ self.contains_regexes = True
+ del info['regex_parsed']
+
+ super().feedback(info)
+
+
+class EvalList(EvalRegexIterableMixIn, EvalBase):
+ """ Class to evaluate a parsed list """
+ def __init__(self, token):
+ super().__init__(token)
+ self.string_repr_format = "[%s]"
+
+ def eval(self, context):
+ self.set_context(context)
+ logger.debug("List: %s", self.value)
+ ret = []
+ for val in self.value:
+ ret.append(self.subeval(val))
+ if self.contains_regexes:
+ return RegexList(ret)
+ return ret
+
+
+class EvalSet(EvalRegexIterableMixIn, EvalBase):
+ """ Class to evaluate a parsed list """
+ def __init__(self, token):
+ super().__init__(token)
+ self.string_repr_format = "{%s}"
+
+ def eval(self, context):
+ self.set_context(context)
+ logger.debug("Set: %s", self.value)
+ ret = set()
+ for val in self.value:
+ ret.add(self.subeval(val))
+ if self.contains_regexes:
+ return RegexSet(ret)
+ return ret
+
+
+class IdentifierMissingException(KeyError):
+ pass
+
+
+class EvalIdentifier(EvalBase):
+ """ Class to evaluate a parsed object name """
+ def eval(self, context):
+ logger.debug("Identifier: %s", self.value)
+ if 'member' in context and context['member']:
+ return self.value
+
+ try:
+ return context['variables'][self.value]
+ except KeyError as error:
+ raise IdentifierMissingException(error.args[0])
+
+
+class EvalResult(EvalBase):
+ """ Class to evaluate a analysis result """
+ def convert(self):
+ logger.debug("Result: %s", self.token)
+ result_map = {
+ 'fail': Result.failed,
+ 'ignore': Result.ignored,
+ }
+
+ if self.token in result_map:
+ self.value = result_map[self.token]
+ else:
+ self.value = Result[self.token]
+
+
+class EvalModifier(EvalBase):
+ """ Class to evaluate typical single-operand modifier expressions such as
+ explicit sign change, bitwise and logical not. """
+ def __init__(self, tokens):
+ super().__init__(tokens)
+ self.operator, self.value = tokens[0]
+
+ def eval(self, context):
+ self.set_context(context)
+ val = self.subeval(self.value)
+ if self.operator == '+':
+ return val
+ elif self.operator == '-':
+ return -val
+ elif self.operator == '~':
+ return ~val
+ elif self.operator == 'not':
+ return not val
+
+ raise ValueError('Invalid operator %s' % self.operator)
+
+
+class EvalPower(EvalBase):
+ """ Class to evaluate exponentiation expressions """
+ def eval(self, context):
+ self.set_context(context)
+ res = self.subeval(self.value[-1])
+ for val in self.value[-3::-2]:
+ res = self.subeval(val)**res
+
+ return res
+
+
+def operator_operands(tokenlist):
+ """ Generator to extract operators and operands in pairs """
+ iterator = iter(tokenlist)
+ while True:
+ try:
+ yield (next(iterator), next(iterator))
+ except StopIteration:
+ break
+
+
+class EvalArith(EvalBase):
+ """ Class to evaluate typical arithmetic and bitwise operations like
+ addition, multiplication, division and shifts expressions. Operator
+ precedence is handled by the order in which they're evaluated by the
+ parser, i.e. given to infixNotation. """
+ def eval(self, context):
+ self.set_context(context)
+ ret = self.subeval(self.value[0])
+ for op, val in operator_operands(self.value[1:]):
+ if op == '+':
+ ret += self.subeval(val)
+ elif op == '-':
+ ret -= self.subeval(val)
+ elif op == '*':
+ ret *= self.subeval(val)
+ elif op == '/':
+ ret /= self.subeval(val)
+ elif op == '//':
+ ret //= self.subeval(val)
+ elif op == '%':
+ ret %= self.subeval(val)
+ elif op == '<<':
+ ret <<= self.subeval(val)
+ elif op == '>>':
+ ret >>= self.subeval(val)
+ elif op == '.':
+ ret = getattr(ret, self.subeval(val, update={'member': True}))
+ elif op == "->":
+ if ret:
+ ret = self.subeval(val)
+ else:
+ ret = None
+ else:
+ raise ValueError('Invalid operator %s' % op)
+
+ return ret
+
+
+class EvalLogic(EvalBase):
+ """ Class to evaluate comparison expressions """
+ def __init__(self, tokens):
+ super().__init__(tokens)
+ self.operator_map = {
+ "<": operator.lt,
+ "<=": operator.le,
+ ">": operator.gt,
+ ">=": operator.ge,
+ "==": operator.eq,
+ "!=": operator.ne,
+ "in": EvalLogic.in_,
+ "not in": EvalLogic.not_in,
+ "is": operator.is_,
+ "is not": operator.is_not,
+ "isdisjoint": lambda a, b: a.isdisjoint(b),
+ "and": operator.and_,
+ "or": operator.or_,
+ }
+
+ @staticmethod
+ def in_(a, b):
+ """ Literally implement membership test. Make it a static method so we
+ can do identity checks. Do not use operator.contains because it needs
+ operands swapped. """
+ return a in b
+
+ @staticmethod
+ def not_in(a, b):
+ """ Naively implement non-membership test. """
+ return a not in b
+
+ @staticmethod
+ def handle_regexes(function, val1, val2):
+ """ Special handling of equality and membership checks for regular
+ expressions. """
+ if (function in (operator.eq, operator.ne)
+ and isinstance(val2, (OperatorRegex, RegexIterableMixIn))):
+ # swap operands around in case the first does not contain any regex
+ # but the other does to reliably reroute to our overridden __eq__
+ # operator, just do that always to keep checks simple since
+ # (in)equality is commutative anyway
+ val1, val2 = val2, val1
+ elif (function in (EvalLogic.in_, EvalLogic.not_in)
+ and isinstance(val1, (OperatorRegex, RegexIterableMixIn))):
+ # "<regex> in <string>|<list-of-strings>" of our grammar directly
+ # implemented using the "in" operator would call
+ # <string>|<list-of-strings>.__contains__(<regex>) which we cannot
+ # override with reasonable effort. To get a call of
+ # <regex>.__contains__(<string>|<list-of-strings>) we need to
+ # switch operands. Otherwise error "TypeError: 'in <string>'
+ # requires string as left operand, not OperatorRegex" would ensue.
+ val1, val2 = val2, val1
+
+ # nothing special
+ return function(val1, val2)
+
+ def eval(self, context):
+ self.set_context(context)
+ val1 = self.subeval(self.value[0])
+ for op, parseobj in operator_operands(self.value[1:]):
+ val2 = self.subeval(parseobj)
+ logger.debug("Comparison: %s %s %s", val1, op, val2)
+ function = self.operator_map[op]
+ if not self.handle_regexes(function, val1, val2):
+ break
+ val1 = val2
+ else:
+ return True
+
+ return False
+
+
+class ExpressionParser(object):
+ """ Define and run the parser. """
+ def __init__(self):
+ # speed up infixNotation considerably at the price of some cache memory
+ ParserElement.enablePackrat()
+
+ boolean = Keyword('True') | Keyword('False')
+ integer = Word(nums)
+ real = Combine(Word(nums) + "." + Word(nums))
+ string = (QuotedString('"', escChar='\\')
+ | QuotedString("'", escChar='\\'))
+ regex = QuotedString('/', escChar='\\')
+ identifier = Word(alphas, alphanums + '_')
+ dereference = infixNotation(identifier, [
+ (Literal('.'), 2, opAssoc.LEFT, EvalArith),
+ ])
+ result = (Keyword('bad') | Keyword('fail') | Keyword('good')
+ | Keyword('ignore') | Keyword('unknown'))
+ rval = boolean | real | integer | string | regex | result | dereference
+ rvallist = Group(Suppress('[') + delimitedList(rval) + Suppress(']'))
+ rvalset = Group(Suppress('{') + delimitedList(rval) + Suppress('}'))
+ operand = rval | rvallist | rvalset
+
+ # parse actions replace the parsed tokens with an instantiated object
+ # which we can later call into for evaluation of its content
+ boolean.setParseAction(EvalBoolean)
+ integer.setParseAction(EvalInteger)
+ real.setParseAction(EvalReal)
+ string.setParseAction(EvalString)
+ regex.setParseAction(EvalRegex)
+ identifier.setParseAction(EvalIdentifier)
+ result.setParseAction(EvalResult)
+ rvallist.setParseAction(EvalList)
+ rvalset.setParseAction(EvalSet)
+
+ identity_test = Keyword('is') + ~Keyword('not') | Combine(
+ Keyword('is') + Keyword('not'), adjacent=False, joinString=' ')
+ membership_test = Keyword('in') | Combine(
+ Keyword('not') + Keyword('in'), adjacent=False, joinString=' ')
+ comparison_op = oneOf('< <= > >= != == isdisjoint')
+ comparison = identity_test | membership_test | comparison_op
+
+ self.parser = infixNotation(operand, [
+ (Literal('**'), 2, opAssoc.LEFT, EvalPower),
+ (oneOf('+ - ~'), 1, opAssoc.RIGHT, EvalModifier),
+ (oneOf('* / // %'), 2, opAssoc.LEFT, EvalArith),
+ (oneOf('+ -'), 2, opAssoc.LEFT, EvalArith),
+ (oneOf('<< >>'), 2, opAssoc.LEFT, EvalArith),
+ (Literal('&'), 2, opAssoc.LEFT, EvalArith),
+ (Literal('^'), 2, opAssoc.LEFT, EvalArith),
+ (Literal('|'), 2, opAssoc.LEFT, EvalArith),
+ (comparison, 2, opAssoc.LEFT, EvalLogic),
+ (Keyword('not'), 1, opAssoc.RIGHT, EvalModifier),
+ (Keyword('and'), 2, opAssoc.LEFT, EvalLogic),
+ (Keyword('or'), 2, opAssoc.LEFT, EvalLogic),
+ (Keyword('->'), 2, opAssoc.LEFT, EvalArith),
+ ])
+
+ def parse(self, expression):
+ """ Parse an expression and return an object supporting evaluation of
+ that expression against a context. """
+ try:
+ return self.parser.parseString(expression, parseAll=True)[0]
+ except ParseException as parse_error:
+ col = parse_error.col
+ raise SyntaxError(
+ "Expression parse error near character %d: %s>>%s<<%s" % (
+ parse_error.col, expression[0:col], expression[col],
+ expression[col+1:]))
+
+
+if __name__ == '__main__':
+ print(ExpressionParser().parse('foo == (bar - blub)'))
diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py
index 6635c0c..36daa1c 100644
--- a/peekaboo/ruleset/rules.py
+++ b/peekaboo/ruleset/rules.py
@@ -29,6 +29,8 @@
import re
import logging
from peekaboo.ruleset import Result, RuleResult
+from peekaboo.ruleset.expressions import ExpressionParser, \
+ IdentifierMissingException
from peekaboo.exceptions import PeekabooAnalysisDeferred, \
CuckooSubmitFailedException, PeekabooRulesetConfigError
from peekaboo.toolbox.ole import Oletools, OletoolsReport, \
@@ -104,6 +106,33 @@ class Rule(object):
return self.config.get_by_type(
self.rule_name, option, fallback=default, option_type=option_type)
+ def get_cuckoo_report(self, sample):
+ """ Get the samples cuckoo_report or submit the sample for analysis by
+ Cuckoo.
+
+ @returns: CuckooReport
+ """
+ report = sample.cuckoo_report
+ if report is not None:
+ return report
+
+ try:
+ job_id = sample.submit_to_cuckoo()
+ except CuckooSubmitFailedException as failed:
+ logger.error("Submit to Cuckoo failed: %s", failed)
+ # exception message intentionally not present in message
+ # delivered back to client as to not disclose internal
+ # information, should request user to contact admin instead
+ return self.result(
+ Result.failed,
+ _("Behavioral analysis by Cuckoo has produced an error "
+ "and did not finish successfully"),
+ False)
+
+ logger.info('Sample submitted to Cuckoo. Job ID: %s. '
+ 'Sample: %s', job_id, sample)
+ raise PeekabooAnalysisDeferred()
+
class KnownRule(Rule):
""" A rule determining if a sample is known by looking at the database for
@@ -293,24 +322,7 @@ class CuckooRule(Rule):
@raises PeekabooAnalysisDeferred: if the sample was submitted to Cuckoo
@returns: RuleResult containing verdict.
"""
- report = sample.cuckoo_report
- if report is None:
- try:
- job_id = sample.submit_to_cuckoo()
- except CuckooSubmitFailedException as failed:
- logger.error("Submit to Cuckoo failed: %s", failed)
- # exception message intentionally not present in message
- # delivered back to client as to not disclose internal
- # information, should request user to contact admin instead
- return self.result(
- Result.failed,
- _("Behavioral analysis by Cuckoo has produced an error "
- "and did not finish successfully"),
- False)
-
- logger.info('Sample submitted to Cuckoo. Job ID: %s. '
- 'Sample: %s', job_id, sample)
- raise PeekabooAnalysisDeferred()
+ report = self.get_cuckoo_report(sample)
# call report evaluation function if we get here
return self.evaluate_report(report)
@@ -467,6 +479,62 @@ class CuckooAnalysisFailedRule(CuckooRule):
return self.result(Result.failed, failure_reason, False)
+class ExpressionRule(Rule):
+ """ A rule checking the sample and cuckoo report against an almost
+ arbitrary logical expression. """
+ rule_name = 'expressions'
+
+ def get_config(self):
+ expressions = self.get_config_value('expression', [])
+ if not expressions:
+ raise PeekabooRulesetConfigError(
+ "List of expressions empty, check %s rule config."
+ % self.rule_name)
+
+ self.rules = []
+ parser = ExpressionParser()
+ for expr in expressions:
+ try:
+ rule = parser.parse(expr)
+ logger.debug("EXPR: %s", expr)
+ logger.debug("RULE: %s", rule)
+ self.rules.append(rule)
+ except SyntaxError as error:
+ raise PeekabooRulesetConfigError(error)
+
+ def evaluate(self, sample):
+ """ Match what rules report against our known result status names. """
+ for rule in self.rules:
+ result = None
+ context = {'variables': {'sample': sample}}
+
+ while result is None:
+ try:
+ result = rule.eval(context = context)
+ # otherwise this is an endless loop
+ if result is None:
+ break
+ except IdentifierMissingException as error:
+ if error.message == "cuckooreport":
+ context['variables']['cuckooreport'] = self.get_cuckoo_report(sample)
+ # here elif for other reports
+ else:
+ return self.result(
+ Result.failed,
+ _("Evaluation of expression uses undefined identifier."),
+ False)
+
+ if result:
+ return self.result(result,
+ _("A rule classified the sample as %s")
+ % result,
+ False)
+
+ return self.result(Result.unknown,
+ _("No rule classified the sample in any way."),
+ True)
+
+
class FinalRule(Rule):
""" A catch-all rule. """
rule_name = 'final_rule'
diff --git a/peekaboo/sample.py b/peekaboo/sample.py
index d4c5a62..58099a7 100644
--- a/peekaboo/sample.py
+++ b/peekaboo/sample.py
@@ -423,25 +423,6 @@ class Sample(object):
if self.name_declared:
declared_filename = self.name_declared
- # check if the sample is an S/MIME signature (smime.p7s)
- # If so, don't overwrite the MIME type since we do not want to analyse
- # S/MIME signatures.
- # FIXME: This is oddly specific for this generic routine. Should it be
- # some sort of callback or plugin?
- leave_alone_types = {
- 'p7s': [
- 'application/pkcs7-signature',
- 'application/x-pkcs7-signature',
- 'application/pkcs7-mime',
- 'application/x-pkcs7-mime',
- ]
- }
-
- if declared_filename == 'smime.p7s' and declared_mt in leave_alone_types['p7s']:
- logger.info('S/MIME signature detected. Using declared MIME type over detected ones.')
- self.__mimetypes = set([declared_mt])
- return self.__mimetypes
-
# determine mime on original p[0-9]* file
# result of __submit_path would be "inode/symlink"
content_based_mime_type = guess_mime_type_from_file_contents(self.__path)
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py
index 47966aa..8b0b646 100644
--- a/peekaboo/toolbox/cuckoo.py
+++ b/peekaboo/toolbox/cuckoo.py
@@ -535,6 +535,20 @@ class CuckooReport(object):
return []
@property
+ def signature_descriptions(self):
+ """
+ Gets the description of triggered Cuckoo signatures from report.
+
+ @returns: The description of triggered signatures from the Cuckoo
+ report or empty list if there was an error parsing the
+ Cuckoo report.
+ """
+ descriptions = []
+ for sig in self.signatures:
+ descriptions.append(sig['description'])
+ return descriptions
+
+ @property
def score(self):
"""
Gets the score from the Cuckoo report.
diff --git a/requirements.txt b/requirements.txt
index 4681594..d814a70 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,3 +9,4 @@ yara-python>=3.6.3
requests>=2.19.0
configparser
future
+pyparsing
diff --git a/ruleset.conf.sample b/ruleset.conf.sample
index 9a9ea73..789a45f 100644
--- a/ruleset.conf.sample
+++ b/ruleset.conf.sample
@@ -11,11 +11,12 @@ rule.3 : file_type_on_whitelist
rule.4 : file_type_on_greylist
#rule.5 : office_macro
#rule.6 : office_macro_with_suspicious_keyword
-rule.7 : cuckoo_evil_sig
-rule.8 : cuckoo_score
-#rule.9 : requests_evil_domain
-rule.10 : cuckoo_analysis_failed
-#rule.11 : contains_peekabooyar
+rule.7 : expressions
+rule.8 : cuckoo_evil_sig
+rule.9 : cuckoo_score
+#rule.10 : requests_evil_domain
+rule.11 : cuckoo_analysis_failed
+#rule.12 : contains_peekabooyar
rule.12 : final_rule
# rule specific configuration options
@@ -76,6 +77,16 @@ greylist.36 : application/msword
keyword.1 : AutoOpen
keyword.2 : AutoClose
+[expressions]
+expression.1 : sample.mimetypes <= {'text/plain', 'inode/x-empty'} -> ignore
+expression.2 : sample.meta_info_name_declared == 'smime.p7s'
+ and sample.meta_info_type_declared in {
+ 'application/pkcs7-signature',
+ 'application/x-pkcs7-signature',
+ 'application/pkcs7-mime',
+ 'application/x-pkcs7-mime'
+ } -> ignore
+
[cuckoo_evil_sig]
signature.1 : A potential heapspray has been detected. .*
signature.2 : A process attempted to delay the analysis task.
diff --git a/tests/test.py b/tests/test.py
index 39318fb..667255f 100755
--- a/tests/test.py
+++ b/tests/test.py
@@ -52,7 +52,9 @@ from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \
FileTypeOnGreylistRule, CuckooAnalysisFailedRule, \
KnownRule, FileLargerThanRule, CuckooEvilSigRule, \
CuckooScoreRule, RequestsEvilDomainRule, FinalRule, \
- OfficeMacroRule, OfficeMacroWithSuspiciousKeyword
+ OfficeMacroRule, OfficeMacroWithSuspiciousKeyword, \
+ ExpressionRule
+
from peekaboo.toolbox.cuckoo import CuckooReport
from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError
# pylint: enable=wrong-import-position
@@ -780,6 +782,85 @@ unknown : baz'''
result = rule.evaluate(sample)
self.assertEqual(result.result, expected)
+ def test_rule_ignore_generic_whitelist(self):
+ """ Test rule to ignore file types on whitelist. """
+ config = '''[expressions]
+ expression.4 : sample.mimetypes <= {'text/plain', 'inode/x-empty', 'image/jpeg'} -> ignore
+ '''
+ factory = CreatingSampleFactory(
+ cuckoo=None, base_dir="",
+ job_hash_regex="", keep_mail_data=False,
+ processing_info_dir=None)
+
+ sample = factory.create_sample('file.txt', 'abc')
+ rule = ExpressionRule(CreatingConfigParser(config))
+ result = rule.evaluate(sample)
+ self.assertEqual(result.result, Result.ignored)
+
+ sample = factory.create_sample('file.html', '<html')
+ rule = ExpressionRule(CreatingConfigParser(config))
+ result = rule.evaluate(sample)
+ self.assertEqual(result.result, Result.unknown)
+
+ # bzip2 compressed data
+ sample = factory.create_sample('file.txt', 'BZh91AY=')
+ rule = ExpressionRule(CreatingConfigParser(config))