summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--collectors/plugins.d/pluginsd_parser.c189
-rw-r--r--collectors/plugins.d/pluginsd_parser.h4
-rw-r--r--daemon/global_statistics.c3
-rw-r--r--daemon/main.c12
-rw-r--r--daemon/service.c11
-rw-r--r--daemon/static_threads.c31
-rw-r--r--database/rrd.h15
-rw-r--r--database/rrdcalc.c22
-rw-r--r--database/rrdcontext.c203
-rw-r--r--database/rrddim.c17
-rw-r--r--database/rrdhost.c64
-rw-r--r--database/rrdset.c15
-rw-r--r--database/sqlite/sqlite_functions.c109
-rw-r--r--database/sqlite/sqlite_functions.h5
-rw-r--r--health/health.c2
-rw-r--r--libnetdata/arrayalloc/arrayalloc.c2
-rw-r--r--libnetdata/dictionary/dictionary.c9
-rw-r--r--libnetdata/dictionary/dictionary.h8
-rw-r--r--libnetdata/ebpf/ebpf.c2
-rw-r--r--libnetdata/inlined.h2
-rw-r--r--libnetdata/socket/socket.c57
-rw-r--r--libnetdata/socket/socket.h2
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c24
-rw-r--r--libnetdata/worker_utilization/worker_utilization.h3
-rw-r--r--parser/parser.c59
-rw-r--r--parser/parser.h13
-rw-r--r--streaming/compression.c236
-rw-r--r--streaming/receiver.c428
-rw-r--r--streaming/replication.c501
-rw-r--r--streaming/replication.h5
-rw-r--r--streaming/rrdpush.c3
-rw-r--r--streaming/rrdpush.h33
-rw-r--r--streaming/sender.c257
-rw-r--r--web/api/queries/query.c189
-rw-r--r--web/api/web_api_v1.c2
-rw-r--r--web/server/web_client.c6
36 files changed, 1672 insertions, 871 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 3e0f83422c..97d2dca984 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -13,20 +13,44 @@ static int send_to_plugin(const char *txt, void *data) {
#ifdef ENABLE_HTTPS
struct netdata_ssl *ssl = parser->ssl_output;
if(ssl) {
- if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- size_t size = strlen(txt);
- return SSL_write(ssl->conn, txt, (int)size);
- }
+ if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt));
- error("cannot write to SSL connection - connection is not ready.");
+ error("PLUGINSD: cannot send command (SSL)");
return -1;
}
#endif
- FILE *fp = parser->output;
- int ret = fprintf(fp, "%s", txt);
- fflush(fp);
- return ret;
+ if(parser->fp_output) {
+ int bytes = fprintf(parser->fp_output, "%s", txt);
+ if(bytes <= 0) {
+ error("PLUGINSD: cannot send command (FILE)");
+ return -2;
+ }
+ fflush(parser->fp_output);
+ return bytes;
+ }
+
+ if(parser->fd != -1) {
+ size_t bytes = 0;
+ size_t total = strlen(txt);
+ ssize_t sent;
+
+ do {
+ sent = write(parser->fd, &txt[bytes], total - bytes);
+ if(sent <= 0) {
+ error("PLUGINSD: cannot send command (fd)");
+ return -3;
+ }
+ bytes += sent;
+ }
+ while(bytes < total);
+
+ return (int)bytes;
+ }
+
+ error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ return -4;
}
PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
@@ -293,9 +317,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
// rrdhost_hostname(host), rrdset_id(st),
// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+
+ ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
+ last_entry_child, 0, 0);
+ }
+ else {
+ internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
+ }
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
@@ -425,7 +465,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
pf->sent_ut = now_realtime_usec();
if(ret < 0) {
- error("FUNCTION: failed to send function to plugin, fprintf() returned error %d", ret);
+ error("FUNCTION: failed to send function to plugin, error %d", ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
}
else {
@@ -847,42 +887,54 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
+ char *child_now_str = get_word(words, num_words, 4);
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
if (unlikely(!id || (!st && !*id))) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
goto disable;
}
if(*id) {
st = rrdset_find(host, id);
if (unlikely(!st)) {
- error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
id, rrdhost_hostname(host));
goto disable;
}
((PARSER_USER_OBJECT *) user)->st = st;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ }
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && !rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) {
+ error("REPLAY: chart '%s' on host '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st), rrdhost_hostname(host));
+ rrdset_isnot_obsolete(st);
}
if(start_time_str && end_time_str) {
time_t start_time = strtol(start_time_str, NULL, 0);
time_t end_time = strtol(end_time_str, NULL, 0);
- if(start_time && end_time) {
- if (start_time > end_time) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
- rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time);
- goto disable;
- }
+ time_t wall_clock_time = 0, tolerance;
+ if(child_now_str) {
+ wall_clock_time = strtol(child_now_str, NULL, 0);
+ tolerance = 1;
+ }
+
+ if(wall_clock_time <= 0) {
+ wall_clock_time = now_realtime_sec();
+ tolerance = st->update_every + 60;
+ }
+
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);
- if (end_time - start_time != st->update_every)
+ if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
+ if (unlikely(end_time - start_time != st->update_every))
rrdset_set_update_every(st, end_time - start_time);
st->last_collected_time.tv_sec = end_time;
@@ -891,11 +943,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
st->last_updated.tv_sec = end_time;
st->last_updated.tv_usec = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
-
st->counter++;
st->counter_done++;
@@ -903,9 +950,31 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
st->current_entry++;
if(st->current_entry >= st->entries)
st->current_entry -= st->entries;
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+
+ return PARSER_RC_OK;
}
+
+ internal_error(true,
+ "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, but timestamps are invalid (now is %ld).",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, wall_clock_time);
}
+ // the child sends an RBEGIN without any parameters initially
+ // setting rset_enabled to false, means the RSET should not store any metrics
+ // to store metrics, the RBEGIN needs to have timestamps
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
return PARSER_RC_OK;
disable:
@@ -915,6 +984,9 @@ disable:
PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
{
+ if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled)
+ return PARSER_RC_OK;
+
char *dimension = get_word(words, num_words, 1);
char *value_str = get_word(words, num_words, 2);
char *flags_str = get_word(words, num_words, 3);
@@ -923,20 +995,22 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
dimension, rrdhost_hostname(host));
goto disable;
}
if (unlikely(!dimension || !*dimension)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
rrdset_id(st), rrdhost_hostname(host));
goto disable;
}
if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
- dimension, rrdhost_hostname(host));
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ dimension, rrdhost_hostname(host),
+ ((PARSER_USER_OBJECT *) user)->replay.start_time,
+ ((PARSER_USER_OBJECT *) user)->replay.end_time);
goto disable;
}
@@ -946,14 +1020,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
if(unlikely(!flags_str))
flags_str = "";
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str);
-
if (likely(value_str)) {
RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
if(unlikely(!rd)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
goto disable;
}
@@ -961,7 +1032,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
- error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
+ error("REPLAY: dimension '%s' in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
rrddim_isnot_obsolete(st, rd);
}
@@ -998,7 +1069,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
rd->collections_counter++;
}
else
- error("Dimension %s in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
+ error("REPLAY: dimension '%s' in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
rrddim_acquired_release(rda);
}
@@ -1021,13 +1092,13 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
dimension, rrdhost_hostname(host));
goto disable;
}
if (unlikely(!dimension || !*dimension)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
rrdset_id(st), rrdhost_hostname(host));
goto disable;
}
@@ -1035,7 +1106,7 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
if(unlikely(!rd)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
goto disable;
}
@@ -1067,7 +1138,7 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
rrdhost_hostname(host));
goto disable;
}
@@ -1117,7 +1188,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
rrdhost_hostname(host));
return PARSER_RC_ERROR;
}
@@ -1132,15 +1203,41 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
((PARSER_USER_OBJECT *) user)->st = NULL;
((PARSER_USER_OBJECT *) user)->count++;
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ time_t now = now_realtime_sec();
+ time_t started = st->rrdhost->receiver->replication_first_time_t;
+ time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
+ }
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+
st->counter++;
st->counter_done++;
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
if (start_streaming) {
if (st->update_every != update_every_child)
rrdset_set_update_every(st, update_every_child);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
+
return PARSER_RC_OK;
}
@@ -1191,7 +1288,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
};
// fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL);
+ PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
rrd_collector_started();
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index a76ab211fe..e18b43e580 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -26,6 +26,10 @@ typedef struct parser_user_object {
usec_t start_time_ut;
usec_t end_time_ut;
+
+ time_t wall_clock_time;
+
+ bool rset_enabled;
} replay;
} PARSER_USER_OBJECT;
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index e81fc49d11..2cdf1f71f3 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -1848,6 +1848,7 @@ static struct worker_utilization all_workers_utilization[] = {
{ .name = "TIMEX", .family = "workers plugin timex", .priority = 1000000 },
{ .name = "IDLEJITTER", .family = "workers plugin idlejitter", .priority = 1000000 },
{ .name = "RRDCONTEXT", .family = "workers contexts", .priority = 1000000 },
+ { .name = "REPLICATION", .family = "workers replication sender", .priority = 1000000 },
{ .name = "SERVICE", .family = "workers service", .priority = 1000000 },
// has to be terminated with a NULL
@@ -2203,7 +2204,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
{
size_t i;
for (i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES; i++) {
- if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL)
+ if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENT && wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL_TOTAL)
continue;
if(!wu->per_job_type[i].count_value)
diff --git a/daemon/main.c b/daemon/main.c
index 5c437d208d..67d24d6977 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -4,6 +4,7 @@
#include "buildinfo.h"
#include "static_threads.h"
+bool unittest_running = false;
int netdata_zero_metrics_enabled;
int netdata_anonymous_statistics_enabled;
@@ -678,7 +679,7 @@ static void get_netdata_configured_variables() {
// ------------------------------------------------------------------------
// get default Database Engine page cache size in MiB
- db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO);
+ db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_YES);
default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb);
if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) {
error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB);
@@ -982,6 +983,8 @@ int main(int argc, char **argv) {
}
if(strcmp(optarg, "unittest") == 0) {
+ unittest_running = true;
+
if (unit_test_static_threads())
return 1;
if (unit_test_buffer())
@@ -1028,24 +1031,31 @@ int main(int argc, char **argv) {
#endif
#ifdef ENABLE_DBENGINE
else if(strcmp(optarg, "mctest") == 0) {
+ unittest_running = true;
return mc_unittest();
}
else if(strcmp(optarg, "ctxtest") == 0) {
+ unittest_running = true;
return ctx_unittest();
}
else if(strcmp(optarg, "dicttest") == 0) {
+ unittest_running = true;
return dictionary_unittest(10000);
}
else if(strcmp(optarg, "araltest") == 0) {
+ unittest_running = true;
return aral_unittest(10000);
}
else if(strcmp(optarg, "stringtest") == 0) {
+ unittest_running = true;
return string_unittest(10000);
}
else if(strcmp(optarg, "rrdlabelstest") == 0) {
+ unittest_running = true;
return rrdlabels_unittest();
}
else if(strcmp(optarg, "metatest") == 0) {
+ unittest_running = true;
return metadata_unittest();
}
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
diff --git a/daemon/service.c b/daemon/service.c
index be1aad8164..28fdc84756 100644
--- a/daemon/service.c
+++ b/daemon/service.c
@@ -156,17 +156,20 @@ static void svc_rrdhost_cleanup_obsolete_charts(RRDHOST *host) {
static void svc_rrdset_check_obsoletion(RRDHOST *host) {
worker_is_busy(WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK);
+ time_t now = now_realtime_sec();
time_t last_entry_t;
RRDSET *st;
rrdset_foreach_read(st, host) {
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+ continue;
+
last_entry_t = rrdset_last_entry_t(st);
- if(last_entry_t && last_entry_t < host->senders_connect_time && host->senders_connect_time
- + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every
- < now_realtime_sec())
+ if(last_entry_t && last_entry_t < host->senders_connect_time &&
+ host->senders_connect_time + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every
+ < now)
rrdset_is_obsolete(st);
-
}
rrdset_foreach_done(st);
}
diff --git a/daemon/static_threads.c b/daemon/static_threads.c
index 707866ee68..9d2f30c355 100644
--- a/daemon/static_threads.c
+++ b/daemon/static_threads.c
@@ -2,16 +2,17 @@
#include "common.h"
-extern void *aclk_main(void *ptr);
-extern void *analytics_main(void *ptr);
-extern void *checks_main(void *ptr);
-extern void *cpuidlejitter_main(void *ptr);
-extern void *global_statistics_main(void *ptr);
-extern void *health_main(void *ptr);
-extern void *pluginsd_main(void *ptr);
-extern void *service_main(void *ptr);
-extern void *statsd_main(void *ptr);
-extern void *timex_main(void *ptr);
+void *aclk_main(void *ptr);
+void *analytics_main(void *ptr);
+void *checks_main(void *ptr);
+void *cpuidlejitter_main(void *ptr);
+void *global_statistics_main(void *ptr);
+void *health_main(void *ptr);
+void *pluginsd_main(void *ptr);
+void *service_main(void *ptr);
+void *statsd_main(void *ptr);
+void *timex_main(void *ptr);
+void *replication_thread_main(void *ptr __maybe_unused);
extern bool global_statistics_enabled;
@@ -140,6 +141,16 @@ const struct netdata_static_thread static_threads_common[] = {
.start_routine = rrdcontext_main
},
+ {
+ .name = "replication",
+ .config_section = NULL,
+ .config_name = NULL,
+ .enabled = 1,
+ .thread = NULL,
+ .init_routine = NULL,
+ .start_routine = replication_thread_main
+ },
+
// terminator
{
.name = NULL,
diff --git a/database/rrd.h b/database/rrd.h
index 0069715b90..b548aa7169 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -55,6 +55,7 @@ struct pg_cache_page_index;
#include "sqlite/sqlite_health.h"
#include "rrdcontext.h"
+extern bool unittest_running;
extern bool dbengine_enabled;
extern size_t storage_tiers;
extern size_t storage_tiers_grouping_iterations[RRD_STORAGE_TIERS];
@@ -533,8 +534,9 @@ typedef enum rrdset_flags {
RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 22), // the sending side has completed replication
RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 23), // the receiving side has completed replication
+ RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress
- RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 24), // a custom variable has been updated and needs to be exposed to parent
+ RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 25), // a custom variable has been updated and needs to be exposed to parent
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
@@ -658,6 +660,14 @@ struct rrdset {
netdata_rwlock_t rwlock; // protection for RRDCALC *base
RRDCALC *base; // double linked list of RRDCALC related to this RRDSET
} alerts;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct {
+ bool start_streaming;
+ time_t after;
+ time_t before;
+ } replay;
+#endif
};
#define rrdset_plugin_name(st) string2str((st)->plugin_name)
@@ -757,6 +767,8 @@ typedef enum {
// Configuration options
RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS = (1 << 3), // delete files of obsolete charts
RRDHOST_OPTION_DELETE_ORPHAN_HOST = (1 << 4), // delete the entire host when orphan
+
+ RRDHOST_OPTION_REPLICATION