summaryrefslogtreecommitdiffstats
path: root/peekaboo/server.py
blob: 6eaf694ea02b565b524ee821191ded2f7c1c0cbe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
###############################################################################
#                                                                             #
# Peekaboo Extended Email Attachment Behavior Observation Owl                 #
#                                                                             #
# server.py                                                                   #
###############################################################################
#                                                                             #
# Copyright (C) 2016-2019  science + computing ag                             #
#                                                                             #
# This program is free software: you can redistribute it and/or modify        #
# it under the terms of the GNU General Public License as published by        #
# the Free Software Foundation, either version 3 of the License, or (at       #
# your option) any later version.                                             #
#                                                                             #
# This program is distributed in the hope that it will be useful, but         #
# WITHOUT ANY WARRANTY; without even the implied warranty of                  #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
# General Public License for more details.                                    #
#                                                                             #
# You should have received a copy of the GNU General Public License           #
# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
#                                                                             #
###############################################################################

""" This module implements the Peekaboo server, i.e. the frontend to the
client. """

import errno
import json
import logging
import os
import stat
import socket
import socketserver
from threading import Thread, Event
from peekaboo.ruleset import Result


logger = logging.getLogger(__name__)


class PeekabooStreamServer(socketserver.ThreadingUnixStreamServer):
    """
    Asynchronous server.

    @author: Sebastian Deiss
    """
    def __init__(self, server_address, request_handler_cls, job_queue,
                 sample_factory, bind_and_activate=True,
                 request_queue_size=10, status_change_timeout=60):
        self.server_address = server_address
        self.__job_queue = job_queue
        self.__sample_factory = sample_factory
        self.request_queue_size = request_queue_size
        self.allow_reuse_address = True
        self.status_change_timeout = status_change_timeout
        self.__shutdown_requested = False
        self.__request_triggers = {}

        # no super() since old-style classes
        logger.debug('Starting up server.')
        socketserver.ThreadingUnixStreamServer.__init__(
            self, server_address, request_handler_cls,
            bind_and_activate=bind_and_activate)

    @property
    def job_queue(self):
        """ Return this server's reference to the job queue. Used by handler
        threads to get access to it for submission of samples for processing.
        """
        return self.__job_queue

    @property
    def sample_factory(self):
        """ Return this server's reference to a factory that can create
        pre-configured sample objects. Used by handler threads to get access to
        it for creation of samples prior to submission for processing. """
        return self.__sample_factory

    @property
    def shutting_down(self):
        """ Return True if we've received a shutdown request. """
        return self.__shutdown_requested

    def register_request(self, thread, event):
        """ Register an event for a request being handled to trigger if we want
        it to shut down. """
        self.__request_triggers[thread] = event
        logger.debug('Request registered with server.')

    def deregister_request(self, thread):
        """ Deregister a request which has finished handling and does no logner
        need to be made aware that we want it to shut down. """
        logger.debug('Request deregistered from server.')
        del self.__request_triggers[thread]

    def shutdown(self):
        """ Shut down the server. In our case, notify requests which are
        currently being handled to shut down as well. """
        logger.debug('Server shutting down.')
        self.__shutdown_requested = True
        for thread in self.__request_triggers:
            # wake up the thread so it can see that we're shutting down
            self.__request_triggers[thread].set()

        socketserver.ThreadingUnixStreamServer.shutdown(self)

    def server_close(self):
        """ Finally completely close down the server. """
        # no new connections from this point on
        logger.debug('Removing connection socket %s', self.server_address)
        try:
            os.remove(self.server_address)
        except OSError as oserror:
            logger.warning('Removal of socket %s failed: %s',
                           self.server_address, oserror)

        logger.debug('Closing down server.')
        return socketserver.ThreadingUnixStreamServer.server_close(self)


