###############################################################################
# #
# Peekaboo Extended Email Attachment Behavior Observation Owl #
# #
# config.py #
###############################################################################
# #
# Copyright (C) 2016-2019 science + computing ag #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or (at #
# your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see . #
# #
###############################################################################
""" The configuration for the main program as well as the ruleset. Handles
defaults as well as reading a configuration file. """
import re
import sys
import logging
import configparser
from peekaboo.exceptions import PeekabooConfigException
logger = logging.getLogger(__name__)
class PeekabooConfigParser( # pylint: disable=too-many-ancestors
configparser.ConfigParser):
""" A config parser that gives error feedback if a required file does not
exist or cannot be opened. """
def __init__(self, config_file):
# super() does not work here because ConfigParser uses old-style
# classes in python 2
configparser.ConfigParser.__init__(self)
try:
self.read_file(open(config_file))
except IOError as ioerror:
raise PeekabooConfigException(
'Configuration file "%s" can not be opened for reading: %s' %
(config_file, ioerror))
except configparser.Error as cperror:
raise PeekabooConfigException(
'Configuration file "%s" can not be parsed: %s' %
(config_file, cperror))
self.lists = {}
self.relists = {}
def getlist(self, section, option, raw=False, vars=None, fallback=None):
""" Special getter where multiple options in the config file
distinguished by a . suffix form a list. Matches the signature for
configparser getters. """
# cache results because the following is somewhat inefficient
if section not in self.lists:
self.lists[section] = {}
if option in self.lists[section]:
return self.lists[section][option]
if section not in self:
self.lists[section][option] = fallback
return fallback
# Go over all options in this section we want to allow "holes" in
# the lists, i.e setting.1, setting.2 but no setting.3 followed by
# setting.4. We use here that ConfigParser retains option order from
# the file.
value = []
for setting in self[section]:
if not setting.startswith(option):
continue
# Parse 'setting' into (key) and 'setting.subscript' into
# (key, subscript) and use it to determine if this setting is a
# list. Note how we do not use the subscript at all here.
name_parts = setting.split('.')
key = name_parts[0]
is_list = len(name_parts) > 1
if key != option:
continue
if not is_list:
raise PeekabooConfigException(
'Option %s in section %s is supposed to be a list '
'but given as individual setting' % (setting, section))
# Potential further checks:
# - There are no duplicate settings with ConfigParser. The last
# one always wins.
value.append(self[section].get(setting, raw=raw, vars=vars))
# it's not gonna get any better on the next call, so cache even the
# default
if not value:
value = fallback
self.lists[section][option] = value
return value
def getrelist(self, section, option, raw=False, vars=None, fallback=None):
""" Special getter for lists of regular expressions. Returns the
compiled expression objects in a list ready for matching and searching.
"""
if section not in self.relists:
self.relists[section] = {}
if option in self.relists[section]:
return self.relists[section][option]
if section not in self:
self.relists[section][option] = fallback
return fallback
strlist = self[section].getlist(option, raw=raw, vars=vars,
fallback=fallback)
if strlist is None:
self.relists[section][option] = None
return None
compiled_res = []
for regex in strlist:
try:
compiled_res.append(re.compile(regex))
except (ValueError, TypeError) as error:
raise PeekabooConfigException(
'Failed to compile regular expression "%s" (section %s, '
'option %s): %s' % (re, section, option, error))
# it's not gonna get any better on the next call, so cache even the
# default
if not compiled_res:
compiled_res = fallback
self.relists[section][option] = compiled_res
return compiled_res
def get_log_level(self, section, option, raw=False, vars=None,
fallback=None):
""" Get the log level from the configuration file and parse the string
into a logging loglevel such as logging.CRITICAL. Raises config
exception if the log level is unknown. Options identical to get(). """
levels = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG
}
level = self.get(section, option, raw=raw, vars=vars, fallback=None)
if level is None:
return fallback
if level not in levels:
raise PeekabooConfigException('Unknown log level %s' % level)
return levels[level]
def get_by_default_type(self, section, option, fallback=None,
option_type=None):
""" Get an option from the configuration file parser. Automatically
detects the type from the type of the default if given and calls the
right getter method to coerce the value to the correct type.
@param section: Which section to look for option in.
@type section: string
@param option: The option to read.
@type option: string
@param fallback: (optional) Default value to return if option is not
found. Defaults itself to None so that the method will
return None if the option is not found.
@type fallback: int, bool, str or None.
@param option_type: Override the option type.
@type option_type: int, bool, str or None. """
if option_type is None and fallback is not None:
option_type = type(fallback)
getter = {
int: self.getint,
bool: self.getboolean,
str: self.get,
None: self.get,
}
try:
return getter[option_type](section, option)
except configparser.NoSectionError:
logger.debug('Configuration section %s not found - using '
'default %s', section, fallback)
except configparser.NoOptionError:
logger.debug('Configuration option %s not found in section '
'%s - using default: %s', option, section, fallback)
return fallback
class PeekabooConfig(PeekabooConfigParser):
""" This class represents the Peekaboo configuration. """
def __init__(self, config_file=None, log_level=None):
""" Initialise the configuration with defaults, overwrite with command
line options and finally read the configuration file. """
# hard defaults: The idea here is that every config option has a
# default that would in principle enable Peekaboo to run. Code using
# the option should still cope with no or an empty value being handed
# to it.
self.user = 'peekaboo'
self.group = 'peekaboo'
self.pid_file = '/var/run/peekaboo/peekaboo.pid'
self.sock_file = '/var/run/peekaboo/peekaboo.sock'
self.log_level = logging.INFO
self.log_format = '%(asctime)s - %(name)s - (%(threadName)s) - ' \
'%(levelname)s - %(message)s'
self.interpreter = '/usr/bin/python2 -u'
self.worker_count = 3
self.sample_base_dir = '/tmp'
self.job_hash_regex = '/amavis/tmp/([^/]+)/parts/'
self.use_debug_module = False
self.keep_mail_data = False
self.processing_info_dir = '/var/lib/peekaboo/malware_reports'
self.report_locale = None
self.db_url = 'sqlite:////var/lib/peekaboo/peekaboo.db'
self.db_log_level = logging.WARNING
self.config_file = '/opt/peekaboo/etc/peekaboo.conf'
self.ruleset_config = '/opt/peekaboo/etc/ruleset.conf'
self.cuckoo_mode = "api"
self.cuckoo_url = 'http://127.0.0.1:8090'
self.cuckoo_poll_interval = 5
self.cuckoo_storage = '/var/lib/peekaboo/.cuckoo/storage'
self.cuckoo_exec = '/opt/cuckoo/bin/cuckoo'
self.cuckoo_submit = '/opt/cuckoo/bin/cuckoo submit'
self.cluster_instance_id = 0
self.cluster_stale_in_flight_threshold = 1*60*60
self.cluster_duplicate_check_interval = 60
# section and option names for the configuration file. key is the above
# variable name whose value will be overwritten by the configuration
# file value. Third item can be getter function if special parsing is
# required.
config_options = {
'log_level': ['logging', 'log_level', self.get_log_level],
'log_format': ['logging', 'log_format'],
'user': ['global', 'user'],
'group': ['global', 'group'],
'pid_file': ['global', 'pid_file'],
'sock_file': ['global', 'socket_file'],
'interpreter': ['global', 'interpreter'],
'worker_count': ['global', 'worker_count'],
'sample_base_dir': ['global', 'sample_base_dir'],
'job_hash_regex': ['global', 'job_hash_regex'],
'use_debug_module': ['global', 'use_debug_module'],
'keep_mail_data': ['global', 'keep_mail_data'],
'processing_info_dir': ['global', 'processing_info_dir'],
'report_locale': ['global', 'report_locale'],
'db_url': ['db', 'url'],
'db_log_level': ['db', 'log_level', self.get_log_level],
'ruleset_config': ['ruleset', 'config'],
'cuckoo_mode': ['cuckoo', 'mode'],
'cuckoo_url': ['cuckoo', 'url'],
'cuckoo_poll_interval': ['cuckoo', 'poll_interval'],
'cuckoo_storage': ['cuckoo', 'storage_path'],
'cuckoo_exec': ['cuckoo', 'exec'],
'cuckoo_submit': ['cuckoo', 'submit'],
'cluster_instance_id': ['cluster', 'instance_id'],
'cluster_stale_in_flight_threshold': ['cluster', 'stale_in_flight_threshold'],
'cluster_duplicate_check_interval': ['cluster', 'duplicate_check_interval'],
}
# overrides from outside, e.g. by command line arguments whose values
# are needed while reading the configuration file already (most notably
# log level and path to the config file).
if log_level:
self.log_level = log_level
if config_file:
self.config_file = config_file
# setup default logging to log any errors during the
# parsing of the config file.
self.setup_logging()
# read configuration file. Note that we require a configuration file
# here. We may change that if we decide that we want to allow the user
# to run us with the above defaults only.
PeekabooConfigParser.__init__(self, self.config_file)
# overwrite above defaults in our member variables via indirect access
settings = vars(self)
for (option, config_names) in config_options.items():
# maybe use special getter
getter = self.get_by_default_type
if len(config_names) == 3:
getter = config_names[2]
# e.g.:
# self.log_format = self.get('logging', 'log_format',
# self.log_format)
settings[option] = getter(
config_names[0], config_names[1], fallback=settings[option])
# Update logging with what we just parsed from the config
self.setup_logging()
# here we could overwrite defaults and config file with additional
# command line arguments if required
def setup_logging(self):
""" Setup logging to console by reconfiguring the root logger so that
it affects all loggers everywhere. """
_logger = logging.getLogger()
# Check if we already have a log handler
if _logger.handlers:
# Remove all handlers
for handler in _logger.handlers:
_logger.removeHandler(handler)
# log format
log_formatter = logging.Formatter(self.log_format)
# create console handler and set level to debug
to_console_log_handler = logging.StreamHandler(sys.stdout)
to_console_log_handler.setFormatter(log_formatter)
_logger.addHandler(to_console_log_handler)
_logger.setLevel(self.log_level)
def __str__(self):
settings = {}
for (option, value) in vars(self).items():
if not option.startswith('_'):
settings[option] = value
return '' % settings
__repr__ = __str__