############################################################################### # # # Peekaboo Extended Email Attachment Behavior Observation Owl # # # # config.py # ############################################################################### # # # Copyright (C) 2016-2019 science + computing ag # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or (at # # your option) any later version. # # # # This program is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # # General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # ############################################################################### """ The configuration for the main program as well as the ruleset. Handles defaults as well as reading a configuration file. """ import re import sys import logging import configparser from peekaboo.exceptions import PeekabooConfigException logger = logging.getLogger(__name__) class PeekabooConfigParser( # pylint: disable=too-many-ancestors configparser.ConfigParser): """ A config parser that gives error feedback if a required file does not exist or cannot be opened. """ def __init__(self, config_file): # super() does not work here because ConfigParser uses old-style # classes in python 2 configparser.ConfigParser.__init__(self) try: self.read_file(open(config_file)) except IOError as ioerror: raise PeekabooConfigException( 'Configuration file "%s" can not be opened for reading: %s' % (config_file, ioerror)) except configparser.Error as cperror: raise PeekabooConfigException( 'Configuration file "%s" can not be parsed: %s' % (config_file, cperror)) self.lists = {} self.relists = {} def getlist(self, section, option, raw=False, vars=None, fallback=None): """ Special getter where multiple options in the config file distinguished by a . suffix form a list. Matches the signature for configparser getters. """ # cache results because the following is somewhat inefficient if section not in self.lists: self.lists[section] = {} if option in self.lists[section]: return self.lists[section][option] if section not in self: self.lists[section][option] = fallback return fallback # Go over all options in this section we want to allow "holes" in # the lists, i.e setting.1, setting.2 but no setting.3 followed by # setting.4. We use here that ConfigParser retains option order from # the file. value = [] for setting in self[section]: if not setting.startswith(option): continue # Parse 'setting' into (key) and 'setting.subscript' into # (key, subscript) and use it to determine if this setting is a # list. Note how we do not use the subscript at all here. name_parts = setting.split('.') key = name_parts[0] is_list = len(name_parts) > 1 if key != option: continue if not is_list: raise PeekabooConfigException( 'Option %s in section %s is supposed to be a list ' 'but given as individual setting' % (setting, section)) # Potential further checks: # - There are no duplicate settings with ConfigParser. The last # one always wins. value.append(self[section].get(setting, raw=raw, vars=vars)) # it's not gonna get any better on the next call, so cache even the # default if not value: value = fallback self.lists[section][option] = value return value def getrelist(self, section, option, raw=False, vars=None, fallback=None): """ Special getter for lists of regular expressions. Returns the compiled expression objects in a list ready for matching and searching. """ if section not in self.relists: self.relists[section] = {} if option in self.relists[section]: return self.relists[section][option] if section not in self: self.relists[section][option] = fallback return fallback strlist = self[section].getlist(option, raw=raw, vars=vars, fallback=fallback) if strlist is None: self.relists[section][option] = None return None compiled_res = [] for regex in strlist: try: compiled_res.append(re.compile(regex)) except (ValueError, TypeError) as error: raise PeekabooConfigException( 'Failed to compile regular expression "%s" (section %s, ' 'option %s): %s' % (re, section, option, error)) # it's not gonna get any better on the next call, so cache even the # default if not compiled_res: compiled_res = fallback self.relists[section][option] = compiled_res return compiled_res def get_log_level(self, section, option, raw=False, vars=None, fallback=None): """ Get the log level from the configuration file and parse the string into a logging loglevel such as logging.CRITICAL. Raises config exception if the log level is unknown. Options identical to get(). """ levels = { 'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR, 'WARNING': logging.WARNING, 'INFO': logging.INFO, 'DEBUG': logging.DEBUG } level = self.get(section, option, raw=raw, vars=vars, fallback=None) if level is None: return fallback if level not in levels: raise PeekabooConfigException('Unknown log level %s' % level) return levels[level] def get_by_default_type(self, section, option, fallback=None, option_type=None): """ Get an option from the configuration file parser. Automatically detects the type from the type of the default if given and calls the right getter method to coerce the value to the correct type. @param section: Which section to look for option in. @type section: string @param option: The option to read. @type option: string @param fallback: (optional) Default value to return if option is not found. Defaults itself to None so that the method will return None if the option is not found. @type fallback: int, bool, str or None. @param option_type: Override the option type. @type option_type: int, bool, str or None. """ if option_type is None and fallback is not None: option_type = type(fallback) getter = { int: self.getint, bool: self.getboolean, str: self.get, None: self.get, } try: return getter[option_type](section, option) except configparser.NoSectionError: logger.debug('Configuration section %s not found - using ' 'default %s', section, fallback) except configparser.NoOptionError: logger.debug('Configuration option %s not found in section ' '%s - using default: %s', option, section, fallback) return fallback class PeekabooConfig(PeekabooConfigParser): """ This class represents the Peekaboo configuration. """ def __init__(self, config_file=None, log_level=None): """ Initialise the configuration with defaults, overwrite with command line options and finally read the configuration file. """ # hard defaults: The idea here is that every config option has a # default that would in principle enable Peekaboo to run. Code using # the option should still cope with no or an empty value being handed # to it. self.user = 'peekaboo' self.group = 'peekaboo' self.pid_file = '/var/run/peekaboo/peekaboo.pid' self.sock_file = '/var/run/peekaboo/peekaboo.sock' self.log_level = logging.INFO self.log_format = '%(asctime)s - %(name)s - (%(threadName)s) - ' \ '%(levelname)s - %(message)s' self.interpreter = '/usr/bin/python2 -u' self.worker_count = 3 self.sample_base_dir = '/tmp' self.job_hash_regex = '/amavis/tmp/([^/]+)/parts/' self.use_debug_module = False self.keep_mail_data = False self.processing_info_dir = '/var/lib/peekaboo/malware_reports' self.report_locale = None self.db_url = 'sqlite:////var/lib/peekaboo/peekaboo.db' self.db_log_level = logging.WARNING self.config_file = '/opt/peekaboo/etc/peekaboo.conf' self.ruleset_config = '/opt/peekaboo/etc/ruleset.conf' self.cuckoo_mode = "api" self.cuckoo_url = 'http://127.0.0.1:8090' self.cuckoo_poll_interval = 5 self.cuckoo_storage = '/var/lib/peekaboo/.cuckoo/storage' self.cuckoo_exec = '/opt/cuckoo/bin/cuckoo' self.cuckoo_submit = '/opt/cuckoo/bin/cuckoo submit' self.cluster_instance_id = 0 self.cluster_stale_in_flight_threshold = 1*60*60 self.cluster_duplicate_check_interval = 60 # section and option names for the configuration file. key is the above # variable name whose value will be overwritten by the configuration # file value. Third item can be getter function if special parsing is # required. config_options = { 'log_level': ['logging', 'log_level', self.get_log_level], 'log_format': ['logging', 'log_format'], 'user': ['global', 'user'], 'group': ['global', 'group'], 'pid_file': ['global', 'pid_file'], 'sock_file': ['global', 'socket_file'], 'interpreter': ['global', 'interpreter'], 'worker_count': ['global', 'worker_count'], 'sample_base_dir': ['global', 'sample_base_dir'], 'job_hash_regex': ['global', 'job_hash_regex'], 'use_debug_module': ['global', 'use_debug_module'], 'keep_mail_data': ['global', 'keep_mail_data'], 'processing_info_dir': ['global', 'processing_info_dir'], 'report_locale': ['global', 'report_locale'], 'db_url': ['db', 'url'], 'db_log_level': ['db', 'log_level', self.get_log_level], 'ruleset_config': ['ruleset', 'config'], 'cuckoo_mode': ['cuckoo', 'mode'], 'cuckoo_url': ['cuckoo', 'url'], 'cuckoo_poll_interval': ['cuckoo', 'poll_interval'], 'cuckoo_storage': ['cuckoo', 'storage_path'], 'cuckoo_exec': ['cuckoo', 'exec'], 'cuckoo_submit': ['cuckoo', 'submit'], 'cluster_instance_id': ['cluster', 'instance_id'], 'cluster_stale_in_flight_threshold': ['cluster', 'stale_in_flight_threshold'], 'cluster_duplicate_check_interval': ['cluster', 'duplicate_check_interval'], } # overrides from outside, e.g. by command line arguments whose values # are needed while reading the configuration file already (most notably # log level and path to the config file). if log_level: self.log_level = log_level if config_file: self.config_file = config_file # setup default logging to log any errors during the # parsing of the config file. self.setup_logging() # read configuration file. Note that we require a configuration file # here. We may change that if we decide that we want to allow the user # to run us with the above defaults only. PeekabooConfigParser.__init__(self, self.config_file) # overwrite above defaults in our member variables via indirect access settings = vars(self) for (option, config_names) in config_options.items(): # maybe use special getter getter = self.get_by_default_type if len(config_names) == 3: getter = config_names[2] # e.g.: # self.log_format = self.get('logging', 'log_format', # self.log_format) settings[option] = getter( config_names[0], config_names[1], fallback=settings[option]) # Update logging with what we just parsed from the config self.setup_logging() # here we could overwrite defaults and config file with additional # command line arguments if required def setup_logging(self): """ Setup logging to console by reconfiguring the root logger so that it affects all loggers everywhere. """ _logger = logging.getLogger() # Check if we already have a log handler if _logger.handlers: # Remove all handlers for handler in _logger.handlers: _logger.removeHandler(handler) # log format log_formatter = logging.Formatter(self.log_format) # create console handler and set level to debug to_console_log_handler = logging.StreamHandler(sys.stdout) to_console_log_handler.setFormatter(log_formatter) _logger.addHandler(to_console_log_handler) _logger.setLevel(self.log_level) def __str__(self): settings = {} for (option, value) in vars(self).items(): if not option.startswith('_'): settings[option] = value return '' % settings __repr__ = __str__