summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Weiser <michael.weiser@gmx.de>2018-09-05 12:09:49 +0100
committerMichael Weiser <michael.weiser@gmx.de>2018-09-06 19:14:42 +0100
commit705de1791ae0044cdf21334953e9f31f99904ddd (patch)
tree6aa72cf7cc15639010f39224a44464f2c267d307
parentb41f93529a6a5c512652440b1f4e561af431f7fa (diff)
Replace OneAnalysis plugin with explicit submit backlog
Introduce an explicit submit backlog replacing the OneAnalysis plugin. This backlog still has the task of avoiding analysing the same sample multiple times in parallel. Instead it keeps duplicate samples in the backlog until one analysis has completed. Then it submits the others in the expectation of them being recognised as known and being analysed much faster and without a roundtrip into cuckoo. The explicit backlog avoids one instance of resubmission of samples to the job queue from within the ruleset. It also introduces a first idea of a sample state which could possibly lead to a more explicit sample state handling in general. As a side effect a lot of expensive sequential searching in the connection map is avoided at the price of a hash and a lock keeping reference of in-flight samples in the job queue. Remove dead code from ConnectionMap.
-rw-r--r--peekaboo/queuing.py92
-rw-r--r--peekaboo/ruleset/engine.py13
-rw-r--r--peekaboo/toolbox/plugins/__init__.py0
-rw-r--r--peekaboo/toolbox/plugins/oneanalysis.py101
-rw-r--r--peekaboo/toolbox/sampletools.py69
5 files changed, 85 insertions, 190 deletions
diff --git a/peekaboo/queuing.py b/peekaboo/queuing.py
index e35d55b..b4bffef 100644
--- a/peekaboo/queuing.py
+++ b/peekaboo/queuing.py
@@ -24,7 +24,7 @@
import logging
-from threading import Thread
+from threading import Thread, Lock
from Queue import Queue, Empty
from peekaboo import Singleton
from peekaboo.ruleset.engine import RulesetEngine
@@ -57,8 +57,17 @@ class JobQueue(Singleton):
workers = []
jobs = Queue()
+ queue_timeout = 300
+
+ # keep a backlog of samples with hashes identical to samples currently
+ # in analysis to avoid analysis multiple identical samples
+ # simultaneously. Once one analysis has finished, we can submit the
+ # others and the ruleset will notice that we already know the result.
+ duplicates = {}
+ duplock = Lock()
+
@staticmethod
- def submit(sample, submitter, timeout=300):
+ def submit(sample, submitter):
"""
Adds a Sample object to the job queue.
If the queue is full, we block for 300 seconds and then throw an exception.
@@ -69,9 +78,70 @@ class JobQueue(Singleton):
if the job has not been submitted.
:raises Full: if the queue is full.
"""
- logger.debug("New sample submitted to job queue by %s. %s" % (submitter, sample))
- # thread safe most likely no race condition possible
- JobQueue.jobs.put(sample, True, timeout)
+ sample_hash = sample.sha256sum
+ sample_str = str(sample)
+ duplicate = None
+ resubmit = None
+ # we have to lock this down because apart from callbacks from our
+ # Workers we're also called from the ThreadingUnixStreamServer
+ with JobQueue.duplock:
+ # check if a sample with same hash is currently in flight
+ duplicates = JobQueue.duplicates.get(sample_hash)
+ if duplicates is not None:
+ # we are regularly resubmitting samples, e.g. after we've
+ # noticed that cuckoo is finished analysing them. This
+ # obviously isn't a duplicate but continued processing of the
+ # same sample.
+ if duplicates['master'] == sample:
+ resubmit = sample_str
+ JobQueue.jobs.put(sample, True, JobQueue.queue_timeout)
+ else:
+ # record the to-be-submitted sample as duplicate and do nothing
+ duplicate = sample_str
+ duplicates['duplicates'].append(sample)
+ else:
+ # initialise a per-duplicate backlog for this sample which also
+ # serves as in-flight marker and submit to queue
+ JobQueue.duplicates[sample_hash] = { 'master': sample, 'duplicates': [] }
+ JobQueue.jobs.put(sample, True, JobQueue.queue_timeout)
+
+ if duplicate:
+ logger.debug("Sample from %s is duplicate and waiting for "
+ "running analysis to finish: %s" % (submitter, duplicate))
+ elif resubmit:
+ logger.debug("Resubmitted sample to job queue for %s: %s" %
+ (submitter, resubmit))
+ else:
+ logger.debug("New sample submitted to job queue by %s. %s" %
+ (submitter, sample_str))
+
+ @staticmethod
+ def done(sample_hash):
+ submitted_duplicates = []
+ with JobQueue.duplock:
+ # duplicates which have been submitted from the backlog still
+ # report done but do not get registered as potentially having
+ # duplicates because we expect the ruleset to identify them as
+ # already known and process them quickly now that the first
+ # instance has gone through full analysis
+ if not JobQueue.duplicates.has_key(sample_hash):
+ return
+
+ # submit all samples which have accumulated in the backlog
+ for s in JobQueue.duplicates[sample_hash]['duplicates']:
+ submitted_duplicates.append(str(s))
+ JobQueue.jobs.put(s, True, JobQueue.queue_timeout)
+
+ sample_str = str(JobQueue.duplicates[sample_hash]['master'])
+ del JobQueue.duplicates[sample_hash]
+
+ logger.debug("Cleared sample %s from in-flight list" % sample_str)
+ if len(submitted_duplicates) > 0:
+ logger.debug("Submitted duplicates from backlog: %s" % submitted_duplicates)
+
+ @staticmethod
+ def dequeue(timeout):
+ return JobQueue.jobs.get(True, timeout)
class Worker(Thread):
@@ -80,15 +150,18 @@ class Worker(Thread):
@author: Sebastian Deiss
"""
- def __init__(self, wid):
+
+ def __init__(self, wid, dequeue_timeout = 5):
self.active = True
self.worker_id = wid
+ self.dequeue_timeout = dequeue_timeout
Thread.__init__(self)
def run(self):
while self.active:
try:
- sample = JobQueue.jobs.get(True, 5) # wait blocking for next job (thread safe)
+ # wait blocking for next job (thread safe) with timeout
+ sample = JobQueue.dequeue(self.dequeue_timeout)
except Empty:
continue
logger.info('Worker %d: Processing sample %s' % (self.worker_id, sample))
@@ -99,10 +172,15 @@ class Worker(Thread):
engine = RulesetEngine(sample)
engine.run()
engine.report()
+ JobQueue.done(sample.sha256sum)
except CuckooReportPendingException:
+ logger.debug("Report for sample %s still pending" % sample)
pass
except Exception as e:
logger.exception(e)
+ # it's no longer in-flight even though processing seems to have
+ # failed
+ JobQueue.done(sample.sha256sum)
logger.debug('Worker is ready')
logger.info('Worker %d: Stopping' % self.worker_id)
diff --git a/peekaboo/ruleset/engine.py b/peekaboo/ruleset/engine.py
index a235161..9367e5a 100644
--- a/peekaboo/ruleset/engine.py
+++ b/peekaboo/ruleset/engine.py
@@ -32,7 +32,6 @@ from peekaboo.ruleset import Result, RuleResult
from peekaboo.ruleset.rules import *
from peekaboo.toolbox.peekabooyar import contains_peekabooyar
from peekaboo.exceptions import CuckooReportPendingException
-from peekaboo.toolbox.plugins.oneanalysis import OneAnalysis
logger = logging.getLogger(__name__)
@@ -65,18 +64,8 @@ class RulesetEngine(object):
ruleset_config.parse()
self.sample = sample
self.config = ruleset_config.get_config()
- self.one_analysis_tool = OneAnalysis()
def run(self):
- # TODO: Integrate this special rule in the base ruleset.
- result = self.__exec_rule(
- self.config,
- self.sample,
- self.one_analysis_tool.already_in_progress
- )
- if not result.further_analysis:
- return
-
for rule in RulesetEngine.rules:
result = self.__exec_rule(self.config, self.sample, rule)
if not result.further_analysis:
@@ -90,8 +79,6 @@ class RulesetEngine(object):
if self.sample.get_result() == Result.bad:
dump_processing_info(self.sample)
self.sample.save_result()
- # TODO: might be better to move this call to a separate function, since it's not related to reporting.
- self.one_analysis_tool.queue_identical_samples(self.sample) # depends on already_in_progress
def __exec_rule(self, config, sample, rule_function):
"""
diff --git a/peekaboo/toolbox/plugins/__init__.py b/peekaboo/toolbox/plugins/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/peekaboo/toolbox/plugins/__init__.py
+++ /dev/null
diff --git a/peekaboo/toolbox/plugins/oneanalysis.py b/peekaboo/toolbox/plugins/oneanalysis.py
deleted file mode 100644
index ba3ca70..0000000
--- a/peekaboo/toolbox/plugins/oneanalysis.py
+++ /dev/null
@@ -1,101 +0,0 @@
-###############################################################################
-# #
-# Peekaboo Extended Email Attachment Behavior Observation Owl #
-# #
-# toolbox/ #
-# plugins/ #
-# oneanalysis.py #
-###############################################################################
-# #
-# Copyright (C) 2016-2018 science + computing ag #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or (at #
-# your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, but #
-# WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
-# General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-
-import threading
-import traceback
-import sys
-import logging
-import peekaboo.queuing
-from peekaboo.ruleset import RuleResult
-from peekaboo.exceptions import CuckooReportPendingException
-from peekaboo.toolbox.sampletools import ConnectionMap
-
-
-logger = logging.getLogger(__name__)
-
-
-def singleton(class_):
- instances = {}
-
- def getinstance(*args, **kwargs):
- if class_ not in instances:
- instances[class_] = class_(*args, **kwargs)
- return instances[class_]
- return getinstance
-
-
-@singleton
-class OneAnalysis(object):
- """
- @author: Felix Bauer
- """
- __in_use = threading.Lock()
-
- def already_in_progress(self, config, s):
- with self.__in_use:
- logger.debug("enter already_in_progress")
- tb = traceback.extract_stack()
- tb = tb[-1]
- position = "%s:%s" % (tb[2], tb[1])
-
- if s.has_attr('pending'):
- s.set_attr('pending', False)
- return RuleResult(position,
- result=s.get_result(),
- reason='Datei wird jetzt Analysiert',
- further_analysis=True)
-
- l = []
- for sample in ConnectionMap.get_samples_by_sha256(s.sha256sum):
- if sample != s:
- if not sample.has_attr('pending') or not sample.get_attr('pending') is True:
- l.append(sample)
-
- if len(l) == 0:
- s.set_attr("pending", False)
- logger.debug("no second analysis present")
- return RuleResult(position,
- result=s.get_result(),
- reason='Datei wird jetzt Analysiert',
- further_analysis=True)
-
- logger.debug("there is another same sample")
- logger.debug("I'll be off until needed")
- s.set_attr("pending", True)
- # stop worker
- sys.stdout.flush()
- logger.debug("leave already_in_progress")
- raise CuckooReportPendingException()
-
- def queue_identical_samples(self, s):
- with self.__in_use:
- logger.debug("queueing identical samples")
- for sample in ConnectionMap.get_samples_by_sha256(s.sha256sum):
- pending = sample.get_attr('pending')
- if pending:
- sample.set_attr('pending', False)
- peekaboo.queuing.JobQueue.submit(sample, self.__class__)
diff --git a/peekaboo/toolbox/sampletools.py b/peekaboo/toolbox/sampletools.py
index 3a46d5a..86021ed 100644
--- a/peekaboo/toolbox/sampletools.py
+++ b/peekaboo/toolbox/sampletools.py
@@ -133,75 +133,6 @@ class ConnectionMap(Singleton):
logger.debug('Found %s for job ID %d' % (sample, job_id))
return sample
- @staticmethod
- def get_sample_by_sha256(sha256sum):
- """
- Get a Sample object from the map by its SHA-256 checksum.
-
- :param sha256sum: The SHA-256 checksum of the file represented by a Sample object.
- :return: The Sample object with the given SHA-256 checksum or None.
- """
- with ConnectionMap.__lock:
- logger.debug(
- 'Searching for a sample with SHA-256 checksum %s' % sha256sum
- )
- for __, samples in ConnectionMap.__map.iteritems():
- logger.debug('Samples for this connection: %s' % samples)
- for sample in samples:
- if sha256sum == sample.sha256sum:
- logger.debug(
- 'Found %s for SHA-256 hash %s' % (sample, sha256sum)
- )
- return sample
-
- @staticmethod
- def get_samples_by_sha256(sha256sum):
- """
- Get all Sample objects from the map with the same SHA-256 checksum.
-
- :param sha256sum: A SHA-256 checksum to search for.
- :return: The Sample objects with the given SHA-256 checksum or None.
- """
- with ConnectionMap.__lock:
- logger.debug('Searching for all samples with SHA-256 checksum %s' % sha256sum)
- matching_samples = []
- for __, samples in ConnectionMap.__map.iteritems():
- logger.debug('Samples for this connection: %s' % samples)
- for sample in samples:
- if sha256sum == sample.sha256sum:
- logger.debug('Found %s for SHA-256 hash %s' % (sample, sha256sum))
- matching_samples.append(sample)
- return matching_samples
-
- @staticmethod
- def get_samples_by_connection(socket):
- """
- Get all Sample objects for a specific socket connection.
-
- :param socket: The socket object to get all Samples objects for.
- :return: A list of all Sample objects for the given socket.
- """
- with ConnectionMap.__lock:
- matching_samples = []
- for sock in ConnectionMap.__map.iteritems():
- if sock == socket:
- matching_samples = ConnectionMap.__map[sock]
- return matching_samples
-
- @staticmethod
- def in_progress(sha256sum):
- """
- Check if a Sample object in the map has the state 'inProgress'.
-
- :param sha256sum: The SHA-256 checksum of the file represented by a Sample object.
- :return: True if the Sample object with the given SHA-256 checksum is
- 'inProgress' otherwise False.
- """
- sample = ConnectionMap.get_sample_by_sha256(sha256sum)
- if sample is not None and sample.get_result() == Result.inProgress:
- return True
- return False
-
def next_job_hash(size=8):
"""