diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-25 20:37:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-25 20:37:15 +0200 |
commit | 2e874e79163771856e4e756b176b729f7d8b0f0f (patch) | |
tree | eeb1ea10af039001e3290090d5a2d365f99f63c7 | |
parent | 870acd61123ece7c074242e1b02d47cb7c667e38 (diff) |
replication fixes #6 (#14046)
use the faster monotonic clock in workers and replication; avoid unecessary statistics function on every request on replication - gather them all together once every second; check the chart flags on all mirrored hosts, not only the ones that have a sender; cleanup and unify replication logs; added child world time to REND; fix first BEGIN been transmitted when replication starts;
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 127 | ||||
-rw-r--r-- | database/rrd.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 2 | ||||
-rw-r--r-- | libnetdata/worker_utilization/worker_utilization.c | 8 | ||||
-rw-r--r-- | streaming/replication.c | 460 | ||||
-rw-r--r-- | streaming/replication.h | 2 | ||||
-rw-r--r-- | streaming/rrdpush.c | 60 |
7 files changed, 401 insertions, 260 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 70ce02b420..5501c12fad 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -151,6 +151,20 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) if (microseconds_txt && *microseconds_txt) microseconds = str2ull(microseconds_txt); +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if(st->replay.log_next_data_collection) { + st->replay.log_next_data_collection = false; + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu", + rrdhost_hostname(host), rrdset_id(st), + st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec, + st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec, + microseconds + ); + } +#endif + if (likely(st->counter_done)) { if (likely(microseconds)) { if (((PARSER_USER_OBJECT *)user)->trust_durations) @@ -312,6 +326,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); + const char *world_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(user); @@ -319,22 +334,14 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - if(unlikely(!first_entry_txt || !last_entry_txt)) { - error("PLUGINSD: 'host:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " without first or last entry. Disabling it.", - rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user); - } + time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; + time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; + time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); - long first_entry_child = str2l(first_entry_txt); - long last_entry_child = str2l(last_entry_txt); - - internal_error( - (first_entry_child != 0 || last_entry_child != 0) - && (first_entry_child == 0 || last_entry_child == 0), - "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).", - rrdhost_hostname(host), rrdset_id(st), - (unsigned long long)first_entry_child, (unsigned long long)last_entry_child - ); + if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", + rrdhost_hostname(host), rrdset_id(st), + first_entry_child, last_entry_child, child_world_time); bool ok = true; if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { @@ -350,8 +357,9 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; - ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child, - last_entry_child, 0, 0); + ok = replicate_chart_request(send_to_plugin, parser, host, st, + first_entry_child, last_entry_child, child_world_time, + 0, 0); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS else { @@ -910,7 +918,7 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_REPLAY_BEGIN); + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(user); debug(D_PLUGINSD, "requested to commit chart labels"); @@ -950,28 +958,35 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use ((PARSER_USER_OBJECT *) user)->st = st; if(start_time_str && end_time_str) { - time_t start_time = strtol(start_time_str, NULL, 0); - time_t end_time = strtol(end_time_str, NULL, 0); + time_t start_time = (time_t)str2ul(start_time_str); + time_t end_time = (time_t)str2ul(end_time_str); time_t wall_clock_time = 0, tolerance; bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child; if(child_now_str) { - wall_clock_time = strtol(child_now_str, NULL, 0); + wall_clock_time = (time_t)str2ul(child_now_str); tolerance = st->update_every + 1; wall_clock_comes_from_child = true; } if(wall_clock_time <= 0) { wall_clock_time = now_realtime_sec(); - tolerance = st->update_every + 60; + tolerance = st->update_every + 5; wall_clock_comes_from_child = false; } #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error( (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), - "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).", + "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).", rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before); + + internal_error( + true, + "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time", + st->replay.after, st->replay.before); #endif if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { @@ -1002,10 +1017,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use return PARSER_RC_OK; } - internal_error(true, - "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld).", - rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, - wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, + wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); } // the child sends an RBEGIN without any parameters initially @@ -1051,7 +1065,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) dimension, ((PARSER_USER_OBJECT *) user)->replay.start_time, ((PARSER_USER_OBJECT *) user)->replay.end_time); - return PARSER_RC_ERROR; + return PLUGINSD_DISABLE_PLUGIN(user); } if (unlikely(!value_str || !*value_str)) @@ -1065,15 +1079,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); - if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the OBSOLETE flag set, but it is collected.", - rrdhost_hostname(st->rrdhost), - rrdset_id(st), - rrddim_id(rd) - ); - rrddim_isnot_obsolete(st, rd); - } - if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { NETDATA_DOUBLE value = strtondd(value_str, NULL); SN_FLAGS flags = SN_FLAG_NONE; @@ -1106,9 +1111,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) rd->last_collected_time.tv_usec = 0; rd->collections_counter++; } - else - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); + else { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); + } } rrddim_acquired_release(rda); @@ -1180,18 +1187,29 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) { - if (num_words < 7) { + if (num_words < 7) { // accepts 7, but the 7th is optional error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); return PARSER_RC_ERROR; } - time_t update_every_child = str2l(get_word(words, num_words, 1)); - time_t first_entry_child = (time_t)str2ull(get_word(words, num_words, 2)); - time_t last_entry_child = (time_t)str2ull(get_word(words, num_words, 3)); + const char *update_every_child_txt = get_word(words, num_words, 1); + const char *first_entry_child_txt = get_word(words, num_words, 2); + const char *last_entry_child_txt = get_word(words, num_words, 3); + const char *start_streaming_txt = get_word(words, num_words, 4); + const char *first_entry_requested_txt = get_word(words, num_words, 5); + const char *last_entry_requested_txt = get_word(words, num_words, 6); + const char *child_world_time_txt = get_word(words, num_words, 7); // optional + + time_t update_every_child = (time_t)str2ul(update_every_child_txt); + time_t first_entry_child = (time_t)str2ul(first_entry_child_txt); + time_t last_entry_child = (time_t)str2ul(last_entry_child_txt); - bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0); - time_t first_entry_requested = (time_t)str2ull(get_word(words, num_words, 5)); - time_t last_entry_requested = (time_t)str2ull(get_word(words, num_words, 6)); + bool start_streaming = (strcmp(start_streaming_txt, "true") == 0); + time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt); + time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt); + + // the optional child world time + time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec(); PARSER_USER_OBJECT *user_object = user; @@ -1201,13 +1219,15 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(user); -#ifdef NETDATATA_LOG_REPLICATION_REQUESTS +#ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, - "PLUGINSD: 'host:%s/chart:%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu", + "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu", rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child, start_streaming?"true":"false", - (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested); + (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested, + (unsigned long long)child_world_time + ); #endif ((PARSER_USER_OBJECT *) user)->st = NULL; @@ -1236,6 +1256,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->replay.start_streaming = false; st->replay.after = 0; st->replay.before = 0; + if(start_streaming) + st->replay.log_next_data_collection = true; #endif if (start_streaming) { @@ -1250,7 +1272,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) } #ifdef NETDATA_LOG_REPLICATION_REQUESTS else - internal_error(true, "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", rrdhost_hostname(host), rrdset_id(st)); #endif worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); @@ -1260,7 +1282,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) rrdcontext_updated_retention_rrdset(st); - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, + bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, + first_entry_child, last_entry_child, child_world_time, first_entry_requested, last_entry_requested); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } diff --git a/database/rrd.h b/database/rrd.h index 12bfb8660e..187c9a5fca 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -673,6 +673,7 @@ struct rrdset { #ifdef NETDATA_LOG_REPLICATION_REQUESTS struct { + bool log_next_data_collection; bool start_streaming; time_t after; time_t before; @@ -1078,6 +1079,7 @@ extern RRDHOST *localhost; #define rrdhost_sender_replicating_charts_minus_one(host) (__atomic_sub_fetch(&((host)->rrdpush_sender_replicating_charts), 1, __ATOMIC_RELAXED)) #define rrdhost_sender_replicating_charts_zero(host) (__atomic_store_n(&((host)->rrdpush_sender_replicating_charts), 0, __ATOMIC_RELAXED)) +extern DICTIONARY *rrdhost_root_index; long rrdhost_hosts_available(void); // ---------------------------------------------------------------------------- diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index d85bc55a3e..4eb212152b 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -2,8 +2,6 @@ #include "sqlite_metadata.h" -extern DICTIONARY *rrdhost_root_index; - // SQL statements #define SQL_STORE_CLAIM_ID "insert into node_instance " \ diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c index 700f88a736..14b8926e04 100644 --- a/libnetdata/worker_utilization/worker_utilization.c +++ b/libnetdata/worker_utilization/worker_utilization.c @@ -56,7 +56,7 @@ void worker_register(const char *workname) { worker->tag = strdupz(netdata_thread_tag()); worker->workname = strdupz(workname); - usec_t now = now_realtime_usec(); + usec_t now = now_monotonic_usec(); worker->statistics_last_checkpoint = now; worker->last_action_timestamp = now; worker->last_action = WORKER_IDLE; @@ -145,14 +145,14 @@ static inline void worker_is_idle_with_time(usec_t now) { void worker_is_idle(void) { if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return; - worker_is_idle_with_time(now_realtime_usec()); + worker_is_idle_with_time(now_monotonic_usec()); } void worker_is_busy(size_t job_id) { if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) return; - usec_t now = now_realtime_usec(); + usec_t now = now_monotonic_usec(); if(worker->last_action == WORKER_BUSY) worker_is_idle_with_time(now); @@ -215,7 +215,7 @@ void workers_foreach(const char *workname, void (*callback)( struct worker *p; DOUBLE_LINKED_LIST_FOREACH_FORWARD(base, p, prev, next) { - usec_t now = now_realtime_usec(); + usec_t now = now_monotonic_usec(); // find per job type statistics STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES]; diff --git a/streaming/replication.c b/streaming/replication.c index 6e3b145507..d88095761e 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -6,7 +6,10 @@ #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 -static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { +// ---------------------------------------------------------------------------- +// sending replication replies + +static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { size_t dimensions = rrdset_number_of_dimensions(st); struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; @@ -23,7 +26,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti memset(data, 0, sizeof(data)); if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu", + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)before, (unsigned long long)st->last_updated.tv_sec @@ -35,8 +38,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti { RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (rd_dfe.counter >= dimensions) + if (rd_dfe.counter >= dimensions) { + internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); break; + } if(rd->exposed) { data[rd_dfe.counter].dict = rd_dfe.dict; @@ -65,7 +71,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti data[i].sp = ops->next_metric(&data[i].handle); internal_error(max_skip <= 0, - "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now); if(data[i].sp.end_time < now) @@ -81,10 +87,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti } } - time_t wall_clock_time = now_realtime_sec(); - if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) { + if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) { internal_error(true, - "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)min_start_time, (unsigned long long)min_end_time, @@ -95,7 +100,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti if(min_end_time < now) { #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, - "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); #endif // NETDATA_LOG_REPLICATION_REQUESTS break; @@ -138,14 +143,14 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, - "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, - "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS @@ -195,7 +200,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t first_entry_local = rrdset_first_entry_t(st); if(first_entry_local > now + tolerance) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now); first_entry_local = now; @@ -208,14 +213,20 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t last_entry_local = st->last_updated.tv_sec; if(!last_entry_local) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", rrdhost_hostname(st->rrdhost), rrdset_id(st)); last_entry_local = rrdset_last_entry_t(st); + if(!last_entry_local) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + last_entry_local = now; + } } if(last_entry_local > now + tolerance) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); last_entry_local = now; @@ -240,28 +251,49 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t // and copying the result to the host's buffer in order to avoid // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); - { - // pass the original after/before so that the parent knows about - // which time range we responded - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - - if(after != 0 && before != 0) - before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming); - else { - after = 0; - before = 0; - enable_streaming = true; - } - if(enable_streaming) - replicate_chart_collection_state(wb, st); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - // end with first/last entries we have, and the first start time and - // last end time of the data we sent - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n", - (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local, - enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + if(after != 0 && before != 0) + before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); + else { + after = 0; + before = 0; + enable_streaming = true; } + + // get again the world clock time + time_t world_clock_time = now_realtime_sec(); + if(enable_streaming) { + if(now < world_clock_time) { + // we needed time to execute this request + // so, the parent will need to replicate more data + enable_streaming = false; + } + else + replicate_chart_collection_state(wb, st); + } + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n", + + // current chart update every + (int)st->update_every + + // child first db time, child end db time + , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + + // start streaming boolean + , enable_streaming ? "true" : "false" + + // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true) + , (unsigned long long)after, (unsigned long long)before + + // child world clock time + , (unsigned long long)world_clock_time + ); + sender_commit(host->sender, wb); return enable_streaming; @@ -282,12 +314,14 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has + time_t world_time_t; // the current time of the child } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed + time_t now; // the current local world clock time } local_db; struct { @@ -305,8 +339,6 @@ struct replication_request_details { time_t before; // the end time of this replication request bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' } wanted; - - time_t now; // the current wall clock time }; static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { @@ -316,6 +348,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c st->rrdhost->receiver->replication_first_time_t = r->wanted.after; #ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.log_next_data_collection = true; + char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; if(r->wanted.after) @@ -326,7 +360,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " - "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s" + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf @@ -334,7 +368,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , msg , r->last_request.after, r->last_request.before , r->child_db.first_entry_t, r->child_db.last_entry_t - , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW" + , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" + , r->local_db.first_entry_t, r->local_db.last_entry_t + , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -352,7 +388,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c int ret = r->caller.callback(buffer, r->caller.data); if (ret < 0) { - error("REPLICATION: failed to send replication request to child (error %d)", ret); + error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)", + rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } @@ -360,7 +397,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c } bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, - time_t first_entry_child, time_t last_entry_child, + time_t first_entry_child, time_t last_entry_child, time_t child_world_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { @@ -375,6 +412,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .child_db = { .first_entry_t = first_entry_child, .last_entry_t = last_entry_child, + .world_time_t = child_world_time, + }, + + .local_db = { + .first_entry_t = rrdset_first_entry_t(st), + .last_entry_t = rrdset_last_entry_t(st), + .last_entry_t_adjusted_to_now = false, + .now = now_realtime_sec(), }, .last_request = { @@ -387,15 +432,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .before = 0, .start_streaming = true, }, - - .now = now_realtime_sec(), }; - // get our local database retention - r.local_db.first_entry_t = rrdset_first_entry_t(st); - r.local_db.last_entry_t = rrdset_last_entry_t(st); - if(r.local_db.last_entry_t > r.now) { - r.local_db.last_entry_t = r.now; + // check our local database retention + if(r.local_db.last_entry_t > r.local_db.now) { + r.local_db.last_entry_t = r.local_db.now; r.local_db.last_entry_t_adjusted_to_now = true; } @@ -408,7 +449,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate - r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate; + r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; } else { @@ -419,7 +460,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // we want all the data up to now - r.gap.to = r.now; + r.gap.to = r.local_db.now; // The gap is now r.gap.from -> r.gap.to @@ -461,8 +502,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; - // the child should start streaming immediately if the wanted duration is small - r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t); + if(r.wanted.after > r.wanted.before) + r.wanted.after = r.wanted.before; + + // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child + r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it @@ -499,11 +543,12 @@ struct replication_sort_entry { static struct replication_thread { netdata_mutex_t mutex; + size_t pending; size_t added; size_t executed; size_t removed; + size_t last_executed; time_t first_time_t; - size_t requests_count; Word_t next_unique_id; struct replication_request *requests; @@ -516,12 +561,13 @@ static struct replication_thread { size_t waits; Pvoid_t JudyL_array; -} rep = { +} replication_globals = { .mutex = NETDATA_MUTEX_INITIALIZER, + .pending = 0, .added = 0, .executed = 0, + .last_executed = 0, .first_time_t = 0, - .requests_count = 0, .next_unique_id = 1, .skipped_no_room = 0, .skipped_not_connected = 0, @@ -535,7 +581,7 @@ static __thread int replication_recursive_mutex_recursions = 0; static void replication_recursive_lock() { if(++replication_recursive_mutex_recursions == 1) - netdata_mutex_lock(&rep.mutex); + netdata_mutex_lock(&replication_globals.mutex); #ifdef NETDATA_INTERNAL_CHECKS if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2) @@ -545,7 +591,7 @@ static void replication_recursive_lock() { static void replication_recursive_unlock() { if(--replication_recursive_mutex_recursions == 0) - netdata_mutex_unlock(&am |