diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-26 00:55:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-26 00:55:38 +0200 |
commit | 7a21b966381022b9dbb15d4377fb09b82d1f6067 (patch) | |
tree | 0aabb02c74b2611a5872dd05dba089bb7dc19f06 /streaming/replication.c | |
parent | 3e3ff4bee83363dca7cfb838baf1cf316960ed1b (diff) |
DBENGINE v2 - improvements part 9 (#14326)
* on shutdown stop data collection for all hosts instead of freeing their memory
* print number of sql statements per metadata host scan
* print timings with metadata checking
* use dbengine API to figure out of a database is legacy
* Recalculate retention after a datafile deletion
* validate child timestamps during replication
* main cache uses a lockless aral per partition, protected by the partition index lock
* prevent ML crash
* Revert "main cache uses a lockless aral per partition, protected by the partition index lock"
This reverts commit 6afc01527dc5c66548b4bc8a1d63c026c3149358.
* Log direct index and binary searches
* distribute metrics more evenly across time
* statistics about retention recalculation
* fix crash
* Reverse the binary search to calculate retention
* more optimization on retention calculation
* removed commented old code
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 133 |
1 files changed, 95 insertions, 38 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index b765e3a850..4abd20cc3d 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -434,13 +434,31 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s } static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { + time_t wall_clock_time = now_realtime_sec(); + + if(requested_after > requested_before) { + // flip them + time_t t = requested_before; + requested_before = requested_after; + requested_after = t; + } + + if(requested_after > wall_clock_time) { + requested_after = 0; + requested_before = 0; + requested_enable_streaming = true; + } + + if(requested_before > wall_clock_time) { + requested_before = wall_clock_time; + requested_enable_streaming = true; + } + time_t query_after = requested_after; time_t query_before = requested_before; bool query_enable_streaming = requested_enable_streaming; - time_t wall_clock_time = now_realtime_sec(); - - time_t db_first_entry, db_last_entry; + time_t db_first_entry = 0, db_last_entry = 0; rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { @@ -580,14 +598,14 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has - time_t world_time_t; // the current time of the child + time_t wall_clock_time; // the current time of the child + bool fixed_last_entry; // when set we set the last entry to wall clock time } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have - bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed - time_t now; // the current local world clock time + time_t wall_clock_time; // the current local world clock time } local_db; struct { @@ -607,9 +625,36 @@ struct replication_request_details { } wanted; }; -static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { +static void replicate_log_request(struct replication_request_details *r, const char *msg) { +#ifdef NETDATA_INTERNAL_CHECKS + internal_error(true, +#else + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, +#endif + "REPLAY ERROR: 'host:%s/chart:%s' child sent: " + "db from %ld to %ld%s, wall clock time %ld, " + "last request from %ld to %ld, " + "issue: %s - " + "sending replication request from %ld to %ld, start streaming %s", + rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st), + r->child_db.first_entry_t, + r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "", + r->child_db.wall_clock_time, + r->last_request.after, + r->last_request.before, + msg, + r->wanted.after, + r->wanted.before, + r->wanted.start_streaming ? "true" : "false"); +} + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) { RRDSET *st = r->st; + if(log) + replicate_log_request(r, msg); + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) st->rrdhost->receiver->replication_first_time_t = r->wanted.after; @@ -626,7 +671,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " - "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf @@ -636,7 +681,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , r->child_db.first_entry_t, r->child_db.last_entry_t , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" , r->local_db.first_entry_t, r->local_db.last_entry_t - , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now + , r->local_db.now , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -663,7 +708,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c } bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, - time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { @@ -676,16 +721,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .st = st, .child_db = { - .first_entry_t = first_entry_child, - .last_entry_t = last_entry_child, - .world_time_t = child_world_time, + .first_entry_t = child_first_entry, + .last_entry_t = child_last_entry, + .wall_clock_time = child_wall_clock_time, + .fixed_last_entry = false, }, .local_db = { - .first_entry_t = rrdset_first_entry_s(st), - .last_entry_t = rrdset_last_entry_s(st), - .last_entry_t_adjusted_to_now = false, - .now = now_realtime_sec(), + .first_entry_t = 0, + .last_entry_t = 0, + .wall_clock_time = now_realtime_sec(), }, .last_request = { @@ -700,12 +745,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST }, }; - // check our local database retention - if(r.local_db.last_entry_t > r.local_db.now) { - r.local_db.last_entry_t = r.local_db.now; - r.local_db.last_entry_t_adjusted_to_now = true; + if(r.child_db.last_entry_t > r.child_db.wall_clock_time) { + replicate_log_request(&r, "child's db last entry > child's wall clock time"); + r.child_db.last_entry_t = r.child_db.wall_clock_time; + r.child_db.fixed_last_entry = true; } + rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0); + // let's find the GAP we have if(!r.last_request.after || !r.last_request.before) { // there is no previous request @@ -715,7 +762,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate - r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; + r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate; } else { @@ -726,27 +773,30 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // we want all the data up to now - r.gap.to = r.local_db.now; + r.gap.to = r.local_db.wall_clock_time; // The gap is now r.gap.from -> r.gap.to if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) - return send_replay_chart_cmd(&r, "empty replication request, replication is disabled"); - - if (unlikely(!r.child_db.last_entry_t)) - return send_replay_chart_cmd(&r, "empty replication request, child has no stored data"); + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false); if (unlikely(!rrdset_number_of_dimensions(st))) - return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions"); + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false); + + if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false); - if (r.child_db.first_entry_t <= 0) - return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid"); + if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0)) + return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true); - if (r.child_db.first_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)"); + if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time)) + return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true); - if (r.local_db.last_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one"); + if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true); + + if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false); // let's find what the child can provide to fill that gap @@ -768,15 +818,22 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; - if(r.wanted.after > r.wanted.before) - r.wanted.after = r.wanted.before; + if(r.wanted.after > r.wanted.before) { + r.wanted.after = 0; + r.wanted.before = 0; + r.wanted.start_streaming = true; + return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true); + } // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child - r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); + r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step || + r.wanted.before >= r.child_db.last_entry_t || + r.wanted.before >= r.child_db.wall_clock_time || + r.wanted.before >= r.local_db.wall_clock_time); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it - return send_replay_chart_cmd(&r, "OK"); + return send_replay_chart_cmd(&r, "OK", false); } // ---------------------------------------------------------------------------- |