diff options
Diffstat (limited to 'peekaboo/toolbox/cuckoo.py')
-rw-r--r-- | peekaboo/toolbox/cuckoo.py | 104 |
1 files changed, 62 insertions, 42 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py index 6528c24..a802cfc 100644 --- a/peekaboo/toolbox/cuckoo.py +++ b/peekaboo/toolbox/cuckoo.py @@ -36,9 +36,8 @@ import random import requests import urllib3.util.retry +from threading import RLock, Event from time import sleep -from threading import RLock - from twisted.internet import protocol, reactor, process from peekaboo.exceptions import CuckooSubmitFailedException @@ -51,7 +50,8 @@ class Cuckoo(object): """ Parent class, defines interface to Cuckoo. """ def __init__(self, job_queue): self.job_queue = job_queue - self.shutdown_requested = False + self.shutdown_requested = Event() + self.shutdown_requested.clear() self.running_jobs = {} self.running_jobs_lock = RLock() @@ -110,7 +110,8 @@ class Cuckoo(object): self.job_queue.submit(sample, self.__class__) def shut_down(self): - self.shutdown_requested = True + """ Request the module to shut down. """ + self.shutdown_requested.set() def reap_children(self): pass @@ -234,10 +235,10 @@ class CuckooEmbed(Cuckoo): # the resulting ReactorNotRunning exception is foiled by the fact that # sigTerm defers the call through a queue into another thread which # insists on logging it - if not self.shutdown_requested: + if not self.shutdown_requested.is_set(): reactor.sigTerm(0) - self.shutdown_requested = True + self.shutdown_requested.set() self.exit_code = exit_code def reap_children(self): @@ -301,9 +302,6 @@ class CuckooApi(Cuckoo): self.session.mount('http://', retry_adapter) self.session.mount('https://', retry_adapter) - self.reported = self.__status()["tasks"]["reported"] - logger.info("Connection to Cuckoo seems to work, %i reported tasks seen", self.reported) - def __get(self, path): request_url = "%s/%s" % (self.url, path) logger.debug("Getting %s", request_url) @@ -328,9 +326,6 @@ class CuckooApi(Cuckoo): return json_resp - def __status(self): - return self.__get("cuckoo/status") - def submit(self, sample): path = sample.submit_path filename = os.path.basename(path) @@ -356,10 +351,11 @@ class CuckooApi(Cuckoo): 'Invalid JSON in response when creating Cuckoo task: %s' % error) - task_id = json_resp["task_id"] - if task_id > 0: - self.register_running_job(task_id, sample) - return task_id + if "task_id" in json_resp: + task_id = json_resp["task_id"] + if task_id > 0: + self.register_running_job(task_id, sample) + return task_id raise CuckooSubmitFailedException( 'Unable to extract job ID from response %s' % json_resp) @@ -369,32 +365,56 @@ class CuckooApi(Cuckoo): return self.__get("tasks/report/%d" % job_id) def do(self): - # do the polling for finished jobs - # record analysis count and call status over and over again - # logger ...... - - limit = 1000000 - offset = self.__status()["tasks"]["total"] - - while not self.shutdown_requested: - cuckoo_tasks_list = None - try: - cuckoo_tasks_list = self.__get("tasks/list/%i/%i" % (limit, offset)) - except Exception as e: - logger.warn('Unable to communicate with Cuckoo API: %s' % e) - pass - - #maxJobID = cuckoo_tasks_list[-1]["id"] - - first = True - if cuckoo_tasks_list: - for j in cuckoo_tasks_list["tasks"]: - if j["status"] == "reported": - job_id = j["id"] - self.resubmit_with_report(job_id) - #self.reported = reported - sleep(float(self.poll_interval)) - + """ Do the polling for finished jobs. """ + # do a simple initial test of connectivity. With this we require the + # API to be reachable at startup (with the usual retries to account for + # a bit of a race condition in parallel startup) but later on hope + # that all errors are transient and retry endlessly + status = self.__get("cuckoo/status") + if status is None: + logger.critical("Connection to Cuckoo REST API failed") + return 1 + if "tasks" not in status or "reported" not in status["tasks"]: + logger.critical("Invalid status JSON structure from Cuckoo REST " + "API: %s", status) + return 1 + + reported = status["tasks"]["reported"] + logger.info("Connection to Cuckoo seems to work, " + "%i reported tasks seen", reported) + + while not self.shutdown_requested.wait(self.poll_interval): + # no lock, atomic, copy() because keys() returns an iterable view + # instead of a fresh new list in python3 + running_jobs = self.running_jobs.copy().keys() + + # somewhat inefficient for the potential number of requests per + # poll round but more efficient in that it does not download an + # ever growing list of jobs using tasks/list. Might also take + # quite a while to get to all jobs if retries happen on each + # request. + # A call to get data about multiple tasks in one go would be nice + # here. tasks/list could be used with the minimum job number as + # offset and spread between highest and lowest job id as limit *if* + # its output was sorted by job ID. Apparently limit and offset are + # only meant to iterate over the job list in blocks but not to + # return data about a specific range of job IDs from that list. + for job_id in running_jobs: + job = self.__get("tasks/view/%i" % job_id) + if job is None: + # ignore and retry on next polling run + continue + + # but fail hard if we get invalid stuff + if "task" not in job or "status" not in job["task"]: + logger.error("Invalid JSON structure from Cuckoo REST " + "API: %s", job) + return 1 + + if job["task"]["status"] == "reported": + self.resubmit_with_report(job_id) + + logger.debug("Shutting down.") return 0 class CuckooServer(protocol.ProcessProtocol): |