From 18d0ac2148d76aaf1dfe659c1a11cf74021ebae9 Mon Sep 17 00:00:00 2001 From: Felix Bauer Date: Tue, 20 Aug 2019 10:47:36 +0200 Subject: Add an additional static test to the ruleset to check for office macros with suspicious keywords (#87) The new rule is deactivated by default, it uses oletools to check SUSPICIOUSKEYWORDS in the macro code of office documents. Ole is now an analyser module much like Cuckoo inside the toolbox. All logic has been moved there. Sample is merely there for caching. Evaluation of Rules uses the toolbox and report back. Regex based matching of MS office files for configurable keywords. Also detection of macros has been improved. Tests for correct handling of none office file extension Tests for correct handling of empty file with correct extension Tests for correct detection of office file with suspicious macro Tests for correct pass of blank office document Tests for correct handling of empty word doc. Tests for correct non detection of Excel file with macro. --- peekaboo/config.py | 13 +- peekaboo/locale/de/LC_MESSAGES/peekaboo.mo | Bin 4074 -> 4392 bytes peekaboo/locale/de/LC_MESSAGES/peekaboo.po | 73 ++- peekaboo/locale/peekaboo.pot | 66 +- peekaboo/ruleset/engine.py | 9 +- peekaboo/ruleset/rules.py | 61 +- peekaboo/sample.py | 20 +- peekaboo/toolbox/ms_office.py | 61 -- peekaboo/toolbox/ole.py | 120 ++++ ruleset.conf.sample | 19 +- test.py | 940 --------------------------- tests/test-data/office/blank.doc | Bin 0 -> 22528 bytes tests/test-data/office/empty.doc | 0 tests/test-data/office/legitmacro.xls | Bin 0 -> 35840 bytes tests/test-data/office/suspiciousMacro.doc | Bin 0 -> 28672 bytes tests/test.py | 990 +++++++++++++++++++++++++++++ 16 files changed, 1292 insertions(+), 1080 deletions(-) delete mode 100644 peekaboo/toolbox/ms_office.py create mode 100644 peekaboo/toolbox/ole.py delete mode 100755 test.py create mode 100644 tests/test-data/office/blank.doc create mode 100644 tests/test-data/office/empty.doc create mode 100644 tests/test-data/office/legitmacro.xls create mode 100644 tests/test-data/office/suspiciousMacro.doc create mode 100755 tests/test.py diff --git a/peekaboo/config.py b/peekaboo/config.py index 5c182bb..af5b81f 100644 --- a/peekaboo/config.py +++ b/peekaboo/config.py @@ -41,6 +41,7 @@ class PeekabooConfigParser( # pylint: disable=too-many-ancestors exist or cannot be opened. """ LOG_LEVEL = object() RELIST = object() + IRELIST = object() def __init__(self, config_file): # super() does not work here because ConfigParser uses old-style @@ -114,7 +115,14 @@ class PeekabooConfigParser( # pylint: disable=too-many-ancestors self.lists[section][option] = value return value - def getrelist(self, section, option, raw=False, vars=None, fallback=None): + def getirelist(self, section, option, raw=False, vars=None, fallback=None, flags=None): + """ Special getter for lists of regular expressions that are compiled to match + case insesitive (IGNORECASE). Returns the compiled expression objects in a + list ready for matching and searching. + """ + return self.getrelist(section, option, raw=raw, vars=vars, fallback=fallback, flags=re.IGNORECASE) + + def getrelist(self, section, option, raw=False, vars=None, fallback=None, flags=0): """ Special getter for lists of regular expressions. Returns the compiled expression objects in a list ready for matching and searching. """ @@ -137,7 +145,7 @@ class PeekabooConfigParser( # pylint: disable=too-many-ancestors compiled_res = [] for regex in strlist: try: - compiled_res.append(re.compile(regex)) + compiled_res.append(re.compile(regex, flags)) except (ValueError, TypeError) as error: raise PeekabooConfigException( 'Failed to compile regular expression "%s" (section %s, ' @@ -203,6 +211,7 @@ class PeekabooConfigParser( # pylint: disable=too-many-ancestors # these only work when given explicitly as option_type self.LOG_LEVEL: self.get_log_level, self.RELIST: self.getrelist, + self.IRELIST: self.getirelist, } return getter[option_type](section, option, fallback=fallback) diff --git a/peekaboo/locale/de/LC_MESSAGES/peekaboo.mo b/peekaboo/locale/de/LC_MESSAGES/peekaboo.mo index c89b24b..af5a962 100644 Binary files a/peekaboo/locale/de/LC_MESSAGES/peekaboo.mo and b/peekaboo/locale/de/LC_MESSAGES/peekaboo.mo differ diff --git a/peekaboo/locale/de/LC_MESSAGES/peekaboo.po b/peekaboo/locale/de/LC_MESSAGES/peekaboo.po index d82f699..77ab127 100644 --- a/peekaboo/locale/de/LC_MESSAGES/peekaboo.po +++ b/peekaboo/locale/de/LC_MESSAGES/peekaboo.po @@ -6,7 +6,7 @@ msgid "" msgstr "" "Project-Id-Version: PeekabooAV 1.6.2\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2019-04-17 09:26+0000\n" +"POT-Creation-Date: 2019-08-20 10:35+0200\n" "PO-Revision-Date: 2019-02-14 22:02+0000\n" "Last-Translator: Michael Weiser \n" "Language: de\n" @@ -15,28 +15,28 @@ msgstr "" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=utf-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: Babel 2.4.0\n" +"Generated-By: Babel 2.7.0\n" #: peekaboo/queuing.py:382 msgid "Sample initialization failed" msgstr "Initialisierung der zu analysierenden Datei fehlgeschlagen" -#: peekaboo/sample.py:186 +#: peekaboo/sample.py:185 #, python-format msgid "File \"%s\" %s is being analyzed" msgstr "Datei \"%s\" %s wird analysiert" -#: peekaboo/sample.py:239 +#: peekaboo/sample.py:238 #, python-format msgid "File \"%s\" is considered \"%s\"" msgstr "Die Datei \"%s\" wird als \"%s\" betrachtet" -#: peekaboo/sample.py:299 +#: peekaboo/sample.py:298 #, python-format msgid "File \"%s\": %s" msgstr "Datei \"%s\": %s" -#: peekaboo/sample.py:495 +#: peekaboo/sample.py:497 #, python-format msgid "Sample %s successfully submitted to Cuckoo as job %d" msgstr "Erfolgreich an Cuckoo gegeben %s als Job %d" @@ -100,55 +100,73 @@ msgstr "Ja" msgid "No" msgstr "Nein" -#: peekaboo/ruleset/engine.py:118 +#: peekaboo/ruleset/engine.py:147 msgid "Rule aborted with error" msgstr "Regel mit Fehler abgebrochen" -#: peekaboo/ruleset/rules.py:133 +#: peekaboo/ruleset/rules.py:122 msgid "File is not yet known to the system" msgstr "Datei ist dem System noch nicht bekannt" -#: peekaboo/ruleset/rules.py:154 +#: peekaboo/ruleset/rules.py:143 #, python-format msgid "Failure to determine sample file size: %s" msgstr "Ermittlung der Dateigröße fehlgeschlagen: %s" -#: peekaboo/ruleset/rules.py:159 +#: peekaboo/ruleset/rules.py:148 #, python-format msgid "File has more than %d bytes" msgstr "Datei hat mehr als %d bytes" -#: peekaboo/ruleset/rules.py:165 +#: peekaboo/ruleset/rules.py:154 #, python-format msgid "File is only %d bytes long" -msgstr "" +msgstr "Die Datei ist nur %d bytes groß" -#: peekaboo/ruleset/rules.py:187 +#: peekaboo/ruleset/rules.py:176 msgid "File type is on whitelist" msgstr "Dateityp ist auf Whitelist" -#: peekaboo/ruleset/rules.py:191 +#: peekaboo/ruleset/rules.py:180 msgid "File type is not on whitelist" msgstr "Dateityp ist nicht auf Whitelist" -#: peekaboo/ruleset/rules.py:213 +#: peekaboo/ruleset/rules.py:202 msgid "File type is on the list of types to analyze" msgstr "Dateityp ist auf der Liste der zu analysiserenden Typen" -#: peekaboo/ruleset/rules.py:218 +#: peekaboo/ruleset/rules.py:207 #, python-format msgid "File type is not on the list of types to analyse (%s)" msgstr "Dateityp ist nicht auf der Liste der zu analysierenden Typen (%s)" -#: peekaboo/ruleset/rules.py:231 +#: peekaboo/ruleset/rules.py:223 +msgid "File is not an office document" +msgstr "Die Datei ist kein Office Dokument" + +#: peekaboo/ruleset/rules.py:247 msgid "The file contains an Office macro" msgstr "Die Datei beinhaltet ein Office-Makro" -#: peekaboo/ruleset/rules.py:235 +#: peekaboo/ruleset/rules.py:251 msgid "The file does not contain a recognizable Office macro" msgstr "Die Datei beinhaltet kein erkennbares Office-Makro" -#: peekaboo/ruleset/rules.py:265 peekaboo/ruleset/rules.py:402 +#: peekaboo/ruleset/rules.py:272 +msgid "The file contains an Office macro which runs at document open" +msgstr "" +"Die Datei beinhaltet ein Office Makro welches beim Öffnen der Datei " +"ausgeführt wird" + +#: peekaboo/ruleset/rules.py:277 +msgid "" +"The file does not contain a recognizable Office macro that is run at " +"document open" +msgstr "" +"Die Datei beinhaltet kein erkennbares Office Makro welches beim Öffnen " +"ausgeführt wird" + +#: peekaboo/ruleset/rules.py:307 peekaboo/ruleset/rules.py:445 msgid "" "Behavioral analysis by Cuckoo has produced an error and did not finish " "successfully" @@ -156,40 +174,41 @@ msgstr "" "Die Verhaltensanalyse durch Cuckoo hat einen Fehler produziert und konnte" " nicht erfolgreich abgeschlossen werden" -#: peekaboo/ruleset/rules.py:322 +#: peekaboo/ruleset/rules.py:365 msgid "No signature suggesting malware detected" msgstr "Keine Signatur erkannt die auf Schadcode hindeutet" -#: peekaboo/ruleset/rules.py:327 +#: peekaboo/ruleset/rules.py:370 #, python-format msgid "The following signatures have been recognized: %s" msgstr "Folgende Signaturen wurden erkannt: %s" -#: peekaboo/ruleset/rules.py:346 +#: peekaboo/ruleset/rules.py:389 #, python-format msgid "Cuckoo score >= %s: %s" msgstr "" -#: peekaboo/ruleset/rules.py:351 +#: peekaboo/ruleset/rules.py:394 #, python-format msgid "Cuckoo score < %s: %s" msgstr "" -#: peekaboo/ruleset/rules.py:375 +#: peekaboo/ruleset/rules.py:418 #, python-format msgid "The file attempts to contact at least one domain on the blacklist (%s)" msgstr "" "Die Datei versucht mindestens eine Domain aus der Blacklist zu " "kontaktieren (%s)" -#: peekaboo/ruleset/rules.py:381 +#: peekaboo/ruleset/rules.py:424 msgid "File does not seem to attempt contact with domains on the blacklist" msgstr "Datei scheint keine Domains aus der Blacklist kontaktieren zu wollen" -#: peekaboo/ruleset/rules.py:418 +#: peekaboo/ruleset/rules.py:461 msgid "Behavioral analysis by Cuckoo completed successfully" msgstr "Die Verhaltensanalyse durch Cuckoo wurde erfolgreich abgeschlossen" -#: peekaboo/ruleset/rules.py:435 +#: peekaboo/ruleset/rules.py:478 msgid "File does not seem to exhibit recognizable malicious behaviour" msgstr "Datei scheint keine erkennbaren Schadroutinen zu starten" + diff --git a/peekaboo/locale/peekaboo.pot b/peekaboo/locale/peekaboo.pot index 56b6113..085c447 100644 --- a/peekaboo/locale/peekaboo.pot +++ b/peekaboo/locale/peekaboo.pot @@ -8,35 +8,35 @@ msgid "" msgstr "" "Project-Id-Version: PROJECT VERSION\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2019-04-17 09:26+0000\n" +"POT-Creation-Date: 2019-08-20 10:35+0200\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=utf-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: Babel 2.4.0\n" +"Generated-By: Babel 2.7.0\n" #: peekaboo/queuing.py:382 msgid "Sample initialization failed" msgstr "" -#: peekaboo/sample.py:186 +#: peekaboo/sample.py:185 #, python-format msgid "File \"%s\" %s is being analyzed" msgstr "" -#: peekaboo/sample.py:239 +#: peekaboo/sample.py:238 #, python-format msgid "File \"%s\" is considered \"%s\"" msgstr "" -#: peekaboo/sample.py:299 +#: peekaboo/sample.py:298 #, python-format msgid "File \"%s\": %s" msgstr "" -#: peekaboo/sample.py:495 +#: peekaboo/sample.py:497 #, python-format msgid "Sample %s successfully submitted to Cuckoo as job %d" msgstr "" @@ -99,93 +99,107 @@ msgstr "" msgid "No" msgstr "" -#: peekaboo/ruleset/engine.py:118 +#: peekaboo/ruleset/engine.py:147 msgid "Rule aborted with error" msgstr "" -#: peekaboo/ruleset/rules.py:133 +#: peekaboo/ruleset/rules.py:122 msgid "File is not yet known to the system" msgstr "" -#: peekaboo/ruleset/rules.py:154 +#: peekaboo/ruleset/rules.py:143 #, python-format msgid "Failure to determine sample file size: %s" msgstr "" -#: peekaboo/ruleset/rules.py:159 +#: peekaboo/ruleset/rules.py:148 #, python-format msgid "File has more than %d bytes" msgstr "" -#: peekaboo/ruleset/rules.py:165 +#: peekaboo/ruleset/rules.py:154 #, python-format msgid "File is only %d bytes long" msgstr "" -#: peekaboo/ruleset/rules.py:187 +#: peekaboo/ruleset/rules.py:176 msgid "File type is on whitelist" msgstr "" -#: peekaboo/ruleset/rules.py:191 +#: peekaboo/ruleset/rules.py:180 msgid "File type is not on whitelist" msgstr "" -#: peekaboo/ruleset/rules.py:213 +#: peekaboo/ruleset/rules.py:202 msgid "File type is on the list of types to analyze" msgstr "" -#: peekaboo/ruleset/rules.py:218 +#: peekaboo/ruleset/rules.py:207 #, python-format msgid "File type is not on the list of types to analyse (%s)" msgstr "" -#: peekaboo/ruleset/rules.py:231 +#: peekaboo/ruleset/rules.py:223 +msgid "File is not an office document" +msgstr "" + +#: peekaboo/ruleset/rules.py:247 msgid "The file contains an Office macro" msgstr "" -#: peekaboo/ruleset/rules.py:235 +#: peekaboo/ruleset/rules.py:251 msgid "The file does not contain a recognizable Office macro" msgstr "" -#: peekaboo/ruleset/rules.py:265 peekaboo/ruleset/rules.py:402 +#: peekaboo/ruleset/rules.py:272 +msgid "The file contains an Office macro which runs at document open" +msgstr "" + +#: peekaboo/ruleset/rules.py:277 +msgid "" +"The file does not contain a recognizable Office macro that is run at " +"document open" +msgstr "" + +#: peekaboo/ruleset/rules.py:307 peekaboo/ruleset/rules.py:445 msgid "" "Behavioral analysis by Cuckoo has produced an error and did not finish " "successfully" msgstr "" -#: peekaboo/ruleset/rules.py:322 +#: peekaboo/ruleset/rules.py:365 msgid "No signature suggesting malware detected" msgstr "" -#: peekaboo/ruleset/rules.py:327 +#: peekaboo/ruleset/rules.py:370 #, python-format msgid "The following signatures have been recognized: %s" msgstr "" -#: peekaboo/ruleset/rules.py:346 +#: peekaboo/ruleset/rules.py:389 #, python-format msgid "Cuckoo score >= %s: %s" msgstr "" -#: peekaboo/ruleset/rules.py:351 +#: peekaboo/ruleset/rules.py:394 #, python-format msgid "Cuckoo score < %s: %s" msgstr "" -#: peekaboo/ruleset/rules.py:375 +#: peekaboo/ruleset/rules.py:418 #, python-format msgid "The file attempts to contact at least one domain on the blacklist (%s)" msgstr "" -#: peekaboo/ruleset/rules.py:381 +#: peekaboo/ruleset/rules.py:424 msgid "File does not seem to attempt contact with domains on the blacklist" msgstr "" -#: peekaboo/ruleset/rules.py:418 +#: peekaboo/ruleset/rules.py:461 msgid "Behavioral analysis by Cuckoo completed successfully" msgstr "" -#: peekaboo/ruleset/rules.py:435 +#: peekaboo/ruleset/rules.py:478 msgid "File does not seem to exhibit recognizable malicious behaviour" msgstr "" diff --git a/peekaboo/ruleset/engine.py b/peekaboo/ruleset/engine.py index 96af1c6..6e736f5 100644 --- a/peekaboo/ruleset/engine.py +++ b/peekaboo/ruleset/engine.py @@ -49,6 +49,7 @@ class RulesetEngine(object): CuckooEvilSigRule, CuckooScoreRule, OfficeMacroRule, + OfficeMacroWithSuspiciousKeyword, RequestsEvilDomainRule, CuckooAnalysisFailedRule, ContainsPeekabooYarRule, @@ -127,7 +128,7 @@ class RulesetEngine(object): rule wrapper for in/out logging and reporting """ rule_name = rule_class.rule_name - logger.debug("Processing rule '%s' for %s" % (rule_name, sample)) + logger.debug("Processing rule '%s' for %s", rule_name, sample) try: rule = rule_class(config=self.config, db_con=self.db_con) @@ -138,8 +139,8 @@ class RulesetEngine(object): raise # catch all other exceptions for this rule except Exception as e: - logger.warning("Unexpected error in '%s' for %s" % (rule_name, - sample)) + logger.warning("Unexpected error in '%s' for %s", rule_name, + sample) logger.exception(e) # create "fake" RuleResult result = RuleResult("RulesetEngine", result=Result.failed, @@ -147,5 +148,5 @@ class RulesetEngine(object): further_analysis=False) sample.add_rule_result(result) - logger.info("Rule '%s' processed for %s" % (rule_name, sample)) + logger.info("Rule '%s' processed for %s", rule_name, sample) return result diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py index 797858a..6635c0c 100644 --- a/peekaboo/ruleset/rules.py +++ b/peekaboo/ruleset/rules.py @@ -31,6 +31,8 @@ import logging from peekaboo.ruleset import Result, RuleResult from peekaboo.exceptions import PeekabooAnalysisDeferred, \ CuckooSubmitFailedException, PeekabooRulesetConfigError +from peekaboo.toolbox.ole import Oletools, OletoolsReport, \ + OleNotAnOfficeDocumentException logger = logging.getLogger(__name__) @@ -207,13 +209,40 @@ class FileTypeOnGreylistRule(Rule): False) -class OfficeMacroRule(Rule): +class OleRule(Rule): + """ A common base class for rules that evaluate the Ole report. """ + def evaluate(self, sample): + """ Report the sample as bad if it contains a macro. """ + if sample.oletools_report is None: + try: + ole = Oletools() + report = ole.get_report(sample) + sample.register_oletools_report(OletoolsReport(report)) + except OleNotAnOfficeDocumentException: + return self.result(Result.unknown, + _("File is not an office document"), + True) + except Exception: + raise + + return self.evaluate_report(sample.oletools_report) + + def evaluate_report(self, report): + """ Evaluate an Ole report. + + @param report: The Ole report. + @returns: RuleResult containing verdict. + """ + raise NotImplementedError + + +class OfficeMacroRule(OleRule): """ A rule checking the sample for Office macros. """ rule_name = 'office_macro' - def evaluate(self, sample): + def evaluate_report(self, report): """ Report the sample as bad if it contains a macro. """ - if sample.office_macros: + if report.has_office_macros(): return self.result(Result.bad, _("The file contains an Office macro"), False) @@ -224,6 +253,32 @@ class OfficeMacroRule(Rule): True) +class OfficeMacroWithSuspiciousKeyword(OleRule): + """ A rule checking the sample for Office macros. """ + rule_name = 'office_macro_with_suspicious_keyword' + + def get_config(self): + # get list of keywords from config file + self.suspicious_keyword_list = self.get_config_value( + 'keyword', [], option_type=self.config.IRELIST) + if not self.suspicious_keyword_list: + raise PeekabooRulesetConfigError( + "Empty suspicious keyword list, check %s rule config." % + self.rule_name) + + def evaluate_report(self, report): + if report.has_office_macros_with_suspicious_keyword(self.suspicious_keyword_list): + return self.result(Result.bad, + _("The file contains an Office macro which " + "runs at document open"), + False) + + return self.result(Result.unknown, + _("The file does not contain a recognizable " + "Office macro that is run at document open"), + True) + + class CuckooRule(Rule): """ A common base class for rules that evaluate the Cuckoo report. """ def evaluate(self, sample): diff --git a/peekaboo/sample.py b/peekaboo/sample.py index 35c661d..d4c5a62 100644 --- a/peekaboo/sample.py +++ b/peekaboo/sample.py @@ -38,7 +38,6 @@ from builtins import open from datetime import datetime from peekaboo.toolbox.files import guess_mime_type_from_file_contents, \ guess_mime_type_from_filename -from peekaboo.toolbox.ms_office import has_office_macros from peekaboo.ruleset import Result @@ -91,6 +90,7 @@ class Sample(object): self.__submit_path = None self.__cuckoo_job_id = -1 self.__cuckoo_report = None + self.__oletools_report = None self.__done = False self.__status_change = status_change self.__result = Result.unchecked @@ -101,7 +101,6 @@ class Sample(object): self.__sha256sum = None self.__mimetypes = None self.__file_extension = None - self.__office_macros = None self.__base_dir = base_dir self.__job_hash = None self.__job_hash_regex = job_hash_regex @@ -461,14 +460,6 @@ class Sample(object): def job_id(self): return self.__cuckoo_job_id - @property - def office_macros(self): - """ Determines if this sample contains any office macros. """ - if not self.__office_macros: - self.__office_macros = has_office_macros(self.__path) - - return self.__office_macros - @property def file_size(self): """ Determine and cache sample file size @@ -484,6 +475,11 @@ class Sample(object): """ Returns the cuckoo report """ return self.__cuckoo_report + @property + def oletools_report(self): + """ Returns the oletools report """ + return self.__oletools_report + @property def submit_path(self): """ Returns the path to use for submission to Cuckoo """ @@ -506,6 +502,10 @@ class Sample(object): """ Records a Cuckoo report for later evaluation. """ self.__cuckoo_report = report + def register_oletools_report(self, report): + """ Records a Oletools report for alter evaluation. """ + self.__oletools_report = report + def cleanup(self): """ Clean up after the sample has been analysed, removing a potentially created workdir. """ diff --git a/peekaboo/toolbox/ms_office.py b/peekaboo/toolbox/ms_office.py deleted file mode 100644 index b5ab902..0000000 --- a/peekaboo/toolbox/ms_office.py +++ /dev/null @@ -1,61 +0,0 @@ -############################################################################### -# # -# Peekaboo Extended Email Attachment Behavior Observation Owl # -# # -# toolbox/ # -# ms_office.py # -############################################################################### -# # -# Copyright (C) 2016-2019 science + computing ag # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or (at # -# your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # -# General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -""" Tool functions for handling office macros. """ - -import logging -from oletools.olevba import VBA_Parser - - -logger = logging.getLogger(__name__) - -MS_OFFICE_EXTENSIONS = [ - ".doc", ".docm", ".dotm", ".docx", - ".ppt", ".pptm", ".pptx", ".potm", ".ppam", ".ppsm", - ".xls", ".xlsm", ".xlsx", -] - - -def has_office_macros(office_file): - """ - Detects macros in Microsoft Office documents. - - @param office_file: The MS Office document to check for macros. - @return: True if macros where found, otherwise False. - If VBA_Parser crashes it returns False too. - """ - file_extension = office_file.split('.')[-1] - if file_extension not in MS_OFFICE_EXTENSIONS: - return False - try: - # VBA_Parser reports macros for office documents - vbaparser = VBA_Parser(office_file) - return vbaparser.detect_vba_macros() - except TypeError: - # The given file is not an office document. - return False - except Exception as error: - logger.exception(error) - return False diff --git a/peekaboo/toolbox/ole.py b/peekaboo/toolbox/ole.py new file mode 100644 index 0000000..e4a3ce0 --- /dev/null +++ b/peekaboo/toolbox/ole.py @@ -0,0 +1,120 @@ +############################################################################### +# # +# Peekaboo Extended Email Attachment Behavior Observation Owl # +# # +# toolbox/ # +# ole.py # +############################################################################### +# # +# Copyright (C) 2016-2019 science + computing ag # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or (at # +# your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # +# General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + + +import logging +import re +from oletools.olevba import VBA_Parser + +logger = logging.getLogger(__name__) + + +class OleNotAnOfficeDocumentException(Exception): + pass + +class Oletools(object): + """ Parent class, defines interface to Oletools. """ + def __init__(self): + self.MS_OFFICE_EXTENSIONS = [ + "doc", "docm", "dotm", "docx", + "ppt", "pptm", "pptx", "potm", "ppam", "ppsm", + "xls", "xlsm", "xlsx", + ] + + def get_report(self, sample): + """ Return oletools report or create if not already cached. """ + if sample.oletools_report != None: + return sample.oletools_report + + report = {} + if sample.file_extension not in self.MS_OFFICE_EXTENSIONS: + raise OleNotAnOfficeDocumentException(sample.file_extension) + + try: + vbaparser = VBA_Parser(sample.file_path) + + # List from oletools/olevba.py#L553 + oletype = ('OLE', 'OpenXML', 'FlatOPC_XML', 'Word2003_XML', 'MHTML', 'PPT') + + # check if ole detects it as an office file + if vbaparser.type not in oletype: + raise OleNotAnOfficeDocumentException(sample.file_extension) + + # VBA_Parser reports macros for office documents + report['has_macros'] = vbaparser.detect_vba_macros() or vbaparser.detect_xlm_macros() + try: + report['vba'] = vbaparser.reveal() + except TypeError: + # no macros + pass + vbaparser.close() + except IOError: + raise + except TypeError: + # The given file is not an office document. + pass + except Exception as error: + logger.exception(error) + sample.register_oletools_report(report) + return report + + +class OletoolsReport(object): + """ Represents a custom Oletools report. """ + def __init__(self, report): + self.report = report + + def has_office_macros(self): + """ + Detects macros in Microsoft Office documents. + + @return: True if macros where found, otherwise False. + If VBA_Parser crashes it returns False too. + """ + + try: + return self.report['has_macros'] + except KeyError: + return False + + def has_office_macros_with_suspicious_keyword(self, suspicious_keywords): + """ + Detects macros with supplied suspicious keywords in Microsoft Office documents. + + @param suspicious_keywords: List of suspicious keyword regexes. + @return: True if macros with keywords where found, otherwise False. + If VBA_Parser crashes it returns False too. + """ + suspicious = False + try: + vba = self.report['vba'] + for w in suspicious_keywords: + if re.search(w, vba): + suspicious = True + break + except KeyError: + return False + + return suspicious diff --git a/ruleset.conf.sample b/ruleset.conf.sample index c8909a9..9a9ea73 100644 --- a/ruleset.conf.sample +++ b/ruleset.conf.sample @@ -9,13 +9,14 @@ rule.1 : known rule.2 : file_larger_than rule.3 : file_type_on_whitelist rule.4 : file_type_on_greylist -rule.5 : cuckoo_evil_sig -rule.6 : cuckoo_score -rule.7 : office_macro -#rule.8 : requests_evil_domain -rule.9 : cuckoo_analysis_failed -#rule.10 : contains_peekabooyar -rule.11 : final_rule +#rule.5 : office_macro +#rule.6 : office_macro_with_suspicious_keyword +rule.7 : cuckoo_evil_sig +rule.8 : cuckoo_score +#rule.9 : requests_evil_domain +rule.10 : cuckoo_analysis_failed +#rule.11 : contains_peekabooyar +rule.12 : final_rule # rule specific configuration options # the section name equals the name of the rule @@ -71,6 +72,10 @@ greylist.34 : application/vnd.ms-excel.template.macroEnabled.12 greylist.35 : application/vnd.ms-excel greylist.36 : application/msword +[office_macro_with_suspicious_keyword] +keyword.1 : AutoOpen +keyword.2 : AutoClose + [cuckoo_evil_sig] signature.1 : A potential heapspray has been detected. .* signature.2 : A process attempted to delay the analysis task. diff --git a/test.py b/test.py deleted file mode 100755 index 8321ac7..0000000 --- a/test.py +++ /dev/null @@ -1,940 +0,0 @@ -#!/usr/bin/env python - -############################################################################### -# # -# Peekaboo Extended Email Attachment Behavior Observation Owl # -# # -# test.py # -############################################################################### -# # -# Copyright (C) 2016-2019 science + computing ag # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or (at # -# your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # -# General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -""" The testsuite. """ - -from future.builtins import super # pylint: disable=wrong-import-order - -import gettext -import sys -import os -import tempfile -import logging -import shutil -import unittest -from datetime import datetime, timedelta - - -# Add Peekaboo to PYTHONPATH -# pylint: disable=wrong-import-position -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from peekaboo.exceptions import PeekabooConfigException, \ - PeekabooRulesetConfigError -from peekaboo.config import PeekabooConfig, PeekabooConfigParser -from peekaboo.sample import SampleFactory -from peekaboo.ruleset import RuleResult, Result -from peekaboo.ruleset.engine import RulesetEngine -from peekaboo.ruleset.rules import FileTypeOnWhitelistRule, \ - FileTypeOnGreylistRule, CuckooAnalysisFailedRule, \ - KnownRule, FileLargerThanRule, CuckooEvilSigRule, \ - CuckooScoreRule, RequestsEvilDomainRule, FinalRule -from peekaboo.toolbox.cuckoo import CuckooReport -from peekaboo.db import PeekabooDatabase, PeekabooDatabaseError -# pylint: enable=wrong-import-position - - -class CreatingConfigMixIn(object): - """ A class for adding config file creation logic to any other class. """ - def create_config(self, content): - """ Create a configuration file with defined content and pass it to the - parent constructor for parsing. """ - _, self.created_config_file = tempfile.mkstemp() - with open(self.created_config_file, 'w') as file_desc: - file_desc.write(content) - - def remove_config(self): - """ Remove the configuration file we've created. """ - os.unlink(self.created_config_file) - - -class CreatingConfigParser(PeekabooConfigParser, CreatingConfigMixIn): - """ A special kind of config parser that creates the configuration file - with defined content. """ - def __init__(self, content=''): - self.created_config_file = None - self.create_config(content) - PeekabooConfigParser.__init__(self, self.created_config_file) - - def __del__(self): - self.remove_config() - - -class CreatingPeekabooConfig(PeekabooConfig, CreatingConfigMixIn): - """ A special kind of Peekaboo config that creates the configuration file - with defined content. """ - def __init__(self, content=''): - self.created_config_file = None - self.create_config(content) - PeekabooConfig.__init__(self, self.created_config_file) - - def __del__(self): - self.remove_config() - - -class TestConfigParser(unittest.TestCase): - """ Test a configuration with all values different from the defaults. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.config = CreatingConfigParser('''#[rule0] - -[rule1] -option1: foo -option2.1: bar -option2.2: baz - -[rules] -rule.1 : rule1 -#rule.2 : rule2 -rule.3 : rule3 -''') - - def test_2_values(self): - """ Test rule configuration values """ - with self.assertRaises(KeyError): - self.config['rule0'] - self.assertEqual(self.config['rule1']['option1'], 'foo') - self.assertEqual(self.config['rule1'].getlist('option2'), - ['bar', 'baz']) - - def test_3_type_mismatch(self): - """ Test correct error is thrown if the option type is mismatched """ - config = '''[rule1] -option1: foo -option1.1: bar''' - - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Option option1 in section rule1 is supposed to be a list but ' - 'given as individual setting'): - CreatingConfigParser(config).getlist('rule1', 'option1') - - - -class TestDefaultConfig(unittest.TestCase): - """ Test a configuration of all defaults. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.config = CreatingPeekabooConfig() - - def test_1_default_settings(self): - """ Test a configuration with just defaults """ - self.assertEqual( - self.config.config_file, self.config.created_config_file) - self.assertEqual(self.config.user, 'peekaboo') - self.assertEqual(self.config.group, 'peekaboo') - self.assertEqual( - self.config.sock_file, '/var/run/peekaboo/peekaboo.sock') - self.assertEqual( - self.config.pid_file, '/var/run/peekaboo/peekaboo.pid') - self.assertEqual(self.config.interpreter, '/usr/bin/python2 -u') - self.assertEqual(self.config.worker_count, 3) - self.assertEqual(self.config.sample_base_dir, '/tmp') - self.assertEqual( - self.config.job_hash_regex, '/amavis/tmp/([^/]+)/parts/') - self.assertEqual(self.config.use_debug_module, False) - self.assertEqual(self.config.keep_mail_data, False) - self.assertEqual( - self.config.processing_info_dir, - '/var/lib/peekaboo/malware_reports') - self.assertEqual( - self.config.ruleset_config, '/opt/peekaboo/etc/ruleset.conf') - self.assertEqual(self.config.log_level, logging.INFO) - self.assertEqual( - self.config.log_format, '%(asctime)s - %(name)s - ' - '(%(threadName)s) - %(levelname)s - %(message)s') - self.assertEqual(self.config.db_url, 'sqlite:////var/lib/peekaboo/peekaboo.db') - self.assertEqual(self.config.cuckoo_mode, 'api') - self.assertEqual(self.config.cuckoo_exec, '/opt/cuckoo/bin/cuckoo') - self.assertEqual(self.config.cuckoo_submit, '/opt/cuckoo/bin/cuckoo submit') - self.assertEqual(self.config.cuckoo_storage, '/var/lib/peekaboo/.cuckoo/storage') - self.assertEqual(self.config.cuckoo_url, 'http://127.0.0.1:8090') - self.assertEqual(self.config.cuckoo_poll_interval, 5) - self.assertEqual(self.config.cluster_instance_id, 0) - self.assertEqual(self.config.cluster_stale_in_flight_threshold, 15*60) - self.assertEqual(self.config.cluster_duplicate_check_interval, 60) - - -class TestValidConfig(unittest.TestCase): - """ Test a configuration with all values different from the defaults. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.config = CreatingPeekabooConfig('''[global] -user : user1 -group : group1 -socket_file : /socket/1 -pid_file : /pid/1 -interpreter : /inter/1 -worker_count : 18 -sample_base_dir : /tmp/1 -job_hash_regex : /var/2 -use_debug_module : yes -keep_mail_data : yes -processing_info_dir : /var/3 - -[ruleset] -config : /rules/1 - -[logging] -log_level : DEBUG -log_format : format%%foo1 - -[db] -url : sqlite:////peekaboo.db1 - -[cuckoo] -mode : api1 -exec : /cuckoo/1 -submit : /submit/1 -storage_path : /storage/1 -url : http://api:1111 -poll_interval : 51 - -[cluster] -instance_id: 12 -stale_in_flight_threshold: 31 -duplicate_check_interval: 61 -''') - - def test_1_read_settings(self): - """ Test reading of configuration settings from file """ - self.assertEqual( - self.config.config_file, self.config.created_config_file) - self.assertEqual(self.config.user, 'user1') - self.assertEqual(self.config.group, 'group1') - self.assertEqual(self.config.sock_file, '/socket/1') - self.assertEqual(self.config.pid_file, '/pid/1') - self.assertEqual(self.config.interpreter, '/inter/1') - self.assertEqual(self.config.worker_count, 18) - self.assertEqual(self.config.sample_base_dir, '/tmp/1') - self.assertEqual(self.config.job_hash_regex, '/var/2') - self.assertEqual(self.config.use_debug_module, True) - self.assertEqual(self.config.keep_mail_data, True) - self.assertEqual(self.config.processing_info_dir, '/var/3') - self.assertEqual(self.config.ruleset_config, '/rules/1') - self.assertEqual(self.config.log_level, logging.DEBUG) - self.assertEqual(self.config.log_format, 'format%foo1') - self.assertEqual(self.config.db_url, 'sqlite:////peekaboo.db1') - self.assertEqual(self.config.cuckoo_mode, 'api1') - self.assertEqual(self.config.cuckoo_exec, '/cuckoo/1') - self.assertEqual(self.config.cuckoo_submit, '/submit/1') - self.assertEqual(self.config.cuckoo_storage, '/storage/1') - self.assertEqual(self.config.cuckoo_url, 'http://api:1111') - self.assertEqual(self.config.cuckoo_poll_interval, 51) - self.assertEqual(self.config.cluster_instance_id, 12) - self.assertEqual(self.config.cluster_stale_in_flight_threshold, 31) - self.assertEqual(self.config.cluster_duplicate_check_interval, 61) - - -class TestInvalidConfig(unittest.TestCase): - """ Various tests of invalid config files. """ - def test_1_section_header(self): - """ Test correct error is thrown if section header syntax is wrong """ - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file ".*" can not be parsed: File contains no ' - 'section headers'): - CreatingPeekabooConfig('''[global[ -user: peekaboo''') - - def test_2_value_separator(self): - """ Test correct error is thrown if the value separator is wrong """ - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file ".*" can not be parsed: (File|Source) ' - 'contains parsing errors:'): - CreatingPeekabooConfig('''[global] -user; peekaboo''') - - def test_3_section_header(self): - """ Test correct error is thrown if the config file is missing """ - _, config_file = tempfile.mkstemp() - os.unlink(config_file) - - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Configuration file "%s" can not be opened for reading: ' - r'\[Errno 2\] No such file or directory' % config_file): - PeekabooConfig(config_file) - - def test_4_unknown_section(self): - """ Test correct error is thrown if an unknown section name is given. - """ - with self.assertRaisesRegexp( - PeekabooConfigException, - r'Unknown section\(s\) found in config: globl'): - CreatingPeekabooConfig('''[globl]''') - - def test_5_unknown_option(self): - """ Test correct error is thrown if an unknown option name is given. - """ - with self.assertRaisesRegexp( - PeekabooConfigException, - r'Unknown config option\(s\) found in section global: foo'): - CreatingPeekabooConfig('''[global] -foo: bar''') - - def test_6_unknown_loglevel(self): - """ Test with an unknown log level """ - with self.assertRaisesRegexp( - PeekabooConfigException, - 'Unknown log level FOO'): - CreatingPeekabooConfig('''[logging] -log_level: FOO''') - - -class CreatingSampleFactory(SampleFactory): - """ A special kind of sample factory that creates the sample files with - defined content in a temporary directory and cleans up after itself. """ - def __init__(self, *args, **kwargs): - self.directory = tempfile.mkdtemp() - super().__init__(*args, **kwargs) - - def create_sample(self, relpath, content, *args, **kwargs): - """ Make a new sample with defined base name and content in the - previously created temporary directory. The given basename can - optionally be a path relative to the temporary directory and the - subdirectory will be created automatically. """ - file_path = os.path.join(self.directory, relpath) - subdir = os.path.dirname(file_path) - if subdir != self.directory: - os.makedirs(subdir) - with open(file_path, 'w') as file_desc: - file_desc.write(content) - - return super().make_sample(file_path, *args, **kwargs) - - def __del__(self): - """ Remove the sample files we've created and the temporary directory - itself. """ - shutil.rmtree(self.directory) - - -class TestDatabase(unittest.TestCase): - """ Unittests for Peekaboo's database module. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.test_db = os.path.abspath('./test.db') - cls.conf = CreatingPeekabooConfig() - cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db, - instance_id=1, - stale_in_flight_threshold=10) - cls.no_cluster_db = PeekabooDatabase('sqlite:///' + cls.test_db, - instance_id=0) - cls.factory = CreatingSampleFactory( - cuckoo=None, base_dir=cls.conf.sample_base_dir, - job_hash_regex=cls.conf.job_hash_regex, keep_mail_data=False, - processing_info_dir=None) - cls.sample = cls.factory.create_sample('test.py', 'test') - result = RuleResult('Unittest', - Result.failed, - 'This is just a test case.', - further_analysis=False) - cls.sample.add_rule_result(result) - - def test_1_analysis_save(self): - """ Test saving of analysis results. """ - self.db_con.analysis_save(self.sample) - - def test_2_sample_info_fetch(self): - """ Test retrieval of analysis results. """ - sample_info = self.db_con.sample_info_fetch(self.sample) - self.assertEqual(sample_info.sha256sum, self.sample.sha256sum) - self.assertEqual(sample_info.result, Result.failed) - self.assertEqual(sample_info.reason, 'This is just a test case.') - - def test_5_in_flight_no_cluster(self): - """ Test that marking of samples as in-flight on a non-cluster-enabled - database are no-ops. """ - self.assertTrue(self.no_cluster_db.mark_sample_in_flight(self.sample)) - self.assertTrue(self.no_cluster_db.mark_sample_in_flight(self.sample)) - self.assertIsNone(self.no_cluster_db.clear_sample_in_flight(self.sample)) - self.assertIsNone(self.no_cluster_db.clear_sample_in_flight(self.sample)) - self.assertIsNone(self.no_cluster_db.clear_in_flight_samples()) - - def test_6_in_flight_cluster(self): - """ Test marking of samples as in-flight. """ - self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1)) - # re-locking the same sample should fail - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertIsNone(self.db_con.clear_sample_in_flight(self.sample, 1)) - # unlocking twice should fail - self.assertRaisesRegexp( - PeekabooDatabaseError, "Unexpected inconsistency: Sample .* not " - "recoreded as in-flight upon clearing flag", - self.db_con.clear_sample_in_flight, self.sample, 1) - - def test_7_in_flight_clear(self): - """ Test clearing of in-flight markers. """ - sample2 = self.factory.create_sample('foo.pyc', 'foo') - sample3 = self.factory.create_sample('bar.pyc', 'bar') - - self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2)) - - # should only clear samples of instance 1 - self.assertIsNone(self.db_con.clear_in_flight_samples(1)) - self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2)) - - # should only clear samples of instance 2 - self.assertIsNone(self.db_con.clear_in_flight_samples(2)) - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2)) - - # should clear all samples - self.assertIsNone(self.db_con.clear_in_flight_samples(-1)) - self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample3, 2)) - - # should be a no-op because there will never be any entries of instance - # 0 - self.assertIsNone(self.db_con.clear_in_flight_samples(0)) - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2)) - - # should be a no-op because this database is not cluster-enabled - self.assertIsNone(self.no_cluster_db.clear_in_flight_samples()) - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample3, 2)) - - # leave as found - self.assertIsNone(self.db_con.clear_in_flight_samples(-1)) - - def test_8_stale_in_flight(self): - """ Test the cleaning of stale in-flight markers. """ - stale = datetime.utcnow() - timedelta(seconds=20) - self.assertTrue(self.db_con.mark_sample_in_flight( - self.sample, 1, stale)) - sample2 = self.factory.create_sample('baz.pyc', 'baz') - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1)) - - # should not clear anything because the database is not cluster-enabled - self.assertTrue(self.no_cluster_db.clear_stale_in_flight_samples()) - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - - # should clear sample marker because it is stale but not sample2 - self.assertTrue(self.db_con.clear_stale_in_flight_samples()) - self.assertTrue(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - - # should not clear anything because all markers are fresh - self.assertFalse(self.db_con.clear_stale_in_flight_samples()) - self.assertFalse(self.db_con.mark_sample_in_flight(self.sample, 1)) - self.assertFalse(self.db_con.mark_sample_in_flight(sample2, 1)) - - # set up new constellation - self.assertIsNone(self.db_con.clear_in_flight_samples(-1)) - self.assertTrue(self.db_con.mark_sample_in_flight( - self.sample, 1, stale)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1, stale)) - - # should clear all markers because all are stale - self.assertTrue(self.db_con.clear_stale_in_flight_samples()) - self.assertTrue(self.db_con.mark_sample_in_flight( - self.sample, 1, stale)) - self.assertTrue(self.db_con.mark_sample_in_flight(sample2, 1, stale)) - - # leave as found - self.assertTrue(self.db_con.clear_stale_in_flight_samples()) - - @classmethod - def tearDownClass(cls): - """ Clean up after the tests. """ - os.unlink(cls.test_db) - # test framework doesn't seem to give up reference so that __del__ is - # never run - del cls.factory - - -class TestSample(unittest.TestCase): - """ Unittests for Samples. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.test_db = os.path.abspath('./test.db') - cls.conf = CreatingPeekabooConfig() - cls.db_con = PeekabooDatabase('sqlite:///' + cls.test_db) - cls.factory = CreatingSampleFactory( - cuckoo=None, base_dir=cls.conf.sample_base_dir, - job_hash_regex=cls.conf.job_hash_regex, keep_mail_data=False, - processing_info_dir=None) - cls.sample = cls.factory.create_sample('test.py', 'test') - - def test_job_hash_regex(self): - """ Test extraction of the job hash from the working directory path. - """ - # class sample has no job hash in path and therefore generates one - # itself - self.assertIn('peekaboo-run_analysis', self.sample.job_hash) - - # a new sample with a job hash in it's path should return it - job_hash = 'amavis-20170831T132736-07759-iSI0rJ4b' - path_with_job_hash = 'd/var/lib/amavis/tmp/%s/parts/file' % job_hash - sample = self.factory.make_sample(path_with_job_hash, 'file') - self.assertEqual(job_hash, sample.job_hash, - 'Job hash regex is not working') - - legacy_factory = CreatingSampleFactory( - cuckoo=None, base_dir=self.conf.sample_base_dir, - job_hash_regex=r'/var/lib/amavis/tmp/([^/]+)/parts.*', - keep_mail_data=False, processing_info_dir=None) - sample = legacy_factory.make_sample(path_with_job_hash, 'file') - self.assertEqual(job_hash, sample.job_hash, - 'Job hash regex is not working') - - def test_3_sample_attributes(self): - """ Test the various sample attribute getters. """ - self.assertEqual(self.sample.file_path, - os.path.join(self.factory.directory, 'test.py')) - self.assertEqual(self.sample.filename, 'test.py') - self.assertEqual(self.sample.file_extension, 'py') - self.assertTrue(set(['text/x-python']).issubset(self.sample.mimetypes)) - self.assertEqual( - self.sample.sha256sum, - '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08') - self.assertEqual(self.sample.job_id, -1) - self.assertEqual(self.sample.result, Result.unchecked) - self.assertEqual(self.sample.reason, None) - self.assertRegexpMatches( - self.sample.peekaboo_report[0], - 'File "%s" is considered "unchecked"' - % self.sample.filename) - self.assertEqual(self.sample.cuckoo_report, None) - self.assertEqual(self.sample.done, False) - self.assertEqual(self.sample.submit_path, None) - self.assertFalse(self.sample.office_macros) - self.assertEqual(self.sample.file_size, 4) - - def test_4_initialised_sample_attributes(self): - """ Test the various sample attributes of an initialised sample. """ - self.sample.init() - self.assertEqual(self.sample.file_path, - os.path.join(self.factory.directory, 'test.py')) - self.assertEqual(self.sample.filename, 'test.py') - self.assertEqual(self.sample.file_extension, 'py') - self.assertTrue(set(['text/x-python']).issubset(self.sample.mimetypes)) - self.assertEqual( - self.sample.sha256sum, - '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08') - self.assertEqual(self.sample.job_id, -1) - self.assertEqual(self.sample.result, Result.unchecked) - self.assertEqual(self.sample.reason, None) - self.assertRegexpMatches( - self.sample.peekaboo_report[0], 'File "%s" %s is being analyzed' - % (self.sample.filename, self.sample.sha256sum)) - self.assertRegexpMatches( - self.sample.peekaboo_report[1], - 'File "%s" is considered "unchecked"' - % self.sample.filename) - self.assertEqual(self.sample.cuckoo_report, None) - self.assertEqual(self.sample.done, False) - self.assertRegexpMatches( - self.sample.submit_path, '/%s.py$' % self.sample.sha256sum) - self.assertFalse(self.sample.office_macros) - self.assertEqual(self.sample.file_size, 4) - - def test_5_mark_done(self): - """ Test the marking of a sample as done. """ - self.sample.mark_done() - self.assertEqual(self.sample.done, True) - - def test_6_add_rule_result(self): - """ Test the adding of a rule result. """ - reason = 'This is just a test case.' - result = RuleResult('Unittest', Result.failed, - reason, - further_analysis=False) - self.sample.add_rule_result(result) - self.assertEqual(self.sample.result, Result.failed) - self.assertEqual(self.sample.reason, reason) - - def test_sample_attributes_with_meta_info(self): - """ Test use of optional meta data. """ - sample = self.factory.make_sample( - 'test.pyc', metainfo={ - 'full_name': '/tmp/test.pyc', - 'name_declared': 'test.pyc', - 'type_declared': 'application/x-bytecode.python', - 'type_long': 'application/x-python-bytecode', - 'type_short': 'pyc', - 'size': '200'}) - self.assertEqual(sample.file_extension, 'pyc') - - def test_sample_without_suffix(self): - """ Test extraction of file extension from declared name. """ - sample = self.factory.make_sample( - 'junk', metainfo={ - 'full_name': '/tmp/junk', - 'name_declared': 'Report.docx', - 'type_declared': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', - 'type_long': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', - 'type_short': 'docx', - 'size': '212'}) - self.assertEqual(sample.file_extension, 'docx') - - @classmethod - def tearDownClass(cls): - """ Clean up after the tests. """ - os.unlink(cls.test_db) - del cls.factory - - -class TestRulesetEngine(unittest.TestCase): - """ Unittests for the Ruleset Engine. """ - def test_no_rules_configured(self): - """ Test that correct error is shown if no rules are configured. """ - config = CreatingConfigParser() - with self.assertRaisesRegexp( - PeekabooRulesetConfigError, - r'No enabled rules found, check ruleset config.'): - RulesetEngine(ruleset_config=config, db_con=None) - - def test_unknown_rule_enabled(self): - """ Test that correct error is shown if an unknown rule is enabled. """ - config = CreatingConfigParser('''[rules] -rule.1: foo''') - with self.assertRaisesRegexp( - PeekabooRulesetConfigError, - r'Unknown rule\(s\) enabled: foo'): - RulesetEngine(ruleset_config=config, db_con=None) - - def test_invalid_type(self): - """ Test that correct error is shown if rule config option has wrong - type. """ - - config = CreatingConfigParser('''[rules] -rule.1: cuckoo_score - -[cuckoo_score] -higher_than: foo''') - with self.assertRaisesRegexp( - ValueError, - r"could not convert string to float: '?foo'?"): - RulesetEngine(ruleset_config=config, db_con=None) - - def test_disabled_config(self): - """ Test that no error is shown if disabled rule has config. """ - - config = CreatingConfigParser('''[rules] -rule.1: known -#rule.2: cuckoo_score - -[cuckoo_score] -higher_than: 4.0''') - RulesetEngine(ruleset_config=config, db_con=None) - - -class MimetypeSample(object): # pylint: disable=too-few-public-methods - """ A dummy sample class that only contains a set of MIME types for testing - whitelist and greylist rules with it. """ - def __init__(self, types): - # don't even need to make it a property - self.mimetypes = set(types) - - -class CuckooReportSample(object): # pylint: disable=too-few-public-methods - """ A dummy sample that only contains a configurable cuckoo report. """ - def __init__(self, report): - self.cuckoo_report = CuckooReport(report) - - -class TestRules(unittest.TestCase): - """ Unittests for Rules. """ - @classmethod - def setUpClass(cls): - """ Set up common test case resources. """ - cls.config = CreatingConfigParser('''[file_type_on_whitelist] -whitelist.1 : text/plain - -[file_type_on_greylist] -greylist.1 : application/x-dosexec -greylist.2 : application/zip -greylist.3 : application/msword - -[cuckoo_analysis_failed] -failure.1: end of analysis reached! -success.1: analysis completed successfully''') - - def test_config_known(self): # pylint: disable=no-self-use - """ Test the known rule configuration. """ - config = '''[known] -unknown : baz''' - # there is no exception here since empty config is acceptable - KnownRule(CreatingConfigParser()) - # there is no exception here since the known rule simply does - # not look at the configuration at all - maybe we should have a - # 'unknown section' error here - KnownRule(CreatingConfigParser(config)) - - def test_config_file_larger_than(self): - """ Test the file larger than rule configuration. """ - config = '''[file_larger_than] -bytes : 10 -unknown : baz''' - # there is no exception here since empty config is acceptable - FileLargerThanRule(CreatingConfigParser()) - - with self.assertRaisesRegexp( - PeekabooConfigException, - r'Unknown config option\(s\) found in section ' - r'file_larger_than: unknown'): - FileLargerThanRule(CreatingConfigParser(config)) - - def test_rule_file_type_on_whitelist(self): - """ Test whitelist rule. """ - combinations = [ - [False, ['text/plain']], - [True, ['application/vnd.ms-excel']], - [True, ['text/plain', 'application/vnd.ms-excel']], - [True, ['image/png', 'application/zip', 'application/vnd.ms-excel']], - [True, ['', 'asdfjkl', '93219843298']], - [True, []], - ] - rule = FileTypeOnWhitelistRule(self.config) - for expected, types in combinations: - result = rule.evaluate(MimetypeSample(types)) - self.assertEqual(result.further_analysis, expected) - - def test_config_file_type_on_whitelist(self): - """ Test whitelist rule configuration. """