summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-14 01:22:03 +0200
committerGitHub <noreply@github.com>2022-11-14 01:22:03 +0200
commitcbebc18ca3e52ec5ab41272ef85a5fd45d8d3b10 (patch)
tree521cd5e364b28455015857245fd7a1e7ece8a476 /streaming
parent1dcb9d7ece382cf52a27e6ec9a4c795fd5902a1e (diff)
replication improvements (#13989)
contexts are not set to collected until replication finishes for them; sender thread disables cancelability while replication queries are executed;
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c7
-rw-r--r--streaming/sender.c4
2 files changed, 9 insertions, 2 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index b841d6a3d4..ef384f4e8f 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -153,10 +153,13 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t query_after = after;
time_t query_before = before;
time_t now = now_realtime_sec();
+ time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
+ // a data collection has been made
+ // so, we give this tolerance to detect invalid timestamps
// find the first entry we have
time_t first_entry_local = rrdset_first_entry_t(st);
- if(first_entry_local > now) {
+ if(first_entry_local > now + tolerance) {
internal_error(true,
"RRDSET: '%s' first time %llu is in the future (now is %llu)",
rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
@@ -175,7 +178,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
last_entry_local = rrdset_last_entry_t(st);
}
- if(last_entry_local > now) {
+ if(last_entry_local > now + tolerance) {
internal_error(true,
"RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
diff --git a/streaming/sender.c b/streaming/sender.c
index 0682597286..e0964f9e3a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1132,10 +1132,14 @@ static void process_replication_requests(struct sender_state *s) {
continue;
}
+ netdata_thread_disable_cancelability();
+
// send the replication data
bool start_streaming = replicate_chart_response(st->rrdhost, st,
rr->start_streaming, rr->after, rr->before);
+ netdata_thread_enable_cancelability();
+
// enable normal streaming if we have to
if (start_streaming) {
debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",