diff options
Diffstat (limited to 'peekaboo/toolbox/cuckoo.py')
-rw-r--r-- | peekaboo/toolbox/cuckoo.py | 100 |
1 files changed, 66 insertions, 34 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py index d62fc85..1ede5f7 100644 --- a/peekaboo/toolbox/cuckoo.py +++ b/peekaboo/toolbox/cuckoo.py @@ -30,11 +30,15 @@ import locale import logging import json import subprocess -import requests import random -from twisted.internet import protocol, reactor, process + from time import sleep -from peekaboo.exceptions import CuckooAnalysisFailedException +from threading import RLock + +import requests +from twisted.internet import protocol, reactor, process + +from peekaboo.exceptions import CuckooSubmitFailedException logger = logging.getLogger(__name__) @@ -46,13 +50,44 @@ class Cuckoo: self.job_queue = job_queue self.shutdown_requested = False self.running_jobs = {} + self.running_jobs_lock = RLock() + + def register_running_job(self, job_id, sample): + """ Register a job as running. Detect if another sample has already + been registered with the same job ID which obviously must never happen + because it corrupts our internal housekeeping. Guarded by a lock + because multiple worker threads will call this routine and check for + collision and update of job log might otherwise race each other. + + @param job_id: ID of the job to register as running. + @type job_id: int + @param sample: Sample object to associate with this job ID + @type sample: Sample + + @returns: None + @raises: CuckooSubmitFailedException on job id collision """ + with self.running_jobs_lock: + if (job_id in self.running_jobs and + self.running_jobs[job_id] is not sample): + raise CuckooSubmitFailedException( + 'A job with ID %d is already registered as running ' + 'for sample %s' % (job_id, self.running_jobs[job_id])) + + self.running_jobs[job_id] = sample def resubmit_with_report(self, job_id): + """ Resubmit a sample to the job queue after the report became + available. Retrieves the report from Cuckoo. + + @param job_id: ID of job which has finished. + @type job_id: int + + @returns: None """ logger.debug("Analysis done for task #%d" % job_id) - # thread-safe, no locking required, revisit if splitting into - # multiple operations - sample = self.running_jobs.pop(job_id, None) + with self.running_jobs_lock: + sample = self.running_jobs.pop(job_id, None) + if sample is None: logger.debug('No sample found for job ID %d', job_id) return None @@ -125,35 +160,34 @@ class CuckooEmbed(Cuckoo): stderr=subprocess.PIPE, universal_newlines=True) p.wait() - except Exception as e: - raise CuckooAnalysisFailedException(e) + except Exception as error: + raise CuckooSubmitFailedException(error) if not p.returncode == 0: - raise CuckooAnalysisFailedException('cuckoo submit returned a non-zero return code.') - else: - out, err = p.communicate() - logger.debug("cuckoo submit STDOUT: %s", out) - logger.debug("cuckoo submit STDERR: %s", err) - - match = None - pattern_no = 0 - for pattern in self.job_id_patterns: - match = re.search(pattern, out) - if match is not None: - logger.debug('Pattern %d matched.' % pattern_no) - break + raise CuckooSubmitFailedException( + 'cuckoo submit returned a non-zero return code.') - pattern_no += 1 - + out, err = p.communicate() + logger.debug("cuckoo submit STDOUT: %s", out) + logger.debug("cuckoo submit STDERR: %s", err) + + match = None + pattern_no = 0 + for pattern in self.job_id_patterns: + match = re.search(pattern, out) if match is not None: - job_id = int(match.group(1)) - # thread-safe, no locking required, revisit if splitting into - # multiple operations - self.running_jobs[job_id] = sample - return job_id + logger.debug('Pattern %d matched.', pattern_no) + break + + pattern_no += 1 + + if match is not None: + job_id = int(match.group(1)) + self.register_running_job(job_id, sample) + return job_id - raise CuckooAnalysisFailedException( - 'Unable to extract job ID from given string %s' % out) + raise CuckooSubmitFailedException( + 'Unable to extract job ID from given string %s' % out) def get_report(self, job_id): path = os.path.join(self.cuckoo_storage, @@ -271,11 +305,9 @@ class CuckooApi(Cuckoo): task_id = response["task_id"] if task_id > 0: - # thread-safe, no locking required, revisit if splitting into - # multiple operations - self.running_jobs[task_id] = sample + self.register_running_job(task_id, sample) return task_id - raise CuckooAnalysisFailedException( + raise CuckooSubmitFailedException( 'Unable to extract job ID from given string %s' % response) def get_report(self, job_id): |