summaryrefslogtreecommitdiffstats
path: root/peekaboo/toolbox/cuckoo.py
diff options
context:
space:
mode:
Diffstat (limited to 'peekaboo/toolbox/cuckoo.py')
-rw-r--r--peekaboo/toolbox/cuckoo.py100
1 files changed, 66 insertions, 34 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py
index d62fc85..1ede5f7 100644
--- a/peekaboo/toolbox/cuckoo.py
+++ b/peekaboo/toolbox/cuckoo.py
@@ -30,11 +30,15 @@ import locale
import logging
import json
import subprocess
-import requests
import random
-from twisted.internet import protocol, reactor, process
+
from time import sleep
-from peekaboo.exceptions import CuckooAnalysisFailedException
+from threading import RLock
+
+import requests
+from twisted.internet import protocol, reactor, process
+
+from peekaboo.exceptions import CuckooSubmitFailedException
logger = logging.getLogger(__name__)
@@ -46,13 +50,44 @@ class Cuckoo:
self.job_queue = job_queue
self.shutdown_requested = False
self.running_jobs = {}
+ self.running_jobs_lock = RLock()
+
+ def register_running_job(self, job_id, sample):
+ """ Register a job as running. Detect if another sample has already
+ been registered with the same job ID which obviously must never happen
+ because it corrupts our internal housekeeping. Guarded by a lock
+ because multiple worker threads will call this routine and check for
+ collision and update of job log might otherwise race each other.
+
+ @param job_id: ID of the job to register as running.
+ @type job_id: int
+ @param sample: Sample object to associate with this job ID
+ @type sample: Sample
+
+ @returns: None
+ @raises: CuckooSubmitFailedException on job id collision """
+ with self.running_jobs_lock:
+ if (job_id in self.running_jobs and
+ self.running_jobs[job_id] is not sample):
+ raise CuckooSubmitFailedException(
+ 'A job with ID %d is already registered as running '
+ 'for sample %s' % (job_id, self.running_jobs[job_id]))
+
+ self.running_jobs[job_id] = sample
def resubmit_with_report(self, job_id):
+ """ Resubmit a sample to the job queue after the report became
+ available. Retrieves the report from Cuckoo.
+
+ @param job_id: ID of job which has finished.
+ @type job_id: int
+
+ @returns: None """
logger.debug("Analysis done for task #%d" % job_id)
- # thread-safe, no locking required, revisit if splitting into
- # multiple operations
- sample = self.running_jobs.pop(job_id, None)
+ with self.running_jobs_lock:
+ sample = self.running_jobs.pop(job_id, None)
+
if sample is None:
logger.debug('No sample found for job ID %d', job_id)
return None
@@ -125,35 +160,34 @@ class CuckooEmbed(Cuckoo):
stderr=subprocess.PIPE,
universal_newlines=True)
p.wait()
- except Exception as e:
- raise CuckooAnalysisFailedException(e)
+ except Exception as error:
+ raise CuckooSubmitFailedException(error)
if not p.returncode == 0:
- raise CuckooAnalysisFailedException('cuckoo submit returned a non-zero return code.')
- else:
- out, err = p.communicate()
- logger.debug("cuckoo submit STDOUT: %s", out)
- logger.debug("cuckoo submit STDERR: %s", err)
-
- match = None
- pattern_no = 0
- for pattern in self.job_id_patterns:
- match = re.search(pattern, out)
- if match is not None:
- logger.debug('Pattern %d matched.' % pattern_no)
- break
+ raise CuckooSubmitFailedException(
+ 'cuckoo submit returned a non-zero return code.')
- pattern_no += 1
-
+ out, err = p.communicate()
+ logger.debug("cuckoo submit STDOUT: %s", out)
+ logger.debug("cuckoo submit STDERR: %s", err)
+
+ match = None
+ pattern_no = 0
+ for pattern in self.job_id_patterns:
+ match = re.search(pattern, out)
if match is not None:
- job_id = int(match.group(1))
- # thread-safe, no locking required, revisit if splitting into
- # multiple operations
- self.running_jobs[job_id] = sample
- return job_id
+ logger.debug('Pattern %d matched.', pattern_no)
+ break
+
+ pattern_no += 1
+
+ if match is not None:
+ job_id = int(match.group(1))
+ self.register_running_job(job_id, sample)
+ return job_id
- raise CuckooAnalysisFailedException(
- 'Unable to extract job ID from given string %s' % out)
+ raise CuckooSubmitFailedException(
+ 'Unable to extract job ID from given string %s' % out)
def get_report(self, job_id):
path = os.path.join(self.cuckoo_storage,
@@ -271,11 +305,9 @@ class CuckooApi(Cuckoo):
task_id = response["task_id"]
if task_id > 0:
- # thread-safe, no locking required, revisit if splitting into
- # multiple operations
- self.running_jobs[task_id] = sample
+ self.register_running_job(task_id, sample)
return task_id
- raise CuckooAnalysisFailedException(
+ raise CuckooSubmitFailedException(
'Unable to extract job ID from given string %s' % response)
def get_report(self, job_id):