class PeekabooStreamRequestHandler(socketserver.StreamRequestHandler):
    """
    Request handler used by PeekabooStreamServer to handle analysis requests.

    @author: Sebastian Deiss
    """
    def setup(self):
        socketserver.StreamRequestHandler.setup(self)
        self.job_queue = self.server.job_queue
        self.sample_factory = self.server.sample_factory
        self.status_change_timeout = self.server.status_change_timeout

        # create an event we will give to all the samples and our server to
        # wake us if they need out attention
        self.status_change = Event()
        self.status_change.clear()

    def handle(self):
        """ Handles an analysis request. """
        # catch wavering clients early on
        logger.debug('New connection incoming.')
        if not self.talk_back([_('Hello, this is Peekaboo.'), '']):
            return

        submitted = self.parse()
        if not submitted:
            return

        if not self.wait(submitted):
            # something went wrong while waiting, i.e. client closed connection
            # or we're shutting down
            return

        # here we know that all samples have reported back
        self.report(submitted)

    def parse(self):
        """ Reads and parses an analysis request. This is expected to be a JSON
        structure containing the path of the directory / file to analyse.
        Structure::

            [ { "full_name": "<path>",
                "name_declared": ...,
                ... },
              { ... },
              ... ]

        The maximum buffer size is 16 KiB, because JSON incurs some bloat.
        """
        request = self.request.recv(1024 * 16).rstrip()

        try:
            parts = json.loads(request)
        except ValueError as error:
            self.talk_back(_('Error: Invalid JSON in request.'))
            logger.error('Invalid JSON in request: %s', error)
            return None

        if not isinstance(parts, (list, tuple)):
            self.talk_back(_('ERROR: Invalid data structure.'))
            logger.error('Invalid data structure.')
            return None

        submitted = []
        for part in parts:
            if not part.has_key('full_name'):
                self.talk_back(_('ERROR: Incomplete data structure.'))
                logger.error('Incomplete data structure.')
                return None

            path = part['full_name']
            logger.info("Got run_analysis request for %s", path)
            if not os.path.exists(path):
                self.talk_back(_('ERROR: Path does not exist or no '
                                 'permission to access it.'))
                logger.error('Path does not exist or no permission '
                             'to access it.')
                return None

            if not os.path.isfile(path):
                self.talk_back(_('ERRROR: Input is not a file'))
                logger.error('Input is not a file')
                return None

            sample = self.sample_factory.make_sample(
                path, status_change=self.status_change, metainfo=part)
            if not self.job_queue.submit(sample, self.__class__):
                self.talk_back(_('Error submitting sample to job queue'))
                # submit will have logged an error
                return None

            submitted.append(sample)
            logger.debug('Created and submitted sample %s', sample)

        return submitted

    def wait(self, to_be_analysed):
        """ Wait for submitted analysis jobs to finished.

        @param to_be_analysed: samples that have been submitted for analysis
                               and which will report back to us when they're
                               finished.
        @type to_be_analysed: List of Sample objects
        """
        # register with our server so it can notify us if it wants us to shut
        # down
        # NOTE: Every exit point from this routine needs to deregister this
        # request from the server to avoid memory leaks. Unfortunately, the
        # server cannot do this iteself on shutdown_request() because it does
        # not have any thread ID available there.
        self.server.register_request(self, self.status_change)

        # wait for results to come in
        while to_be_analysed:
            # wait for an event to signal that its status has changed or
            # timeout expires
            if not self.status_change.wait(self.status_change_timeout):
                # keep our client engaged
                # TODO: Impose maximum processing time of our own?
                if not self.talk_back(_('Files are being analyzed...')):
                    # Abort handling this request since no-one's interested
                    # any more. We could dequeue the samples here to avoid
                    # unnecessary work. Instead we'll have them run their
                    # course, assuming that we'll be much quicker
                    # responding if the client decides to resubmit them.
                    self.server.deregister_request(self)
                    return False

                logger.debug('Client updated that samples are still '
                             'processing.')

                # Fall through here and evaluate all samples for paranoia's
                # sake in case our status change event has a race condition.
                # It shouldn't though, because we wait for it, then first clear
                # it and then look at all samples that might have set it. If
                # while doing that another sample sets it and we don't catch it
                # because we've already looked at it, the next wait will
                # immediately return and send us back into checking all samples
                # for status change.

            # see if our server is shutting down and follow it if so
            if self.server.shutting_down:
                self.talk_back(_('Peekaboo is shutting down.'))
                logger.debug('Request shutting down with server.')
                self.server.deregister_request(self)
                return False

            self.status_change.clear()

            # see which samples are done and which are still processing
            still_analysing = []
            for sample in to_be_analysed:
                # remove samples that are done
                if sample.done:
                    continue

                still_analysing.append(sample)

            to_be_analysed = still_analysing

        # deregister notification from server since we've exited our wait loop
        self.server.deregister_request(self)
        return True

    def report(self, done):
        """ Report individual files' and overall verdict to client.

        @param done: List of samples that are done processing and need
                     reporting.
        @type done: List of Sample objects.
        """
        # evaluate results into an overall result: We want to present the
        # client with an overall result instead of confusing them with
        # assertions about individual files. Particularly in the case of
        # AMaViS, this would otherwise lead to messages being passed on as
        # clean where a single attachment evaluated to "good" but analysis of
        # all the others failed.
        result = Result.unchecked
        logger.debug('Determining final verdict to report to client.')
        for sample in done:
            # check if result of this rule is worse than what we know so far
            sample_result = sample.result
            logger.debug('Current overall result: %s, Sample result: %s',
                         result.name, sample_result.name)
            if sample_result >= result:
                result = sample_result

            # and unconditionally send out its report to the client (plus an
            # empty line)
            if not self.talk_back(sample.peekaboo_report + ['']):
                return

        # report overall result.
        logger.debug('Reporting batch as "%s" to client', result.name)
        loc_verdict = _('The file collection has been categorized "%s"')
        overall = [loc_verdict % result.name]

        # Add untranslated verdict (if the above actually got translated) for
        # potential pattern matching of the client to reliably latch on to.
        # Need to duplicate strings here for pygettext and pybabel extract to
        # find the translatable one in the above _().
        verdict = 'The file collection has been categorized "%s"'
        if verdict != loc_verdict:
            overall.append(verdict % result.name)

        # append newline and send
        overall.append('')
        if not self.talk_back(overall):
            return

        # shut down connection
        logger.debug('Results reported back to client - closing connection.')

    def talk_back(self, msgs):
        """ Send message(s) back to the client. Automatically appends newline
        to each message.

        @param msgs: message(s) to send to client.
        @type msgs: string or (list or tuple of strings)

        @returns: True on successful sending of all messages, False on error of
                  sending and None specifically if sending failed because the
                  client closed the connection. """
        if isinstance(msgs, str):
            msgs = (msgs, )

        for msg in msgs:
            try:
                # FIXME: Hard-coded, arbitrary encoding since we have no
                # clearly defined protocol here.
                self.request.sendall('%s\n' % msg.encode('utf-8'))
            except IOError as ioerror:
                if ioerror.errno == errno.EPIPE:
                    logger.warning('Client closed connection on us: %s',
                                   ioerror)
                    return None

                logger.warning('Error talking back to client: %s',