summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-25 20:37:15 +0200
committerGitHub <noreply@github.com>2022-11-25 20:37:15 +0200
commit2e874e79163771856e4e756b176b729f7d8b0f0f (patch)
treeeeb1ea10af039001e3290090d5a2d365f99f63c7
parent870acd61123ece7c074242e1b02d47cb7c667e38 (diff)
replication fixes #6 (#14046)
use the faster monotonic clock in workers and replication; avoid unecessary statistics function on every request on replication - gather them all together once every second; check the chart flags on all mirrored hosts, not only the ones that have a sender; cleanup and unify replication logs; added child world time to REND; fix first BEGIN been transmitted when replication starts;
-rw-r--r--collectors/plugins.d/pluginsd_parser.c127
-rw-r--r--database/rrd.h2
-rw-r--r--database/sqlite/sqlite_metadata.c2
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c8
-rw-r--r--streaming/replication.c460
-rw-r--r--streaming/replication.h2
-rw-r--r--streaming/rrdpush.c60
7 files changed, 401 insertions, 260 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 70ce02b420..5501c12fad 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -151,6 +151,20 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
if (microseconds_txt && *microseconds_txt)
microseconds = str2ull(microseconds_txt);
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(st->replay.log_next_data_collection) {
+ st->replay.log_next_data_collection = false;
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
+ st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
+ microseconds
+ );
+ }
+#endif
+
if (likely(st->counter_done)) {
if (likely(microseconds)) {
if (((PARSER_USER_OBJECT *)user)->trust_durations)
@@ -312,6 +326,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
{
const char *first_entry_txt = get_word(words, num_words, 1);
const char *last_entry_txt = get_word(words, num_words, 2);
+ const char *world_time_txt = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
@@ -319,22 +334,14 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- if(unlikely(!first_entry_txt || !last_entry_txt)) {
- error("PLUGINSD: 'host:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " without first or last entry. Disabling it.",
- rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
+ time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
+ time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
- long first_entry_child = str2l(first_entry_txt);
- long last_entry_child = str2l(last_entry_txt);
-
- internal_error(
- (first_entry_child != 0 || last_entry_child != 0)
- && (first_entry_child == 0 || last_entry_child == 0),
- "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
- rrdhost_hostname(host), rrdset_id(st),
- (unsigned long long)first_entry_child, (unsigned long long)last_entry_child
- );
+ if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
+ rrdhost_hostname(host), rrdset_id(st),
+ first_entry_child, last_entry_child, child_world_time);
bool ok = true;
if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
@@ -350,8 +357,9 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
- ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child,
- last_entry_child, 0, 0);
+ ok = replicate_chart_request(send_to_plugin, parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ 0, 0);
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
else {
@@ -910,7 +918,7 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
debug(D_PLUGINSD, "requested to commit chart labels");
@@ -950,28 +958,35 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
((PARSER_USER_OBJECT *) user)->st = st;
if(start_time_str && end_time_str) {
- time_t start_time = strtol(start_time_str, NULL, 0);
- time_t end_time = strtol(end_time_str, NULL, 0);
+ time_t start_time = (time_t)str2ul(start_time_str);
+ time_t end_time = (time_t)str2ul(end_time_str);
time_t wall_clock_time = 0, tolerance;
bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
if(child_now_str) {
- wall_clock_time = strtol(child_now_str, NULL, 0);
+ wall_clock_time = (time_t)str2ul(child_now_str);
tolerance = st->update_every + 1;
wall_clock_comes_from_child = true;
}
if(wall_clock_time <= 0) {
wall_clock_time = now_realtime_sec();
- tolerance = st->update_every + 60;
+ tolerance = st->update_every + 5;
wall_clock_comes_from_child = false;
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(
(!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
- "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+ "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+
+ internal_error(
+ true,
+ "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
+ st->replay.after, st->replay.before);
#endif
if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
@@ -1002,10 +1017,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
return PARSER_RC_OK;
}
- internal_error(true,
- "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld).",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
- wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
}
// the child sends an RBEGIN without any parameters initially
@@ -1051,7 +1065,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
dimension,
((PARSER_USER_OBJECT *) user)->replay.start_time,
((PARSER_USER_OBJECT *) user)->replay.end_time);
- return PARSER_RC_ERROR;
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
if (unlikely(!value_str || !*value_str))
@@ -1065,15 +1079,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
- if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the OBSOLETE flag set, but it is collected.",
- rrdhost_hostname(st->rrdhost),
- rrdset_id(st),
- rrddim_id(rd)
- );
- rrddim_isnot_obsolete(st, rd);
- }
-
if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
NETDATA_DOUBLE value = strtondd(value_str, NULL);
SN_FLAGS flags = SN_FLAG_NONE;
@@ -1106,9 +1111,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
rd->last_collected_time.tv_usec = 0;
rd->collections_counter++;
}
- else
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is collected. Ignoring data.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+ else {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+ }
}
rrddim_acquired_release(rda);
@@ -1180,18 +1187,29 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
{
- if (num_words < 7) {
+ if (num_words < 7) { // accepts 7, but the 7th is optional
error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
return PARSER_RC_ERROR;
}
- time_t update_every_child = str2l(get_word(words, num_words, 1));
- time_t first_entry_child = (time_t)str2ull(get_word(words, num_words, 2));
- time_t last_entry_child = (time_t)str2ull(get_word(words, num_words, 3));
+ const char *update_every_child_txt = get_word(words, num_words, 1);
+ const char *first_entry_child_txt = get_word(words, num_words, 2);
+ const char *last_entry_child_txt = get_word(words, num_words, 3);
+ const char *start_streaming_txt = get_word(words, num_words, 4);
+ const char *first_entry_requested_txt = get_word(words, num_words, 5);
+ const char *last_entry_requested_txt = get_word(words, num_words, 6);
+ const char *child_world_time_txt = get_word(words, num_words, 7); // optional
+
+ time_t update_every_child = (time_t)str2ul(update_every_child_txt);
+ time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
+ time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
- bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0);
- time_t first_entry_requested = (time_t)str2ull(get_word(words, num_words, 5));
- time_t last_entry_requested = (time_t)str2ull(get_word(words, num_words, 6));
+ bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
+ time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+
+ // the optional child world time
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
PARSER_USER_OBJECT *user_object = user;
@@ -1201,13 +1219,15 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
-#ifdef NETDATATA_LOG_REPLICATION_REQUESTS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
- "PLUGINSD: 'host:%s/chart:%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu",
+ "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
rrdhost_hostname(host), rrdset_id(st),
(unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
start_streaming?"true":"false",
- (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested);
+ (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
+ (unsigned long long)child_world_time
+ );
#endif
((PARSER_USER_OBJECT *) user)->st = NULL;
@@ -1236,6 +1256,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->replay.start_streaming = false;
st->replay.after = 0;
st->replay.before = 0;
+ if(start_streaming)
+ st->replay.log_next_data_collection = true;
#endif
if (start_streaming) {
@@ -1250,7 +1272,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
else
- internal_error(true, "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
rrdhost_hostname(host), rrdset_id(st));
#endif
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
@@ -1260,7 +1282,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
rrdcontext_updated_retention_rrdset(st);
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child,
+ bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
first_entry_requested, last_entry_requested);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
diff --git a/database/rrd.h b/database/rrd.h
index 12bfb8660e..187c9a5fca 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -673,6 +673,7 @@ struct rrdset {
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
struct {
+ bool log_next_data_collection;
bool start_streaming;
time_t after;
time_t before;
@@ -1078,6 +1079,7 @@ extern RRDHOST *localhost;
#define rrdhost_sender_replicating_charts_minus_one(host) (__atomic_sub_fetch(&((host)->rrdpush_sender_replicating_charts), 1, __ATOMIC_RELAXED))
#define rrdhost_sender_replicating_charts_zero(host) (__atomic_store_n(&((host)->rrdpush_sender_replicating_charts), 0, __ATOMIC_RELAXED))
+extern DICTIONARY *rrdhost_root_index;
long rrdhost_hosts_available(void);
// ----------------------------------------------------------------------------
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index d85bc55a3e..4eb212152b 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -2,8 +2,6 @@
#include "sqlite_metadata.h"
-extern DICTIONARY *rrdhost_root_index;
-
// SQL statements
#define SQL_STORE_CLAIM_ID "insert into node_instance " \
diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c
index 700f88a736..14b8926e04 100644
--- a/libnetdata/worker_utilization/worker_utilization.c
+++ b/libnetdata/worker_utilization/worker_utilization.c
@@ -56,7 +56,7 @@ void worker_register(const char *workname) {
worker->tag = strdupz(netdata_thread_tag());
worker->workname = strdupz(workname);
- usec_t now = now_realtime_usec();
+ usec_t now = now_monotonic_usec();
worker->statistics_last_checkpoint = now;
worker->last_action_timestamp = now;
worker->last_action = WORKER_IDLE;
@@ -145,14 +145,14 @@ static inline void worker_is_idle_with_time(usec_t now) {
void worker_is_idle(void) {
if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return;
- worker_is_idle_with_time(now_realtime_usec());
+ worker_is_idle_with_time(now_monotonic_usec());
}
void worker_is_busy(size_t job_id) {
if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
return;
- usec_t now = now_realtime_usec();
+ usec_t now = now_monotonic_usec();
if(worker->last_action == WORKER_BUSY)
worker_is_idle_with_time(now);
@@ -215,7 +215,7 @@ void workers_foreach(const char *workname, void (*callback)(
struct worker *p;
DOUBLE_LINKED_LIST_FOREACH_FORWARD(base, p, prev, next) {
- usec_t now = now_realtime_usec();
+ usec_t now = now_monotonic_usec();
// find per job type statistics
STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];
diff --git a/streaming/replication.c b/streaming/replication.c
index 6e3b145507..d88095761e 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -6,7 +6,10 @@
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
-static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
+// ----------------------------------------------------------------------------
+// sending replication replies
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
@@ -23,7 +26,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
memset(data, 0, sizeof(data));
if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu",
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)before,
(unsigned long long)st->last_updated.tv_sec
@@ -35,8 +38,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
{
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (rd_dfe.counter >= dimensions)
+ if (rd_dfe.counter >= dimensions) {
+ internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
break;
+ }
if(rd->exposed) {
data[rd_dfe.counter].dict = rd_dfe.dict;
@@ -65,7 +71,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
data[i].sp = ops->next_metric(&data[i].handle);
internal_error(max_skip <= 0,
- "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
if(data[i].sp.end_time < now)
@@ -81,10 +87,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
}
}
- time_t wall_clock_time = now_realtime_sec();
- if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
+ if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)min_start_time,
(unsigned long long)min_end_time,
@@ -95,7 +100,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
if(min_end_time < now) {
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
break;
@@ -138,14 +143,14 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
@@ -195,7 +200,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t first_entry_local = rrdset_first_entry_t(st);
if(first_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)first_entry_local, (unsigned long long)now);
first_entry_local = now;
@@ -208,14 +213,20 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t last_entry_local = st->last_updated.tv_sec;
if(!last_entry_local) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
rrdhost_hostname(st->rrdhost), rrdset_id(st));
last_entry_local = rrdset_last_entry_t(st);
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = now;
+ }
}
if(last_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)last_entry_local, (unsigned long long)now);
last_entry_local = now;
@@ -240,28 +251,49 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
// and copying the result to the host's buffer in order to avoid
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- {
- // pass the original after/before so that the parent knows about
- // which time range we responded
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
-
- if(after != 0 && before != 0)
- before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
- else {
- after = 0;
- before = 0;
- enable_streaming = true;
- }
- if(enable_streaming)
- replicate_chart_collection_state(wb, st);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- // end with first/last entries we have, and the first start time and
- // last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
- (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
- enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
}
+
+ // get again the world clock time
+ time_t world_clock_time = now_realtime_sec();
+ if(enable_streaming) {
+ if(now < world_clock_time) {
+ // we needed time to execute this request
+ // so, the parent will need to replicate more data
+ enable_streaming = false;
+ }
+ else
+ replicate_chart_collection_state(wb, st);
+ }
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+
+ // current chart update every
+ (int)st->update_every
+
+ // child first db time, child end db time
+ , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+
+ // start streaming boolean
+ , enable_streaming ? "true" : "false"
+
+ // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+ , (unsigned long long)after, (unsigned long long)before
+
+ // child world clock time
+ , (unsigned long long)world_clock_time
+ );
+
sender_commit(host->sender, wb);
return enable_streaming;
@@ -282,12 +314,14 @@ struct replication_request_details {
struct {
time_t first_entry_t; // the first entry time the child has
time_t last_entry_t; // the last entry time the child has
+ time_t world_time_t; // the current time of the child
} child_db;
struct {
time_t first_entry_t; // the first entry time we have
time_t last_entry_t; // the last entry time we have
bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed
+ time_t now; // the current local world clock time
} local_db;
struct {
@@ -305,8 +339,6 @@ struct replication_request_details {
time_t before; // the end time of this replication request
bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
} wanted;
-
- time_t now; // the current wall clock time
};
static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
@@ -316,6 +348,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.log_next_data_collection = true;
+
char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
if(r->wanted.after)
@@ -326,7 +360,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
internal_error(true,
"REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
- "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s"
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
, rrdhost_hostname(r->host), rrdset_id(r->st)
, r->wanted.after, wanted_after_buf
, r->wanted.before, wanted_before_buf
@@ -334,7 +368,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, msg
, r->last_request.after, r->last_request.before
, r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW"
+ , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->local_db.first_entry_t, r->local_db.last_entry_t
+ , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -352,7 +388,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
int ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
- error("REPLICATION: failed to send replication request to child (error %d)", ret);
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ rrdhost_hostname(r->host), rrdset_id(r->st), ret);
return false;
}
@@ -360,7 +397,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
}
bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t first_entry_child, time_t last_entry_child,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
{
struct replication_request_details r = {
@@ -375,6 +412,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.child_db = {
.first_entry_t = first_entry_child,
.last_entry_t = last_entry_child,
+ .world_time_t = child_world_time,
+ },
+
+ .local_db = {
+ .first_entry_t = rrdset_first_entry_t(st),
+ .last_entry_t = rrdset_last_entry_t(st),
+ .last_entry_t_adjusted_to_now = false,
+ .now = now_realtime_sec(),
},
.last_request = {
@@ -387,15 +432,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.before = 0,
.start_streaming = true,
},
-
- .now = now_realtime_sec(),
};
- // get our local database retention
- r.local_db.first_entry_t = rrdset_first_entry_t(st);
- r.local_db.last_entry_t = rrdset_last_entry_t(st);
- if(r.local_db.last_entry_t > r.now) {
- r.local_db.last_entry_t = r.now;
+ // check our local database retention
+ if(r.local_db.last_entry_t > r.local_db.now) {
+ r.local_db.last_entry_t = r.local_db.now;
r.local_db.last_entry_t_adjusted_to_now = true;
}
@@ -408,7 +449,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
r.gap.from = r.local_db.last_entry_t;
else
// we don't have any data, the gap is the max timeframe we are allowed to replicate
- r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate;
+ r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
}
else {
@@ -419,7 +460,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
}
// we want all the data up to now
- r.gap.to = r.now;
+ r.gap.to = r.local_db.now;
// The gap is now r.gap.from -> r.gap.to
@@ -461,8 +502,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
if(r.wanted.before > r.child_db.last_entry_t)
r.wanted.before = r.child_db.last_entry_t;
- // the child should start streaming immediately if the wanted duration is small
- r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t);
+ if(r.wanted.after > r.wanted.before)
+ r.wanted.after = r.wanted.before;
+
+ // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child
+ r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
// the wanted timeframe is now r.wanted.after -> r.wanted.before
// send it
@@ -499,11 +543,12 @@ struct replication_sort_entry {
static struct replication_thread {
netdata_mutex_t mutex;
+ size_t pending;
size_t added;
size_t executed;
size_t removed;
+ size_t last_executed;
time_t first_time_t;
- size_t requests_count;
Word_t next_unique_id;
struct replication_request *requests;
@@ -516,12 +561,13 @@ static struct replication_thread {
size_t waits;
Pvoid_t JudyL_array;
-} rep = {
+} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
+ .pending = 0,
.added = 0,
.executed = 0,
+ .last_executed = 0,
.first_time_t = 0,
- .requests_count = 0,
.next_unique_id = 1,
.skipped_no_room = 0,
.skipped_not_connected = 0,
@@ -535,7 +581,7 @@ static __thread int replication_recursive_mutex_recursions = 0;
static void replication_recursive_lock() {
if(++replication_recursive_mutex_recursions == 1)
- netdata_mutex_lock(&rep.mutex);
+ netdata_mutex_lock(&replication_globals.mutex);
#ifdef NETDATA_INTERNAL_CHECKS
if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
@@ -545,7 +591,7 @@ static void replication_recursive_lock() {
static void replication_recursive_unlock() {
if(--replication_recursive_mutex_recursions == 0)
- netdata_mutex_unlock(&am