diff options
Diffstat (limited to 'test.py')
-rwxr-xr-x | test.py | 363 |
1 files changed, 194 insertions, 169 deletions
@@ -42,10 +42,12 @@ from datetime import datetime, timedelta # pylint: disable=wrong-import-position sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from peekaboo.exceptions import PeekabooConfigException -from peekaboo.config import PeekabooConfig, PeekabooRulesetConfig +from peekaboo.exceptions import PeekabooConfigException, \ + PeekabooRulesetConfigError +from peekaboo.config import PeekabooConfig, PeekabooConfigParser from peekaboo.sample import SampleFactory from peekaboo.ruleset import RuleResult, Result +from peekaboo.ruleset.engine import RulesetEngine from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \ FileTypeOnGreylistRule, CuckooAnalysisFailedRule from peekaboo.toolbox.cuckoo import CuckooReport @@ -53,33 +55,95 @@ from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError # pylint: enable=wrong-import-position -class TestConfig(unittest.TestCase): - """ Base class for various tests of the configuration module. """ - config_class = PeekabooConfig - testconfig = None +class CreatingConfigMixIn(object): + """ A class for adding config file creation logic to any other class. """ + def create_config(self, content): + """ Create a configuration file with defined content and pass it to the + parent constructor for parsing. """ + _, self.created_config_file = tempfile.mkstemp() + with open(self.created_config_file, 'w') as file_desc: + file_desc.write(content) + + def remove_config(self): + """ Remove the configuration file we've created. """ + os.unlink(self.created_config_file) + + +class CreatingConfigParser(PeekabooConfigParser, CreatingConfigMixIn): + """ A special kind of config parser that creates the configuration file + with defined content. """ + def __init__(self, content=''): + self.created_config_file = None + self.create_config(content) + PeekabooConfigParser.__init__(self, self.created_config_file) + + def __del__(self): + self.remove_config() + +class CreatingPeekabooConfig(PeekabooConfig, CreatingConfigMixIn): + """ A special kind of Peekaboo config that creates the configuration file + with defined content. """ + def __init__(self, content=''): + self.created_config_file = None + self.create_config(content) + PeekabooConfig.__init__(self, self.created_config_file) + + def __del__(self): + self.remove_config() + + +class TestConfigParser(unittest.TestCase): + """ Test a configuration with all values different from the defaults. """ @classmethod def setUpClass(cls): """ Set up common test case resources. """ - cls.config_file = tempfile.mktemp() - with open(cls.config_file, 'w') as file_desc: - file_desc.write(cls.testconfig) + cls.config = CreatingConfigParser('''#[rule0] - cls.config = cls.config_class(config_file=cls.config_file) +[rule1] +option1: foo +option2.1: bar +option2.2: baz - @classmethod - def tearDownClass(cls): - """ Clean up after the tests. """ - os.unlink(cls.config_file) +[rules] +rule.1 : rule1 +#rule.2 : rule2 +rule.3 : rule3 +''') + def test_2_values(self): + """ Test rule configuration values """ + with self.assertRaises(KeyError): + self.config['rule0'] + self.assertEqual(self.config['rule1']['option1'], 'foo') + self.assertEqual(self.config['rule1'].getlist('option2'), + ['bar', 'baz']) + + def test_3_type_mismatch(self): + """ Test correct error is thrown if the option type is mismatched """ + config = '''[rule1] +option1: foo +option1.1: bar''' -class TestDefaultConfig(TestConfig): + with self.assertRaisesRegexp( + PeekabooConfigException, + 'Option option1 in section rule1 is supposed to be a list but ' + 'given as individual setting'): + CreatingConfigParser(config).getlist('rule1', 'option1') + + + +class TestDefaultConfig(unittest.TestCase): """ Test a configuration of all defaults. """ - testconfig = '' + @classmethod + def setUpClass(cls): + """ Set up common test case resources. """ + cls.config = CreatingPeekabooConfig() def test_1_default_settings(self): """ Test a configuration with just defaults """ - self.assertEqual(self.config.config_file, self.config_file) + self.assertEqual( + self.config.config_file, self.config.created_config_file) self.assertEqual(self.config.user, 'peekaboo') self.assertEqual(self.config.group, 'peekaboo') self.assertEqual( @@ -114,9 +178,12 @@ class TestDefaultConfig(TestConfig): self.assertEqual(self.config.cluster_duplicate_check_interval, 60) -class TestValidConfig(TestConfig): +class TestValidConfig(unittest.TestCase): """ Test a configuration with all values different from the defaults. """ - testconfig = '''[global] + @classmethod + def setUpClass(cls): + """ Set up common test case resources. """ + cls.config = CreatingPeekabooConfig('''[global] user : user1 group : group1 socket_file : /socket/1 @@ -151,11 +218,12 @@ poll_interval : 51 instance_id: 12 stale_in_flight_threshold: 31 duplicate_check_interval: 61 -''' +''') def test_1_read_settings(self): """ Test reading of configuration settings from file """ - self.assertEqual(self.config.config_file, self.config_file) + self.assertEqual( + self.config.config_file, self.config.created_config_file) self.assertEqual(self.config.user, 'user1') self.assertEqual(self.config.group, 'group1') self.assertEqual(self.config.sock_file, '/socket/1') @@ -182,146 +250,44 @@ duplicate_check_interval: 61 self.assertEqual(self.config.cluster_duplicate_check_interval, 61) -class TestInvalidConfigBase(unittest.TestCase): +class TestInvalidConfig(unittest.TestCase): """ Various tests of invalid config files. """ - config_class = None - - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.config_file = tempfile.mktemp() - - def write_config(self, testconfig): - """ Helper method for writing out a test config file. """ - with open(self.config_file, 'w') as file_desc: - file_desc.write(testconfig) - def test_1_section_header(self): """ Test correct error is thrown if section header syntax is wrong """ - self.write_config('''[global[ -user: peekaboo''') with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file "%s" can not be parsed: File contains no ' - 'section headers' % self.config_file): - self.config_class(config_file=self.config_file) + PeekabooConfigException, + 'Configuration file ".*" can not be parsed: File contains no ' + 'section headers'): + CreatingPeekabooConfig('''[global[ +user: peekaboo''') def test_2_value_separator(self): """ Test correct error is thrown if the value separator is wrong """ - self.write_config('''[global] -user; peekaboo''') with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file "%s" can not be parsed: (File|Source) ' - 'contains parsing errors:' % self.config_file): - self.config_class(config_file=self.config_file) + PeekabooConfigException, + 'Configuration file ".*" can not be parsed: (File|Source) ' + 'contains parsing errors:'): + CreatingPeekabooConfig('''[global] +user; peekaboo''') def test_3_section_header(self): """ Test correct error is thrown if the config file is missing """ - try: - os.unlink(self.config_file) - except OSError: - pass + _, config_file = tempfile.mkstemp() + os.unlink(config_file) with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file "%s" can not be opened for reading: ' - r'\[Errno 2\] No such file or directory' % self.config_file): - self.config_class(config_file=self.config_file) - - @classmethod - def tearDownClass(cls): - """ Clean up after the tests. """ - try: - os.unlink(cls.config_file) - except OSError: - pass - - -class TestInvalidConfig(TestInvalidConfigBase): - """ Various tests of invalid config files. """ - config_class = PeekabooConfig + PeekabooConfigException, + 'Configuration file "%s" can not be opened for reading: ' + r'\[Errno 2\] No such file or directory' % config_file): + PeekabooConfig(config_file) - def test_50_unknown_loglevel(self): + def test_4_unknown_loglevel(self): """ Test with an unknown log level """ - self.write_config('''[logging] -log_level: FOO''') with self.assertRaisesRegexp( - PeekabooConfigException, - 'Unknown log level FOO'): - self.config_class(config_file=self.config_file) - - -class TestValidRulesetConfig(TestConfig): - """ Test a configuration with all values different from the defaults. """ - config_class = PeekabooRulesetConfig - testconfig = '''#[rule0] - -[rule1] -option1: foo -option2.1: bar -option2.2: baz - -[rule2] -enabled = false - -[rule3] -enabled: true - -[rule4] -enabled = on - -[rule5] -enabled: off -''' - - def test_1_enabled(self): - """ Test disabling of rules """ - self.assertEqual(self.config.rule_enabled('rule0'), True) - self.assertEqual(self.config.rule_enabled('rule1'), True) - self.assertEqual(self.config.rule_enabled('rule2'), False) - self.assertEqual(self.config.rule_enabled('rule3'), True) - self.assertEqual(self.config.rule_enabled('rule4'), True) - self.assertEqual(self.config.rule_enabled('rule5'), False) - - def test_2_values(self): - """ Test rule configuration values """ - self.assertEqual(self.config.rule_config('rule0'), None) - self.assertEqual(self.config.rule_config('rule1')['option1'], 'foo') - self.assertEqual( - self.config.rule_config('rule1')['option2'], ['bar', 'baz']) - - -class TestInvalidRulesetConfig(TestInvalidConfigBase): - """ Various tests of invalid ruleset config files. """ - config_class = PeekabooRulesetConfig - - def test_50_type_mismatch(self): - """ Test correct error is thrown if the option type is mismatched """ - self.write_config('''[rule1] -option1: foo -option1.1: bar''') - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Setting option1.1 in section rule1 specified as list as well ' - 'as individual setting'): - self.config_class(config_file=self.config_file) - -class PeekabooDummyConfig(object): - """ A dummy configuration for the test cases. """ - def __init__(self): - """ Initialize dummy configuration """ - self.job_hash_regex = r'/amavis/tmp/([^/]+)/parts/' - self.sample_base_dir = '/tmp' - - def get(self, option, default): - """ Return specific dummy settings. """ - config = { - 'whitelist':['text/plain', 'inode/x-empty'], - 'greylist' :['application/x-dosexec', 'application/msword', - 'application/vnd.ms-powerpoint'], - } - return config[option] + PeekabooConfigException, + 'Unknown log level FOO'): + CreatingPeekabooConfig('''[logging] +log_level: FOO''') class CreatingSampleFactory(SampleFactory): @@ -357,7 +323,7 @@ class TestDatabase(unittest.TestCase): def setUpClass(cls): """ Set up common test case resources. """ cls.test_db = os.path.abspath('./test.db') - cls.conf = PeekabooDummyConfig() + cls.conf = CreatingPeekabooConfig() cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db, instance_id=1, stale_in_flight_threshold=10) @@ -502,7 +468,7 @@ class TestSample(unittest.TestCase): def setUpClass(cls): """ Set up common test case resources. """ cls.test_db = os.path.abspath('./test.db') - cls.conf = PeekabooDummyConfig() + cls.conf = CreatingPeekabooConfig() cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db) cls.factory = CreatingSampleFactory( cuckoo=None, base_dir=cls.conf.sample_base_dir, @@ -629,6 +595,39 @@ class TestSample(unittest.TestCase): del cls.factory +class TestRulesetEngine(unittest.TestCase): + """ Unittests for the Ruleset Engine. """ + def test_no_rules_configured(self): + """ Test that correct error is shown if no rules are configured. """ + config = CreatingConfigParser() + with self.assertRaisesRegexp( + PeekabooRulesetConfigError, + r'No enabled rules found, check ruleset config.'): + RulesetEngine(ruleset_config=config, db_con=None) + + def test_unknown_rule_enabled(self): + """ Test that correct error is shown if an unknown rule is enabled. """ + config = CreatingConfigParser('''[rules] +rule.1: foo''') + with self.assertRaisesRegexp( + PeekabooRulesetConfigError, + r'Unknown rule\(s\) enabled: foo'): + RulesetEngine(ruleset_config=config, db_con=None) + + def test_invalid_type(self): + """ Test that correct error is shown if rule config option has wrong + type. """ + + config = CreatingConfigParser('''[rules] +rule.1: cuckoo_score + +[cuckoo_score] +higher_than: foo''') + with self.assertRaisesRegexp( + ValueError, + r'could not convert string to float: foo'): + RulesetEngine(ruleset_config=config, db_con=None) + class MimetypeSample(object): # pylint: disable=too-few-public-methods """ A dummy sample class that only contains a set of MIME types for testing whitelist and greylist rules with it. """ @@ -645,6 +644,21 @@ class CuckooReportSample(object): # pylint: disable=too-few-public-methods class TestRules(unittest.TestCase): """ Unittests for Rules. """ + @classmethod + def setUpClass(cls): + """ Set up common test case resources. """ + cls.config = CreatingConfigParser('''[file_type_on_whitelist] +whitelist.1 : text/plain + +[file_type_on_greylist] +greylist.1 : application/x-dosexec +greylist.2 : application/zip +greylist.3 : application/msword + +[cuckoo_analysis_failed] +failure.1: end of analysis reached! +success.1: analysis completed successfully''') + def test_rule_file_type_on_whitelist(self): """ Test whitelist rule. """ combinations = [ @@ -655,7 +669,7 @@ class TestRules(unittest.TestCase): [True, ['', 'asdfjkl', '93219843298']], [True, []], ] - rule = FileTypeOnWhitelistRule({'whitelist': ['text/plain']}) + rule = FileTypeOnWhitelistRule(self.config) for expected, types in combinations: result = rule.evaluate(MimetypeSample(types)) self.assertEqual(result.further_analysis, expected) @@ -671,42 +685,53 @@ class TestRules(unittest.TestCase): [False, ['', 'asdfjkl', '93219843298']], [True, []], ] - rule = FileTypeOnGreylistRule({ - 'greylist': [ - 'application/x-dosexec', - 'application/zip', - 'application/msword']}) + rule = FileTypeOnGreylistRule(self.config) for expected, types in combinations: result = rule.evaluate(MimetypeSample(types)) self.assertEqual(result.further_analysis, expected) def test_rule_analysis_failed(self): """ Test the Cuckoo analysis failed rule """ + # create some test samples + successful_sample = CuckooReportSample( + {'debug': {'cuckoo': ['analysis completed successfully']}}) + failed_sample = CuckooReportSample( + {'debug': {'cuckoo': ['analysis failed']}}) + reached_sample = CuckooReportSample( + {'debug': {'cuckoo': ['end of analysis reached!']}}) + everything_sample = CuckooReportSample( + {'debug': {'cuckoo': [ + 'end of analysis reached!', + 'analysis failed', + 'analysis completed successfully']}}) + # test defaults - rule = CuckooAnalysisFailedRule() - result = rule.evaluate(CuckooReportSample( - {'debug': {'cuckoo': ['analysis completed successfully']}})) + rule = CuckooAnalysisFailedRule(CreatingConfigParser('')) + result = rule.evaluate(successful_sample) self.assertEqual(result.result, Result.unknown) self.assertEqual(result.further_analysis, True) - result = rule.evaluate(CuckooReportSample( - {'debug': {'cuckoo': ['analysis failed']}})) + result = rule.evaluate(reached_sample) self.assertEqual(result.result, Result.failed) self.assertEqual(result.further_analysis, False) + result = rule.evaluate(failed_sample) + self.assertEqual(result.result, Result.failed) + self.assertEqual(result.further_analysis, False) + result = rule.evaluate(everything_sample) + self.assertEqual(result.result, Result.unknown) + self.assertEqual(result.further_analysis, True) # test with config - rule = CuckooAnalysisFailedRule({ - 'failure': ['end of analysis reached!'], - 'success': ['analysis completed successfully']}) - result = rule.evaluate(CuckooReportSample( - {'debug': {'cuckoo': ['analysis completed successfully']}})) + rule = CuckooAnalysisFailedRule(self.config) + result = rule.evaluate(successful_sample) self.assertEqual(result.result, Result.unknown) self.assertEqual(result.further_analysis, True) - result = rule.evaluate(CuckooReportSample( - {'debug': {'cuckoo': ['end of analysis reached!']}})) + result = rule.evaluate(reached_sample) + self.assertEqual(result.result, Result.failed) + self.assertEqual(result.further_analysis, False) + result = rule.evaluate(failed_sample) self.assertEqual(result.result, Result.failed) self.assertEqual(result.further_analysis, False) - result = rule.evaluate(CuckooReportSample( - {'debug': {'cuckoo': ['analysis failed']}})) + result = rule.evaluate(everything_sample) self.assertEqual(result.result, Result.failed) self.assertEqual(result.further_analysis, False) @@ -727,13 +752,13 @@ def main(): gettext.NullTranslations().install() suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestConfigParser)) suite.addTest(unittest.makeSuite(TestDefaultConfig)) suite.addTest(unittest.makeSuite(TestValidConfig)) suite.addTest(unittest.makeSuite(TestInvalidConfig)) - suite.addTest(unittest.makeSuite(TestValidRulesetConfig)) - suite.addTest(unittest.makeSuite(TestInvalidRulesetConfig)) suite.addTest(unittest.makeSuite(TestSample)) suite.addTest(unittest.makeSuite(TestDatabase)) + suite.addTest(unittest.makeSuite(TestRulesetEngine)) suite.addTest(unittest.makeSuite(TestRules)) # TODO: We need more tests!!! |