diff options
author | Tom <tom@tomforb.es> | 2015-10-30 00:14:11 +0000 |
---|---|---|
committer | Tom <tom@tomforb.es> | 2015-10-30 00:15:33 +0000 |
commit | 958dc6308b2dcd3f54c20f321dfa3f6271ec7fa3 (patch) | |
tree | f6d94e1094e88471fecfa43c644461d79672f400 | |
parent | 5e83b5f8d14ac79493a67934389e2fba0e73331d (diff) |
Cleanup the codecleanup
-rw-r--r-- | gping/pinger.py | 439 | ||||
-rw-r--r-- | requirements.txt | 1 |
2 files changed, 226 insertions, 214 deletions
diff --git a/gping/pinger.py b/gping/pinger.py index 1278f79..4d32412 100644 --- a/gping/pinger.py +++ b/gping/pinger.py @@ -1,98 +1,73 @@ -import os -import subprocess -import re -import collections +from collections import namedtuple, deque import itertools +import subprocess +import functools +import sys import platform -import io +import re +from itertools import islice from colorama import Fore, init +init() -try: - from colorama.ansitowin32 import winterm -except Exception: - winterm = None -import sys +buff = deque(maxlen=400) + +Point = namedtuple("Point", "x y") try: from gping.termsize import get_terminal_size except ImportError: - #python 2 compatibility + # python 2 compatibility from termsize import get_terminal_size -init() -windows_re = re.compile('.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?(\\d+)', re.IGNORECASE | re.DOTALL) - -linux_re = re.compile(r'time=(\d+(?:\.\d+)?) *ms', re.IGNORECASE) - -darwin_re = re.compile(r''' - \s?([0-9]*) # capture the bytes of data - \sbytes\sfrom\s # bytes from - (\d+\.\d+\.\d+\.\d+): - \s+icmp_seq=(\d+) # capture icmp_seq - \s+ttl=(\d+) # capture ttl - \s+time=(?:([0-9\.]+)\s+ms) # capture time - ''', - re.VERBOSE | re.IGNORECASE | re.DOTALL) - -buff = collections.deque([0 for _ in range(20)], maxlen=400) - -P = collections.namedtuple("Point", "x y") -hidden = object() +try: + from colorama.ansitowin32 import winterm +except Exception: + winterm = None -class Bitmap(object): - def __init__(self, width, height, default=" "): - ''' the pane on which we are going to draw''' - self.width = width - self.height = height - self._bitmap = [ - [default for _ in range(width + 1)] - for _ in range(height + 1) +class Canvas(object): + def __init__(self, width, height): + # Each item in each row is a tuple, the first element is the single character that will be printed + # and the second is the characters color. + self.data = [ + [(" ", None) for i in range(width)] + for i in range(height) ] - def __setitem__(self, point, value): - ''' we set the value at the given point - raise an error if its not Point Object - ''' - if isinstance(point, P): - self._bitmap[self.height - int(point.y)][int(point.x)] = value - else: - raise RuntimeError("Can only use __setitem__ with a point") + def __setitem__(self, key, value): + x, y = key + if isinstance(value, tuple): + data, color = value + if callable(color): + color = color(x, y) -class ConsoleCanvas(object): - def __init__(self, width, height): - self.bitmap = Bitmap(width, height) - self.colors = Bitmap(width, height, default="") - - def point(self, point, data, paint=None): - ''' Args : - point - point where the data shall be placed - data - paint - ''' - self.bitmap[point] = data - if isinstance(paint, str): - self.colors[point] = paint + data = (data, color) else: - self.colors[point] = paint(point) if paint else "" + data = (value, None) + + self.data[-y][x] = data - # Yes, these two methods could be refactored :/ def horizontal_line(self, data, row, from_, to, paint=None): + if len(data) == 1: + data = itertools.cycle(data) + from_ = int(from_) to = int(to) data_iter = iter(data) - for i in range(from_,to): - p = P(i, row) - self.point(p, next(data_iter), paint) + for x in range(from_, to): + self[x, row] = (next(data_iter), paint) + + def vertical_line(self, data, column, from_, to, paint=None): + if len(data) == 1: + data = itertools.cycle(data) - def vertical_line(self, character, column, from_, to, paint=None): - to = int(to) from_ = int(from_) - for i in range(from_,to + 1): - p = P(column, i) - self.point(p, character, paint) + to = int(to) + data_iter = iter(data) + for y in range(from_, to + 1): + self[column, y] = (next(data_iter), paint) def line(self, from_, to, paint=None, character=None): from_, to = sorted([from_, to]) @@ -103,213 +78,249 @@ class ConsoleCanvas(object): elif from_.y == to.y: # Horizontal line. Just fill in the right buffer character = character or "-" - self.horizontal_line(itertools.cycle(character), from_.y, from_.x, to.x, paint) - else: - raise RuntimeError("Diagonal lines are not supported") + self.horizontal_line(character, from_.y, from_.x, to.x, paint) def box(self, bottom_left_corner, top_right_corner, paint=None, blank=False): ''' creates the visual frame/box in which we place the graph ''' path = [ bottom_left_corner, - P(bottom_left_corner.x, top_right_corner.y), + Point(bottom_left_corner.x, top_right_corner.y), top_right_corner, - P(top_right_corner.x, bottom_left_corner.y), + Point(top_right_corner.x, bottom_left_corner.y), bottom_left_corner ] - #use the bottom left corner as the starting point + # use the bottom left corner as the starting point last_point = bottom_left_corner for idx, point in enumerate(path): - #skipping the first item because we use it as starting point + # skipping the first item because we use it as starting point if idx != 0: self.line(last_point, point, paint=paint, character=" " if blank else None) last_point = point - def process_colors(self): - # Try and optimize colours. Maybe not needed on *nix? - for row_idx, color_row in enumerate(self.colors._bitmap): - last_color = None - r = io.StringIO() - for col_idx, color_item in enumerate(color_row): - d = self.bitmap._bitmap[row_idx][col_idx] - d = u'{}'.format(d) - if d and d != " ": - if color_item: - if color_item != last_color: - r.write(u'{}'.format(color_item)) - last_color = color_item - elif last_color: - r.write(u'{}'.format(Fore.RESET)) - r.write(d if d is not hidden else " ") - if not color_item: - if last_color: - r.write(u'{}'.format(Fore.RESET)) - last_color = None - else: - r.write(d) - yield r.getvalue() - - -def plot(url, data, width, height): - canvas = ConsoleCanvas(width, height) + @property + def lines(self): + for line in self.data: + data = [] + current_color = None + # Output lines but keep track of color changes. Needed to stop colors bleeding over to other characters. + # Only output color changes when needed, keeps this snappy on windows. + for char, color in line: + if color != current_color and char != " ": + if color is None: + data.append(Fore.RESET) + else: + data.append(color) + current_color = color + data.append(char) + + yield "".join(data) + + +def plot(width, height, data, host): + # We space for a newline after each line, so restrict the hight/width a bit + width, height = width - 1, height - 1 # Our work area is slightly smaller + canvas = Canvas(width, height) + # Draw a box around the edges of the screen, one character in. canvas.box( - P(1, 1), P(width, height) + Point(1, 1), Point(width - 1, height - 1) ) + # We use islice to slice the data because we can't do a ranged slice on a dequeue :( + data_slice = list(islice(data, 0, width - 3)) - data_slice = list(itertools.islice(data, 1, width - 3)) - stats_data = [d for d in data_slice if d] - if not stats_data: + # Filter the -1's (timeouts) from the data so it doesn't impact avg, sum etc. + filtered_data = [d for d in data if d != -1] + if not filtered_data: return canvas - max_ping = max(max(stats_data), 100) - min_scaled, max_scaled = 0, height - 3 + average_ping = sum(filtered_data) / len(filtered_data) + max_ping = min(max(filtered_data), average_ping * 2) + + # Scale the chart. + min_scaled, max_scaled = 0, height - 4 yellow_zone_idx = round(max_scaled * (100 / max_ping)) green_zone_idx = round(max_scaled * (50 / max_ping)) - for column, datum in enumerate(data_slice, 2): - if datum is None: - canvas.point(P(column, 2), "?", Fore.RED) - continue - elif datum is 0: + for column, datum in enumerate(data_slice): + if datum == -1: + # If it's a timeout then do a red questionmark + canvas[column + 2, 2] = ("?", Fore.RED) continue - # bar percentage - percent = (datum / max_ping) - # percent of max + + # What percentage of the max_ping are we? 0 -> 1 + percent = min(datum / max_ping, 100) if datum < max_ping else 1 bar_height = round(max_scaled * percent) - if bar_height == 0: - bar_height = 1 - def _paint(point): - y = point.y + # Our paint callback, we check if the y value of the point is in any of our zones, + # if it is paint the appropriate color. + def _paint(x, y): if y <= green_zone_idx: return Fore.GREEN elif y <= yellow_zone_idx: return Fore.YELLOW else: - return Fore.RED + return Fore.RED # Danger zone canvas.vertical_line( - "#", column, 2, 2 + bar_height, paint=_paint + "#", column + 2, 2, 2 + bar_height, paint=_paint ) - if stats_data: - average = sum(stats_data)/len(stats_data) - stats_box = [ - "Avg: {:6.0f}".format(average), - "Min: {:6.0f}".format(min(d for d in stats_data if d)), # Filter None values - "Max: {:6.0f}".format(max(stats_data)), - "Cur: {:6.0f}".format(stats_data[0]) - ] - max_stats_len = max(len(s) for s in stats_box) - - #this part was used to place the info box in the upper right corner - #if False: - # for idx, stat in enumerate(stats_box): - # canvas.horizontal_line(stat, height - 2 - idx, width - max_stats_len - 2) - - # canvas.box( - # P(width - max_stats_len - len(stats_box), height - 2 - len(stats_box)), - # P(width - 1, height - 1) - # ) - #creating the box for the ping information in the middle - midpoint = P( - round(width / 2), - round(height / 2) - ) + stats_box = [ + "Avg: {:6.0f}".format(average_ping), + "Min: {:6.0f}".format(min(filtered_data)), # Filter None values + "Max: {:6.0f}".format(max(filtered_data)), + "Cur: {:6.0f}".format(filtered_data[0]) + ] + # creating the box for the ping information in the middle + midpoint = Point( + round(width / 2), + round(height / 2) + ) + max_stats_len = max(len(s) for s in stats_box) + # Draw a box around the outside of the stats box. We do this to stop the bars from touching the text, + # it looks weird. We need a blank area around it. + canvas.box( + Point(midpoint.x - round(max_stats_len / 2) - 1, midpoint.y + len(stats_box)), + Point(midpoint.x + round(max_stats_len / 2) - 1, midpoint.y - 1), + blank=True + ) + # Paint each of the statistics lines + for idx, stat in enumerate(stats_box): + from_stat = midpoint.x - round(max_stats_len / 2) + to_stat = from_stat + len(stat) + canvas.horizontal_line(stat, midpoint.y + idx, from_stat, to_stat) + + # adding the url to the top + if host: + host = " {} ".format(host) + from_url = midpoint.x - round(len(host) / 2) + to_url = from_url + len(host) + canvas.horizontal_line(host, height - 1, from_url, to_url) - canvas.box( - P(midpoint.x - round(max_stats_len / 2) - 1, midpoint.y + len(stats_box)), - P(midpoint.x + round(max_stats_len / 2) - 1, midpoint.y - 1), - blank=True - ) + return canvas - for idx, stat in enumerate(stats_box): - from_stat = midpoint.x - round(max_stats_len / 2) - to_stat = from_stat + len(stat) - canvas.horizontal_line(stat, midpoint.y + idx, from_stat ,to_stat ) - #adding the url to the top - from_url = midpoint.x - round(len(url) / 2) - to_url = from_url + len(url) - canvas.horizontal_line(url, height, from_url ,to_url) +# A bunch of regexes nice people on github made. +windows_re = re.compile('.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?(\\d+)', re.IGNORECASE | re.DOTALL) - return canvas +linux_re = re.compile(r'time=(\d+(?:\.\d+)?) *ms', re.IGNORECASE) + +darwin_re = re.compile(r''' + \s?([0-9]*) # capture the bytes of data + \sbytes\sfrom\s # bytes from + (\d+\.\d+\.\d+\.\d+): + \s+icmp_seq=(\d+) # capture icmp_seq + \s+ttl=(\d+) # capture ttl + \s+time=(?:([0-9\.]+)\s+ms) # capture time + ''', + re.VERBOSE | re.IGNORECASE | re.DOTALL) -def _windows(url): - ping = subprocess.Popen(["ping", "-t", url], stdout=subprocess.PIPE) - while True: - line = ping.stdout.readline().decode() - if line.startswith("Reply from"): - yield int(windows_re.search(line).group(1)) - elif "timed out" in line or "failure" in line: - yield None +# Simple ping decorator. Takes a list of default arguments to be passed to ping (used in the windows pinger) +def pinger(default_options=None): + # We return the inner decorator (a bit verbose I know) + def _wrapper(func): + # Make the wrapped function... + @functools.wraps(func) + def _inner(*args): + args = ["ping"] + (default_options or []) + list(arg[0] for arg in args) + ping = subprocess.Popen(args, stdout=subprocess.PIPE) + # Store the last 5 lines of output in case ping unexpectedly quits + last_5 = deque(maxlen=5) + while True: + line = ping.stdout.readline().decode() + if line == "": + print("ping quit unexpectedly. Last 5 lines of output:") + print("\n".join(last_5)) + return -def _linux(url): - ping = subprocess.Popen(["ping", url], stdout=subprocess.PIPE) - while True: - line = ping.stdout.readline().decode() - if line.startswith("64 bytes from"): - yield round(float(linux_re.search(line).group(1))) + last_5.append(line) + result = func(line) + # A none result means no result (i.e a header or something). -1 means timeout, otherwise it's the ping. + if result is None: + continue + yield result + + return _inner + + return _wrapper + + +@pinger(["-t"]) +def windows_ping(line): + if line.startswith("Reply from"): + return int(windows_re.search(line).group(1)) + elif "timed out" in line or "failure" in line: + return -1 -def _darwin(url): - ping = subprocess.Popen(["ping", url], stdout=subprocess.PIPE) - while True: - line = ping.stdout.readline().decode() - if line.startswith("64 bytes from"): - yield round(float(darwin_re.search(line).group(5))) - elif line.startswith("Request timeout"): - yield None +@pinger() +def linux_ping(line): + if line.startswith("64 bytes from"): + return round(float(linux_re.search(line).group(1))) -def _simulate(url): - import time, random + +@pinger() +def darwin_ping(line): + if line.startswith("64 bytes from"): + return round(float(darwin_re.search(line).group(5))) + elif line.startswith("Request timeout"): + return -1 + + +def simulate_ping(): + import random last = random.randint(25, 150) while True: curr = random.randint(last - ((last / 10) * 20), last + ((last / 10) * 20)) - if not 25 < curr < 150: + if not 25 < curr < 500: continue last = curr yield curr - time.sleep(0.1) + # time.sleep(0.1) -def _run(): +def run(): + # We do this so the command line stub installed by setup.py is surrounded by try/catch. Also neater than + # wrapping the whole contents of _run(). try: - url = sys.argv[1] - except IndexError: - url = "google.com" + _run() + except KeyboardInterrupt: + pass + - if url == "--sim": - it = _simulate +def _run(): + if len(sys.argv) == 1: + options = ["google.com"] + host = options[0] else: - system = platform.system() - if system == "Windows": - it = _windows - elif system == "Darwin": - it = _darwin - else: - it = _linux + options = sys.argv[1:] + host = "" + + system = platform.system() + if system == "Windows": + it = windows_ping + elif system == "Darwin": + it = darwin_ping + else: + it = linux_ping + + for line in it(options): + buff.appendleft(line) - for ping in it(url): - buff.appendleft(ping) - if winterm: - winterm.set_cursor_position((1, 1)) - else: - os.system("cls" if platform.system() == "Windows" else "clear") width, height = get_terminal_size() - c = plot(url, buff, width - 2, height - 2) - print("\n".join(c.process_colors())) + plotted = plot(width, height, buff, host) -def run(): - try: - _run() - except KeyboardInterrupt: - pass + if winterm and system == "Windows": + winterm.set_cursor_position((1, 1)) + else: + print(chr(27) + "[2J") + + print("\n".join(plotted.lines)) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3d90aaa --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +colorama
\ No newline at end of file |