diff options
author | Michael Weiser <michael.weiser@gmx.de> | 2019-05-07 18:58:21 +0000 |
---|---|---|
committer | Michael Weiser <michael.weiser@gmx.de> | 2019-05-09 08:14:56 +0000 |
commit | 0e066d44f63260881d2b72a05282e49e105c1791 (patch) | |
tree | a7f1abcd9696a1f4816af7572eda896ce0c80cfe | |
parent | 91712f60b19ab2227601a33364946e0482b58c58 (diff) |
More robustly poll Cuckoo REST API jobs
Downloading the full list of jobs gets less and less efficient as the
number of current and past jobs increases. There is no way to filter
down to specific job IDs. The limit and offset parameters of the list
action of the API cannot be used to achieve a similar effect because the
job list is not sorted by job/task ID and the parameters seem only meant
to iterate over the whole list in blocks, not to extract specific jobs
from it.
The previous logic of determining the highest job ID at startup and
requesting the next million entries from that offset on was therefore
likely not working as expected and making us "blind" to status changes
of jobs which end up below our offset in the job list.
This change adjusts the CuckooAPI to make use of the list of running
jobs we've had for some time now to work around this. Instead of getting
a list of all jobs starting from the highest job id we saw at startup we
just get each job's status individually. While this makes for more
requests, it should over a longer runtime make for less network traffic
and reliably get us the data we need about our jobs.
Also, turn the shutdown_requested flag into an event so we can use its
wait() method to also implement the poll interval and get immediate
reaction to a shutdown request.
Finally, we switch to endless retrying of failed job status requests
paired with the individual request retry logic introduced earlier. On
submission we still fail the submission process after timeouts or
retries on the assumption that without the job being submitted to
Cuckoo, our feedback to the client that analysis fail will cause it to
resubmit and still avoid duplicates.
Closes #43.
-rw-r--r-- | peekaboo/toolbox/cuckoo.py | 104 |
1 files changed, 62 insertions, 42 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py index 6528c24..a802cfc 100644 --- a/peekaboo/toolbox/cuckoo.py +++ b/peekaboo/toolbox/cuckoo.py @@ -36,9 +36,8 @@ import random import requests import urllib3.util.retry +from threading import RLock, Event from time import sleep -from threading import RLock - from twisted.internet import protocol, reactor, process from peekaboo.exceptions import CuckooSubmitFailedException @@ -51,7 +50,8 @@ class Cuckoo(object): """ Parent class, defines interface to Cuckoo. """ def __init__(self, job_queue): self.job_queue = job_queue - self.shutdown_requested = False + self.shutdown_requested = Event() + self.shutdown_requested.clear() self.running_jobs = {} self.running_jobs_lock = RLock() @@ -110,7 +110,8 @@ class Cuckoo(object): self.job_queue.submit(sample, self.__class__) def shut_down(self): - self.shutdown_requested = True + """ Request the module to shut down. """ + self.shutdown_requested.set() def reap_children(self): pass @@ -234,10 +235,10 @@ class CuckooEmbed(Cuckoo): # the resulting ReactorNotRunning exception is foiled by the fact that # sigTerm defers the call through a queue into another thread which # insists on logging it - if not self.shutdown_requested: + if not self.shutdown_requested.is_set(): reactor.sigTerm(0) - self.shutdown_requested = True + self.shutdown_requested.set() self.exit_code = exit_code def reap_children(self): @@ -301,9 +302,6 @@ class CuckooApi(Cuckoo): self.session.mount('http://', retry_adapter) self.session.mount('https://', retry_adapter) - self.reported = self.__status()["tasks"]["reported"] - logger.info("Connection to Cuckoo seems to work, %i reported tasks seen", self.reported) - def __get(self, path): request_url = "%s/%s" % (self.url, path) logger.debug("Getting %s", request_url) @@ -328,9 +326,6 @@ class CuckooApi(Cuckoo): return json_resp - def __status(self): - return self.__get("cuckoo/status") - def submit(self, sample): path = sample.submit_path filename = os.path.basename(path) @@ -356,10 +351,11 @@ class CuckooApi(Cuckoo): 'Invalid JSON in response when creating Cuckoo task: %s' % error) - task_id = json_resp["task_id"] - if task_id > 0: - self.register_running_job(task_id, sample) - return task_id + if "task_id" in json_resp: + task_id = json_resp["task_id"] + if task_id > 0: + self.register_running_job(task_id, sample) + return task_id raise CuckooSubmitFailedException( 'Unable to extract job ID from response %s' % json_resp) @@ -369,32 +365,56 @@ class CuckooApi(Cuckoo): return self.__get("tasks/report/%d" % job_id) def do(self): - # do the polling for finished jobs - # record analysis count and call status over and over again - # logger ...... - - limit = 1000000 - offset = self.__status()["tasks"]["total"] - - while not self.shutdown_requested: - cuckoo_tasks_list = None - try: - cuckoo_tasks_list = self.__get("tasks/list/%i/%i" % (limit, offset)) - except Exception as e: - logger.warn('Unable to communicate with Cuckoo API: %s' % e) - pass - - #maxJobID = cuckoo_tasks_list[-1]["id"] - - first = True - if cuckoo_tasks_list: - for j in cuckoo_tasks_list["tasks"]: - if j["status"] == "reported": - job_id = j["id"] - self.resubmit_with_report(job_id) - #self.reported = reported - sleep(float(self.poll_interval)) - + """ Do the polling for finished jobs. """ + # do a simple initial test of connectivity. With this we require the + # API to be reachable at startup (with the usual retries to account for + # a bit of a race condition in parallel startup) but later on hope + # that all errors are transient and retry endlessly + status = self.__get("cuckoo/status") + if status is None: + logger.critical("Connection to Cuckoo REST API failed") + return 1 + if "tasks" not in status or "reported" not in status["tasks"]: + logger.critical("Invalid status JSON structure from Cuckoo REST " + "API: %s", status) + return 1 + + reported = status["tasks"]["reported"] + logger.info("Connection to Cuckoo seems to work, " + "%i reported tasks seen", reported) + + while not self.shutdown_requested.wait(self.poll_interval): + # no lock, atomic, copy() because keys() returns an iterable view + # instead of a fresh new list in python3 + running_jobs = self.running_jobs.copy().keys() + + # somewhat inefficient for the potential number of requests per + # poll round but more efficient in that it does not download an + # ever growing list of jobs using tasks/list. Might also take + # quite a while to get to all jobs if retries happen on each + # request. + # A call to get data about multiple tasks in one go would be nice + # here. tasks/list could be used with the minimum job number as + # offset and spread between highest and lowest job id as limit *if* + # its output was sorted by job ID. Apparently limit and offset are + # only meant to iterate over the job list in blocks but not to + # return data about a specific range of job IDs from that list. + for job_id in running_jobs: + job = self.__get("tasks/view/%i" % job_id) + if job is None: + # ignore and retry on next polling run + continue + + # but fail hard if we get invalid stuff + if "task" not in job or "status" not in job["task"]: + logger.error("Invalid JSON structure from Cuckoo REST " + "API: %s", job) + return 1 + + if job["task"]["status"] == "reported": + self.resubmit_with_report(job_id) + + logger.debug("Shutting down.") return 0 class CuckooServer(protocol.ProcessProtocol): |