summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlya Mashchenko <ilya@netdata.cloud>2020-07-21 09:46:31 +0300
committerGitHub <noreply@github.com>2020-07-21 09:46:31 +0300
commitdef03a151cd042748b2687729a922d2f93fbd570 (patch)
tree39ec86975f9fa425d216d5e862ad7eb1c1011373
parent79ea229a81099f374cad17018fff5a04dc2fac11 (diff)
python.d: add job file lock registry (#9564)
-rw-r--r--REDISTRIBUTED.md4
-rw-r--r--collectors/python.d.plugin/Makefile.am1
-rw-r--r--collectors/python.d.plugin/python.d.plugin.in65
-rw-r--r--collectors/python.d.plugin/python_modules/third_party/filelock.py451
4 files changed, 521 insertions, 0 deletions
diff --git a/REDISTRIBUTED.md b/REDISTRIBUTED.md
index bc1ce59ca8..5a3c328709 100644
--- a/REDISTRIBUTED.md
+++ b/REDISTRIBUTED.md
@@ -178,4 +178,8 @@ connectivity is not available.
Copyright 2014, 2015, 2016 Ori Livneh [ori@wikimedia.org](mailto:ori@wikimedia.org)
[Apache-2.0](http://www.apache.org/licenses/LICENSE-2.0)
+- [filelock](https://github.com/benediktschmitt/py-filelock)
+
+ Copyright 2015, Benedikt Schmitt [Unlicense License](https://unlicense.org/)
+
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2FREDISTRIBUTED&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/collectors/python.d.plugin/Makefile.am b/collectors/python.d.plugin/Makefile.am
index e678f86a21..176bd3cb01 100644
--- a/collectors/python.d.plugin/Makefile.am
+++ b/collectors/python.d.plugin/Makefile.am
@@ -141,6 +141,7 @@ dist_third_party_DATA = \
python_modules/third_party/mcrcon.py \
python_modules/third_party/boinc_client.py \
python_modules/third_party/monotonic.py \
+ python_modules/third_party/filelock.py \
$(NULL)
pythonyaml2dir=$(pythonmodulesdir)/pyyaml2
diff --git a/collectors/python.d.plugin/python.d.plugin.in b/collectors/python.d.plugin/python.d.plugin.in
index b289def994..106a77475e 100644
--- a/collectors/python.d.plugin/python.d.plugin.in
+++ b/collectors/python.d.plugin/python.d.plugin.in
@@ -52,6 +52,7 @@ ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
+ENV_NETDATA_LOCKS_DIR = 'NETDATA_LOCKS_DIR'
def add_pythond_packages():
@@ -66,6 +67,7 @@ add_pythond_packages()
from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
+from third_party import filelock
try:
from collections import OrderedDict
@@ -90,6 +92,10 @@ def dirs():
ENV_NETDATA_PLUGINS_DIR,
os.path.dirname(__file__),
)
+ locks = os.getenv(
+ ENV_NETDATA_LOCKS_DIR,
+ # TODO: add '@locksdir_POST@
+ )
modules_user_config = os.path.join(plugin_user_config, 'python.d')
modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
modules = os.path.abspath(pluginsd + '/../python.d')
@@ -103,6 +109,7 @@ def dirs():
'modules_stock_config',
'modules',
'var_lib',
+ 'locks',
]
)
return Dirs(
@@ -112,6 +119,7 @@ def dirs():
modules_stock_config,
modules,
var_lib,
+ locks,
)
@@ -304,6 +312,9 @@ class Job(threading.Thread):
def init(self):
self.job = self.service(configuration=copy.deepcopy(self.config))
+ def full_name(self):
+ return self.job.name
+
def check(self):
ok = self.job.check()
self.checks -= self.checks != self.inf and not ok
@@ -445,6 +456,41 @@ class PluginConfig(dict):
return self['default_run']
+class FileLockRegistry:
+ def __init__(self, path):
+ self.path = path
+ self.locks = dict()
+
+ def register(self, name):
+ if name in self.locks:
+ return
+ file = os.path.join(self.path, name)
+ lock = filelock.FileLock(file)
+ lock.acquire(timeout=0)
+ self.locks[name] = lock
+
+ def unregister(self, name):
+ if name not in self.locks:
+ return
+ lock = self.locks[name]
+ lock.release()
+ del self.locks[name]
+
+
+class DummyRegistry:
+ def register(self, name):
+ pass
+
+ def unregister(self, name):
+ pass
+
+
+def create_jobs_registry():
+ if not DIRS.locks:
+ return DummyRegistry()
+ return FileLockRegistry(DIRS.locks)
+
+
class Plugin:
config_name = 'python.d.conf'
jobs_status_dump_name = 'pythond-jobs-statuses.json'
@@ -454,6 +500,7 @@ class Plugin:
self.min_update_every = min_update_every
self.config = PluginConfig(PLUGIN_BASE_CONF)
self.log = PythonDLogger()
+ self.registry = create_jobs_registry()
self.started_jobs = collections.defaultdict(dict)
self.jobs = list()
self.saver = None
@@ -605,11 +652,29 @@ class Plugin:
self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
try:
+ self.registry.register(job.full_name())
+ except filelock.Timeout as error:
+ self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+ except Exception as error:
+ self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
+ job.module_name, job.real_name, error))
+ job.status = JOB_STATUS_DROPPED
+ continue
+
+ try:
job.create()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
job.module_name, job.real_name, repr(error)))
job.status = JOB_STATUS_DROPPED
+ try:
+ self.registry.unregister(job.full_name())
+ except Exception as error:
+ self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
+ job.module_name, job.real_name, error))
continue
self.started_jobs[job.module_name] = job.actual_name
diff --git a/collectors/python.d.plugin/python_modules/third_party/filelock.py b/collectors/python.d.plugin/python_modules/third_party/filelock.py
new file mode 100644
index 0000000000..4c981672bc
--- /dev/null
+++ b/collectors/python.d.plugin/python_modules/third_party/filelock.py
@@ -0,0 +1,451 @@
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# For more information, please refer to <http://unlicense.org>
+
+"""
+A platform independent file lock that supports the with-statement.
+"""
+
+
+# Modules
+# ------------------------------------------------
+import logging
+import os
+import threading
+import time
+try:
+ import warnings
+except ImportError:
+ warnings = None
+
+try:
+ import msvcrt
+except ImportError:
+ msvcrt = None
+
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
+
+
+# Backward compatibility
+# ------------------------------------------------
+try:
+ TimeoutError
+except NameError:
+ TimeoutError = OSError
+
+
+# Data
+# ------------------------------------------------
+__all__ = [
+ "Timeout",
+ "BaseFileLock",
+ "WindowsFileLock",
+ "UnixFileLock",
+ "SoftFileLock",
+ "FileLock"
+]
+
+__version__ = "3.0.12"
+
+
+_logger = None
+def logger():
+ """Returns the logger instance used in this module."""
+ global _logger
+ _logger = _logger or logging.getLogger(__name__)
+ return _logger
+
+
+# Exceptions
+# ------------------------------------------------
+class Timeout(TimeoutError):
+ """
+ Raised when the lock could not be acquired in *timeout*
+ seconds.
+ """
+
+ def __init__(self, lock_file):
+ """
+ """
+ #: The path of the file lock.
+ self.lock_file = lock_file
+ return None
+
+ def __str__(self):
+ temp = "The file lock '{}' could not be acquired."\
+ .format(self.lock_file)
+ return temp
+
+
+# Classes
+# ------------------------------------------------
+
+# This is a helper class which is returned by :meth:`BaseFileLock.acquire`
+# and wraps the lock to make sure __enter__ is not called twice when entering
+# the with statement.
+# If we would simply return *self*, the lock would be acquired again
+# in the *__enter__* method of the BaseFileLock, but not released again
+# automatically.
+#
+# :seealso: issue #37 (memory leak)
+class _Acquire_ReturnProxy(object):
+
+ def __init__(self, lock):
+ self.lock = lock
+ return None
+
+ def __enter__(self):
+ return self.lock
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.lock.release()
+ return None
+
+
+class BaseFileLock(object):
+ """
+ Implements the base class of a file lock.
+ """
+
+ def __init__(self, lock_file, timeout = -1):
+ """
+ """
+ # The path to the lock file.
+ self._lock_file = lock_file
+
+ # The file descriptor for the *_lock_file* as it is returned by the
+ # os.open() function.
+ # This file lock is only NOT None, if the object currently holds the
+ # lock.
+ self._lock_file_fd = None
+
+ # The default timeout value.
+ self.timeout = timeout
+
+ # We use this lock primarily for the lock counter.
+ self._thread_lock = threading.Lock()
+
+ # The lock counter is used for implementing the nested locking
+ # mechanism. Whenever the lock is acquired, the counter is increased and
+ # the lock is only released, when this value is 0 again.
+ self._lock_counter = 0
+ return None
+
+ @property
+ def lock_file(self):
+ """
+ The path to the lock file.
+ """
+ return self._lock_file
+
+ @property
+ def timeout(self):
+ """
+ You can set a default timeout for the filelock. It will be used as
+ fallback value in the acquire method, if no timeout value (*None*) is
+ given.
+
+ If you want to disable the timeout, set it to a negative value.
+
+ A timeout of 0 means, that there is exactly one attempt to acquire the
+ file lock.
+
+ .. versionadded:: 2.0.0
+ """
+ return self._timeout
+
+ @timeout.setter
+ def timeout(self, value):
+ """
+ """
+ self._timeout = float(value)
+ return None
+
+ # Platform dependent locking
+ # --------------------------------------------
+
+ def _acquire(self):
+ """
+ Platform dependent. If the file lock could be
+ acquired, self._lock_file_fd holds the file descriptor
+ of the lock file.
+ """
+ raise NotImplementedError()
+
+ def _release(self):
+ """
+ Releases the lock and sets self._lock_file_fd to None.
+ """
+ raise NotImplementedError()
+
+ # Platform independent methods
+ # --------------------------------------------
+
+ @property
+ def is_locked(self):
+ """
+ True, if the object holds the file lock.
+
+ .. versionchanged:: 2.0.0
+
+ This was previously a method and is now a property.
+ """
+ return self._lock_file_fd is not None
+
+ def acquire(self, timeout=None, poll_intervall=0.05):
+ """
+ Acquires the file lock or fails with a :exc:`Timeout` error.
+
+ .. code-block:: python
+
+ # You can use this method in the context manager (recommended)
+ with lock.acquire():
+ pass
+
+ # Or use an equivalent try-finally construct:
+ lock.acquire()
+ try:
+ pass
+ finally:
+ lock.release()
+
+ :arg float timeout:
+ The maximum time waited for the file lock.
+ If ``timeout < 0``, there is no timeout and this method will
+ block until the lock could be acquired.
+ If ``timeout`` is None, the default :attr:`~timeout` is used.
+
+ :arg float poll_intervall:
+ We check once in *poll_intervall* seconds if we can acquire the
+ file lock.
+
+ :raises Timeout:
+ if the lock could not be acquired in *timeout* seconds.
+
+ .. versionchanged:: 2.0.0
+
+ This method returns now a *proxy* object instead of *self*,
+ so that it can be used in a with statement without side effects.
+ """
+ # Use the default timeout, if no timeout is provided.
+ if timeout is None:
+ timeout = self.timeout
+
+ # Increment the number right at the beginning.
+ # We can still undo it, if something fails.
+ with self._thread_lock:
+ self._lock_counter += 1
+
+ lock_id = id(self)
+ lock_filename = self._lock_file
+ start_time = time.time()
+ try:
+ while True:
+ with self._thread_lock:
+ if not self.is_locked:
+ logger().debug('Attempting to acquire lock %s on %s', lock_id, lock_filename)
+ self._acquire()
+
+ if self.is_locked:
+ logger().info('Lock %s acquired on %s', lock_id, lock_filename)
+ break
+ elif timeout >= 0 and time.time() - start_time > timeout:
+ logger().debug('Timeout on acquiring lock %s on %s', lock_id, lock_filename)
+ raise Timeout(self._lock_file)
+ else:
+ logger().debug(
+ 'Lock %s not acquired on %s, waiting %s seconds ...',
+ lock_id, lock_filename, poll_intervall
+ )
+ time.sleep(poll_intervall)
+ except:
+ # Something did go wrong, so decrement the counter.
+ with self._thread_lock:
+ self._lock_counter = max(0, self._lock_counter - 1)
+
+ raise
+ return _Acquire_ReturnProxy(lock = self)
+
+ def release(self, force = False):
+ """
+ Releases the file lock.
+
+ Please note, that the lock is only completly released, if the lock
+ counter is 0.
+
+ Also note, that the lock file itself is not automatically deleted.
+
+ :arg bool force:
+ If true, the lock counter is ignored and the lock is released in
+ every case.
+ """
+ with self._thread_lock:
+
+ if self.is_locked:
+ self._lock_counter -= 1
+
+ if self._lock_counter == 0 or force:
+ lock_id = id(self)
+ lock_filename = self._lock_file
+
+ logger().debug('Attempting to release lock %s on %s', lock_id, lock_filename)
+ self._release()
+ self._lock_counter = 0
+ logger().info('Lock %s released on %s', lock_id, lock_filename)
+
+ return None
+
+ def __enter__(self):
+ self.acquire()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.release()
+ return None
+
+ def __del__(self):
+ self.release(force = True)
+ return None
+
+
+# Windows locking mechanism
+# ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+class WindowsFileLock(BaseFileLock):
+ """
+ Uses the :func:`msvcrt.locking` function to hard lock the lock file on
+ windows systems.
+ """
+
+ def _acquire(self):
+ open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
+
+ try:
+ fd = os.open(self._lock_file, open_mode)
+ except OSError:
+ pass
+ else:
+ try:
+ msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
+ except (IOError, OSError):
+ os.close(fd)
+ else:
+ self._lock_file_fd = fd
+ return None
+
+ def _release(self):
+ fd = self._lock_file_fd
+ self._lock_file_fd = None
+ msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
+ os.close(fd)
+
+ try:
+ os.remove(self._lock_file)
+ # Probably another instance of the application
+ # that acquired the file lock.
+ except OSError:
+ pass
+ return None
+
+# Unix locking mechanism
+# ~~~~~~~~~~~~~~~~~~~~~~
+
+class UnixFileLock(BaseFileLock):
+ """
+ Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.
+ """
+
+ def _acquire(self):
+ open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
+ fd = os.open(self._lock_file, open_mode)
+
+ try:
+ fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except (IOError, OSError):
+ os.close(fd)
+ else:
+ self._lock_file_fd = fd
+ return None
+
+ def _release(self):
+ # Do not remove the lockfile:
+ #
+ # https://github.com/benediktschmitt/py-filelock/issues/31
+ # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
+ fd = self._lock_file_fd
+ self._lock_file_fd = None
+ fcntl.flock(fd, fcntl.LOCK_UN)
+ os.close(fd)
+ return None
+
+# Soft lock
+# ~~~~~~~~~
+
+class SoftFileLock(BaseFileLock):
+ """
+ Simply watches the existence of the lock file.
+ """
+
+ def _acquire(self):
+ open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC
+ try:
+ fd = os.open(self._lock_file, open_mode)
+ except (IOError, OSError):
+ pass
+ else:
+ self._lock_file_fd = fd
+ return None
+
+ def _release(self):
+ os.close(self._lock_file_fd)
+ self._lock_file_fd = None
+
+ try:
+ os.remove(self._lock_file)
+ # The file is already deleted and that's what we want.
+ except OSError:
+ pass
+ return None
+
+
+# Platform filelock
+# ~~~~~~~~~~~~~~~~~
+
+#: Alias for the lock, which should be used for the current platform. On
+#: Windows, this is an alias for :class:`WindowsFileLock`, on Unix for
+#: :class:`UnixFileLock` and otherwise for :class:`SoftFileLock`.
+FileLock = None
+
+if msvcrt:
+ FileLock = WindowsFileLock
+elif fcntl:
+ FileLock = UnixFileLock
+else:
+ FileLock = SoftFileLock
+
+ if warnings is not None:
+ warnings.warn("only soft file lock is available")