summaryrefslogtreecommitdiffstats
path: root/peekaboo/queuing.py
blob: 33e04de41d2694913f1465137f0767bf93467ad4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
###############################################################################
#                                                                             #
# Peekaboo Extended Email Attachment Behavior Observation Owl                 #
#                                                                             #
# queuing.py                                                                  #
###############################################################################
#                                                                             #
# Copyright (C) 2016-2018  science + computing ag                             #
#                                                                             #
# This program is free software: you can redistribute it and/or modify        #
# it under the terms of the GNU General Public License as published by        #
# the Free Software Foundation, either version 3 of the License, or (at       #
# your option) any later version.                                             #
#                                                                             #
# This program is distributed in the hope that it will be useful, but         #
# WITHOUT ANY WARRANTY; without even the implied warranty of                  #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
# General Public License for more details.                                    #
#                                                                             #
# You should have received a copy of the GNU General Public License           #
# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
#                                                                             #
###############################################################################


import logging
from threading import Thread
from Queue import Queue
from peekaboo import Singleton
from peekaboo.ruleset.engine import RulesetEngine
from peekaboo.exceptions import CuckooReportPendingException


logger = logging.getLogger(__name__)


def create_workers(worker_count=4):
    """
    Create n Peekaboo worker threads to process samples.

    :param worker_count: The amount of worker threads to create. Defaults to 4.
    """
    for i in range(0, worker_count):
        logger.debug("Create Worker %d" % i)
        w = Worker(i)
        JobQueue.workers.append(w)
        w.start()
    logger.info('Created %d Workers.' % worker_count)


class JobQueue(Singleton):
    """
    Peekaboo's queuing system.

    @author: Sebastian Deiss
    """
    workers = []
    jobs = Queue()

    @staticmethod
    def submit(sample, submitter, timeout=300):
        """
        Adds a Sample object to the job queue.
        If the queue is full, we block for 300 seconds and then throw an exception.

        :param sample: The Sample object to add to the queue.
        :param submitter: The name of the class / module that wants to submit the sample.
        :param timeout: Block until timeout is reached and then trow an exception
                        if the job has not been submitted.
        :raises Full: if the queue is full.
        """
        logger.debug("New sample submitted to job queue by %s. %s" % (submitter, sample))
        # thread safe most likely no race condition possible
        JobQueue.jobs.put(sample, True, timeout)


class Worker(Thread):
    """
    A Worker thread to process a sample.

    @author: Sebastian Deiss
    """
    def __init__(self, wid):
        self.active = True
        self.worker_id = wid
        Thread.__init__(self)

    def run(self):
        while self.active:
            logger.debug('Worker is ready')
            sample = JobQueue.jobs.get(True)  # wait blocking for next job (thread safe)
            logger.info('Worker %d: Processing sample %s' % (self.worker_id, sample))

            sample.init()

            try:
                engine = RulesetEngine(sample)
                engine.run()
                engine.report()
            except CuckooReportPendingException:
                pass
            except Exception as e:
                logger.exception(e)

    def __exit__(self, exc_type, exc_value, traceback):
        self.active = False