summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Weiser <michael.weiser@gmx.de>2019-05-07 18:58:21 +0000
committerMichael Weiser <michael.weiser@gmx.de>2019-05-09 08:14:56 +0000
commit0e066d44f63260881d2b72a05282e49e105c1791 (patch)
treea7f1abcd9696a1f4816af7572eda896ce0c80cfe
parent91712f60b19ab2227601a33364946e0482b58c58 (diff)
More robustly poll Cuckoo REST API jobs
Downloading the full list of jobs gets less and less efficient as the number of current and past jobs increases. There is no way to filter down to specific job IDs. The limit and offset parameters of the list action of the API cannot be used to achieve a similar effect because the job list is not sorted by job/task ID and the parameters seem only meant to iterate over the whole list in blocks, not to extract specific jobs from it. The previous logic of determining the highest job ID at startup and requesting the next million entries from that offset on was therefore likely not working as expected and making us "blind" to status changes of jobs which end up below our offset in the job list. This change adjusts the CuckooAPI to make use of the list of running jobs we've had for some time now to work around this. Instead of getting a list of all jobs starting from the highest job id we saw at startup we just get each job's status individually. While this makes for more requests, it should over a longer runtime make for less network traffic and reliably get us the data we need about our jobs. Also, turn the shutdown_requested flag into an event so we can use its wait() method to also implement the poll interval and get immediate reaction to a shutdown request. Finally, we switch to endless retrying of failed job status requests paired with the individual request retry logic introduced earlier. On submission we still fail the submission process after timeouts or retries on the assumption that without the job being submitted to Cuckoo, our feedback to the client that analysis fail will cause it to resubmit and still avoid duplicates. Closes #43.
-rw-r--r--peekaboo/toolbox/cuckoo.py104
1 files changed, 62 insertions, 42 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py
index 6528c24..a802cfc 100644
--- a/peekaboo/toolbox/cuckoo.py
+++ b/peekaboo/toolbox/cuckoo.py
@@ -36,9 +36,8 @@ import random
import requests
import urllib3.util.retry
+from threading import RLock, Event
from time import sleep
-from threading import RLock
-
from twisted.internet import protocol, reactor, process
from peekaboo.exceptions import CuckooSubmitFailedException
@@ -51,7 +50,8 @@ class Cuckoo(object):
""" Parent class, defines interface to Cuckoo. """
def __init__(self, job_queue):
self.job_queue = job_queue
- self.shutdown_requested = False
+ self.shutdown_requested = Event()
+ self.shutdown_requested.clear()
self.running_jobs = {}
self.running_jobs_lock = RLock()
@@ -110,7 +110,8 @@ class Cuckoo(object):
self.job_queue.submit(sample, self.__class__)
def shut_down(self):
- self.shutdown_requested = True
+ """ Request the module to shut down. """
+ self.shutdown_requested.set()
def reap_children(self):
pass
@@ -234,10 +235,10 @@ class CuckooEmbed(Cuckoo):
# the resulting ReactorNotRunning exception is foiled by the fact that
# sigTerm defers the call through a queue into another thread which
# insists on logging it
- if not self.shutdown_requested:
+ if not self.shutdown_requested.is_set():
reactor.sigTerm(0)
- self.shutdown_requested = True
+ self.shutdown_requested.set()
self.exit_code = exit_code
def reap_children(self):
@@ -301,9 +302,6 @@ class CuckooApi(Cuckoo):
self.session.mount('http://', retry_adapter)
self.session.mount('https://', retry_adapter)
- self.reported = self.__status()["tasks"]["reported"]
- logger.info("Connection to Cuckoo seems to work, %i reported tasks seen", self.reported)
-
def __get(self, path):
request_url = "%s/%s" % (self.url, path)
logger.debug("Getting %s", request_url)
@@ -328,9 +326,6 @@ class CuckooApi(Cuckoo):
return json_resp
- def __status(self):
- return self.__get("cuckoo/status")
-
def submit(self, sample):
path = sample.submit_path
filename = os.path.basename(path)
@@ -356,10 +351,11 @@ class CuckooApi(Cuckoo):
'Invalid JSON in response when creating Cuckoo task: %s'
% error)
- task_id = json_resp["task_id"]
- if task_id > 0:
- self.register_running_job(task_id, sample)
- return task_id
+ if "task_id" in json_resp:
+ task_id = json_resp["task_id"]
+ if task_id > 0:
+ self.register_running_job(task_id, sample)
+ return task_id
raise CuckooSubmitFailedException(
'Unable to extract job ID from response %s' % json_resp)
@@ -369,32 +365,56 @@ class CuckooApi(Cuckoo):
return self.__get("tasks/report/%d" % job_id)
def do(self):
- # do the polling for finished jobs
- # record analysis count and call status over and over again
- # logger ......
-
- limit = 1000000
- offset = self.__status()["tasks"]["total"]
-
- while not self.shutdown_requested:
- cuckoo_tasks_list = None
- try:
- cuckoo_tasks_list = self.__get("tasks/list/%i/%i" % (limit, offset))
- except Exception as e:
- logger.warn('Unable to communicate with Cuckoo API: %s' % e)
- pass
-
- #maxJobID = cuckoo_tasks_list[-1]["id"]
-
- first = True
- if cuckoo_tasks_list:
- for j in cuckoo_tasks_list["tasks"]:
- if j["status"] == "reported":
- job_id = j["id"]
- self.resubmit_with_report(job_id)
- #self.reported = reported
- sleep(float(self.poll_interval))
-
+ """ Do the polling for finished jobs. """
+ # do a simple initial test of connectivity. With this we require the
+ # API to be reachable at startup (with the usual retries to account for
+ # a bit of a race condition in parallel startup) but later on hope
+ # that all errors are transient and retry endlessly
+ status = self.__get("cuckoo/status")
+ if status is None:
+ logger.critical("Connection to Cuckoo REST API failed")
+ return 1
+ if "tasks" not in status or "reported" not in status["tasks"]:
+ logger.critical("Invalid status JSON structure from Cuckoo REST "
+ "API: %s", status)
+ return 1
+
+ reported = status["tasks"]["reported"]
+ logger.info("Connection to Cuckoo seems to work, "
+ "%i reported tasks seen", reported)
+
+ while not self.shutdown_requested.wait(self.poll_interval):
+ # no lock, atomic, copy() because keys() returns an iterable view
+ # instead of a fresh new list in python3
+ running_jobs = self.running_jobs.copy().keys()
+
+ # somewhat inefficient for the potential number of requests per
+ # poll round but more efficient in that it does not download an
+ # ever growing list of jobs using tasks/list. Might also take
+ # quite a while to get to all jobs if retries happen on each
+ # request.
+ # A call to get data about multiple tasks in one go would be nice
+ # here. tasks/list could be used with the minimum job number as
+ # offset and spread between highest and lowest job id as limit *if*
+ # its output was sorted by job ID. Apparently limit and offset are
+ # only meant to iterate over the job list in blocks but not to
+ # return data about a specific range of job IDs from that list.
+ for job_id in running_jobs:
+ job = self.__get("tasks/view/%i" % job_id)
+ if job is None:
+ # ignore and retry on next polling run
+ continue
+
+ # but fail hard if we get invalid stuff
+ if "task" not in job or "status" not in job["task"]:
+ logger.error("Invalid JSON structure from Cuckoo REST "
+ "API: %s", job)
+ return 1
+
+ if job["task"]["status"] == "reported":
+ self.resubmit_with_report(job_id)
+
+ logger.debug("Shutting down.")
return 0
class CuckooServer(protocol.ProcessProtocol):