summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTom <tom@tomforb.es>2015-10-30 00:14:11 +0000
committerTom <tom@tomforb.es>2015-10-30 00:15:33 +0000
commit958dc6308b2dcd3f54c20f321dfa3f6271ec7fa3 (patch)
treef6d94e1094e88471fecfa43c644461d79672f400
parent5e83b5f8d14ac79493a67934389e2fba0e73331d (diff)
Cleanup the codecleanup
-rw-r--r--gping/pinger.py439
-rw-r--r--requirements.txt1
2 files changed, 226 insertions, 214 deletions
diff --git a/gping/pinger.py b/gping/pinger.py
index 1278f79..4d32412 100644
--- a/gping/pinger.py
+++ b/gping/pinger.py
@@ -1,98 +1,73 @@
-import os
-import subprocess
-import re
-import collections
+from collections import namedtuple, deque
import itertools
+import subprocess
+import functools
+import sys
import platform
-import io
+import re
+from itertools import islice
from colorama import Fore, init
+init()
-try:
- from colorama.ansitowin32 import winterm
-except Exception:
- winterm = None
-import sys
+buff = deque(maxlen=400)
+
+Point = namedtuple("Point", "x y")
try:
from gping.termsize import get_terminal_size
except ImportError:
- #python 2 compatibility
+ # python 2 compatibility
from termsize import get_terminal_size
-init()
-windows_re = re.compile('.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?(\\d+)', re.IGNORECASE | re.DOTALL)
-
-linux_re = re.compile(r'time=(\d+(?:\.\d+)?) *ms', re.IGNORECASE)
-
-darwin_re = re.compile(r'''
- \s?([0-9]*) # capture the bytes of data
- \sbytes\sfrom\s # bytes from
- (\d+\.\d+\.\d+\.\d+):
- \s+icmp_seq=(\d+) # capture icmp_seq
- \s+ttl=(\d+) # capture ttl
- \s+time=(?:([0-9\.]+)\s+ms) # capture time
- ''',
- re.VERBOSE | re.IGNORECASE | re.DOTALL)
-
-buff = collections.deque([0 for _ in range(20)], maxlen=400)
-
-P = collections.namedtuple("Point", "x y")
-hidden = object()
+try:
+ from colorama.ansitowin32 import winterm
+except Exception:
+ winterm = None
-class Bitmap(object):
- def __init__(self, width, height, default=" "):
- ''' the pane on which we are going to draw'''
- self.width = width
- self.height = height
- self._bitmap = [
- [default for _ in range(width + 1)]
- for _ in range(height + 1)
+class Canvas(object):
+ def __init__(self, width, height):
+ # Each item in each row is a tuple, the first element is the single character that will be printed
+ # and the second is the characters color.
+ self.data = [
+ [(" ", None) for i in range(width)]
+ for i in range(height)
]
- def __setitem__(self, point, value):
- ''' we set the value at the given point
- raise an error if its not Point Object
- '''
- if isinstance(point, P):
- self._bitmap[self.height - int(point.y)][int(point.x)] = value
- else:
- raise RuntimeError("Can only use __setitem__ with a point")
+ def __setitem__(self, key, value):
+ x, y = key
+ if isinstance(value, tuple):
+ data, color = value
+ if callable(color):
+ color = color(x, y)
-class ConsoleCanvas(object):
- def __init__(self, width, height):
- self.bitmap = Bitmap(width, height)
- self.colors = Bitmap(width, height, default="")
-
- def point(self, point, data, paint=None):
- ''' Args :
- point - point where the data shall be placed
- data
- paint
- '''
- self.bitmap[point] = data
- if isinstance(paint, str):
- self.colors[point] = paint
+ data = (data, color)
else:
- self.colors[point] = paint(point) if paint else ""
+ data = (value, None)
+
+ self.data[-y][x] = data
- # Yes, these two methods could be refactored :/
def horizontal_line(self, data, row, from_, to, paint=None):
+ if len(data) == 1:
+ data = itertools.cycle(data)
+
from_ = int(from_)
to = int(to)
data_iter = iter(data)
- for i in range(from_,to):
- p = P(i, row)
- self.point(p, next(data_iter), paint)
+ for x in range(from_, to):
+ self[x, row] = (next(data_iter), paint)
+
+ def vertical_line(self, data, column, from_, to, paint=None):
+ if len(data) == 1:
+ data = itertools.cycle(data)
- def vertical_line(self, character, column, from_, to, paint=None):
- to = int(to)
from_ = int(from_)
- for i in range(from_,to + 1):
- p = P(column, i)
- self.point(p, character, paint)
+ to = int(to)
+ data_iter = iter(data)
+ for y in range(from_, to + 1):
+ self[column, y] = (next(data_iter), paint)
def line(self, from_, to, paint=None, character=None):
from_, to = sorted([from_, to])
@@ -103,213 +78,249 @@ class ConsoleCanvas(object):
elif from_.y == to.y:
# Horizontal line. Just fill in the right buffer
character = character or "-"
- self.horizontal_line(itertools.cycle(character), from_.y, from_.x, to.x, paint)
- else:
- raise RuntimeError("Diagonal lines are not supported")
+ self.horizontal_line(character, from_.y, from_.x, to.x, paint)
def box(self, bottom_left_corner, top_right_corner, paint=None, blank=False):
''' creates the visual frame/box in which we place the graph '''
path = [
bottom_left_corner,
- P(bottom_left_corner.x, top_right_corner.y),
+ Point(bottom_left_corner.x, top_right_corner.y),
top_right_corner,
- P(top_right_corner.x, bottom_left_corner.y),
+ Point(top_right_corner.x, bottom_left_corner.y),
bottom_left_corner
]
- #use the bottom left corner as the starting point
+ # use the bottom left corner as the starting point
last_point = bottom_left_corner
for idx, point in enumerate(path):
- #skipping the first item because we use it as starting point
+ # skipping the first item because we use it as starting point
if idx != 0:
self.line(last_point, point, paint=paint, character=" " if blank else None)
last_point = point
- def process_colors(self):
- # Try and optimize colours. Maybe not needed on *nix?
- for row_idx, color_row in enumerate(self.colors._bitmap):
- last_color = None
- r = io.StringIO()
- for col_idx, color_item in enumerate(color_row):
- d = self.bitmap._bitmap[row_idx][col_idx]
- d = u'{}'.format(d)
- if d and d != " ":
- if color_item:
- if color_item != last_color:
- r.write(u'{}'.format(color_item))
- last_color = color_item
- elif last_color:
- r.write(u'{}'.format(Fore.RESET))
- r.write(d if d is not hidden else " ")
- if not color_item:
- if last_color:
- r.write(u'{}'.format(Fore.RESET))
- last_color = None
- else:
- r.write(d)
- yield r.getvalue()
-
-
-def plot(url, data, width, height):
- canvas = ConsoleCanvas(width, height)
+ @property
+ def lines(self):
+ for line in self.data:
+ data = []
+ current_color = None
+ # Output lines but keep track of color changes. Needed to stop colors bleeding over to other characters.
+ # Only output color changes when needed, keeps this snappy on windows.
+ for char, color in line:
+ if color != current_color and char != " ":
+ if color is None:
+ data.append(Fore.RESET)
+ else:
+ data.append(color)
+ current_color = color
+ data.append(char)
+
+ yield "".join(data)
+
+
+def plot(width, height, data, host):
+ # We space for a newline after each line, so restrict the hight/width a bit
+ width, height = width - 1, height - 1 # Our work area is slightly smaller
+ canvas = Canvas(width, height)
+ # Draw a box around the edges of the screen, one character in.
canvas.box(
- P(1, 1), P(width, height)
+ Point(1, 1), Point(width - 1, height - 1)
)
+ # We use islice to slice the data because we can't do a ranged slice on a dequeue :(
+ data_slice = list(islice(data, 0, width - 3))
- data_slice = list(itertools.islice(data, 1, width - 3))
- stats_data = [d for d in data_slice if d]
- if not stats_data:
+ # Filter the -1's (timeouts) from the data so it doesn't impact avg, sum etc.
+ filtered_data = [d for d in data if d != -1]
+ if not filtered_data:
return canvas
- max_ping = max(max(stats_data), 100)
- min_scaled, max_scaled = 0, height - 3
+ average_ping = sum(filtered_data) / len(filtered_data)
+ max_ping = min(max(filtered_data), average_ping * 2)
+
+ # Scale the chart.
+ min_scaled, max_scaled = 0, height - 4
yellow_zone_idx = round(max_scaled * (100 / max_ping))
green_zone_idx = round(max_scaled * (50 / max_ping))
- for column, datum in enumerate(data_slice, 2):
- if datum is None:
- canvas.point(P(column, 2), "?", Fore.RED)
- continue
- elif datum is 0:
+ for column, datum in enumerate(data_slice):
+ if datum == -1:
+ # If it's a timeout then do a red questionmark
+ canvas[column + 2, 2] = ("?", Fore.RED)
continue
- # bar percentage
- percent = (datum / max_ping)
- # percent of max
+
+ # What percentage of the max_ping are we? 0 -> 1
+ percent = min(datum / max_ping, 100) if datum < max_ping else 1
bar_height = round(max_scaled * percent)
- if bar_height == 0:
- bar_height = 1
- def _paint(point):
- y = point.y
+ # Our paint callback, we check if the y value of the point is in any of our zones,
+ # if it is paint the appropriate color.
+ def _paint(x, y):
if y <= green_zone_idx:
return Fore.GREEN
elif y <= yellow_zone_idx:
return Fore.YELLOW
else:
- return Fore.RED
+ return Fore.RED # Danger zone
canvas.vertical_line(
- "#", column, 2, 2 + bar_height, paint=_paint
+ "#", column + 2, 2, 2 + bar_height, paint=_paint
)
- if stats_data:
- average = sum(stats_data)/len(stats_data)
- stats_box = [
- "Avg: {:6.0f}".format(average),
- "Min: {:6.0f}".format(min(d for d in stats_data if d)), # Filter None values
- "Max: {:6.0f}".format(max(stats_data)),
- "Cur: {:6.0f}".format(stats_data[0])
- ]
- max_stats_len = max(len(s) for s in stats_box)
-
- #this part was used to place the info box in the upper right corner
- #if False:
- # for idx, stat in enumerate(stats_box):
- # canvas.horizontal_line(stat, height - 2 - idx, width - max_stats_len - 2)
-
- # canvas.box(
- # P(width - max_stats_len - len(stats_box), height - 2 - len(stats_box)),
- # P(width - 1, height - 1)
- # )
- #creating the box for the ping information in the middle
- midpoint = P(
- round(width / 2),
- round(height / 2)
- )
+ stats_box = [
+ "Avg: {:6.0f}".format(average_ping),
+ "Min: {:6.0f}".format(min(filtered_data)), # Filter None values
+ "Max: {:6.0f}".format(max(filtered_data)),
+ "Cur: {:6.0f}".format(filtered_data[0])
+ ]
+ # creating the box for the ping information in the middle
+ midpoint = Point(
+ round(width / 2),
+ round(height / 2)
+ )
+ max_stats_len = max(len(s) for s in stats_box)
+ # Draw a box around the outside of the stats box. We do this to stop the bars from touching the text,
+ # it looks weird. We need a blank area around it.
+ canvas.box(
+ Point(midpoint.x - round(max_stats_len / 2) - 1, midpoint.y + len(stats_box)),
+ Point(midpoint.x + round(max_stats_len / 2) - 1, midpoint.y - 1),
+ blank=True
+ )
+ # Paint each of the statistics lines
+ for idx, stat in enumerate(stats_box):
+ from_stat = midpoint.x - round(max_stats_len / 2)
+ to_stat = from_stat + len(stat)
+ canvas.horizontal_line(stat, midpoint.y + idx, from_stat, to_stat)
+
+ # adding the url to the top
+ if host:
+ host = " {} ".format(host)
+ from_url = midpoint.x - round(len(host) / 2)
+ to_url = from_url + len(host)
+ canvas.horizontal_line(host, height - 1, from_url, to_url)
- canvas.box(
- P(midpoint.x - round(max_stats_len / 2) - 1, midpoint.y + len(stats_box)),
- P(midpoint.x + round(max_stats_len / 2) - 1, midpoint.y - 1),
- blank=True
- )
+ return canvas
- for idx, stat in enumerate(stats_box):
- from_stat = midpoint.x - round(max_stats_len / 2)
- to_stat = from_stat + len(stat)
- canvas.horizontal_line(stat, midpoint.y + idx, from_stat ,to_stat )
- #adding the url to the top
- from_url = midpoint.x - round(len(url) / 2)
- to_url = from_url + len(url)
- canvas.horizontal_line(url, height, from_url ,to_url)
+# A bunch of regexes nice people on github made.
+windows_re = re.compile('.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?\\d+.*?(\\d+)', re.IGNORECASE | re.DOTALL)
- return canvas
+linux_re = re.compile(r'time=(\d+(?:\.\d+)?) *ms', re.IGNORECASE)
+
+darwin_re = re.compile(r'''
+ \s?([0-9]*) # capture the bytes of data
+ \sbytes\sfrom\s # bytes from
+ (\d+\.\d+\.\d+\.\d+):
+ \s+icmp_seq=(\d+) # capture icmp_seq
+ \s+ttl=(\d+) # capture ttl
+ \s+time=(?:([0-9\.]+)\s+ms) # capture time
+ ''',
+ re.VERBOSE | re.IGNORECASE | re.DOTALL)
-def _windows(url):
- ping = subprocess.Popen(["ping", "-t", url], stdout=subprocess.PIPE)
- while True:
- line = ping.stdout.readline().decode()
- if line.startswith("Reply from"):
- yield int(windows_re.search(line).group(1))
- elif "timed out" in line or "failure" in line:
- yield None
+# Simple ping decorator. Takes a list of default arguments to be passed to ping (used in the windows pinger)
+def pinger(default_options=None):
+ # We return the inner decorator (a bit verbose I know)
+ def _wrapper(func):
+ # Make the wrapped function...
+ @functools.wraps(func)
+ def _inner(*args):
+ args = ["ping"] + (default_options or []) + list(arg[0] for arg in args)
+ ping = subprocess.Popen(args, stdout=subprocess.PIPE)
+ # Store the last 5 lines of output in case ping unexpectedly quits
+ last_5 = deque(maxlen=5)
+ while True:
+ line = ping.stdout.readline().decode()
+ if line == "":
+ print("ping quit unexpectedly. Last 5 lines of output:")
+ print("\n".join(last_5))
+ return
-def _linux(url):
- ping = subprocess.Popen(["ping", url], stdout=subprocess.PIPE)
- while True:
- line = ping.stdout.readline().decode()
- if line.startswith("64 bytes from"):
- yield round(float(linux_re.search(line).group(1)))
+ last_5.append(line)
+ result = func(line)
+ # A none result means no result (i.e a header or something). -1 means timeout, otherwise it's the ping.
+ if result is None:
+ continue
+ yield result
+
+ return _inner
+
+ return _wrapper
+
+
+@pinger(["-t"])
+def windows_ping(line):
+ if line.startswith("Reply from"):
+ return int(windows_re.search(line).group(1))
+ elif "timed out" in line or "failure" in line:
+ return -1
-def _darwin(url):
- ping = subprocess.Popen(["ping", url], stdout=subprocess.PIPE)
- while True:
- line = ping.stdout.readline().decode()
- if line.startswith("64 bytes from"):
- yield round(float(darwin_re.search(line).group(5)))
- elif line.startswith("Request timeout"):
- yield None
+@pinger()
+def linux_ping(line):
+ if line.startswith("64 bytes from"):
+ return round(float(linux_re.search(line).group(1)))
-def _simulate(url):
- import time, random
+
+@pinger()
+def darwin_ping(line):
+ if line.startswith("64 bytes from"):
+ return round(float(darwin_re.search(line).group(5)))
+ elif line.startswith("Request timeout"):
+ return -1
+
+
+def simulate_ping():
+ import random
last = random.randint(25, 150)
while True:
curr = random.randint(last - ((last / 10) * 20), last + ((last / 10) * 20))
- if not 25 < curr < 150:
+ if not 25 < curr < 500:
continue
last = curr
yield curr
- time.sleep(0.1)
+ # time.sleep(0.1)
-def _run():
+def run():
+ # We do this so the command line stub installed by setup.py is surrounded by try/catch. Also neater than
+ # wrapping the whole contents of _run().
try:
- url = sys.argv[1]
- except IndexError:
- url = "google.com"
+ _run()
+ except KeyboardInterrupt:
+ pass
+
- if url == "--sim":
- it = _simulate
+def _run():
+ if len(sys.argv) == 1:
+ options = ["google.com"]
+ host = options[0]
else:
- system = platform.system()
- if system == "Windows":
- it = _windows
- elif system == "Darwin":
- it = _darwin
- else:
- it = _linux
+ options = sys.argv[1:]
+ host = ""
+
+ system = platform.system()
+ if system == "Windows":
+ it = windows_ping
+ elif system == "Darwin":
+ it = darwin_ping
+ else:
+ it = linux_ping
+
+ for line in it(options):
+ buff.appendleft(line)
- for ping in it(url):
- buff.appendleft(ping)
- if winterm:
- winterm.set_cursor_position((1, 1))
- else:
- os.system("cls" if platform.system() == "Windows" else "clear")
width, height = get_terminal_size()
- c = plot(url, buff, width - 2, height - 2)
- print("\n".join(c.process_colors()))
+ plotted = plot(width, height, buff, host)
-def run():
- try:
- _run()
- except KeyboardInterrupt:
- pass
+ if winterm and system == "Windows":
+ winterm.set_cursor_position((1, 1))
+ else:
+ print(chr(27) + "[2J")
+
+ print("\n".join(plotted.lines))
if __name__ == "__main__":
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..3d90aaa
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+colorama \ No newline at end of file