summaryrefslogtreecommitdiffstats
path: root/gping/pinger.py
blob: 8c27dc1c1c932ec3cbc5eefaa1fa29106bfa32b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from collections import namedtuple, deque
import itertools
import subprocess
import functools
import sys
import platform
import re
from itertools import islice

from colorama import Fore, init
init()

buff = deque(maxlen=400)

Point = namedtuple("Point", "x y")

try:
    from gping.termsize import get_terminal_size
except ImportError:
    # python 2 compatibility
    from termsize import get_terminal_size

try:
    from colorama.ansitowin32 import winterm
except Exception:
    winterm = None


class Canvas(object):
    def __init__(self, width, height):
        # Each item in each row is a tuple, the first element is the single character that will be printed
        # and the second is the characters color.
        self.data = [
            [(" ", None) for i in range(width)]
            for i in range(height)
            ]

    def __setitem__(self, key, value):
        x, y = key

        if isinstance(value, tuple):
            data, color = value
            if callable(color):
                color = color(x, y)

            data = (data, color)
        else:
            data = (value, None)

        self.data[-y][x] = data

    def horizontal_line(self, data, row, from_, to, paint=None):
        if len(data) == 1:
            data = itertools.cycle(data)

        from_ = int(from_)
        to = int(to)
        data_iter = iter(data)
        for x in range(from_, to):
            self[x, row] = (next(data_iter), paint)

    def vertical_line(self, data, column, from_, to, paint=None):
        if len(data) == 1:
            data = itertools.cycle(data)

        from_ = int(from_)
        to = int(to)
        data_iter = iter(data)
        for y in range(from_, to + 1):
            self[column, y] = (next(data_iter), paint)

    def line(self, from_, to, paint=None, character=None):
        from_, to = sorted([from_, to])

        if from_.x == to.x:
            character = character or "|"
            self.vertical_line(character, from_.x, from_.y, to.y, paint)
        elif from_.y == to.y:
            # Horizontal line. Just fill in the right buffer
            character = character or "-"
            self.horizontal_line(character, from_.y, from_.x, to.x, paint)

    def box(self, bottom_left_corner, top_right_corner, paint=None, blank=False):
        ''' creates the visual frame/box in which we place the graph '''
        path = [
            bottom_left_corner,
            Point(bottom_left_corner.x, top_right_corner.y),
            top_right_corner,
            Point(top_right_corner.x, bottom_left_corner.y),
            bottom_left_corner
        ]

        # use the bottom left corner as the starting point
        last_point = bottom_left_corner
        for idx, point in enumerate(path):
            # skipping the first item because we use it as starting point
            if idx != 0:
                self.line(last_point, point, paint=paint, character=" " if blank else None)
            last_point = point

    @property
    def lines(self):
        for line in self.data:
            data = []
            current_color = None
            # Output lines but keep track of color changes. Needed to stop colors bleeding over to other characters.
            # Only output color changes when needed, keeps this snappy on windows.
            for char, color in line:
                if color != current_color and char != " ":
                    if color is None:
                        data.append(Fore.RESET)
                    else:
                        data.append(color)
                    current_color = color
                data.append(char)

            yield "".join(data)


