summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-26 00:55:38 +0200
committerGitHub <noreply@github.com>2023-01-26 00:55:38 +0200
commit7a21b966381022b9dbb15d4377fb09b82d1f6067 (patch)
tree0aabb02c74b2611a5872dd05dba089bb7dc19f06 /streaming/replication.c
parent3e3ff4bee83363dca7cfb838baf1cf316960ed1b (diff)
DBENGINE v2 - improvements part 9 (#14326)
* on shutdown stop data collection for all hosts instead of freeing their memory * print number of sql statements per metadata host scan * print timings with metadata checking * use dbengine API to figure out of a database is legacy * Recalculate retention after a datafile deletion * validate child timestamps during replication * main cache uses a lockless aral per partition, protected by the partition index lock * prevent ML crash * Revert "main cache uses a lockless aral per partition, protected by the partition index lock" This reverts commit 6afc01527dc5c66548b4bc8a1d63c026c3149358. * Log direct index and binary searches * distribute metrics more evenly across time * statistics about retention recalculation * fix crash * Reverse the binary search to calculate retention * more optimization on retention calculation * removed commented old code Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c133
1 files changed, 95 insertions, 38 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index b765e3a850..4abd20cc3d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -434,13 +434,31 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
+ time_t wall_clock_time = now_realtime_sec();
+
+ if(requested_after > requested_before) {
+ // flip them
+ time_t t = requested_before;
+ requested_before = requested_after;
+ requested_after = t;
+ }
+
+ if(requested_after > wall_clock_time) {
+ requested_after = 0;
+ requested_before = 0;
+ requested_enable_streaming = true;
+ }
+
+ if(requested_before > wall_clock_time) {
+ requested_before = wall_clock_time;
+ requested_enable_streaming = true;
+ }
+
time_t query_after = requested_after;
time_t query_before = requested_before;
bool query_enable_streaming = requested_enable_streaming;
- time_t wall_clock_time = now_realtime_sec();
-
- time_t db_first_entry, db_last_entry;
+ time_t db_first_entry = 0, db_last_entry = 0;
rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
@@ -580,14 +598,14 @@ struct replication_request_details {
struct {
time_t first_entry_t; // the first entry time the child has
time_t last_entry_t; // the last entry time the child has
- time_t world_time_t; // the current time of the child
+ time_t wall_clock_time; // the current time of the child
+ bool fixed_last_entry; // when set we set the last entry to wall clock time
} child_db;
struct {
time_t first_entry_t; // the first entry time we have
time_t last_entry_t; // the last entry time we have
- bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed
- time_t now; // the current local world clock time
+ time_t wall_clock_time; // the current local world clock time
} local_db;
struct {
@@ -607,9 +625,36 @@ struct replication_request_details {
} wanted;
};
-static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+static void replicate_log_request(struct replication_request_details *r, const char *msg) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ internal_error(true,
+#else
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
+#endif
+ "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
+ "db from %ld to %ld%s, wall clock time %ld, "
+ "last request from %ld to %ld, "
+ "issue: %s - "
+ "sending replication request from %ld to %ld, start streaming %s",
+ rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
+ r->child_db.first_entry_t,
+ r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
+ r->child_db.wall_clock_time,
+ r->last_request.after,
+ r->last_request.before,
+ msg,
+ r->wanted.after,
+ r->wanted.before,
+ r->wanted.start_streaming ? "true" : "false");
+}
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
RRDSET *st = r->st;
+ if(log)
+ replicate_log_request(r, msg);
+
if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
@@ -626,7 +671,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
internal_error(true,
"REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
- "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
, rrdhost_hostname(r->host), rrdset_id(r->st)
, r->wanted.after, wanted_after_buf
, r->wanted.before, wanted_before_buf
@@ -636,7 +681,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, r->child_db.first_entry_t, r->child_db.last_entry_t
, r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
, r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
+ , r->local_db.now
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -663,7 +708,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
}
bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
{
struct replication_request_details r = {
@@ -676,16 +721,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.st = st,
.child_db = {
- .first_entry_t = first_entry_child,
- .last_entry_t = last_entry_child,
- .world_time_t = child_world_time,
+ .first_entry_t = child_first_entry,
+ .last_entry_t = child_last_entry,
+ .wall_clock_time = child_wall_clock_time,
+ .fixed_last_entry = false,
},
.local_db = {
- .first_entry_t = rrdset_first_entry_s(st),
- .last_entry_t = rrdset_last_entry_s(st),
- .last_entry_t_adjusted_to_now = false,
- .now = now_realtime_sec(),
+ .first_entry_t = 0,
+ .last_entry_t = 0,
+ .wall_clock_time = now_realtime_sec(),
},
.last_request = {
@@ -700,12 +745,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
},
};
- // check our local database retention
- if(r.local_db.last_entry_t > r.local_db.now) {
- r.local_db.last_entry_t = r.local_db.now;
- r.local_db.last_entry_t_adjusted_to_now = true;
+ if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
+ replicate_log_request(&r, "child's db last entry > child's wall clock time");
+ r.child_db.last_entry_t = r.child_db.wall_clock_time;
+ r.child_db.fixed_last_entry = true;
}
+ rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
+
// let's find the GAP we have
if(!r.last_request.after || !r.last_request.before) {
// there is no previous request
@@ -715,7 +762,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
r.gap.from = r.local_db.last_entry_t;
else
// we don't have any data, the gap is the max timeframe we are allowed to replicate
- r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
+ r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
}
else {
@@ -726,27 +773,30 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
}
// we want all the data up to now
- r.gap.to = r.local_db.now;
+ r.gap.to = r.local_db.wall_clock_time;
// The gap is now r.gap.from -> r.gap.to
if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
- return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
-
- if (unlikely(!r.child_db.last_entry_t))
- return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
+ return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
if (unlikely(!rrdset_number_of_dimensions(st)))
- return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
+ return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
+
+ if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
- if (r.child_db.first_entry_t <= 0)
- return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
+ if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
+ return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
- if (r.child_db.first_entry_t > r.child_db.last_entry_t)
- return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
+ if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
+ return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
- if (r.local_db.last_entry_t > r.child_db.last_entry_t)
- return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
+ if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
+
+ if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
// let's find what the child can provide to fill that gap
@@ -768,15 +818,22 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
if(r.wanted.before > r.child_db.last_entry_t)
r.wanted.before = r.child_db.last_entry_t;
- if(r.wanted.after > r.wanted.before)
- r.wanted.after = r.wanted.before;
+ if(r.wanted.after > r.wanted.before) {
+ r.wanted.after = 0;
+ r.wanted.before = 0;
+ r.wanted.start_streaming = true;
+ return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
+ }
// the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
- r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
+ r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
+ r.wanted.before >= r.child_db.last_entry_t ||
+ r.wanted.before >= r.child_db.wall_clock_time ||
+ r.wanted.before >= r.local_db.wall_clock_time);
// the wanted timeframe is now r.wanted.after -> r.wanted.before
// send it
- return send_replay_chart_cmd(&r, "OK");
+ return send_replay_chart_cmd(&r, "OK", false);
}
// ----------------------------------------------------------------------------