summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
Diffstat (limited to 'collectors')
-rw-r--r--collectors/all.h7
-rw-r--r--collectors/cgroups.plugin/sys_fs_cgroup.c27
-rw-r--r--collectors/diskspace.plugin/plugin_diskspace.c16
-rw-r--r--collectors/idlejitter.plugin/plugin_idlejitter.c2
-rw-r--r--collectors/plugins.d/plugins_d.c8
-rw-r--r--collectors/plugins.d/pluginsd_parser.c26
-rw-r--r--collectors/proc.plugin/plugin_proc.c6
-rw-r--r--collectors/proc.plugin/proc_net_dev.c4
-rw-r--r--collectors/statsd.plugin/statsd.c29
-rw-r--r--collectors/tc.plugin/plugin_tc.c6
-rw-r--r--collectors/timex.plugin/plugin_timex.c2
11 files changed, 83 insertions, 50 deletions
diff --git a/collectors/all.h b/collectors/all.h
index 8774eb702a..958ed7c214 100644
--- a/collectors/all.h
+++ b/collectors/all.h
@@ -382,4 +382,11 @@
#define NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS 890007
#define NETDATA_ML_CHART_PRIO_TRAINING_RESULTS 890008
+#define NETDATA_ML_CHART_FAMILY "ml - machine learning"
+#define NETDATA_ML_PLUGIN "ml.plugin"
+#define NETDATA_ML_MODULE_TRAINING "training"
+#define NETDATA_ML_MODULE_DETECTION "detection"
+#define NETDATA_ML_MODULE_PREDICTION "prediction"
+
+
#endif //NETDATA_ALL_H
diff --git a/collectors/cgroups.plugin/sys_fs_cgroup.c b/collectors/cgroups.plugin/sys_fs_cgroup.c
index 8f7548286c..f9dc085b46 100644
--- a/collectors/cgroups.plugin/sys_fs_cgroup.c
+++ b/collectors/cgroups.plugin/sys_fs_cgroup.c
@@ -2754,10 +2754,20 @@ static inline void discovery_find_all_cgroups() {
debug(D_CGROUP, "done searching for cgroups");
}
+static void cgroup_discovery_cleanup(void *ptr) {
+ UNUSED(ptr);
+
+ discovery_thread.exited = 1;
+ worker_unregister();
+ service_exits();
+}
+
void cgroup_discovery_worker(void *ptr)
{
UNUSED(ptr);
+ netdata_thread_cleanup_push(cgroup_discovery_cleanup, ptr);
+
worker_register("CGROUPSDISC");
worker_register_job_name(WORKER_DISCOVERY_INIT, "init");
worker_register_job_name(WORKER_DISCOVERY_FIND, "find");
@@ -2777,24 +2787,23 @@ void cgroup_discovery_worker(void *ptr)
NULL,
SIMPLE_PATTERN_EXACT);
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
uv_mutex_lock(&discovery_thread.mutex);
- while (!discovery_thread.start_discovery)
+ while (!discovery_thread.start_discovery && service_running(SERVICE_COLLECTORS))
uv_cond_wait(&discovery_thread.cond_var, &discovery_thread.mutex);
discovery_thread.start_discovery = 0;
uv_mutex_unlock(&discovery_thread.mutex);
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
discovery_find_all_cgroups();
}
- discovery_thread.exited = 1;
- worker_unregister();
-}
+ netdata_thread_cleanup_pop(1);
+}
// ----------------------------------------------------------------------------
// generate charts
@@ -4853,11 +4862,11 @@ void *cgroups_main(void *ptr) {
usec_t step = cgroup_update_every * USEC_PER_SEC;
usec_t find_every = cgroup_check_for_new_every * USEC_PER_SEC, find_dt = 0;
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
usec_t hb_dt = heartbeat_next(&hb, step);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
find_dt += hb_dt;
if (unlikely(find_dt >= find_every || (!is_inside_k8s && cgroups_check))) {
@@ -4872,9 +4881,11 @@ void *cgroups_main(void *ptr) {
worker_is_busy(WORKER_CGROUPS_READ);
read_all_discovered_cgroups(cgroup_root);
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
worker_is_busy(WORKER_CGROUPS_CHART);
update_cgroup_charts(cgroup_update_every);
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
worker_is_idle();
uv_mutex_unlock(&cgroup_root_mutex);
diff --git a/collectors/diskspace.plugin/plugin_diskspace.c b/collectors/diskspace.plugin/plugin_diskspace.c
index e806a33602..81604e2048 100644
--- a/collectors/diskspace.plugin/plugin_diskspace.c
+++ b/collectors/diskspace.plugin/plugin_diskspace.c
@@ -515,7 +515,7 @@ void *diskspace_slow_worker(void *ptr)
heartbeat_t hb;
heartbeat_init(&hb);
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
heartbeat_next(&hb, USEC_PER_SEC);
@@ -530,7 +530,7 @@ void *diskspace_slow_worker(void *ptr)
if (!dict_mountpoints)
continue;
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
// --------------------------------------------------------------------------
// disk space metrics
@@ -547,10 +547,10 @@ void *diskspace_slow_worker(void *ptr)
for(bmi = slow_mountinfo_root; bmi; bmi = bmi->next) {
do_slow_disk_space_stats(bmi, slow_update_every);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
}
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
worker_is_busy(WORKER_JOB_SLOW_CLEANUP);
@@ -640,11 +640,11 @@ void *diskspace_main(void *ptr) {
usec_t step = update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
/* usec_t hb_dt = */ heartbeat_next(&hb, step);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
// --------------------------------------------------------------------------
// this is smart enough not to reload it every time
@@ -671,11 +671,11 @@ void *diskspace_main(void *ptr) {
worker_is_busy(WORKER_JOB_MOUNTPOINT);
do_disk_space_stats(mi, update_every);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
}
netdata_mutex_unlock(&slow_mountinfo_mutex);
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
if(dict_mountpoints) {
worker_is_busy(WORKER_JOB_CLEANUP);
diff --git a/collectors/idlejitter.plugin/plugin_idlejitter.c b/collectors/idlejitter.plugin/plugin_idlejitter.c
index b6339cc0fc..df0f9b9515 100644
--- a/collectors/idlejitter.plugin/plugin_idlejitter.c
+++ b/collectors/idlejitter.plugin/plugin_idlejitter.c
@@ -48,7 +48,7 @@ void *cpuidlejitter_main(void *ptr) {
usec_t update_every_ut = localhost->rrd_update_every * USEC_PER_SEC;
struct timeval before, after;
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
int iterations = 0;
usec_t error_total = 0,
error_min = 0,
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 79abc70708..34130efff3 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -120,7 +120,7 @@ void *pluginsd_worker_thread(void *arg)
cd->obsolete = 0;
size_t count = 0;
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input = NULL;
FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input);
if (unlikely(!fp_child_input || !fp_child_output)) {
@@ -186,12 +186,12 @@ void *pluginsd_main(void *ptr)
// so that we don't log broken directories on each loop
int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 };
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
int idx;
const char *directory_name;
for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) {
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
errno = 0;
@@ -206,7 +206,7 @@ void *pluginsd_main(void *ptr)
struct dirent *file = NULL;
while (likely((file = readdir(dir)))) {
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
debug(D_PLUGINSD, "examining file '%s'", file->d_name);
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 5501c12fad..264d0eca41 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -441,19 +441,20 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
} else
rrddim_isnot_obsolete(st, rd);
+ bool should_update_dimension = false;
+
if (likely(unhide_dimension)) {
rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
- if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
- metaqueue_dimension_update_flags(rd);
- }
+ should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
}
else {
rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
- if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
- metaqueue_dimension_update_flags(rd);
- }
+ should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
+ }
+
+ if (should_update_dimension) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
}
return PARSER_RC_OK;
@@ -883,7 +884,7 @@ PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __may
host->rrdlabels = rrdlabels_create();
rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
- metaqueue_store_host_labels(host->machine_guid);
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
@@ -991,7 +992,7 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
if (unlikely(end_time - start_time != st->update_every))
- rrdset_set_update_every(st, end_time - start_time);
+ rrdset_set_update_every_s(st, end_time - start_time);
st->last_collected_time.tv_sec = end_time;
st->last_collected_time.tv_usec = 0;
@@ -1251,6 +1252,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->counter++;
st->counter_done++;
+ store_metric_collection_completed();
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
st->replay.start_streaming = false;
@@ -1262,7 +1264,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
if (start_streaming) {
if (st->update_every != update_every_child)
- rrdset_set_update_every(st, update_every_child);
+ rrdset_set_update_every_s(st, update_every_child);
if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
@@ -1339,7 +1341,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
user.parser = parser;
while (likely(!parser_next(parser))) {
- if (unlikely(netdata_exit || parser_action(parser, NULL)))
+ if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL)))
break;
}
diff --git a/collectors/proc.plugin/plugin_proc.c b/collectors/proc.plugin/plugin_proc.c
index 1b24df45fd..220f9cb3db 100644
--- a/collectors/proc.plugin/plugin_proc.c
+++ b/collectors/proc.plugin/plugin_proc.c
@@ -128,15 +128,15 @@ void *proc_main(void *ptr)
heartbeat_t hb;
heartbeat_init(&hb);
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
usec_t hb_dt = heartbeat_next(&hb, step);
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
for (i = 0; proc_modules[i].name; i++) {
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
struct proc_module *pm = &proc_modules[i];
diff --git a/collectors/proc.plugin/proc_net_dev.c b/collectors/proc.plugin/proc_net_dev.c
index 4f8a86cb9f..82f3ec1946 100644
--- a/collectors/proc.plugin/proc_net_dev.c
+++ b/collectors/proc.plugin/proc_net_dev.c
@@ -1504,11 +1504,11 @@ void *netdev_main(void *ptr)
heartbeat_t hb;
heartbeat_init(&hb);
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
usec_t hb_dt = heartbeat_next(&hb, step);
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
worker_is_busy(0);
diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c
index 67d7ed2e26..b8a62fb9b1 100644
--- a/collectors/statsd.plugin/statsd.c
+++ b/collectors/statsd.plugin/statsd.c
@@ -234,7 +234,8 @@ typedef struct statsd_app {
// global statsd data
struct collection_thread_status {
- int status;
+ SPINLOCK spinlock;
+ bool running;
size_t max_sockets;
netdata_thread_t thread;
@@ -875,7 +876,7 @@ struct statsd_tcp {
#ifdef HAVE_RECVMMSG
struct statsd_udp {
- int *running;
+ struct collection_thread_status *status;
STATSD_SOCKET_DATA_TYPE type;
size_t size;
struct iovec *iovecs;
@@ -1097,7 +1098,9 @@ static int statsd_snd_callback(POLLINFO *pi, short int *events) {
void statsd_collector_thread_cleanup(void *data) {
struct statsd_udp *d = data;
- *d->running = 0;
+ netdata_spinlock_lock(&d->status->spinlock);
+ d->status->running = false;
+ netdata_spinlock_unlock(&d->status->spinlock);
info("cleaning up...");
@@ -1114,9 +1117,15 @@ void statsd_collector_thread_cleanup(void *data) {
worker_unregister();
}
+static bool statsd_should_stop(void) {
+ return !service_running(SERVICE_COLLECTORS);
+}
+
void *statsd_collector_thread(void *ptr) {
struct collection_thread_status *status = ptr;
- status->status = 1;
+ netdata_spinlock_lock(&status->spinlock);
+ status->running = true;
+ netdata_spinlock_unlock(&status->spinlock);
worker_register("STATSD");
worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect");
@@ -1127,7 +1136,7 @@ void *statsd_collector_thread(void *ptr) {
info("STATSD collector thread started with taskid %d", gettid());
struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1);
- d->running = &status->status;
+ d->status = status;
netdata_thread_cleanup_push(statsd_collector_thread_cleanup, d);
@@ -1152,6 +1161,7 @@ void *statsd_collector_thread(void *ptr) {
, statsd_rcv_callback
, statsd_snd_callback
, NULL
+ , statsd_should_stop
, NULL // No access control pattern
, 0 // No dns lookups for access control pattern
, (void *)d
@@ -2358,13 +2368,15 @@ static void statsd_main_cleanup(void *data) {
if (statsd.collection_threads_status) {
int i;
for (i = 0; i < statsd.threads; i++) {
- if(statsd.collection_threads_status[i].status) {
+ netdata_spinlock_lock(&statsd.collection_threads_status[i].spinlock);
+ if(statsd.collection_threads_status[i].running) {
info("STATSD: stopping data collection thread %d...", i + 1);
netdata_thread_cancel(statsd.collection_threads_status[i].thread);
}
else {
info("STATSD: data collection thread %d found stopped.", i + 1);
}
+ netdata_spinlock_unlock(&statsd.collection_threads_status[i].spinlock);
}
}
@@ -2537,6 +2549,7 @@ void *statsd_main(void *ptr) {
statsd.collection_threads_status[i].max_sockets = max_sockets / statsd.threads;
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_COLLECTOR[%d]", i + 1);
+ netdata_spinlock_init(&statsd.collection_threads_status[i].spinlock);
netdata_thread_create(&statsd.collection_threads_status[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, statsd_collector_thread, &statsd.collection_threads_status[i]);
}
@@ -2753,7 +2766,7 @@ void *statsd_main(void *ptr) {
usec_t step = statsd.update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
heartbeat_next(&hb, step);
@@ -2781,7 +2794,7 @@ void *statsd_main(void *ptr) {
worker_is_busy(WORKER_STATSD_FLUSH_STATS);
statsd_update_all_app_charts();
- if(unlikely(netdata_exit))
+ if(unlikely(!service_running(SERVICE_COLLECTORS)))
break;
if(global_statistics_enabled) {
diff --git a/collectors/tc.plugin/plugin_tc.c b/collectors/tc.plugin/plugin_tc.c
index a2e72ee339..331a787f34 100644
--- a/collectors/tc.plugin/plugin_tc.c
+++ b/collectors/tc.plugin/plugin_tc.c
@@ -929,7 +929,7 @@ void *tc_main(void *ptr) {
snprintfz(command, TC_LINE_MAX, "%s/tc-qos-helper.sh", netdata_configured_primary_plugins_dir);
char *tc_script = config_get("plugin:tc", "script to run to get tc values", command);
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input, *fp_child_output;
struct tc_device *device = NULL;
struct tc_class *class = NULL;
@@ -945,7 +945,7 @@ void *tc_main(void *ptr) {
char buffer[TC_LINE_MAX+1] = "";
while(fgets(buffer, TC_LINE_MAX, fp_child_output) != NULL) {
- if(unlikely(netdata_exit)) break;
+ if(unlikely(!service_running(SERVICE_COLLECTORS))) break;
buffer[TC_LINE_MAX] = '\0';
// debug(D_TC_LOOP, "TC: read '%s'", buffer);
@@ -1162,7 +1162,7 @@ void *tc_main(void *ptr) {
class = NULL;
}
- if(unlikely(netdata_exit))
+ if(unlikely(!service_running(SERVICE_COLLECTORS)))
goto cleanup;
if(code == 1 || code == 127) {
diff --git a/collectors/timex.plugin/plugin_timex.c b/collectors/timex.plugin/plugin_timex.c
index 46cfc57967..84147c8513 100644
--- a/collectors/timex.plugin/plugin_timex.c
+++ b/collectors/timex.plugin/plugin_timex.c
@@ -64,7 +64,7 @@ void *timex_main(void *ptr)
usec_t step = update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
heartbeat_next(&hb, step);
worker_is_busy(0);