summaryrefslogtreecommitdiffstats
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rwxr-xr-xtest.py363
1 files changed, 194 insertions, 169 deletions
diff --git a/test.py b/test.py
index d70ffce..6309efa 100755
--- a/test.py
+++ b/test.py
@@ -42,10 +42,12 @@ from datetime import datetime, timedelta
# pylint: disable=wrong-import-position
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from peekaboo.exceptions import PeekabooConfigException
-from peekaboo.config import PeekabooConfig, PeekabooRulesetConfig
+from peekaboo.exceptions import PeekabooConfigException, \
+ PeekabooRulesetConfigError
+from peekaboo.config import PeekabooConfig, PeekabooConfigParser
from peekaboo.sample import SampleFactory
from peekaboo.ruleset import RuleResult, Result
+from peekaboo.ruleset.engine import RulesetEngine
from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \
FileTypeOnGreylistRule, CuckooAnalysisFailedRule
from peekaboo.toolbox.cuckoo import CuckooReport
@@ -53,33 +55,95 @@ from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError
# pylint: enable=wrong-import-position
-class TestConfig(unittest.TestCase):
- """ Base class for various tests of the configuration module. """
- config_class = PeekabooConfig
- testconfig = None
+class CreatingConfigMixIn(object):
+ """ A class for adding config file creation logic to any other class. """
+ def create_config(self, content):
+ """ Create a configuration file with defined content and pass it to the
+ parent constructor for parsing. """
+ _, self.created_config_file = tempfile.mkstemp()
+ with open(self.created_config_file, 'w') as file_desc:
+ file_desc.write(content)
+
+ def remove_config(self):
+ """ Remove the configuration file we've created. """
+ os.unlink(self.created_config_file)
+
+
+class CreatingConfigParser(PeekabooConfigParser, CreatingConfigMixIn):
+ """ A special kind of config parser that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfigParser.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+class CreatingPeekabooConfig(PeekabooConfig, CreatingConfigMixIn):
+ """ A special kind of Peekaboo config that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfig.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+
+class TestConfigParser(unittest.TestCase):
+ """ Test a configuration with all values different from the defaults. """
@classmethod
def setUpClass(cls):
""" Set up common test case resources. """
- cls.config_file = tempfile.mktemp()
- with open(cls.config_file, 'w') as file_desc:
- file_desc.write(cls.testconfig)
+ cls.config = CreatingConfigParser('''#[rule0]
- cls.config = cls.config_class(config_file=cls.config_file)
+[rule1]
+option1: foo
+option2.1: bar
+option2.2: baz
- @classmethod
- def tearDownClass(cls):
- """ Clean up after the tests. """
- os.unlink(cls.config_file)
+[rules]
+rule.1 : rule1
+#rule.2 : rule2
+rule.3 : rule3
+''')
+ def test_2_values(self):
+ """ Test rule configuration values """
+ with self.assertRaises(KeyError):
+ self.config['rule0']
+ self.assertEqual(self.config['rule1']['option1'], 'foo')
+ self.assertEqual(self.config['rule1'].getlist('option2'),
+ ['bar', 'baz'])
+
+ def test_3_type_mismatch(self):
+ """ Test correct error is thrown if the option type is mismatched """
+ config = '''[rule1]
+option1: foo
+option1.1: bar'''
-class TestDefaultConfig(TestConfig):
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Option option1 in section rule1 is supposed to be a list but '
+ 'given as individual setting'):
+ CreatingConfigParser(config).getlist('rule1', 'option1')
+
+
+
+class TestDefaultConfig(unittest.TestCase):
""" Test a configuration of all defaults. """
- testconfig = ''
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig()
def test_1_default_settings(self):
""" Test a configuration with just defaults """
- self.assertEqual(self.config.config_file, self.config_file)
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
self.assertEqual(self.config.user, 'peekaboo')
self.assertEqual(self.config.group, 'peekaboo')
self.assertEqual(
@@ -114,9 +178,12 @@ class TestDefaultConfig(TestConfig):
self.assertEqual(self.config.cluster_duplicate_check_interval, 60)
-class TestValidConfig(TestConfig):
+class TestValidConfig(unittest.TestCase):
""" Test a configuration with all values different from the defaults. """
- testconfig = '''[global]
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig('''[global]
user : user1
group : group1
socket_file : /socket/1
@@ -151,11 +218,12 @@ poll_interval : 51
instance_id: 12
stale_in_flight_threshold: 31
duplicate_check_interval: 61
-'''
+''')
def test_1_read_settings(self):
""" Test reading of configuration settings from file """
- self.assertEqual(self.config.config_file, self.config_file)
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
self.assertEqual(self.config.user, 'user1')
self.assertEqual(self.config.group, 'group1')
self.assertEqual(self.config.sock_file, '/socket/1')
@@ -182,146 +250,44 @@ duplicate_check_interval: 61
self.assertEqual(self.config.cluster_duplicate_check_interval, 61)
-class TestInvalidConfigBase(unittest.TestCase):
+class TestInvalidConfig(unittest.TestCase):
""" Various tests of invalid config files. """
- config_class = None
-
- @classmethod
- def setUpClass(cls):
- """ Set up common test case resources. """
- cls.config_file = tempfile.mktemp()
-
- def write_config(self, testconfig):
- """ Helper method for writing out a test config file. """
- with open(self.config_file, 'w') as file_desc:
- file_desc.write(testconfig)
-
def test_1_section_header(self):
""" Test correct error is thrown if section header syntax is wrong """
- self.write_config('''[global[
-user: peekaboo''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be parsed: File contains no '
- 'section headers' % self.config_file):
- self.config_class(config_file=self.config_file)
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: File contains no '
+ 'section headers'):
+ CreatingPeekabooConfig('''[global[
+user: peekaboo''')
def test_2_value_separator(self):
""" Test correct error is thrown if the value separator is wrong """
- self.write_config('''[global]
-user; peekaboo''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be parsed: (File|Source) '
- 'contains parsing errors:' % self.config_file):
- self.config_class(config_file=self.config_file)
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: (File|Source) '
+ 'contains parsing errors:'):
+ CreatingPeekabooConfig('''[global]
+user; peekaboo''')
def test_3_section_header(self):
""" Test correct error is thrown if the config file is missing """
- try:
- os.unlink(self.config_file)
- except OSError:
- pass
+ _, config_file = tempfile.mkstemp()
+ os.unlink(config_file)
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Configuration file "%s" can not be opened for reading: '
- r'\[Errno 2\] No such file or directory' % self.config_file):
- self.config_class(config_file=self.config_file)
-
- @classmethod
- def tearDownClass(cls):
- """ Clean up after the tests. """
- try:
- os.unlink(cls.config_file)
- except OSError:
- pass
-
-
-class TestInvalidConfig(TestInvalidConfigBase):
- """ Various tests of invalid config files. """
- config_class = PeekabooConfig
+ PeekabooConfigException,
+ 'Configuration file "%s" can not be opened for reading: '
+ r'\[Errno 2\] No such file or directory' % config_file):
+ PeekabooConfig(config_file)
- def test_50_unknown_loglevel(self):
+ def test_4_unknown_loglevel(self):
""" Test with an unknown log level """
- self.write_config('''[logging]
-log_level: FOO''')
with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Unknown log level FOO'):
- self.config_class(config_file=self.config_file)
-
-
-class TestValidRulesetConfig(TestConfig):
- """ Test a configuration with all values different from the defaults. """
- config_class = PeekabooRulesetConfig
- testconfig = '''#[rule0]
-
-[rule1]
-option1: foo
-option2.1: bar
-option2.2: baz
-
-[rule2]
-enabled = false
-
-[rule3]
-enabled: true
-
-[rule4]
-enabled = on
-
-[rule5]
-enabled: off
-'''
-
- def test_1_enabled(self):
- """ Test disabling of rules """
- self.assertEqual(self.config.rule_enabled('rule0'), True)
- self.assertEqual(self.config.rule_enabled('rule1'), True)
- self.assertEqual(self.config.rule_enabled('rule2'), False)
- self.assertEqual(self.config.rule_enabled('rule3'), True)
- self.assertEqual(self.config.rule_enabled('rule4'), True)
- self.assertEqual(self.config.rule_enabled('rule5'), False)
-
- def test_2_values(self):
- """ Test rule configuration values """
- self.assertEqual(self.config.rule_config('rule0'), None)
- self.assertEqual(self.config.rule_config('rule1')['option1'], 'foo')
- self.assertEqual(
- self.config.rule_config('rule1')['option2'], ['bar', 'baz'])
-
-
-class TestInvalidRulesetConfig(TestInvalidConfigBase):
- """ Various tests of invalid ruleset config files. """
- config_class = PeekabooRulesetConfig
-
- def test_50_type_mismatch(self):
- """ Test correct error is thrown if the option type is mismatched """
- self.write_config('''[rule1]
-option1: foo
-option1.1: bar''')
- with self.assertRaisesRegexp(
- PeekabooConfigException,
- 'Setting option1.1 in section rule1 specified as list as well '
- 'as individual setting'):
- self.config_class(config_file=self.config_file)
-
-class PeekabooDummyConfig(object):
- """ A dummy configuration for the test cases. """
- def __init__(self):
- """ Initialize dummy configuration """
- self.job_hash_regex = r'/amavis/tmp/([^/]+)/parts/'
- self.sample_base_dir = '/tmp'
-
- def get(self, option, default):
- """ Return specific dummy settings. """
- config = {
- 'whitelist':['text/plain', 'inode/x-empty'],
- 'greylist' :['application/x-dosexec', 'application/msword',
- 'application/vnd.ms-powerpoint'],
- }
- return config[option]
+ PeekabooConfigException,
+ 'Unknown log level FOO'):
+ CreatingPeekabooConfig('''[logging]
+log_level: FOO''')
class CreatingSampleFactory(SampleFactory):
@@ -357,7 +323,7 @@ class TestDatabase(unittest.TestCase):
def setUpClass(cls):
""" Set up common test case resources. """
cls.test_db = os.path.abspath('./test.db')
- cls.conf = PeekabooDummyConfig()
+ cls.conf = CreatingPeekabooConfig()
cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db,
instance_id=1,
stale_in_flight_threshold=10)
@@ -502,7 +468,7 @@ class TestSample(unittest.TestCase):
def setUpClass(cls):
""" Set up common test case resources. """
cls.test_db = os.path.abspath('./test.db')
- cls.conf = PeekabooDummyConfig()
+ cls.conf = CreatingPeekabooConfig()
cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db)
cls.factory = CreatingSampleFactory(
cuckoo=None, base_dir=cls.conf.sample_base_dir,
@@ -629,6 +595,39 @@ class TestSample(unittest.TestCase):
del cls.factory
+class TestRulesetEngine(unittest.TestCase):
+ """ Unittests for the Ruleset Engine. """
+ def test_no_rules_configured(self):
+ """ Test that correct error is shown if no rules are configured. """
+ config = CreatingConfigParser()
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'No enabled rules found, check ruleset config.'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_unknown_rule_enabled(self):
+ """ Test that correct error is shown if an unknown rule is enabled. """
+ config = CreatingConfigParser('''[rules]
+rule.1: foo''')
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Unknown rule\(s\) enabled: foo'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_invalid_type(self):
+ """ Test that correct error is shown if rule config option has wrong
+ type. """
+
+ config = CreatingConfigParser('''[rules]
+rule.1: cuckoo_score
+
+[cuckoo_score]
+higher_than: foo''')
+ with self.assertRaisesRegexp(
+ ValueError,
+ r'could not convert string to float: foo'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
class MimetypeSample(object): # pylint: disable=too-few-public-methods
""" A dummy sample class that only contains a set of MIME types for testing
whitelist and greylist rules with it. """
@@ -645,6 +644,21 @@ class CuckooReportSample(object): # pylint: disable=too-few-public-methods
class TestRules(unittest.TestCase):
""" Unittests for Rules. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingConfigParser('''[file_type_on_whitelist]
+whitelist.1 : text/plain
+
+[file_type_on_greylist]
+greylist.1 : application/x-dosexec
+greylist.2 : application/zip
+greylist.3 : application/msword
+
+[cuckoo_analysis_failed]
+failure.1: end of analysis reached!
+success.1: analysis completed successfully''')
+
def test_rule_file_type_on_whitelist(self):
""" Test whitelist rule. """
combinations = [
@@ -655,7 +669,7 @@ class TestRules(unittest.TestCase):
[True, ['', 'asdfjkl', '93219843298']],
[True, []],
]
- rule = FileTypeOnWhitelistRule({'whitelist': ['text/plain']})
+ rule = FileTypeOnWhitelistRule(self.config)
for expected, types in combinations:
result = rule.evaluate(MimetypeSample(types))
self.assertEqual(result.further_analysis, expected)
@@ -671,42 +685,53 @@ class TestRules(unittest.TestCase):
[False, ['', 'asdfjkl', '93219843298']],
[True, []],
]
- rule = FileTypeOnGreylistRule({
- 'greylist': [
- 'application/x-dosexec',
- 'application/zip',
- 'application/msword']})
+ rule = FileTypeOnGreylistRule(self.config)
for expected, types in combinations:
result = rule.evaluate(MimetypeSample(types))
self.assertEqual(result.further_analysis, expected)
def test_rule_analysis_failed(self):
""" Test the Cuckoo analysis failed rule """
+ # create some test samples
+ successful_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis completed successfully']}})
+ failed_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis failed']}})
+ reached_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['end of analysis reached!']}})
+ everything_sample = CuckooReportSample(
+ {'debug': {'cuckoo': [
+ 'end of analysis reached!',
+ 'analysis failed',
+ 'analysis completed successfully']}})
+
# test defaults
- rule = CuckooAnalysisFailedRule()
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis completed successfully']}}))
+ rule = CuckooAnalysisFailedRule(CreatingConfigParser(''))
+ result = rule.evaluate(successful_sample)
self.assertEqual(result.result, Result.unknown)
self.assertEqual(result.further_analysis, True)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis failed']}}))
+ result = rule.evaluate(reached_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(everything_sample)
+ self.assertEqual(result.result, Result.unknown)
+ self.assertEqual(result.further_analysis, True)
# test with config
- rule = CuckooAnalysisFailedRule({
- 'failure': ['end of analysis reached!'],
- 'success': ['analysis completed successfully']})
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis completed successfully']}}))
+ rule = CuckooAnalysisFailedRule(self.config)
+ result = rule.evaluate(successful_sample)
self.assertEqual(result.result, Result.unknown)
self.assertEqual(result.further_analysis, True)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['end of analysis reached!']}}))
+ result = rule.evaluate(reached_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
- result = rule.evaluate(CuckooReportSample(
- {'debug': {'cuckoo': ['analysis failed']}}))
+ result = rule.evaluate(everything_sample)
self.assertEqual(result.result, Result.failed)
self.assertEqual(result.further_analysis, False)
@@ -727,13 +752,13 @@ def main():
gettext.NullTranslations().install()
suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestConfigParser))
suite.addTest(unittest.makeSuite(TestDefaultConfig))
suite.addTest(unittest.makeSuite(TestValidConfig))
suite.addTest(unittest.makeSuite(TestInvalidConfig))
- suite.addTest(unittest.makeSuite(TestValidRulesetConfig))
- suite.addTest(unittest.makeSuite(TestInvalidRulesetConfig))
suite.addTest(unittest.makeSuite(TestSample))
suite.addTest(unittest.makeSuite(TestDatabase))
+ suite.addTest(unittest.makeSuite(TestRulesetEngine))
suite.addTest(unittest.makeSuite(TestRules))
# TODO: We need more tests!!!