def plot(width, height, data, host):
    # We space for a newline after each line, so restrict the hight/width a bit
    width, height = width - 1, height - 1  # Our work area is slightly smaller
    canvas = Canvas(width, height)
    # Draw a box around the edges of the screen, one character in.
    canvas.box(
        Point(1, 1), Point(width - 1, height - 1)
    )
    # We use islice to slice the data because we can't do a ranged slice on a dequeue :(
    data_slice = list(islice(data, 0, width - 3))

    # Filter the -1's (timeouts) from the data so it doesn't impact avg, sum etc.
    filtered_data = [d for d in data if d != -1]
    if not filtered_data:
        return canvas

    average_ping = sum(filtered_data) / len(filtered_data)
    max_ping = max(filtered_data)

    if max_ping > (average_ping * 2):
        max_ping *= 0.75

    # Scale the chart.
    min_scaled, max_scaled = 0, height - 4

    yellow_zone_idx = round(max_scaled * (100 / max_ping))
    green_zone_idx = round(max_scaled * (50 / max_ping))

    for column, datum in enumerate(data_slice):
        if datum == -1:
            # If it's a timeout then do a red questionmark
            canvas[column + 2, 2] = ("?", Fore.RED)
            continue

        # What percentage of the max_ping are we? 0 -> 1
        percent = min(datum / max_ping, 100) if datum < max_ping else 1
        bar_height = round(max_scaled * percent)

        # Our paint callback, we check if the y value of the point is in any of our zones,
        # if it is paint the appropriate color.
        def _paint(x, y):
            if y <= green_zone_idx:
                return Fore.GREEN
            elif y <= yellow_zone_idx:
                return Fore.YELLOW
            else:
                return Fore.RED  # Danger zone

        canvas.vertical_line(
            "#", column + 2, 2, 2 + bar_height, paint=_paint
        )

    stats_box = [
        "Avg: {:6.0f}".format(average_ping),
        "Min: {:6.0f}".format(min(filtered_data)),  # Filter None values
        "Max: {:6.0f}".format(max(filtered_data)),
        "Cur: {:6.0f}".format(filtered_data[0])
    ]
    # creating the box for the ping information in the middle
    midpoint = Point(
        round(width / 2),
        round(height / 2)
    )
    max_stats_len = max(len(s) for s in stats_box)
    # Draw a box around the outside of the stats box. We do this to stop the bars from touching the text,
    # it looks weird. We need a blank area around it.
    canvas.box(
        Point(midpoint.x - round(max_stats_len / 2) - 1, midpoint.y + len(stats_box)),
        Point(midpoint.x + round(max_stats_len / 2) - 1, midpoint.y - 1),
        blank=True
    )
    # Paint each of the statistics lines
    for idx, stat in enumerate(stats_box):
        from_stat = midpoint.x - round(max_stats_len / 2)
        to_stat = from_stat + len(stat)
        canvas.horizontal_line(stat, midpoint.y + idx, from_stat, to_stat)

    # adding the url to the top
    if host:
        host = " {} ".format(host)
        from_url = midpoint.x - round(len(host) / 2)
        to_url = from_url + len(host)
        canvas.horizontal_line(host, height - 1, from_url, to_url)

    return canvas


# A bunch of regexes nice people on github made.
windows_re = re.compile('.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?(\\d+)', re.IGNORECASE | re.DOTALL)

linux_re = re.compile(r'time=(\d+(?:\.\d+)?) *ms', re.IGNORECASE)

darwin_re = re.compile(r'''
    \s?([0-9]*) # capture the bytes of data
    \sbytes\sfrom\s # bytes from
    (\d+\.\d+\.\d+\.\d+):
    \s+icmp_seq=(\d+)  # capture icmp_seq
    \s+ttl=(\d+)  # capture ttl
    \s+time=(?:([0-9\.]+)\s+ms)  # capture time
    ''',
                       re.VERBOSE | re.IGNORECASE | re.DOTALL)


# Simple ping decorator. Takes a list of default arguments to be passed to ping (used in the windows pinger)
def pinger(default_options=None):
    # We return the inner decorator (a bit verbose I know)
    def _wrapper(func):
        # Make the wrapped function...
        @functools.wraps(func)
        def _inner(*args):
            args = ["ping"] + (default_options or []) + list(arg[0] for arg in args)
            ping = subprocess.Popen(args, stdout=subprocess.PIPE)
            # Store the last 5 lines of output in case ping unexpectedly quits
            last_5 = deque(maxlen=5)
            while True:
                line = ping.stdout.readline().decode()

                if line == "":
                    print("ping quit unexpectedly. Last 5 lines of output:")
                    print("\n".join(last_5))
                    return

                last_5.append(line)
                result = func(line)
                # A none result means no result (i.e a header or something). -1 means timeout, otherwise it's the ping.
                if result is None:
                    continue

                yield result

        return _inner

    return _wrapper


@pinger(["-t"])
def windows_ping(line):
    if line.startswith("Reply from"):
        return int(windows_re.search(line).group(1))
    elif "timed out" in line or "failure" in line:
        return -1


@pinger()
def linux_ping(line):
    if line.startswith("64 bytes from"):
        return round(float(linux_re.search(line).group(1)))


@pinger()
def darwin_ping(line):
    if line.startswith("64 bytes from"):
        return round(float(darwin_re.search(line).group(5)))
    elif line.startswith("Request timeout"):
        return -1


def simulate_ping():
    import random
    last = random.randint(25, 150)
    while True:
        curr = random.randint(last - ((last / 10) * 20), last + ((last / 10) * 20))
        if not 25 < curr < 500:
            continue
        last = curr
        yield curr
        # time.sleep(0.1)


def run():
    # We do this so the command line stub installed by setup.py is surrounded by try/catch. Also neater than
    # wrapping the whole contents of _run().
    try:
        _run()
    except KeyboardInterrupt:
        pass


def _run():
    if len(sys.argv) == 1:
        options = ["google.com"]
        host = options[0]
    else:
        options = sys.argv[1:]
        host