summaryrefslogtreecommitdiffstats
path: root/test.py
diff options
context:
space:
mode:
authorMichael Weiser <michael.weiser@gmx.de>2019-04-18 08:57:15 +0000
committerMichael Weiser <michael.weiser@gmx.de>2019-04-25 12:20:20 +0000
commit77d2ff1348a6f1ade05b54b297fc771abd0e14cd (patch)
treee7914e3ad89f36b65aced27f53132244f894832f /test.py
parente6c44a8ca3c2216904731e4d889e53473691c0dc (diff)
Validate ruleset config
Validate the ruleset configuration at startup to inform the user about misconfiguration and exit immediately instead of giving warnings during seemingly normal operation. This also gives us a chance to pre-compile regexes for more efficient matching later on. We give rules a new method get_config() which retrieves their configuration. This is called for each rule by new method the validate_config() of the ruleset engine to catch errors. This way the layout and extent of configuration is still completely governed by the rule and we can interview it about its happiness with what's provided in the configuration file. As an incidental cleanup, merge class PeekabooRulesetConfig into PeekabooRulesetParser because there's nothing left where it could and would need to help the rules with an abstraction of the config file. Also switch class PeekabooConfig to be a subclass of PeekabooConfigParser so it can (potentially) benefit from the list parsing code there. By moving the special log level and by-default-type getters over there as well we end up with nicely generic config classes that can benefit directly from improvements in the configparser module. Update the test suite to test and use this new functionality. Incidentally, remove the convoluted inheritance-based config testing layout in favour of creating subclasses of the config classes.
Diffstat (limited to 'test.py')
-rwxr-xr-xtest.py363
1 files changed, 194 insertions, 169 deletions
diff --git a/test.py b/test.py
index d70ffce..6309efa 100755
--- a/test.py
+++ b/test.py
@@ -42,10 +42,12 @@ from datetime import datetime, timedelta
# pylint: disable=wrong-import-position
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from peekaboo.exceptions import PeekabooConfigException
-from peekaboo.config import PeekabooConfig, PeekabooRulesetConfig
+from peekaboo.exceptions import PeekabooConfigException, \
+ PeekabooRulesetConfigError
+from peekaboo.config import PeekabooConfig, PeekabooConfigParser
from peekaboo.sample import SampleFactory
from peekaboo.ruleset import RuleResult, Result
+from peekaboo.ruleset.engine import RulesetEngine
from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \
FileTypeOnGreylistRule, CuckooAnalysisFailedRule
from peekaboo.toolbox.cuckoo import CuckooReport
@@ -53,33 +55,95 @@ from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError
# pylint: enable=wrong-import-position
-class TestConfig(unittest.TestCase):
- """ Base class for various tests of the configuration module. """
- config_class = PeekabooConfig
- testconfig = None
+class CreatingConfigMixIn(object):
+ """ A class for adding config file creation logic to any other class. """
+ def create_config(self, content):
+ """ Create a configuration file with defined content and pass it to the
+ parent constructor for parsing. """
+ _, self.created_config_file = tempfile.mkstemp()
+ with open(self.created_config_file, 'w') as file_desc:
+ file_desc.write(content)
+
+ def remove_config(self):
+ """ Remove the configuration file we've created. """
+ os.unlink(self.created_config_file)
+
+
+class CreatingConfigParser(PeekabooConfigParser, CreatingConfigMixIn):
+ """ A special kind of config parser that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfigParser.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+class CreatingPeekabooConfig(PeekabooConfig, CreatingConfigMixIn):
+ """ A special kind of Peekaboo config that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfig.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+
+class TestConfigParser(unittest.TestCase):
+ """ Test a configuration with all values different from the defaults. """
@classmethod
def setUpClass(cls):
""" Set up common test case resources. """
- cls.config_file = tempfile.mktemp()
- with open(cls.config_file, 'w') as file_desc:
- file_desc.write(cls.testconfig)
+ cls.config = CreatingConfigParser('''#[rule0]
- cls.config = cls.config_class(config_file=cls.config_file)
+[rule1]
+option1: foo
+option2.1: bar
+option2.2: baz
- @classmethod
- def tearDownClass(cls):
- """ Clean up after the tests. """
- os.unlink(cls.config_file)
+[rules]
+rule.1 : rule1
+#rule.2 : rule2
+rule.3 : rule3
+''')
+ def test_2_values(self):
+ """ Test rule configuration values """
+ with self.assertRaises(KeyError):
+ self.config['rule0']
+ self.assertEqual(self.config['rule1']['option1'], 'foo')
+ self.assertEqual(self.config['rule1'].getlist('option2'),
+ ['bar', 'baz'])
+
+ def test_3_type_mismatch(self):
+ """ Test correct error is thrown if the option type is mismatched """
+ config = '''[rule1]
+option1: foo
+option1.1: bar'''
-class TestDefaultConfig(TestConfig):
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Option option1 in section rule1 is supposed to be a list but '
+ 'given as individual setting'):
+ CreatingConfigParser(config).getlist('rule1', 'option1')
+
+
+
+class TestDefaultConfig(unittest.TestCase):
""" Test a configuration of all defaults. """
- testconfig = ''
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig()
def test_1_default_settings(self):
""" Test a configuration with just defaults """
- self.assertEqual(self.config.config_file, self.config_file)
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
self.assertEqual(self.config.user, 'peekaboo')
self.assertEqual(self.config.group, 'peekaboo')
self.assertEqual(
@@ -114,9 +178,12 @@ class TestDefaultConfig(TestConfig):
self.assertEqual(self.config.cluster_duplicate_check_interval, 60)
-class TestValidConfig(TestConfig):
+class TestValidConfig(unittest.TestCase):
""" Test a configuration with all values different from the defaults. """
- testconfig = '''[global]
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig('''[global]
user : user1
group : group1
socket_file : /socket/1
@@ -151,11 +218,12 @@ poll_interval : 51
instance_id: 12
stale_in_flight_threshold: 31
duplicate_check_interval: 61
-'''
+''')
def test_1_read_settings(self):
""" Test reading of configuration settings from file """
- self.assertEqual(self.config.config_file, self.config_file)
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
self.assertEqual(self.config.user, 'user1')
self.assertEqual(self.config.group, 'group1')
self.assertEqual(self.config.sock_file, '/socket/1')
@@ -182,146 +250,44 @@ duplicate_check_interval: 61
self.assertEqual(self.config.cluster_duplicate_check_interval, 61)
-class TestInvalidConfigBase(unittest.TestCase):
+class TestInvalidConfig(unittest.TestCase):
""" Various tests of invalid config files. """
- config_class = None
-
- @classmethod
- def setUpClass(cls):
- """ Set up common test case resources. """
- cls.config_file = tempfile.mktemp()
-
- def write_config(self, testconfig):
- """ Helper method for writing out a test config file. """
- with open(self.config_file, 'w') as file_desc:
- file_desc.write(testconfig)
-
def test_1_section_header(self):
""" Test correct error is thrown if section header syntax is wrong """
- self.write_config('''[global[
-user: peekaboo''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be parsed: File contains no '
- 'section headers' % self.config_file):
- self.config_class(config_file=self.config_file)
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: File contains no '
+ 'section headers'):
+ CreatingPeekabooConfig('''[global[
+user: peekaboo''')
def test_2_value_separator(self):
""" Test correct error is thrown if the value separator is wrong """
- self.write_config('''[global]
-user; peekaboo''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be parsed: (File|Source) '
- 'contains parsing errors:' % self.config_file):
- self.config_class(config_file=self.config_file)
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: (File|Source) '
+ 'contains parsing errors:'):
+ CreatingPeekabooConfig('''[global]
+user; peekaboo''')
def test_3_section_header(self):
""" Test correct error is thrown if the config file is missing """
- try:
- os.unlink(self.config_file)
- except OSError:
- pass
+ _, config_file = tempfile.mkstemp()
+ os.unlink(config_file)
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be opened for reading: '
- r'\[Errno 2\] No such file or directory' % self.config_file):
- self.config_class(config_file=self.config_file)
-
- @classmethod
- def tearDownClass(cls):
- """ Clean up after the tests. """
- try:
- os.unlink(cls.config_file)
- except OSError:
- pass
-
-
-class TestInvalidConfig(TestInvalidConfigBase):
- """ Various tests of invalid config files. """
- config_class = PeekabooConfig
+ PeekabooConfigException,
+ 'Configuration file "%s" can not be opened for reading: '
+ r'\[Errno 2\] No such file or directory' % config_file):
+ PeekabooConfig(config_file)
- def test_50_unknown_loglevel(self):
+ def test_4_unknown_loglevel(self):
""" Test with an unknown log level """
- self.write_config('''[logging]
-log_level: FOO''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Unknown log level FOO'):
- self.config_class(config_file=self.config_file)
-
-
-class TestValidRulesetConfig(TestConfig):
- """ Test a configuration with all values different from the defaults. """
- config_class = PeekabooRulesetConfig
- testconfig = '''#[rule0]
-
-[rule1]
-option1: foo
-option2.1: bar
-option2.2: baz
-
-[rule2]
-enabled = false
-
-[rule3]
-enabled: true
-
-[rule4]
-enabled = on
-
-[rule5]
-enabled: off
-'''
-
- def test_1_enabled(self):
- """ Test disabling of rules """
- self.assertEqual(self.config.rule_enabled('rule0'), True)
- self.assertEqual(self.config.rule_enabled('rule1'), True)
- self.assertEqual(self.config.rule_enabled('rule2'), False)
- self.assertEqual(self.config.rule_enabled('rule3'), True)
- self.assertEqual(self.config.rule_enabled('rule4'), True)
- self.assertEqual(self.config.rule_enabled('rule5'), False)
-
- def test_2_values(self):
- """ Test rule configuration values """
- self.assertEqual(self.config.rule_config('rule0'), None)
- self.assertEqual(self.config.rule_config('rule1')['option1'], 'foo')
- self.assertEqual(
- self.config.rule_config('rule1')['option2'], ['bar', 'baz'])
-
-
-class TestInvalidRulesetConfig(TestInvalidConfigBase):
- """ Various tests of invalid ruleset config files. """
- config_class = PeekabooRulesetConfig
-
- def test_50_type_mismatch(self):
- """ Test correct error is thrown if the option type is mismatched """
- self.write_config('''[rule1]
-option1: foo
-option1.1: bar''')
- with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Setting option1.1 in section rule1 specified as list as well '
- 'as individual setting'):
- self.config_class(config_file=self.config_file)
-
-class PeekabooDummyConfig(object):
- """ A dummy configuration for the test cases. """
- def __init__(self):
- """ Initialize dummy configuration """
- self.job_hash_regex = r'/amavis/tmp/([^/]+)/parts/'
- self.sample_base_dir = '/tmp'
-
- def get(self, option, default):
- """ Return specific dummy settings. """
- config = {
- 'whitelist':['text/plain', 'inode/x-empty'],
- 'greylist' :['application/x-dosexec', 'application/msword',
- 'application/vnd.ms-powerpoint'],
- }
- return config[option]
+ PeekabooConfigException,
+ 'Unknown log level FOO'):
+ CreatingPeekabooConfig('''[logging]
+log_level: FOO''')
class CreatingSampleFactory(SampleFactory):
@@ -357,7 +323,7 @@ class TestDatabase(unittest.TestCase):
def setUpClass(cls):
""" Set up common test case resources. """
cls.test_db = os.path.abspath('./test.db')
- cls.conf = PeekabooDummyConfig()
+ cls.conf = CreatingPeekabooConfig()
cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db,
instance_id=1,
stale_in_flight_threshold=10)
@@ -502,7 +468,7 @@ class TestSample(unittest.TestCase):
def setUpClass(cls):
""" Set up common test case resources. """
cls.test_db = os.path.abspath('./test.db')
- cls.conf = PeekabooDummyConfig()
+ cls.conf = CreatingPeekabooConfig()
cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db)
cls.factory = CreatingSampleFactory(
cuckoo=None, base_dir=cls.conf.sample_base_dir,
@@ -629,6 +595,39 @@ class TestSample(unittest.TestCase):
del cls.factory
+class TestRulesetEngine(unittest.TestCase):
+ """ Unittests for the Ruleset Engine. """
+ def test_no_rules_configured(self):
+ """ Test that correct error is shown if no rules are configured. """
+ config = CreatingConfigParser()
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'No enabled rules found, check ruleset config.'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_unknown_rule_enabled(self):
+ """ Test that correct error is shown if an unknown rule is enabled. """
+ config = CreatingConfigParser('''[rules]
+rule.1: foo''')
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Unknown rule\(s\) enabled: foo'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_invalid_type(self):
+ """ Test that correct error is shown if rule config option has wrong
+ type. """
+
+ config = CreatingConfigParser('''[rules]
+rule.1: cuckoo_score
+
+[cuckoo_score]
+higher_than: foo''')
+ with self.assertRaisesRegexp(
+ ValueError,
+ r'could not convert string to float: foo'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
class MimetypeSample(object): # pylint: disable=too-few-public-methods
""" A dummy sample class that only contains a set of MIME types for testing
whitelist and greylist rules with it. """
@@ -645,6 +644,21 @@ class CuckooReportSample(object): # pylint: disable=too-few-public-methods
class TestRules(unittest.TestCase):
""" Unittests for Rules. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingConfigParser('''[file_type_on_whitelist]
+whitelist.1 : text/plain
+
+[file_type_on_greylist]
+greylist.1 : application/x-dosexec
+greylist.2 : application/zip
+greylist.3 : application/msword
+
+[cuckoo_analysis_failed]
+failure.1: end of analysis reached!
+success.1: analysis completed successfully''')
+
def test_rule_file_type_on_whitelist(self):
""" Test whitelist rule. """
combinations = [
@@ -655,7 +669,7 @@ class TestRules(unittest.TestCase):
[True, ['', 'asdfjkl', '93219843298']],
[True, []],
]
- rule = FileTypeOnWhitelistRule({'whitelist': ['text/plain']})
+ rule = FileTypeOnWhitelistRule(self.config)
for expected, types in combinations:
result = rule.evaluate(MimetypeSample(types))
self.assertEqual(result.further_analysis, expected)
@@ -671,42 +685,53 @@ class TestRules(unittest.TestCase):
[False, ['', 'asdfjkl', '93219843298']],
[True, []],
]
- rule = FileTypeOnGreylistRule({
- 'greylist': [
- 'application/x-dosexec',
- 'application/zip',
- 'application/msword']})
+ rule = FileTypeOnGreylistRule(self.config)
for expected, types in combinations:
result = rule.evaluate(MimetypeSample(types))
self.assertEqual(result.further_analysis, expected)
def test_rule_analysis_failed(self):
""" Test the Cuckoo analysis failed rule """
+ # create some test samples
+ successful_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis completed successfully']}})
+ failed_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis failed']}})
+ reached_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['end of analysis reached!']}})
+ everything_sample = CuckooReportSample(
+ {'debug': {'cuckoo': [
+ 'end of analysis reached!',
+ 'analysis failed',
+ 'analysis completed successfully']}})
+
# test defaults
- rule = CuckooAnalysisFailedRule()
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis completed successfully']}}))
+ rule = CuckooAnalysisFailedRule(CreatingConfigParser(''))
+ result = rule.evaluate(successful_sample)
self.assertEqual(result.result, Result.unknown)
self.assertEqual(result.further_analysis, True)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis failed']}}))
+ result = rule.evaluate(reached_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(everything_sample)
+ self.assertEqual(result.result, Result.unknown)
+ self.assertEqual(result.further_analysis, True)
# test with config
- rule = CuckooAnalysisFailedRule({
- 'failure': ['end of analysis reached!'],
- 'success': ['analysis completed successfully']})
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis completed successfully']}}))
+ rule = CuckooAnalysisFailedRule(self.config)
+ result = rule.evaluate(successful_sample)
self.assertEqual(result.result, Result.unknown)
self.assertEqual(result.further_analysis, True)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['end of analysis reached!']}}))
+ result = rule.evaluate(reached_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis failed']}}))
+ result = rule.evaluate(everything_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
@@ -727,13 +752,13 @@ def main():
gettext.NullTranslations().install()
suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestConfigParser))
suite.addTest(unittest.makeSuite(TestDefaultConfig))
suite.addTest(unittest.makeSuite(TestValidConfig))
suite.addTest(unittest.makeSuite(TestInvalidConfig))
- suite.addTest(unittest.makeSuite(TestValidRulesetConfig))
- suite.addTest(unittest.makeSuite(TestInvalidRulesetConfig))
suite.addTest(unittest.makeSuite(TestSample))
suite.addTest(unittest.makeSuite(TestDatabase))
+ suite.addTest(unittest.makeSuite(TestRulesetEngine))
suite.addTest(unittest.makeSuite(TestRules))
# TODO: We need more tests!!!