summaryrefslogtreecommitdiffstats
path: root/tests/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test.py')
-rwxr-xr-xtests/test.py990
1 files changed, 990 insertions, 0 deletions
diff --git a/tests/test.py b/tests/test.py
new file mode 100755
index 0000000..39318fb
--- /dev/null
+++ b/tests/test.py
@@ -0,0 +1,990 @@
+#!/usr/bin/env python
+
+###############################################################################
+# #
+# Peekaboo Extended Email Attachment Behavior Observation Owl #
+# #
+# test.py #
+###############################################################################
+# #
+# Copyright (C) 2016-2019 science + computing ag #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or (at #
+# your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, but #
+# WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
+# General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+""" The testsuite. """
+
+from future.builtins import super # pylint: disable=wrong-import-order
+
+import gettext
+import sys
+import os
+import tempfile
+import logging
+import shutil
+import unittest
+from datetime import datetime, timedelta
+
+
+# Add Peekaboo to PYTHONPATH
+# pylint: disable=wrong-import-position
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from peekaboo.exceptions import PeekabooConfigException, \
+ PeekabooRulesetConfigError
+from peekaboo.config import PeekabooConfig, PeekabooConfigParser
+from peekaboo.sample import SampleFactory
+from peekaboo.ruleset import RuleResult, Result
+from peekaboo.ruleset.engine import RulesetEngine
+from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \
+ FileTypeOnGreylistRule, CuckooAnalysisFailedRule, \
+ KnownRule, FileLargerThanRule, CuckooEvilSigRule, \
+ CuckooScoreRule, RequestsEvilDomainRule, FinalRule, \
+ OfficeMacroRule, OfficeMacroWithSuspiciousKeyword
+from peekaboo.toolbox.cuckoo import CuckooReport
+from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError
+# pylint: enable=wrong-import-position
+
+
+class CreatingConfigMixIn(object):
+ """ A class for adding config file creation logic to any other class. """
+ def create_config(self, content):
+ """ Create a configuration file with defined content and pass it to the
+ parent constructor for parsing. """
+ _, self.created_config_file = tempfile.mkstemp()
+ with open(self.created_config_file, 'w') as file_desc:
+ file_desc.write(content)
+
+ def remove_config(self):
+ """ Remove the configuration file we've created. """
+ os.unlink(self.created_config_file)
+
+
+class CreatingConfigParser(PeekabooConfigParser, CreatingConfigMixIn):
+ """ A special kind of config parser that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfigParser.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+
+class CreatingPeekabooConfig(PeekabooConfig, CreatingConfigMixIn):
+ """ A special kind of Peekaboo config that creates the configuration file
+ with defined content. """
+ def __init__(self, content=''):
+ self.created_config_file = None
+ self.create_config(content)
+ PeekabooConfig.__init__(self, self.created_config_file)
+
+ def __del__(self):
+ self.remove_config()
+
+
+class TestConfigParser(unittest.TestCase):
+ """ Test a configuration with all values different from the defaults. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingConfigParser('''#[rule0]
+
+[rule1]
+option1: foo
+option2.1: bar
+option2.2: baz
+
+[rules]
+rule.1 : rule1
+#rule.2 : rule2
+rule.3 : rule3
+''')
+
+ def test_2_values(self):
+ """ Test rule configuration values """
+ with self.assertRaises(KeyError):
+ self.config['rule0']
+ self.assertEqual(self.config['rule1']['option1'], 'foo')
+ self.assertEqual(self.config['rule1'].getlist('option2'),
+ ['bar', 'baz'])
+
+ def test_3_type_mismatch(self):
+ """ Test correct error is thrown if the option type is mismatched """
+ config = '''[rule1]
+option1: foo
+option1.1: bar'''
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Option option1 in section rule1 is supposed to be a list but '
+ 'given as individual setting'):
+ CreatingConfigParser(config).getlist('rule1', 'option1')
+
+
+
+class TestDefaultConfig(unittest.TestCase):
+ """ Test a configuration of all defaults. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig()
+
+ def test_1_default_settings(self):
+ """ Test a configuration with just defaults """
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
+ self.assertEqual(self.config.user, 'peekaboo')
+ self.assertEqual(self.config.group, 'peekaboo')
+ self.assertEqual(
+ self.config.sock_file, '/var/run/peekaboo/peekaboo.sock')
+ self.assertEqual(
+ self.config.pid_file, '/var/run/peekaboo/peekaboo.pid')
+ self.assertEqual(self.config.interpreter, '/usr/bin/python2 -u')
+ self.assertEqual(self.config.worker_count, 3)
+ self.assertEqual(self.config.sample_base_dir, '/tmp')
+ self.assertEqual(
+ self.config.job_hash_regex, '/amavis/tmp/([^/]+)/parts/')
+ self.assertEqual(self.config.use_debug_module, False)
+ self.assertEqual(self.config.keep_mail_data, False)
+ self.assertEqual(
+ self.config.processing_info_dir,
+ '/var/lib/peekaboo/malware_reports')
+ self.assertEqual(
+ self.config.ruleset_config, '/opt/peekaboo/etc/ruleset.conf')
+ self.assertEqual(self.config.log_level, logging.INFO)
+ self.assertEqual(
+ self.config.log_format, '%(asctime)s - %(name)s - '
+ '(%(threadName)s) - %(levelname)s - %(message)s')
+ self.assertEqual(self.config.db_url, 'sqlite:////var/lib/peekaboo/peekaboo.db')
+ self.assertEqual(self.config.cuckoo_mode, 'api')
+ self.assertEqual(self.config.cuckoo_exec, '/opt/cuckoo/bin/cuckoo')
+ self.assertEqual(self.config.cuckoo_submit, '/opt/cuckoo/bin/cuckoo submit')
+ self.assertEqual(self.config.cuckoo_storage, '/var/lib/peekaboo/.cuckoo/storage')
+ self.assertEqual(self.config.cuckoo_url, 'http://127.0.0.1:8090')
+ self.assertEqual(self.config.cuckoo_poll_interval, 5)
+ self.assertEqual(self.config.cluster_instance_id, 0)
+ self.assertEqual(self.config.cluster_stale_in_flight_threshold, 15*60)
+ self.assertEqual(self.config.cluster_duplicate_check_interval, 60)
+
+
+class TestValidConfig(unittest.TestCase):
+ """ Test a configuration with all values different from the defaults. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingPeekabooConfig('''[global]
+user : user1
+group : group1
+socket_file : /socket/1
+pid_file : /pid/1
+interpreter : /inter/1
+worker_count : 18
+sample_base_dir : /tmp/1
+job_hash_regex : /var/2
+use_debug_module : yes
+keep_mail_data : yes
+processing_info_dir : /var/3
+
+[ruleset]
+config : /rules/1
+
+[logging]
+log_level : DEBUG
+log_format : format%%foo1
+
+[db]
+url : sqlite:////peekaboo.db1
+
+[cuckoo]
+mode : api1
+exec : /cuckoo/1
+submit : /submit/1
+storage_path : /storage/1
+url : http://api:1111
+poll_interval : 51
+
+[cluster]
+instance_id: 12
+stale_in_flight_threshold: 31
+duplicate_check_interval: 61
+''')
+
+ def test_1_read_settings(self):
+ """ Test reading of configuration settings from file """
+ self.assertEqual(
+ self.config.config_file, self.config.created_config_file)
+ self.assertEqual(self.config.user, 'user1')
+ self.assertEqual(self.config.group, 'group1')
+ self.assertEqual(self.config.sock_file, '/socket/1')
+ self.assertEqual(self.config.pid_file, '/pid/1')
+ self.assertEqual(self.config.interpreter, '/inter/1')
+ self.assertEqual(self.config.worker_count, 18)
+ self.assertEqual(self.config.sample_base_dir, '/tmp/1')
+ self.assertEqual(self.config.job_hash_regex, '/var/2')
+ self.assertEqual(self.config.use_debug_module, True)
+ self.assertEqual(self.config.keep_mail_data, True)
+ self.assertEqual(self.config.processing_info_dir, '/var/3')
+ self.assertEqual(self.config.ruleset_config, '/rules/1')
+ self.assertEqual(self.config.log_level, logging.DEBUG)
+ self.assertEqual(self.config.log_format, 'format%foo1')
+ self.assertEqual(self.config.db_url, 'sqlite:////peekaboo.db1')
+ self.assertEqual(self.config.cuckoo_mode, 'api1')
+ self.assertEqual(self.config.cuckoo_exec, '/cuckoo/1')
+ self.assertEqual(self.config.cuckoo_submit, '/submit/1')
+ self.assertEqual(self.config.cuckoo_storage, '/storage/1')
+ self.assertEqual(self.config.cuckoo_url, 'http://api:1111')
+ self.assertEqual(self.config.cuckoo_poll_interval, 51)
+ self.assertEqual(self.config.cluster_instance_id, 12)
+ self.assertEqual(self.config.cluster_stale_in_flight_threshold, 31)
+ self.assertEqual(self.config.cluster_duplicate_check_interval, 61)
+
+
+class TestInvalidConfig(unittest.TestCase):
+ """ Various tests of invalid config files. """
+ def test_1_section_header(self):
+ """ Test correct error is thrown if section header syntax is wrong """
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: File contains no '
+ 'section headers'):
+ CreatingPeekabooConfig('''[global[
+user: peekaboo''')
+
+ def test_2_value_separator(self):
+ """ Test correct error is thrown if the value separator is wrong """
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Configuration file ".*" can not be parsed: (File|Source) '
+ 'contains parsing errors:'):
+ CreatingPeekabooConfig('''[global]
+user; peekaboo''')
+
+ def test_3_section_header(self):
+ """ Test correct error is thrown if the config file is missing """
+ _, config_file = tempfile.mkstemp()
+ os.unlink(config_file)
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Configuration file "%s" can not be opened for reading: '
+ r'\[Errno 2\] No such file or directory' % config_file):
+ PeekabooConfig(config_file)
+
+ def test_4_unknown_section(self):
+ """ Test correct error is thrown if an unknown section name is given.
+ """
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown section\(s\) found in config: globl'):
+ CreatingPeekabooConfig('''[globl]''')
+
+ def test_5_unknown_option(self):
+ """ Test correct error is thrown if an unknown option name is given.
+ """
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown config option\(s\) found in section global: foo'):
+ CreatingPeekabooConfig('''[global]
+foo: bar''')
+
+ def test_6_unknown_loglevel(self):
+ """ Test with an unknown log level """
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ 'Unknown log level FOO'):
+ CreatingPeekabooConfig('''[logging]
+log_level: FOO''')
+
+
+class CreatingSampleFactory(SampleFactory):
+ """ A special kind of sample factory that creates the sample files with
+ defined content in a temporary directory and cleans up after itself. """
+ def __init__(self, *args, **kwargs):
+ self.directory = tempfile.mkdtemp()
+ super().__init__(*args, **kwargs)
+
+ def create_sample(self, relpath, content, *args, **kwargs):
+ """ Make a new sample with defined base name and content in the
+ previously created temporary directory. The given basename can
+ optionally be a path relative to the temporary directory and the
+ subdirectory will be created automatically. """
+ file_path = os.path.join(self.directory, relpath)
+ subdir = os.path.dirname(file_path)
+ if subdir != self.directory:
+ os.makedirs(subdir)
+ with open(file_path, 'w') as file_desc:
+ file_desc.write(content)
+
+ return super().make_sample(file_path, *args, **kwargs)
+
+ def __del__(self):
+ """ Remove the sample files we've created and the temporary directory
+ itself. """
+ shutil.rmtree(self.directory)
+
+
+class TestDatabase(unittest.TestCase):
+ """ Unittests for Peekaboo's database module. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.test_db = os.path.abspath('./test.db')
+ cls.conf = CreatingPeekabooConfig()
+ cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db,
+ instance_id=1,
+ stale_in_flight_threshold=10)
+ cls.no_cluster_db = PeekabooDatabase('sqlite:///' + cls.test_db,
+ instance_id=0)
+ cls.factory = CreatingSampleFactory(
+ cuckoo=None, base_dir=cls.conf.sample_base_dir,
+ job_hash_regex=cls.conf.job_hash_regex, keep_mail_data=False,
+ processing_info_dir=None)
+ cls.sample = cls.factory.create_sample('test.py', 'test')
+ result = RuleResult('Unittest',
+ Result.failed,
+ 'This is just a test case.',
+ further_analysis=False)
+ cls.sample.add_rule_result(result)
+
+ def test_1_analysis_save(self):
+ """ Test saving of analysis results. """
+ self.db_con.analysis_save(self.sample)
+
+ def test_2_sample_info_fetch(self):
+ """ Test retrieval of analysis results. """
+ sample_info = self.db_con.sample_info_fetch(self.sample)
+ self.assertEqual(sample_info.sha256sum, self.sample.sha256sum)
+ self.assertEqual(sample_info.result, Result.failed)
+ self.assertEqual(sample_info.reason, 'This is just a test case.')
+
+ def test_5_in_flight_no_cluster(self):
+ """ Test that marking of samples as in-flight on a non-cluster-enabled
+ database are no-ops. """
+ self.assertTrue(self.no_cluster_db.mark_sample_in_flight(self.sample))
+ self.assertTrue(self.no_cluster_db.mark_sample_in_flight(self.sample))
+ self.assertIsNone(self.no_cluster_db.clear_sample_in_flight(self.sample))
+ self.assertIsNone(self.no_cluster_db.clear_sample_in_flight(self.sample))
+ self.assertIsNone(self.no_cluster_db.clear_in_flight_samples())
+
+ def test_6_in_flight_cluster(self):
+ """ Test marking of samples as in-flight. """
+ self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1))
+ # re-locking the same sample should fail
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertIsNone(self.db_con.clear_sample_in_flight(self.sample, 1))
+ # unlocking twice should fail
+ self.assertRaisesRegexp(
+ PeekabooDatabaseError, "Unexpected inconsistency: Sample .* not "
+ "recoreded as in-flight upon clearing flag",
+ self.db_con.clear_sample_in_flight, self.sample, 1)
+
+ def test_7_in_flight_clear(self):
+ """ Test clearing of in-flight markers. """
+ sample2 = self.factory.create_sample('foo.pyc', 'foo')
+ sample3 = self.factory.create_sample('bar.pyc', 'bar')
+
+ self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # should only clear samples of instance 1
+ self.assertIsNone(self.db_con.clear_in_flight_samples(1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # should only clear samples of instance 2
+ self.assertIsNone(self.db_con.clear_in_flight_samples(2))
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # should clear all samples
+ self.assertIsNone(self.db_con.clear_in_flight_samples(-1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # should be a no-op because there will never be any entries of instance
+ # 0
+ self.assertIsNone(self.db_con.clear_in_flight_samples(0))
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # should be a no-op because this database is not cluster-enabled
+ self.assertIsNone(self.no_cluster_db.clear_in_flight_samples())
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2))
+
+ # leave as found
+ self.assertIsNone(self.db_con.clear_in_flight_samples(-1))
+
+ def test_8_stale_in_flight(self):
+ """ Test the cleaning of stale in-flight markers. """
+ stale = datetime.utcnow() - timedelta(seconds=20)
+ self.assertTrue(self.db_con.mark_sample_in_flight(
+ self.sample, 1, stale))
+ sample2 = self.factory.create_sample('baz.pyc', 'baz')
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1))
+
+ # should not clear anything because the database is not cluster-enabled
+ self.assertTrue(self.no_cluster_db.clear_stale_in_flight_samples())
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+
+ # should clear sample marker because it is stale but not sample2
+ self.assertTrue(self.db_con.clear_stale_in_flight_samples())
+ self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+
+ # should not clear anything because all markers are fresh
+ self.assertFalse(self.db_con.clear_stale_in_flight_samples())
+ self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1))
+ self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1))
+
+ # set up new constellation
+ self.assertIsNone(self.db_con.clear_in_flight_samples(-1))
+ self.assertTrue(self.db_con.mark_sample_in_flight(
+ self.sample, 1, stale))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1, stale))
+
+ # should clear all markers because all are stale
+ self.assertTrue(self.db_con.clear_stale_in_flight_samples())
+ self.assertTrue(self.db_con.mark_sample_in_flight(
+ self.sample, 1, stale))
+ self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1, stale))
+
+ # leave as found
+ self.assertTrue(self.db_con.clear_stale_in_flight_samples())
+
+ @classmethod
+ def tearDownClass(cls):
+ """ Clean up after the tests. """
+ os.unlink(cls.test_db)
+ # test framework doesn't seem to give up reference so that __del__ is
+ # never run
+ del cls.factory
+
+
+class TestSample(unittest.TestCase):
+ """ Unittests for Samples. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.test_db = os.path.abspath('./test.db')
+ cls.conf = CreatingPeekabooConfig()
+ cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db)
+ cls.factory = CreatingSampleFactory(
+ cuckoo=None, base_dir=cls.conf.sample_base_dir,
+ job_hash_regex=cls.conf.job_hash_regex, keep_mail_data=False,
+ processing_info_dir=None)
+ cls.sample = cls.factory.create_sample('test.py', 'test')
+
+ def test_job_hash_regex(self):
+ """ Test extraction of the job hash from the working directory path.
+ """
+ # class sample has no job hash in path and therefore generates one
+ # itself
+ self.assertIn('peekaboo-run_analysis', self.sample.job_hash)
+
+ # a new sample with a job hash in it's path should return it
+ job_hash = 'amavis-20170831T132736-07759-iSI0rJ4b'
+ path_with_job_hash = 'd/var/lib/amavis/tmp/%s/parts/file' % job_hash
+ sample = self.factory.make_sample(path_with_job_hash, 'file')
+ self.assertEqual(job_hash, sample.job_hash,
+ 'Job hash regex is not working')
+
+ legacy_factory = CreatingSampleFactory(
+ cuckoo=None, base_dir=self.conf.sample_base_dir,
+ job_hash_regex=r'/var/lib/amavis/tmp/([^/]+)/parts.*',
+ keep_mail_data=False, processing_info_dir=None)
+ sample = legacy_factory.make_sample(path_with_job_hash, 'file')
+ self.assertEqual(job_hash, sample.job_hash,
+ 'Job hash regex is not working')
+
+ def test_3_sample_attributes(self):
+ """ Test the various sample attribute getters. """
+ self.assertEqual(self.sample.file_path,
+ os.path.join(self.factory.directory, 'test.py'))
+ self.assertEqual(self.sample.filename, 'test.py')
+ self.assertEqual(self.sample.file_extension, 'py')
+ self.assertTrue(set(['text/x-python']).issubset(self.sample.mimetypes))
+ self.assertEqual(
+ self.sample.sha256sum,
+ '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08')
+ self.assertEqual(self.sample.job_id, -1)
+ self.assertEqual(self.sample.result, Result.unchecked)
+ self.assertEqual(self.sample.reason, None)
+ self.assertRegexpMatches(
+ self.sample.peekaboo_report[0],
+ 'File "%s" is considered "unchecked"'
+ % self.sample.filename)
+ self.assertEqual(self.sample.cuckoo_report, None)
+ self.assertEqual(self.sample.done, False)
+ self.assertEqual(self.sample.submit_path, None)
+ self.assertEqual(self.sample.file_size, 4)
+
+ def test_4_initialised_sample_attributes(self):
+ """ Test the various sample attributes of an initialised sample. """
+ self.sample.init()
+ self.assertEqual(self.sample.file_path,
+ os.path.join(self.factory.directory, 'test.py'))
+ self.assertEqual(self.sample.filename, 'test.py')
+ self.assertEqual(self.sample.file_extension, 'py')
+ self.assertTrue(set(['text/x-python']).issubset(self.sample.mimetypes))
+ self.assertEqual(
+ self.sample.sha256sum,
+ '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08')
+ self.assertEqual(self.sample.job_id, -1)
+ self.assertEqual(self.sample.result, Result.unchecked)
+ self.assertEqual(self.sample.reason, None)
+ self.assertRegexpMatches(
+ self.sample.peekaboo_report[0], 'File "%s" %s is being analyzed'
+ % (self.sample.filename, self.sample.sha256sum))
+ self.assertRegexpMatches(
+ self.sample.peekaboo_report[1],
+ 'File "%s" is considered "unchecked"'
+ % self.sample.filename)
+ self.assertEqual(self.sample.cuckoo_report, None)
+ self.assertEqual(self.sample.done, False)
+ self.assertRegexpMatches(
+ self.sample.submit_path, '/%s.py$' % self.sample.sha256sum)
+ self.assertEqual(self.sample.file_size, 4)
+
+ def test_5_mark_done(self):
+ """ Test the marking of a sample as done. """
+ self.sample.mark_done()
+ self.assertEqual(self.sample.done, True)
+
+ def test_6_add_rule_result(self):
+ """ Test the adding of a rule result. """
+ reason = 'This is just a test case.'
+ result = RuleResult('Unittest', Result.failed,
+ reason,
+ further_analysis=False)
+ self.sample.add_rule_result(result)
+ self.assertEqual(self.sample.result, Result.failed)
+ self.assertEqual(self.sample.reason, reason)
+
+ def test_sample_attributes_with_meta_info(self):
+ """ Test use of optional meta data. """
+ sample = self.factory.make_sample(
+ 'test.pyc', metainfo={
+ 'full_name': '/tmp/test.pyc',
+ 'name_declared': 'test.pyc',
+ 'type_declared': 'application/x-bytecode.python',
+ 'type_long': 'application/x-python-bytecode',
+ 'type_short': 'pyc',
+ 'size': '200'})
+ self.assertEqual(sample.file_extension, 'pyc')
+
+ def test_sample_without_suffix(self):
+ """ Test extraction of file extension from declared name. """
+ sample = self.factory.make_sample(
+ 'junk', metainfo={
+ 'full_name': '/tmp/junk',
+ 'name_declared': 'Report.docx',
+ 'type_declared': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
+ 'type_long': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
+ 'type_short': 'docx',
+ 'size': '212'})
+ self.assertEqual(sample.file_extension, 'docx')
+
+ @classmethod
+ def tearDownClass(cls):
+ """ Clean up after the tests. """
+ os.unlink(cls.test_db)
+ del cls.factory
+
+
+class TestRulesetEngine(unittest.TestCase):
+ """ Unittests for the Ruleset Engine. """
+ def test_no_rules_configured(self):
+ """ Test that correct error is shown if no rules are configured. """
+ config = CreatingConfigParser()
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'No enabled rules found, check ruleset config.'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_unknown_rule_enabled(self):
+ """ Test that correct error is shown if an unknown rule is enabled. """
+ config = CreatingConfigParser('''[rules]
+rule.1: foo''')
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Unknown rule\(s\) enabled: foo'):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_invalid_type(self):
+ """ Test that correct error is shown if rule config option has wrong
+ type. """
+
+ config = CreatingConfigParser('''[rules]
+rule.1: cuckoo_score
+
+[cuckoo_score]
+higher_than: foo''')
+ with self.assertRaisesRegexp(
+ ValueError,
+ r"could not convert string to float: '?foo'?"):
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+ def test_disabled_config(self):
+ """ Test that no error is shown if disabled rule has config. """
+
+ config = CreatingConfigParser('''[rules]
+rule.1: known
+#rule.2: cuckoo_score
+
+[cuckoo_score]
+higher_than: 4.0''')
+ RulesetEngine(ruleset_config=config, db_con=None)
+
+
+class MimetypeSample(object): # pylint: disable=too-few-public-methods
+ """ A dummy sample class that only contains a set of MIME types for testing
+ whitelist and greylist rules with it. """
+ def __init__(self, types):
+ # don't even need to make it a property
+ self.mimetypes = set(types)
+
+
+class CuckooReportSample(object): # pylint: disable=too-few-public-methods
+ """ A dummy sample that only contains a configurable cuckoo report. """
+ def __init__(self, report):
+ self.cuckoo_report = CuckooReport(report)
+
+
+class TestRules(unittest.TestCase):
+ """ Unittests for Rules. """
+ @classmethod
+ def setUpClass(cls):
+ """ Set up common test case resources. """
+ cls.config = CreatingConfigParser('''[file_type_on_whitelist]
+whitelist.1 : text/plain
+
+[file_type_on_greylist]
+greylist.1 : application/x-dosexec
+greylist.2 : application/zip
+greylist.3 : application/msword
+
+[cuckoo_analysis_failed]
+failure.1: end of analysis reached!
+success.1: analysis completed successfully''')
+
+ def test_config_known(self): # pylint: disable=no-self-use
+ """ Test the known rule configuration. """
+ config = '''[known]
+unknown : baz'''
+ # there is no exception here since empty config is acceptable
+ KnownRule(CreatingConfigParser())
+ # there is no exception here since the known rule simply does
+ # not look at the configuration at all - maybe we should have a
+ # 'unknown section' error here
+ KnownRule(CreatingConfigParser(config))
+
+ def test_config_file_larger_than(self):
+ """ Test the file larger than rule configuration. """
+ config = '''[file_larger_than]
+bytes : 10
+unknown : baz'''
+ # there is no exception here since empty config is acceptable
+ FileLargerThanRule(CreatingConfigParser())
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown config option\(s\) found in section '
+ r'file_larger_than: unknown'):
+ FileLargerThanRule(CreatingConfigParser(config))
+
+ def test_rule_file_type_on_whitelist(self):
+ """ Test whitelist rule. """
+ combinations = [
+ [False, ['text/plain']],
+ [True, ['application/vnd.ms-excel']],
+ [True, ['text/plain', 'application/vnd.ms-excel']],
+ [True, ['image/png', 'application/zip', 'application/vnd.ms-excel']],
+ [True, ['', 'asdfjkl', '93219843298']],
+ [True, []],
+ ]
+ rule = FileTypeOnWhitelistRule(self.config)
+ for expected, types in combinations:
+ result = rule.evaluate(MimetypeSample(types))
+ self.assertEqual(result.further_analysis, expected)
+
+ def test_rule_office_ole(self):
+ """ Test rule office_ole. """
+ config = '''[office_macro_with_suspicious_keyword]
+ keyword.1 : AutoOpen
+ keyword.2 : AutoClose
+ keyword.3 : suSPi.ious'''
+ rule = OfficeMacroWithSuspiciousKeyword(CreatingConfigParser(config))
+ # sample factory to create samples from real files
+ factory1 = SampleFactory(
+ cuckoo=None, base_dir=None, job_hash_regex=None,
+ keep_mail_data=False, processing_info_dir=None)
+ # sampe factory to create samples with defined content
+ factory2 = CreatingSampleFactory(
+ cuckoo=None, base_dir=None, job_hash_regex=None,
+ keep_mail_data=False, processing_info_dir=None)
+ tests_data_dir = os.path.dirname(os.path.abspath(__file__))+"/test-data"
+
+ combinations = [
+ # no office document file extension
+ [Result.unknown, factory2.make_sample('test.nodoc', 'test')],
+ # test with empty file
+ [Result.unknown, factory1.make_sample(tests_data_dir+'/office/empty.doc')],
+ # office document with 'suspicious' in macro code
+ [Result.bad, factory1.make_sample(tests_data_dir+'/office/suspiciousMacro.doc')],
+ # test with blank word doc
+ [Result.unknown, factory1.make_sample(tests_data_dir+'/office/blank.doc')],
+ # test with legitimate macro
+ [Result.unknown, factory1.make_sample(tests_data_dir+'/office/legitmacro.xls')]
+ ]
+ for expected, sample in combinations:
+ result = rule.evaluate(sample)
+ self.assertEqual(result.result, expected)
+
+ # test if macro present
+ rule = OfficeMacroRule(CreatingConfigParser(config))
+ combinations = [
+ # no office document file extension
+ [Result.unknown, factory2.make_sample('test.nodoc', 'test')],
+ # test with empty file
+ [Result.unknown, factory1.make_sample(tests_data_dir+'/office/empty.doc')],
+ # office document with 'suspicious' in macro code
+ [Result.bad, factory1.make_sample(tests_data_dir+'/office/suspiciousMacro.doc')],
+ # test with blank word doc
+ [Result.unknown, factory1.make_sample(tests_data_dir+'/office/blank.doc')],
+ # test with legitimate macro
+ [Result.bad, factory1.make_sample(tests_data_dir+'/office/legitmacro.xls')]
+ ]
+ for expected, sample in combinations:
+ result = rule.evaluate(sample)
+ self.assertEqual(result.result, expected)
+
+ def test_config_file_type_on_whitelist(self):
+ """ Test whitelist rule configuration. """
+ config = '''[file_type_on_whitelist]
+whitelist.1 : foo/bar
+unknown : baz'''
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Empty whitelist, check file_type_on_whitelist rule config.'):
+ FileTypeOnWhitelistRule(CreatingConfigParser())
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown config option\(s\) found in section '
+ r'file_type_on_whitelist: unknown'):
+ FileTypeOnWhitelistRule(CreatingConfigParser(config))
+
+ def test_rule_file_type_on_greylist(self):
+ """ Test greylist rule. """
+ combinations = [
+ [False, ['text/plain']],
+ [True, ['application/msword']],
+ [True, ['text/plain', 'application/x-dosexec']],
+ [True, ['image/png', 'application/zip', 'application/vnd.ms-excel',
+ 'application/vnd.ms-powerpoint']],
+ [False, ['', 'asdfjkl', '93219843298']],
+ [True, []],
+ ]
+ rule = FileTypeOnGreylistRule(self.config)
+ for expected, types in combinations:
+ result = rule.evaluate(MimetypeSample(types))
+ self.assertEqual(result.further_analysis, expected)
+
+ def test_config_file_type_on_greylist(self):
+ """ Test greylist rule configuration. """
+ config = '''[file_type_on_greylist]
+greylist.1 : foo/bar
+unknown : baz'''
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Empty greylist, check file_type_on_greylist rule config.'):
+ FileTypeOnGreylistRule(CreatingConfigParser())
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown config option\(s\) found in section '
+ r'file_type_on_greylist: unknown'):
+ FileTypeOnGreylistRule(CreatingConfigParser(config))
+
+ def test_rule_analysis_failed(self):
+ """ Test the Cuckoo analysis failed rule """
+ # create some test samples
+ successful_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis completed successfully']}})
+ failed_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['analysis failed']}})
+ reached_sample = CuckooReportSample(
+ {'debug': {'cuckoo': ['end of analysis reached!']}})
+ everything_sample = CuckooReportSample(
+ {'debug': {'cuckoo': [
+ 'end of analysis reached!',
+ 'analysis failed',
+ 'analysis completed successfully']}})
+
+ # test defaults
+ rule = CuckooAnalysisFailedRule(CreatingConfigParser(''))
+ result = rule.evaluate(successful_sample)
+ self.assertEqual(result.result, Result.unknown)
+ self.assertEqual(result.further_analysis, True)
+ result = rule.evaluate(reached_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(everything_sample)
+ self.assertEqual(result.result, Result.unknown)
+ self.assertEqual(result.further_analysis, True)
+
+ # test with config
+ rule = CuckooAnalysisFailedRule(self.config)
+ result = rule.evaluate(successful_sample)
+ self.assertEqual(result.result, Result.unknown)
+ self.assertEqual(result.further_analysis, True)
+ result = rule.evaluate(reached_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(failed_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+ result = rule.evaluate(everything_sample)
+ self.assertEqual(result.result, Result.failed)
+ self.assertEqual(result.further_analysis, False)
+
+ def test_config_evil_sig(self):
+ """ Test the Cuckoo evil signature rule configuration. """
+ config = '''[cuckoo_evil_sig]
+signature.1 : foo
+unknown : baz'''
+ with self.assertRaisesRegexp(
+ PeekabooRulesetConfigError,
+ r'Empty bad signature list, check cuckoo_evil_sig rule '
+ r'config.'):
+ CuckooEvilSigRule(CreatingConfigParser())
+
+ with self.assertRaisesRegexp(
+ PeekabooConfigException,
+ r'Unknown config option\(s\) found in section '
+ r'cuckoo_evil_sig: unknown'):
+ CuckooEvilSigRule(CreatingConfigParser(config))
+
+ def test_config_score(self):
+ """ Test the Cuckoo score rul