summaryrefslogtreecommitdiffstats
path: root/peekaboo/toolbox/cuckoo.py
diff options
context:
space:
mode:
Diffstat (limited to 'peekaboo/toolbox/cuckoo.py')
-rw-r--r--peekaboo/toolbox/cuckoo.py104
1 files changed, 62 insertions, 42 deletions
diff --git a/peekaboo/toolbox/cuckoo.py b/peekaboo/toolbox/cuckoo.py
index 6528c24..a802cfc 100644
--- a/peekaboo/toolbox/cuckoo.py
+++ b/peekaboo/toolbox/cuckoo.py
@@ -36,9 +36,8 @@ import random
import requests
import urllib3.util.retry
+from threading import RLock, Event
from time import sleep
-from threading import RLock
-
from twisted.internet import protocol, reactor, process
from peekaboo.exceptions import CuckooSubmitFailedException
@@ -51,7 +50,8 @@ class Cuckoo(object):
""" Parent class, defines interface to Cuckoo. """
def __init__(self, job_queue):
self.job_queue = job_queue
- self.shutdown_requested = False
+ self.shutdown_requested = Event()
+ self.shutdown_requested.clear()
self.running_jobs = {}
self.running_jobs_lock = RLock()
@@ -110,7 +110,8 @@ class Cuckoo(object):
self.job_queue.submit(sample, self.__class__)
def shut_down(self):
- self.shutdown_requested = True
+ """ Request the module to shut down. """
+ self.shutdown_requested.set()
def reap_children(self):
pass
@@ -234,10 +235,10 @@ class CuckooEmbed(Cuckoo):
# the resulting ReactorNotRunning exception is foiled by the fact that
# sigTerm defers the call through a queue into another thread which
# insists on logging it
- if not self.shutdown_requested:
+ if not self.shutdown_requested.is_set():
reactor.sigTerm(0)
- self.shutdown_requested = True
+ self.shutdown_requested.set()
self.exit_code = exit_code
def reap_children(self):
@@ -301,9 +302,6 @@ class CuckooApi(Cuckoo):
self.session.mount('http://', retry_adapter)
self.session.mount('https://', retry_adapter)
- self.reported = self.__status()["tasks"]["reported"]
- logger.info("Connection to Cuckoo seems to work, %i reported tasks seen", self.reported)
-
def __get(self, path):
request_url = "%s/%s" % (self.url, path)
logger.debug("Getting %s", request_url)
@@ -328,9 +326,6 @@ class CuckooApi(Cuckoo):
return json_resp
- def __status(self):
- return self.__get("cuckoo/status")
-
def submit(self, sample):
path = sample.submit_path
filename = os.path.basename(path)
@@ -356,10 +351,11 @@ class CuckooApi(Cuckoo):
'Invalid JSON in response when creating Cuckoo task: %s'
% error)
- task_id = json_resp["task_id"]
- if task_id > 0:
- self.register_running_job(task_id, sample)
- return task_id
+ if "task_id" in json_resp:
+ task_id = json_resp["task_id"]
+ if task_id > 0:
+ self.register_running_job(task_id, sample)
+ return task_id
raise CuckooSubmitFailedException(
'Unable to extract job ID from response %s' % json_resp)
@@ -369,32 +365,56 @@ class CuckooApi(Cuckoo):
return self.__get("tasks/report/%d" % job_id)
def do(self):
- # do the polling for finished jobs
- # record analysis count and call status over and over again
- # logger ......
-
- limit = 1000000
- offset = self.__status()["tasks"]["total"]
-
- while not self.shutdown_requested:
- cuckoo_tasks_list = None
- try:
- cuckoo_tasks_list = self.__get("tasks/list/%i/%i" % (limit, offset))
- except Exception as e:
- logger.warn('Unable to communicate with Cuckoo API: %s' % e)
- pass
-
- #maxJobID = cuckoo_tasks_list[-1]["id"]
-
- first = True
- if cuckoo_tasks_list:
- for j in cuckoo_tasks_list["tasks"]:
- if j["status"] == "reported":
- job_id = j["id"]
- self.resubmit_with_report(job_id)
- #self.reported = reported
- sleep(float(self.poll_interval))
-
+ """ Do the polling for finished jobs. """
+ # do a simple initial test of connectivity. With this we require the
+ # API to be reachable at startup (with the usual retries to account for
+ # a bit of a race condition in parallel startup) but later on hope
+ # that all errors are transient and retry endlessly
+ status = self.__get("cuckoo/status")
+ if status is None:
+ logger.critical("Connection to Cuckoo REST API failed")
+ return 1
+ if "tasks" not in status or "reported" not in status["tasks"]:
+ logger.critical("Invalid status JSON structure from Cuckoo REST "
+ "API: %s", status)
+ return 1
+
+ reported = status["tasks"]["reported"]
+ logger.info("Connection to Cuckoo seems to work, "
+ "%i reported tasks seen", reported)
+
+ while not self.shutdown_requested.wait(self.poll_interval):
+ # no lock, atomic, copy() because keys() returns an iterable view
+ # instead of a fresh new list in python3
+ running_jobs = self.running_jobs.copy().keys()
+
+ # somewhat inefficient for the potential number of requests per
+ # poll round but more efficient in that it does not download an
+ # ever growing list of jobs using tasks/list. Might also take
+ # quite a while to get to all jobs if retries happen on each
+ # request.
+ # A call to get data about multiple tasks in one go would be nice
+ # here. tasks/list could be used with the minimum job number as
+ # offset and spread between highest and lowest job id as limit *if*
+ # its output was sorted by job ID. Apparently limit and offset are
+ # only meant to iterate over the job list in blocks but not to
+ # return data about a specific range of job IDs from that list.
+ for job_id in running_jobs:
+ job = self.__get("tasks/view/%i" % job_id)
+ if job is None:
+ # ignore and retry on next polling run
+ continue
+
+ # but fail hard if we get invalid stuff
+ if "task" not in job or "status" not in job["task"]:
+ logger.error("Invalid JSON structure from Cuckoo REST "
+ "API: %s", job)
+ return 1
+
+ if job["task"]["status"] == "reported":
+ self.resubmit_with_report(job_id)
+
+ logger.debug("Shutting down.")
return 0
class CuckooServer(protocol.ProcessProtocol):