diff options
Diffstat (limited to 'python.d')
-rw-r--r-- | python.d/nginx.chart.py | 8 | ||||
-rwxr-xr-x[-rw-r--r--] | python.d/phpfpm.chart.py | 15 | ||||
-rw-r--r-- | python.d/python_modules/base.py | 277 | ||||
-rw-r--r-- | python.d/redis.chart.py | 118 |
4 files changed, 270 insertions, 148 deletions
diff --git a/python.d/nginx.chart.py b/python.d/nginx.chart.py index de3da7c142..88849a921c 100644 --- a/python.d/nginx.chart.py +++ b/python.d/nginx.chart.py @@ -22,24 +22,24 @@ ORDER = ['connections', 'requests', 'connection_status', 'connect_rate'] CHARTS = { 'connections': { - 'options': [None, 'nginx Active Connections', 'connections', 'Active Connections', 'nginx.connections', 'line'], + 'options': [None, 'nginx Active Connections', 'connections', 'active connections', 'nginx.connections', 'line'], 'lines': [ ["active"] ]}, 'requests': { - 'options': [None, 'nginx Requests', 'requests/s', 'Requests', 'nginx.requests', 'line'], + 'options': [None, 'nginx Requests', 'requests/s', 'requests', 'nginx.requests', 'line'], 'lines': [ ["requests", None, 'incremental'] ]}, 'connection_status': { - 'options': [None, 'nginx Active Connections by Status', 'connections', 'Active Connection Status', 'nginx.connection_status', 'line'], + 'options': [None, 'nginx Active Connections by Status', 'connections', 'status', 'nginx.connection_status', 'line'], 'lines': [ ["reading"], ["writing"], ["waiting", "idle"] ]}, 'connect_rate': { - 'options': [None, 'nginx Connections Rate', 'connections/s', 'Connections Rate', 'nginx.connect_rate', 'line'], + 'options': [None, 'nginx Connections Rate', 'connections/s', 'connections rate', 'nginx.connect_rate', 'line'], 'lines': [ ["accepts", "accepted", "incremental"], ["handled", None, "incremental"] diff --git a/python.d/phpfpm.chart.py b/python.d/phpfpm.chart.py index e2a5355b0f..b79a35d754 100644..100755 --- a/python.d/phpfpm.chart.py +++ b/python.d/phpfpm.chart.py @@ -23,37 +23,37 @@ ORDER = ['connections', 'requests', 'performance', 'request_duration', 'request_ CHARTS = { 'connections': { - 'options': [None, 'PHP-FPM Active Connections', 'connections', 'Active Connections', 'phpfpm.connections', 'line'], + 'options': [None, 'PHP-FPM Active Connections', 'connections', 'active connections', 'phpfpm.connections', 'line'], 'lines': [ ["active"], ["maxActive", 'max active'], ["idle"] ]}, 'requests': { - 'options': [None, 'PHP-FPM Requests', 'requests/s', 'Requests', 'phpfpm.requests', 'line'], + 'options': [None, 'PHP-FPM Requests', 'requests/s', 'requests', 'phpfpm.requests', 'line'], 'lines': [ ["requests", None, "incremental"] ]}, 'performance': { - 'options': [None, 'PHP-FPM Performance', 'status', 'Performance', 'phpfpm.performance', 'line'], + 'options': [None, 'PHP-FPM Performance', 'status', 'performance', 'phpfpm.performance', 'line'], 'lines': [ ["reached", 'max children reached'], ["slow", 'slow requests'] ]}, 'request_duration': { - 'options': [None, 'PHP-FPM Request Duration', 'milliseconds', 'Request Duration', 'phpfpm.request_duration', 'line'], + 'options': [None, 'PHP-FPM Request Duration', 'milliseconds', 'request duration', 'phpfpm.request_duration', 'line'], 'lines': [ ["maxReqDur", 'max request duration'], ["avgReqDur", 'average request duration'] ]}, 'request_cpu': { - 'options': [None, 'PHP-FPM Request CPU', 'percent', 'Request CPU', 'phpfpm.request_cpu', 'line'], + 'options': [None, 'PHP-FPM Request CPU', 'percent', 'request CPU', 'phpfpm.request_cpu', 'line'], 'lines': [ ["maxReqCPU", 'max request cpu'], ["avgReqCPU", 'average request cpu'] ]}, 'request_mem': { - 'options': [None, 'PHP-FPM Request Memory', 'kilobytes', 'Request Memory', 'phpfpm.request_mem', 'line'], + 'options': [None, 'PHP-FPM Request Memory', 'kilobytes', 'request memory', 'phpfpm.request_mem', 'line'], 'lines': [ ["maxReqMem", 'max request memory'], ["avgReqMem", 'average request memory'] @@ -137,6 +137,3 @@ class Service(UrlService): if len(data) == 0: return None return data - - def check(self): - return UrlService.check(self) diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py index aecba8ea18..8940b8731a 100644 --- a/python.d/python_modules/base.py +++ b/python.d/python_modules/base.py @@ -116,33 +116,21 @@ class SimpleService(threading.Thread): Return value presents exit status of update() :return: boolean """ - t_start = time.time() - timetable = self.timetable + t_start = float(time.time()) chart_name = self.chart_name - # check if it is time to execute job update() function - if timetable['next'] > t_start: - self.debug(chart_name, "will be run in", str(int((timetable['next'] - t_start) * 1000)), "ms") - return True - since_last = int((t_start - timetable['last']) * 1000000) - self.debug(chart_name, - "ready to run, after", str(int((t_start - timetable['last']) * 1000)), - "ms (update_every:", str(timetable['freq'] * 1000), - "ms, latency:", str(int((t_start - timetable['next']) * 1000)), "ms") + since_last = int((t_start - self.timetable['last']) * 1000000) if self.__first_run: since_last = 0 + if not self.update(since_last): self.error("update function failed.") return False - t_end = time.time() - self.timetable['next'] = t_end - (t_end % timetable['freq']) + timetable['freq'] + # draw performance graph - run_time = str(int((t_end - t_start) * 1000)) - # noinspection SqlNoDataSourceInspection + run_time = int((time.time() - t_start) * 1000) print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % - (self.chart_name, str(since_last), run_time)) - # sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % - # (self.chart_name, str(since_last), run_time)) + (self.chart_name, str(since_last), str(run_time))) self.debug(chart_name, "updated in", str(run_time), "ms") self.timetable['last'] = t_start @@ -155,23 +143,49 @@ class SimpleService(threading.Thread): Exits when job failed or timed out. :return: None """ - self.timetable['last'] = time.time() + step = float(self.timetable['freq']) + penalty = 0 + self.timetable['last'] = float(time.time() - step) + self.debug("starting data collection - update frequency:", str(step), " retries allowed:", str(self.retries)) while True: # run forever, unless something is wrong + now = float(time.time()) + next = self.timetable['next'] = now - (now % step) + step + penalty + + # it is important to do this in a loop + # sleep() is interruptable + while now < next: + self.debug("sleeping for", str(next - now), "secs to reach frequency of", str(step), "secs, now:", str(now), " next:", str(next), " penalty:", str(penalty)) + time.sleep(next - now) + now = float(time.time()) + + # do the job try: status = self._run_once() except Exception as e: - self.error("Something wrong: ", str(e)) + self.alert("internal error - aborting data collection: " + str(e)) return - if status: # handle retries if update failed - time.sleep(self.timetable['next'] - time.time()) + + if status: + # it is good self.retries_left = self.retries + penalty = 0 else: + # it failed self.retries_left -= 1 if self.retries_left <= 0: - self.error("no more retries. Exiting") - return + if penalty == 0: + penalty = float(self.retries * step) / 2 + else: + penalty *= 1.5 + + if penalty > 600: + penalty = 600 + + self.retries_left = self.retries + self.alert("failed to collect data for " + str(self.retries) + " times - increasing penalty to " + str(penalty) + " sec and trying again") + else: - time.sleep(self.timetable['freq']) + self.error("failed to collect data - " + str(self.retries_left) + " retries left - penalty: " + str(penalty) + " sec") # --- CHART --- @@ -318,6 +332,12 @@ class SimpleService(threading.Thread): """ msg.error(self.chart_name, *params) + def alert(self, *params): + """ + Show error message on stderr + """ + msg.alert(self.chart_name, *params) + def debug(self, *params): """ Show debug message on stderr @@ -345,10 +365,18 @@ class SimpleService(threading.Thread): :return: boolean """ self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.") - if self._get_data() is None or len(self._get_data()) == 0: + data = self._get_data() + + if data is None: + self.debug("failed to receive data during check().") return False - else: - return True + + if len(data) == 0: + self.debug("empty data during check().") + return False + + self.debug("successfully received data during check(): '" + str(data) + "'") + return True def create(self): """ @@ -357,6 +385,7 @@ class SimpleService(threading.Thread): """ data = self._get_data() if data is None: + self.debug("failed to receive data during create().") return False idx = 0 @@ -380,7 +409,7 @@ class SimpleService(threading.Thread): """ data = self._get_data() if data is None: - self.debug("_get_data() returned no data") + self.debug("failed to receive data during update().") return False updated = False @@ -506,8 +535,83 @@ class SocketService(SimpleService): self.unix_socket = None self.request = "" self.__socket_config = None + self.__empty_request = "".encode() SimpleService.__init__(self, configuration=configuration, name=name) + def _socketerror(self, message=None): + if self.unix_socket is not None: + self.error("unix socket '" + self.unix_socket + "':", message) + else: + if self.__socket_config is not None: + af, socktype, proto, canonname, sa = self.__socket_config + self.error("socket to '" + str(sa[0]) + "' port " + str(sa[1]) + ":", message) + else: + self.error("unknown socket:", message) + + def _connect2socket(self, res=None): + """ + Connect to a socket, passing the result of getaddrinfo() + :return: boolean + """ + if res is None: + res = self.__socket_config + if res is None: + self.error("Cannot create socket to 'None':") + return False + + af, socktype, proto, canonname, sa = res + try: + self.debug("creating socket to '" + str(sa[0]) + "', port " + str(sa[1])) + self._sock = socket.socket(af, socktype, proto) + except socket.error as e: + self.error("Failed to create socket to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) + self._sock = None + self.__socket_config = None + return False + + try: + self.debug("connecting socket to '" + str(sa[0]) + "', port " + str(sa[1])) + self._sock.connect(sa) + except socket.error as e: + self.error("Failed to connect to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e)) + self._disconnect() + self.__socket_config = None + return False + + self.debug("connected to '" + str(sa[0]) + "', port " + str(sa[1])) + self.__socket_config = res + return True + + def _connect2unixsocket(self): + """ + Connect to a unix socket, given its filename + :return: boolean + """ + if self.unix_socket is None: + self.error("cannot connect to unix socket 'None'") + return False + + try: + self.debug("attempting DGRAM unix socket '" + str(self.unix_socket) + "'") + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.connect(self.unix_socket) + self.debug("connected DGRAM unix socket '" + str(self.unix_socket) + "'") + return True + except socket.error as e: + self.debug("Failed to connect DGRAM unix socket '" + str(self.unix_socket) + "':", str(e)) + + try: + self.debug("attempting STREAM unix socket '" + str(self.unix_socket) + "'") + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.connect(self.unix_socket) + self.debug("connected STREAM unix socket '" + str(self.unix_socket) + "'") + return True + except socket.error as e: + self.debug("Failed to connect STREAM unix socket '" + str(self.unix_socket) + "':", str(e)) + self.error("Failed to connect to unix socket '" + str(self.unix_socket) + "':", str(e)) + self._sock = None + return False + def _connect(self): """ Recreate socket and connect to it since sockets cannot be reused after closing @@ -515,63 +619,38 @@ class SocketService(SimpleService): :return: """ try: - if self.unix_socket is None: - if self.__socket_config is None: - # establish ipv6 or ipv4 connection. - for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): - try: - # noinspection SpellCheckingInspection - af, socktype, proto, canonname, sa = res - self._sock = socket.socket(af, socktype, proto) - except socket.error as e: - self.debug("Cannot create socket:", str(e)) - self._sock = None - continue - try: - self._sock.connect(sa) - except socket.error as e: - self.debug("Cannot connect to socket:", str(e)) - self._disconnect() - continue - self.__socket_config = res - break - else: - # connect to socket with previously established configuration - try: - af, socktype, proto, canonname, sa = self.__socket_config - self._sock = socket.socket(af, socktype, proto) - self._sock.connect(sa) - except socket.error as e: - self.debug("Cannot create or connect to socket:", str(e)) - self._disconnect() + if self.unix_socket is not None: + self._connect2unixsocket() + else: - # connect to unix socket - try: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self._sock.connect(self.unix_socket) - except socket.error: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._sock.connect(self.unix_socket) + if self.__socket_config is not None: + self._connect2socket() + else: + for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): + if self._connect2socket(res): break except Exception as e: - self.error(str(e), - "Cannot create socket with following configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) self._sock = None - self._sock.setblocking(0) + self.__socket_config = None + + if self._sock is not None: + self._sock.setblocking(0) + self._sock.settimeout(5) + self.debug("set socket timeout to: " + str(self._sock.gettimeout())) def _disconnect(self): """ Close socket connection :return: """ - try: - self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all - self._sock.close() - except Exception: - pass - self._sock = None + if self._sock is not None: + try: + self.debug("closing socket") + self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all + self._sock.close() + except Exception: + pass + self._sock = None def _send(self): """ @@ -579,15 +658,13 @@ class SocketService(SimpleService): :return: boolean """ # Send request if it is needed - if self.request != "".encode(): + if self.request != self.__empty_request: try: + self.debug("sending request:", str(self.request)) self._sock.send(self.request) except Exception as e: + self._socketerror("error sending request:" + str(e)) self._disconnect() - self.error(str(e), - "used configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) return False return True @@ -598,24 +675,28 @@ class SocketService(SimpleService): """ data = "" while True: + self.debug("receiving response") try: - ready_to_read, _, in_error = select.select([self._sock], [], [], 5) + buf = self._sock.recv(4096) except Exception as e: - self.debug("SELECT", str(e)) + self._socketerror("failed to receive response:" + str(e)) self._disconnect() break - if len(ready_to_read) > 0: - buf = self._sock.recv(4096) - if len(buf) == 0 or buf is None: # handle server disconnect - break - data += buf.decode('utf-8', 'ignore') - if self._check_raw_data(data): - break - else: - self.error("Socket timed out.") + + if buf is None or len(buf) == 0: # handle server disconnect + if data == "": + self._socketerror("unexpectedly disconnected") + else: + self.debug("server closed the connection") self._disconnect() break + self.debug("received data:", str(buf)) + data += buf.decode('utf-8', 'ignore') + if self._check_raw_data(data): + break + + self.debug("final response:", str(data)) return data def _get_raw_data(self): @@ -625,6 +706,8 @@ class SocketService(SimpleService): """ if self._sock is None: self._connect() + if self._sock is None: + return None # Send request if it is needed if not self._send(): @@ -654,6 +737,7 @@ class SocketService(SimpleService): self.name = "" else: self.name = str(self.name) + try: self.unix_socket = str(self.configuration['socket']) except (KeyError, TypeError): @@ -667,10 +751,12 @@ class SocketService(SimpleService): self.port = int(self.configuration['port']) except (KeyError, TypeError): self.debug("No port specified. Using: '" + str(self.port) + "'") + try: self.request = str(self.configuration['request']) except (KeyError, TypeError): self.debug("No request specified. Using: '" + str(self.request) + "'") + self.request = self.request.encode() def check(self): @@ -724,7 +810,7 @@ class LogService(SimpleService): try: self.log_path = str(self.configuration['path']) except (KeyError, TypeError): - self.error("No path to log specified. Using: '" + self.log_path + "'") + self.info("No path to log specified. Using: '" + self.log_path + "'") if os.access(self.log_path, os.R_OK): return True @@ -779,13 +865,14 @@ class ExecutableService(SimpleService): try: self.command = str(self.configuration['command']) except (KeyError, TypeError): - self.error("No command specified. Using: '" + self.command + "'") + self.info("No command specified. Using: '" + self.command + "'") command = self.command.split(' ') for arg in command[1:]: if any(st in arg for st in self.bad_substrings): self.error("Bad command argument:" + " ".join(self.command[1:])) return False + # test command and search for it in /usr/sbin or /sbin when failed base = command[0].split('/')[-1] if self._get_raw_data() is None: @@ -793,8 +880,10 @@ class ExecutableService(SimpleService): command[0] = prefix + base if os.path.isfile(command[0]): break + self.command = command if self._get_data() is None or len(self._get_data()) == 0: self.error("Command", self.command, "returned no data") return False + return True diff --git a/python.d/redis.chart.py b/python.d/redis.chart.py index cb64a33bb7..56561ee095 100644 --- a/python.d/redis.chart.py +++ b/python.d/redis.chart.py @@ -19,40 +19,58 @@ retries = 60 # 'unix_socket': None # }} -ORDER = ['operations', 'hit_rate', 'memory', 'keys', 'clients', 'slaves'] +ORDER = ['operations', 'hit_rate', 'memory', 'keys', 'net', 'connections', 'clients', 'slaves', 'persistence'] CHARTS = { 'operations': { - 'options': [None, 'Operations', 'operations/s', 'Statistics', 'redis.operations', 'line'], + 'options': [None, 'Redis Operations', 'operations/s', 'operations', 'redis.operations', 'line'], 'lines': [ + ['total_commands_processed', 'commands', 'incremental'], ['instantaneous_ops_per_sec', 'operations', 'absolute'] ]}, 'hit_rate': { - 'options': [None, 'Hit rate', 'percent', 'Statistics', 'redis.hit_rate', 'line'], + 'options': [None, 'Redis Hit rate', 'percent', 'hits', 'redis.hit_rate', 'line'], 'lines': [ ['hit_rate', 'rate', 'absolute'] ]}, 'memory': { - 'options': [None, 'Memory utilization', 'kilobytes', 'Memory', 'redis.memory', 'line'], + 'options': [None, 'Redis Memory utilization', 'kilobytes', 'memory', 'redis.memory', 'line'], 'lines': [ ['used_memory', 'total', 'absolute', 1, 1024], ['used_memory_lua', 'lua', 'absolute', 1, 1024] ]}, + 'net': { + 'options': [None, 'Redis Bandwidth', 'kilobits/s', 'network', 'redis.net', 'area'], + 'lines': [ + ['total_net_input_bytes', 'in', 'incremental', 8, 1024], + ['total_net_output_bytes', 'out', 'incremental', -8, 1024] + ]}, 'keys': { - 'options': [None, 'Database keys', 'keys', 'Keys', 'redis.keys', 'line'], + 'options': [None, 'Redis Keys per Database', 'keys', 'keys', 'redis.keys', 'line'], 'lines': [ # lines are created dynamically in `check()` method ]}, + 'connections': { + 'options': [None, 'Redis Connections', 'connections/s', 'connections', 'redis.connections', 'line'], + 'lines': [ + ['total_connections_received', 'received', 'incremental', 1], + ['rejected_connections', 'rejected', 'incremental', -1] + ]}, 'clients': { - 'options': [None, 'Clients', 'clients', 'Clients', 'redis.clients', 'line'], + 'options': [None, 'Redis Clients', 'clients', 'connections', 'redis.clients', 'line'], 'lines': [ - ['connected_clients', 'connected', 'absolute'], - ['blocked_clients', 'blocked', 'absolute'] + ['connected_clients', 'connected', 'absolute', 1], + ['blocked_clients', 'blocked', 'absolute', -1] ]}, 'slaves': { - 'options': [None, 'Slaves', 'slaves', 'Replication', 'redis.slaves', 'line'], + 'options': [None, 'Redis Slaves', 'slaves', 'replication', 'redis.slaves', 'line'], 'lines': [ ['connected_slaves', 'connected', 'absolute'] + ]}, + 'persistence': { + 'options': [None, 'Redis Persistence Changes Since Last Save', 'changes', 'persistence', 'redis.rdb_changes', 'line'], + 'lines': [ + ['rdb_changes_since_last_save', 'changes', 'absolute'] ]} } @@ -61,57 +79,74 @@ class Service(SocketService): def __init__(self, configuration=None, name=None): SocketService.__init__(self, configuration=configuration, name=name) self.request = "INFO\r\n" - self.host = "localhost" - self.port = 6379 - self.unix_socket = None self.order = ORDER self.definitions = CHARTS self._keep_alive = True self.chart_name = "" - self.passwd = None + self.passwd = None + self.port = 6379 + if 'port' in configuration: + self.port = configuration['port'] if 'pass' in configuration: self.passwd = configuration['pass'] + if 'host' in configuration: + self.host = configuration['host'] + if 'socket' in configuration: + self.unix_socket = configuration['socket'] def _get_data(self): """ Get data from socket :return: dict """ + if self.passwd: + info_request = self.request + self.request = "AUTH " + self.passwd + "\r\n" + raw = self._get_raw_data().strip() + if raw != "+OK": + self.error("invalid password") + return None + self.request = info_request + + response = self._get_raw_data() + if response is None: + # error has already been logged + return None + try: - if self.passwd: - info_request = self.request - self.request = "AUTH " + self.passwd + "\r\n" - raw = self._get_raw_data().strip() - if raw != "+OK": - self.error("invalid password") - return None - self.request = info_request - raw = self._get_raw_data().split("\n") + parsed = response.split("\n") except AttributeError: - self.error("no data received") + self.error("response is invalid/empty") return None + data = {} - for line in raw: - if line.startswith(('instantaneous', 'keyspace', 'used_memory', 'connected', 'blocked')): - try: - t = line.split(':') - data[t[0]] = int(t[1]) - except (IndexError, ValueError): - pass - elif line.startswith('db'): + for line in parsed: + if len(line) < 5 or line[0] == '$' or line[0] == '#': + continue + + if line.startswith('db'): tmp = line.split(',')[0].replace('keys=', '') record = tmp.split(':') - data[record[0]] = int(record[1]) + data[record[0]] = record[1] + continue + + try: + t = line.split(':') + data[t[0]] = t[1] + except (IndexError, ValueError): + self.debug("invalid line received: " + str(line)) + pass + + if len(data) == 0: + self.error("received data doesn't have any records") + return None + try: - data['hit_rate'] = int((data['keyspace_hits'] / float(data['keyspace_hits'] + data['keyspace_misses'])) * 100) + data['hit_rate'] = (int(data['keyspace_hits']) * 100) / (int(data['keyspace_hits']) + int(data['keyspace_misses'])) except: data['hit_rate'] = 0 - if len(data) == 0: - self.error("received data doesn't have needed records") - return None - else: - return data + return data def _check_raw_data(self, data): """ @@ -123,14 +158,15 @@ class Service(SocketService): length = len(data) supposed = data.split('\n')[0][1:] offset = len(supposed) + 4 # 1 dollar sing, 1 new line character + 1 ending sequence '\r\n' - if (not supposed.isdigit()) : + if not supposed.isdigit(): return True supposed = int(supposed) + if length - offset >= supposed: + self.debug("received full response from redis") return True - else: - return False + self.debug("waiting more data from redis") return False def check(self): |