summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-05-09 16:34:31 +0300
committerGitHub <noreply@github.com>2022-05-09 16:34:31 +0300
commiteb216a1f4bbb26e1f18537b30d22e8ad8711f42c (patch)
tree353938a0f71da7b04d4f9b67769d2a38ba6db2cb /libnetdata
parent0b3ee50c76dcc3b8dcdd13cec0e432394d3c6964 (diff)
Workers utilization charts (#12807)
* initial version of worker utilization * working example * without mutexes * monitoring DBENGINE, ACLKSYNC, WEB workers * added charts to monitor worker usage * fixed charts units * updated contexts * updated priorities * added documentation * converted threads to stacked chart * One query per query thread * Revert "One query per query thread" This reverts commit 6aeb391f5987c3c6ba2864b559fd7f0cd64b14d3. * fixed priority for web charts * read worker cpu utilization from proc * read workers cpu utilization via /proc/self/task/PID/stat, so that we have cpu utilization even when the jobs are too long to finish within our update_every frequency * disabled web server cpu utilization monitoring - it is now monitored by worker utilization * tight integration of worker utilization to web server * monitoring statsd worker threads * code cleanup and renaming of variables * contrained worker and statistics conflict to just one variable * support for rendering jobs per type * better priorities and removed the total jobs chart * added busy time in ms per job type * added proc.plugin monitoring, switch clock to MONOTONIC_RAW if available, global statistics now cleans up old worker threads * isolated worker thread families * added cgroups.plugin workers * remove unneeded dimensions when then expected worker is just one * plugins.d and streaming monitoring * rebased; support worker_is_busy() to be called one after another * added diskspace plugin monitoring * added tc.plugin monitoring * added ML threads monitoring * dont create dimensions and charts that are not needed * fix crash when job types are added on the fly * added timex and idlejitter plugins; collected heartbeat statistics; reworked heartbeat according to the POSIX * the right name is heartbeat for this chart * monitor streaming senders * added streaming senders to global stats * prevent division by zero * added clock_init() to external C plugins * added freebsd and macos plugins * added freebsd and macos to global statistics * dont use new as a variable; address compiler warnings on FreeBSD and MacOS * refactored contexts to be unique; added health threads monitoring Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/Makefile.am1
-rw-r--r--libnetdata/clocks/clocks.c245
-rw-r--r--libnetdata/clocks/clocks.h25
-rw-r--r--libnetdata/libnetdata.h1
-rw-r--r--libnetdata/worker_utilization/Makefile.am8
-rw-r--r--libnetdata/worker_utilization/README.md58
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c201
-rw-r--r--libnetdata/worker_utilization/worker_utilization.h22
8 files changed, 476 insertions, 85 deletions
diff --git a/libnetdata/Makefile.am b/libnetdata/Makefile.am
index ea24601498..167d05caa1 100644
--- a/libnetdata/Makefile.am
+++ b/libnetdata/Makefile.am
@@ -26,6 +26,7 @@ SUBDIRS = \
storage_number \
threads \
url \
+ worker_utilization \
tests \
$(NULL)
diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c
index 5dfd93753a..85f4eff418 100644
--- a/libnetdata/clocks/clocks.c
+++ b/libnetdata/clocks/clocks.c
@@ -7,6 +7,9 @@
static clockid_t clock_boottime_to_use = CLOCK_MONOTONIC;
static clockid_t clock_monotonic_to_use = CLOCK_MONOTONIC;
+usec_t clock_monotonic_resolution = 1000;
+usec_t clock_realtime_resolution = 1000;
+
#ifndef HAVE_CLOCK_GETTIME
inline int clock_gettime(clockid_t clk_id, struct timespec *ts) {
struct timeval tv;
@@ -20,15 +23,19 @@ inline int clock_gettime(clockid_t clk_id, struct timespec *ts) {
}
#endif
-// When running a binary with CLOCK_MONOTONIC_COARSE defined on a system with a linux kernel older than Linux 2.6.32 the
-// clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC.
+// Similar to CLOCK_MONOTONIC, but provides access to a raw hardware-based time that is not subject to NTP adjustments
+// or the incremental adjustments performed by adjtime(3). This clock does not count time that the system is suspended
-static void test_clock_monotonic_coarse(void) {
+static void test_clock_monotonic_raw(void) {
+#ifdef CLOCK_MONOTONIC_RAW
struct timespec ts;
- if(clock_gettime(CLOCK_MONOTONIC_COARSE, &ts) == -1 && errno == EINVAL)
+ if(clock_gettime(CLOCK_MONOTONIC_RAW, &ts) == -1 && errno == EINVAL)
clock_monotonic_to_use = CLOCK_MONOTONIC;
else
- clock_monotonic_to_use = CLOCK_MONOTONIC_COARSE;
+ clock_monotonic_to_use = CLOCK_MONOTONIC_RAW;
+#else
+ clock_monotonic_to_use = CLOCK_MONOTONIC;
+#endif
}
// When running a binary with CLOCK_BOOTTIME defined on a system with a linux kernel older than Linux 2.6.39 the
@@ -42,14 +49,31 @@ static void test_clock_boottime(void) {
clock_boottime_to_use = CLOCK_BOOTTIME;
}
+static usec_t get_clock_resolution(clockid_t clock) {
+ struct timespec ts;
+ clock_getres(clock, &ts);
+ return ts.tv_sec * USEC_PER_SEC + ts.tv_nsec * NSEC_PER_USEC;
+}
+
// perform any initializations required for clocks
void clocks_init(void) {
- // monotonic coarse has to be tested before boottime
- test_clock_monotonic_coarse();
+ // monotonic raw has to be tested before boottime
+ test_clock_monotonic_raw();
// boottime has to be tested after monotonic coarse
test_clock_boottime();
+
+ clock_monotonic_resolution = get_clock_resolution(clock_monotonic_to_use);
+ clock_realtime_resolution = get_clock_resolution(CLOCK_REALTIME);
+
+ // if for any reason these are zero, netdata will crash
+ // since we use them as modulo to calculations
+ if(!clock_realtime_resolution)
+ clock_realtime_resolution = 1000;
+
+ if(!clock_monotonic_resolution)
+ clock_monotonic_resolution = 1000;
}
inline time_t now_sec(clockid_t clk_id) {
@@ -155,8 +179,110 @@ inline usec_t dt_usec(struct timeval *now, struct timeval *old) {
return (ts1 > ts2) ? (ts1 - ts2) : (ts2 - ts1);
}
+void sleep_to_absolute_time(usec_t usec) {
+ static int einval_printed = 0, enotsup_printed = 0, eunknown_printed = 0;
+ clockid_t clock = CLOCK_REALTIME;
+
+ struct timespec req = {
+ .tv_sec = (time_t)(usec / USEC_PER_SEC),
+ .tv_nsec = (suseconds_t)((usec % USEC_PER_SEC) * NSEC_PER_USEC)
+ };
+
+ int ret = 0;
+ while( (ret = clock_nanosleep(clock, TIMER_ABSTIME, &req, NULL)) != 0 ) {
+ if(ret == EINTR) continue;
+ else {
+ if (ret == EINVAL) {
+ if (!einval_printed) {
+ einval_printed++;
+ error(
+ "Invalid time given to clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld",
+ clock,
+ req.tv_sec,
+ req.tv_nsec);
+ }
+ } else if (ret == ENOTSUP) {
+ if (!enotsup_printed) {
+ enotsup_printed++;
+ error(
+ "Invalid clock id given to clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld",
+ clock,
+ req.tv_sec,
+ req.tv_nsec);
+ }
+ } else {
+ if (!eunknown_printed) {
+ eunknown_printed++;
+ error(
+ "Unknown return value %d from clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld",
+ ret,
+ clock,
+ req.tv_sec,
+ req.tv_nsec);
+ }
+ }
+ sleep_usec(usec);
+ }
+ }
+};
+
+#define HEARTBEAT_ALIGNMENT_STATISTICS_SIZE 10
+netdata_mutex_t heartbeat_alignment_mutex = NETDATA_MUTEX_INITIALIZER;
+static size_t heartbeat_alignment_id = 0;
+
+struct heartbeat_thread_statistics {
+ size_t sequence;
+ usec_t dt;
+};
+static struct heartbeat_thread_statistics heartbeat_alignment_values[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE] = { 0 };
+
+void heartbeat_statistics(usec_t *min_ptr, usec_t *max_ptr, usec_t *average_ptr, size_t *count_ptr) {
+ struct heartbeat_thread_statistics current[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE];
+ static struct heartbeat_thread_statistics old[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE] = { 0 };
+
+ memcpy(current, heartbeat_alignment_values, sizeof(struct heartbeat_thread_statistics) * HEARTBEAT_ALIGNMENT_STATISTICS_SIZE);
+
+ usec_t min = 0, max = 0, total = 0, average = 0;
+ size_t i, count = 0;
+ for(i = 0; i < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE ;i++) {
+ if(current[i].sequence == old[i].sequence) continue;
+ usec_t value = current[i].dt - old[i].dt;
+
+ if(!count) {
+ min = max = total = value;
+ count = 1;
+ }
+ else {
+ total += value;
+ if(value < min) min = value;
+ if(value > max) max = value;
+ count++;
+ }
+ }
+ average = total / count;
+
+ if(min_ptr) *min_ptr = min;
+ if(max_ptr) *max_ptr = max;
+ if(average_ptr) *average_ptr = average;
+ if(count_ptr) *count_ptr = count;
+
+ memcpy(old, current, sizeof(struct heartbeat_thread_statistics) * HEARTBEAT_ALIGNMENT_STATISTICS_SIZE);
+}
+
inline void heartbeat_init(heartbeat_t *hb) {
- hb->monotonic = hb->realtime = 0ULL;
+ hb->realtime = 0ULL;
+ hb->randomness = 250 * USEC_PER_MS + ((now_realtime_usec() * clock_realtime_resolution) % (250 * USEC_PER_MS));
+ hb->randomness -= (hb->randomness % clock_realtime_resolution);
+
+ netdata_mutex_lock(&heartbeat_alignment_mutex);
+ hb->statistics_id = heartbeat_alignment_id;
+ heartbeat_alignment_id++;
+ netdata_mutex_unlock(&heartbeat_alignment_mutex);
+
+ if(hb->statistics_id < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE) {
+ heartbeat_alignment_values[hb->statistics_id].dt = 0;
+ heartbeat_alignment_values[hb->statistics_id].sequence = 0;
+ }
}
// waits for the next heartbeat
@@ -164,96 +290,73 @@ inline void heartbeat_init(heartbeat_t *hb) {
// it returns the dt using the realtime clock
usec_t heartbeat_next(heartbeat_t *hb, usec_t tick) {
- heartbeat_t now;
- now.monotonic = now_monotonic_usec();
- now.realtime = now_realtime_usec();
-
- usec_t next_monotonic = now.monotonic - (now.monotonic % tick) + tick;
-
- while(now.monotonic < next_monotonic) {
- sleep_usec(next_monotonic - now.monotonic);
- now.monotonic = now_monotonic_usec();
- now.realtime = now_realtime_usec();
+ if(unlikely(hb->randomness > tick / 2)) {
+ // TODO: The heartbeat tick should be specified at the heartbeat_init() function
+ usec_t tmp = (now_realtime_usec() * clock_realtime_resolution) % (tick / 2);
+ info("heartbeat randomness of %llu is too big for a tick of %llu - setting it to %llu", hb->randomness, tick, tmp);
+ hb->randomness = tmp;
}
- if(likely(hb->realtime != 0ULL)) {
- usec_t dt_monotonic = now.monotonic - hb->monotonic;
- usec_t dt_realtime = now.realtime - hb->realtime;
+ usec_t dt;
+ usec_t now = now_realtime_usec();
+ usec_t next = now - (now % tick) + tick + hb->randomness;
- hb->monotonic = now.monotonic;
- hb->realtime = now.realtime;
+ // align the next time we want to the clock resolution
+ if(next % clock_realtime_resolution)
+ next = next - (next % clock_realtime_resolution) + clock_realtime_resolution;
- if(unlikely(dt_monotonic >= tick + tick / 2)) {
- errno = 0;
- error("heartbeat missed %llu monotonic microseconds", dt_monotonic - tick);
- }
+ // sleep_usec() has a loop to guarantee we will sleep for at least the requested time.
+ // According the specs, when we sleep for a relative time, clock adjustments should not affect the duration
+ // we sleep.
+ sleep_usec(next - now);
+ now = now_realtime_usec();
+ dt = now - hb->realtime;
- return dt_realtime;
+ if(hb->statistics_id < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE) {
+ heartbeat_alignment_values[hb->statistics_id].dt += now - next;
+ heartbeat_alignment_values[hb->statistics_id].sequence++;
}
- else {
- hb->monotonic = now.monotonic;
- hb->realtime = now.realtime;
- return 0ULL;
+
+ if(unlikely(now < next)) {
+ errno = 0;
+ error("heartbeat clock: woke up %llu microseconds earlier than expected (can be due to the CLOCK_REALTIME set to the past).", next - now);
+ }
+ else if(unlikely(now - next > tick / 2)) {
+ errno = 0;
+ error("heartbeat clock: woke up %llu microseconds later than expected (can be due to system load or the CLOCK_REALTIME set to the future).", now - next);
}
-}
-// returned the elapsed time, since the last heartbeat
-// using the monotonic clock
+ if(unlikely(!hb->realtime)) {
+ // the first time return zero
+ dt = 0;
+ }
-inline usec_t heartbeat_monotonic_dt_to_now_usec(heartbeat_t *hb) {
- if(!hb || !hb->monotonic) return 0ULL;
- return now_monotonic_usec() - hb->monotonic;
+ hb->realtime = now;
+ return dt;
}
-int sleep_usec(usec_t usec) {
-
-#ifndef NETDATA_WITH_USLEEP
+void sleep_usec(usec_t usec) {
// we expect microseconds (1.000.000 per second)
// but timespec is nanoseconds (1.000.000.000 per second)
struct timespec rem, req = {
- .tv_sec = (time_t) (usec / 1000000),
- .tv_nsec = (suseconds_t) ((usec % 1000000) * 1000)
+ .tv_sec = (time_t) (usec / USEC_PER_SEC),
+ .tv_nsec = (suseconds_t) ((usec % USEC_PER_SEC) * NSEC_PER_USEC)
};
- while (nanosleep(&req, &rem) == -1) {
+ while ((errno = clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem)) != 0) {
if (likely(errno == EINTR)) {
- debug(D_SYSTEM, "nanosleep() interrupted (while sleeping for %llu microseconds).", usec);
req.tv_sec = rem.tv_sec;
req.tv_nsec = rem.tv_nsec;
} else {
- error("Cannot nanosleep() for %llu microseconds.", usec);
+ error("Cannot clock_nanosleep(CLOCK_REALTIME) for %llu microseconds.", usec);
break;
}
}
-
- return 0;
-#else
- int ret = usleep(usec);
- if(unlikely(ret == -1 && errno == EINVAL)) {
- // on certain systems, usec has to be up to 999999
- if(usec > 999999) {
- int counter = usec / 999999;
- while(counter--)
- usleep(999999);
-
- usleep(usec % 999999);
- }
- else {
- error("Cannot usleep() for %llu microseconds.", usec);
- return ret;
- }
- }
-
- if(ret != 0)
- error("usleep() failed for %llu microseconds.", usec);
-
- return ret;
-#endif
}
static inline collected_number uptime_from_boottime(void) {
#ifdef CLOCK_BOOTTIME_IS_AVAILABLE
- return now_boottime_usec() / 1000;
+ return (collected_number)(now_boottime_usec() / USEC_PER_MS);
#else
error("uptime cannot be read from CLOCK_BOOTTIME on this system.");
return 0;
diff --git a/libnetdata/clocks/clocks.h b/libnetdata/clocks/clocks.h
index 3c9ee28bad..53c036ece9 100644
--- a/libnetdata/clocks/clocks.h
+++ b/libnetdata/clocks/clocks.h
@@ -22,8 +22,9 @@ typedef unsigned long long usec_t;
typedef long long susec_t;
typedef struct heartbeat {
- usec_t monotonic;
usec_t realtime;
+ usec_t randomness;
+ size_t statistics_id;
} heartbeat_t;
/* Linux value is as good as any other */
@@ -36,20 +37,14 @@ typedef struct heartbeat {
#define CLOCK_MONOTONIC CLOCK_REALTIME
#endif
-/* Prefer CLOCK_MONOTONIC_COARSE where available to reduce overhead. It has the same semantics as CLOCK_MONOTONIC */
-#ifndef CLOCK_MONOTONIC_COARSE
-/* fallback to CLOCK_MONOTONIC if not available */
-#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC
-#endif
-
#ifndef CLOCK_BOOTTIME
#ifdef CLOCK_UPTIME
/* CLOCK_BOOTTIME falls back to CLOCK_UPTIME on FreeBSD */
#define CLOCK_BOOTTIME CLOCK_UPTIME
#else // CLOCK_UPTIME
-/* CLOCK_BOOTTIME falls back to CLOCK_MONOTONIC */
-#define CLOCK_BOOTTIME CLOCK_MONOTONIC_COARSE
+/* CLOCK_BOOTTIME falls back to CLOCK_REALTIME */
+#define CLOCK_BOOTTIME CLOCK_REALTIME
#endif // CLOCK_UPTIME
#else // CLOCK_BOOTTIME
@@ -115,8 +110,6 @@ extern int clock_gettime(clockid_t clk_id, struct timespec *ts);
* All now_*_sec() functions return the time in seconds from the appropriate clock, or 0 on error.
* All now_*_usec() functions return the time in microseconds from the appropriate clock, or 0 on error.
*
- * Most functions will attempt to use CLOCK_MONOTONIC_COARSE if available to reduce contention overhead and improve
- * performance scaling. If high precision is required please use one of the available now_*_high_precision_* functions.
*/
extern int now_realtime_timeval(struct timeval *tv);
extern time_t now_realtime_sec(void);
@@ -146,10 +139,9 @@ extern void heartbeat_init(heartbeat_t *hb);
*/
extern usec_t heartbeat_next(heartbeat_t *hb, usec_t tick);
-/* Returns elapsed time in microseconds since last heartbeat */
-extern usec_t heartbeat_monotonic_dt_to_now_usec(heartbeat_t *hb);
+extern void heartbeat_statistics(usec_t *min_ptr, usec_t *max_ptr, usec_t *average_ptr, size_t *count_ptr);
-extern int sleep_usec(usec_t usec);
+extern void sleep_usec(usec_t usec);
extern void clocks_init(void);
@@ -160,4 +152,9 @@ extern int now_timeval(clockid_t clk_id, struct timeval *tv);
extern collected_number uptime_msec(char *filename);
+extern usec_t clock_monotonic_resolution;
+extern usec_t clock_realtime_resolution;
+
+extern void sleep_to_absolute_time(usec_t usec);
+
#endif /* NETDATA_CLOCKS_H */
diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h
index d197f3f7c6..34062f2a65 100644
--- a/libnetdata/libnetdata.h
+++ b/libnetdata/libnetdata.h
@@ -346,6 +346,7 @@ extern char *netdata_configured_host_prefix;
#include "health/health.h"
#include "string/utf8.h"
#include "onewayalloc/onewayalloc.h"
+#include "worker_utilization/worker_utilization.h"
// BEWARE: Outside of the C code this also exists in alarm-notify.sh
#define DEFAULT_CLOUD_BASE_URL "https://app.netdata.cloud"
diff --git a/libnetdata/worker_utilization/Makefile.am b/libnetdata/worker_utilization/Makefile.am
new file mode 100644
index 0000000000..161784b8f6
--- /dev/null
+++ b/libnetdata/worker_utilization/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/libnetdata/worker_utilization/README.md b/libnetdata/worker_utilization/README.md
new file mode 100644
index 0000000000..85e532ed10
--- /dev/null
+++ b/libnetdata/worker_utilization/README.md
@@ -0,0 +1,58 @@
+<!--
+title: "Worker Utilization"
+custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/onewayallocator/README.md
+-->
+
+# Worker Utilization
+
+This library is to be used when there are 1 or more worker threads accepting requests of some kind and servicing them.
+The goal is to provide a very simple way to monitor worker threads utilization, as a percentage of the time they are busy and the amount of requests served.
+
+## How to use
+
+When a working thread starts, call:
+
+```c
+void worker_register(const char *name);
+```
+
+This will create the necessary structures for the library to work.
+No need to keep a pointer to them. They are allocated as `__thread` variables.
+
+When the thread stops, call:
+
+```c
+void worker_unregister(void)
+```
+
+Again, no parameters, or return values.
+
+When you are about to do some work in the working thread, call:
+
+```c
+void worker_is_busy(void)
+```
+
+When you finish doing the job, call:
+
+```c
+void worker_is_idle(void)
+```
+
+Calls to `worker_is_busy()` can be made one after another (without calling
+`worker_is_idle()` between them) to switch jobs without losing any time between
+them and eliminating one of the 2 clock calls involved.
+
+## Implementation details
+
+Totally lockless, extremely fast, it should not introduce any kind of problems to the workers.
+Every time `worker_is_busy()` or `worker_is_idle()` are called, a call to `now_realtime_usec()`
+is done and a couple of variables are updated. That's it!
+
+The worker does not need to update the variables regularly. Based on the last status of the worker,
+the statistics collector of netdata will calculate if the thread is busy or idle all the time or
+part of the time. Works well for both thousands of jobs per second and unlimited working time
+(being totally busy with a single request for ages).
+
+The statistics collector is called by the global statistics thread of netdata. So, even if the workers
+are extremely busy with their jobs, netdata will be able to know how busy they are.
diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c
new file mode 100644
index 0000000000..459df2f265
--- /dev/null
+++ b/libnetdata/worker_utilization/worker_utilization.c
@@ -0,0 +1,201 @@
+#include "worker_utilization.h"
+
+#define WORKER_IDLE 'I'
+#define WORKER_BUSY 'B'
+
+struct worker_job_type {
+ char name[WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH + 1];
+ size_t worker_jobs_started;
+ usec_t worker_busy_time;
+
+ size_t statistics_jobs_started;
+ usec_t statistics_busy_time;
+};
+
+struct worker {
+ pid_t pid;
+ const char *tag;
+ const char *workname;
+ uint32_t workname_hash;
+
+ // only one variable is set by our statistics callers
+ usec_t statistics_last_checkpoint;
+ size_t statistics_last_jobs_started;
+ usec_t statistics_last_busy_time;
+
+ // the worker controlled variables
+ size_t job_id;
+ volatile size_t jobs_started;
+ volatile usec_t busy_time;
+ volatile usec_t last_action_timestamp;
+ volatile char last_action;
+
+ struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES];
+
+ struct worker *next;
+};
+
+static netdata_mutex_t base_lock = NETDATA_MUTEX_INITIALIZER;
+static struct worker *base = NULL;
+static __thread struct worker *worker = NULL;
+
+void worker_register(const char *workname) {
+ if(unlikely(worker)) return;
+
+ worker = callocz(1, sizeof(struct worker));
+ worker->pid = gettid();
+ worker->tag = strdupz(netdata_thread_tag());
+ worker->workname = strdupz(workname);
+ worker->workname_hash = simple_hash(worker->workname);
+
+ usec_t now = now_realtime_usec();
+ worker->statistics_last_checkpoint = now;
+ worker->last_action_timestamp = now;
+ worker->last_action = WORKER_IDLE;
+
+ netdata_mutex_lock(&base_lock);
+ worker->next = base;
+ base = worker;
+ netdata_mutex_unlock(&base_lock);
+}
+
+void worker_register_job_name(size_t job_id, const char *name) {
+ if(unlikely(!worker)) return;
+
+ if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) {
+ error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1));
+ return;
+ }
+
+ strncpy(worker->per_job_type[job_id].name, name, WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH);
+}
+
+void worker_unregister(void) {
+ if(unlikely(!worker)) return;
+
+ netdata_mutex_lock(&base_lock);
+ if(base == worker)
+ base = worker->next;
+ else {
+ struct worker *p;
+ for(p = base; p && p->next && p->next != worker ;p = p->next);
+ if(p && p->next == worker)
+ p->next = worker->next;
+ }
+ netdata_mutex_unlock(&base_lock);
+
+ freez((void *)worker->tag);
+ freez((void *)worker->workname);
+ freez(worker);
+
+ worker = NULL;
+}
+
+static inline void worker_is_idle_with_time(usec_t now) {
+ usec_t delta = now - worker->last_action_timestamp;
+ worker->busy_time += delta;
+ worker->per_job_type[worker->job_id].worker_busy_time += delta;
+
+ // the worker was busy
+ // set it to idle before we set the timestamp
+
+ worker->last_action = WORKER_IDLE;
+ if(likely(worker->last_action_timestamp < now))
+ worker->last_action_timestamp = now;
+}
+
+void worker_is_idle(void) {
+ if(unlikely(!worker)) return;
+ if(unlikely(worker->last_action != WORKER_BUSY)) return;
+
+ worker_is_idle_with_time(now_realtime_usec());
+}
+
+void worker_is_busy(size_t job_id) {
+ if(unlikely(!worker)) return;
+ if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
+ job_id = 0;
+
+ usec_t now = now_realtime_usec();
+
+ if(worker->last_action == WORKER_BUSY)
+ worker_is_idle_with_time(now);
+
+ // the worker was idle
+ // set the timestamp and then set it to busy
+
+ worker->job_id = job_id;
+ worker->per_job_type[job_id].worker_jobs_started++;
+ worker->jobs_started++;
+ worker->last_action_timestamp = now;
+ worker->last_action = WORKER_BUSY;
+}
+
+
+// statistics interface
+
+void workers_foreach(const char *workname, void (*callback)(void *data, pid_t pid, const char *thread_tag, size_t utilization_usec, size_t duration_usec, size_t jobs_started, size_t is_running, const char **job_types_names, size_t *job_types_jobs_started, usec_t *job_types_busy_time), void *data) {
+ netdata_mutex_lock(&base_lock);
+ uint32_t hash = simple_hash(workname);
+ usec_t busy_time, delta;
+ size_t i, jobs_started, jobs_running;
+
+ struct worker *p;
+ for(p = base; p ; p = p->next) {
+ if(hash != p->workname_hash || strcmp(workname, p->workname)) continue;
+
+ usec_t now = now_realtime_usec();
+
+ // find per job type statistics
+ const char *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];
+ size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES];
+ usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES];
+ for(i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) {
+ per_job_type_name[i] = p->per_job_type[i].name;
+
+ size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started;
+ per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_jobs_started;
+ p->per_job_type[i].statistics_jobs_started = tmp_jobs_started;
+
+ usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time;
+ per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_busy_time;
+ p->per_job_type[i].statistics_busy_time = tmp_busy_time;
+ }
+
+ // get a copy of the worker variables
+ usec_t worker_busy_time = p->busy_time;
+ size_t worker_jobs_started = p->jobs_started;
+ char worker_last_action = p->last_action;
+ usec_t worker_last_action_timestamp = p->last_action_timestamp;
+
+ // this is the only variable both the worker thread and the statistics thread are writing
+ // we set this only when the worker is busy, so that worker will not
+ // accumulate all the busy time, but only the time after the point we collected statistics
+ if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY)
+ p->last_action_timestamp = now;
+
+ // calculate delta busy time
+ busy_time = worker_busy_time - p->statistics_last_busy_time;
+ p->statistics_last_busy_time = worker_busy_time;
+
+ // calculate delta jobs done
+ jobs_started = worker_jobs_started - p->statistics_last_jobs_started;
+ p->statistics_last_jobs_started = worker_jobs_started;
+
+ jobs_running = 0;
+ if(worker_last_action == WORKER_BUSY) {
+ // the worker is still busy with something
+ // let's add that busy time to the reported one
+ busy_time += now - worker_last_action_timestamp;
+ jobs_running = 1;
+ }
+
+ delta = now - p->statistics_last_checkpoint;
+
+ p->statistics_last_checkpoint = now;
+
+ callback(data, p->pid, p->tag, busy_time, delta, jobs_started, jobs_running, per_job_type_name, per_job_type_jobs_started, per_job_type_busy_time);
+ }
+
+ netdata_mutex_unlock(&base_lock);
+}
diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h
new file mode 100644
index 0000000000..8f16fe0549
--- /dev/null
+++ b/libnetdata/worker_utilization/worker_utilization.h
@@ -0,0 +1,22 @@
+#ifndef WORKER_UTILIZATION_H
+#define WORKER_UTILIZATION_H 1
+
+#include "../libnetdata.h"
+
+// workers interfaces
+
+#define WORKER_UTILIZATION_MAX_JOB_TYPES 50
+#define WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH 25
+
+extern void worker_register(const char *workname);
+extern void worker_register_job_name(size_t job_id, const char *name);
+extern void worker_unregister(void);
+
+extern void worker_is_idle(void);
+extern void worker_is_busy(size_t job_id);
+
+// statistics interface
+
+extern void workers_foreach(const char *workname, void (*callback)(void *data, pid_t pid, const char *thread_tag, size_t utilization_usec, size_t duration_usec, size_t jobs_started, size_t is_running, const char **job_types_names, size_t *job_types_jobs_started, usec_t *job_types_busy_time), void *data);
+
+#endif // WORKER_UTILIZATION_H