summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-23 22:18:44 +0200
committerGitHub <noreply@github.com>2023-01-23 22:18:44 +0200
commitdd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch)
treefecf5514eda33c0a96f4d359f30fd07229d12cf7 /streaming/replication.c
parentc2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff)
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers * when there is a discrepancy between update every, fix it * fix the other occurences of metric update every mismatch * allow resetting the same timestamp * validate flushed pages before committing them to disk * initialize collection with the latest time in mrg * these should be static functions * acquire metrics for writing to detect multiple data collections of the same metric * print the uuid of the metric that is collected twice * log the discrepancies of completed pages * 1 second tolerance * unify validation of pages and related logging across dbengine * make do_flush_pages() thread safe * flush pages runs on libuv workers * added uv events to tp workers * dont cross datafile spinlock and rwlock * should be unlock * prevent the creation of multiple datafiles * break an infinite replication loop * do not log the epxansion of the replication window due to start streaming * log all invalid pages with internal checks * do not shutdown event loop threads * add information about collected page events, to find the root cause of invalid collected pages * rewrite of the gap filling to fix the invalid collected pages problem * handle multiple collections of the same metric gracefully * added log about main cache page conflicts; fix gap filling once again... * keep track of the first metric writer * it should be an internal fatal - it does not harm users * do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks * prevent negative replication completion percentage * internal error for the discrepancy of mrg * better logging of dbengine new metrics collection * without internal checks it is unused * prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread * renames and atomics everywhere * if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it * Debug for context load * rrdcontext uuid debug * rrddim uuid debug * rrdeng uuid debug * Revert "rrdeng uuid debug" This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b. * Revert "rrddim uuid debug" This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8. * Revert "rrdcontext uuid debug" This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0. * Revert "Debug for context load" This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b. * do not use legacy uuids on multihost dbs * thread safety for journafile size * handle other cases of inconsistent collected pages * make health thread check if it should be running in key loops * do not log uuids Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c64
1 files changed, 40 insertions, 24 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 8d136a04a4..9defd3f872 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -108,9 +108,10 @@ static struct replication_query *replication_query_prepare(
time_t db_last_entry,
time_t requested_after,
time_t requested_before,
+ bool requested_enable_streaming,
time_t query_after,
time_t query_before,
- bool enable_streaming,
+ bool query_enable_streaming,
time_t wall_clock_time
) {
size_t dimensions = rrdset_number_of_dimensions(st);
@@ -125,11 +126,11 @@ static struct replication_query *replication_query_prepare(
q->request.after = requested_after,
q->request.before = requested_before,
- q->request.enable_streaming = enable_streaming,
+ q->request.enable_streaming = requested_enable_streaming,
q->query.after = query_after;
q->query.before = query_before;
- q->query.enable_streaming = enable_streaming;
+ q->query.enable_streaming = query_enable_streaming;
q->wall_clock_time = wall_clock_time;
@@ -144,6 +145,7 @@ static struct replication_query *replication_query_prepare(
q->query.locked_data_collection = true;
if (st->last_updated.tv_sec > q->query.before) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
"has start_streaming = true, "
@@ -152,6 +154,7 @@ static struct replication_query *replication_query_prepare(
(unsigned long long) q->query.before,
(unsigned long long) st->last_updated.tv_sec
);
+#endif
q->query.before = st->last_updated.tv_sec;
}
}
@@ -339,13 +342,15 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
#endif
if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
- internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'."
- "Interrupting replication query at %ld, before the expected %ld.",
- buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
- last_end_time_in_buffer, q->query.before);
-
q->query.before = last_end_time_in_buffer;
q->query.enable_streaming = false;
+
+ internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
+ "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
+ buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
+ q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
+ q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
+
q->query.interrupted = true;
break;
@@ -424,37 +429,48 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
);
}
-static struct replication_query *replication_response_prepare(RRDSET *st, bool start_streaming, time_t requested_after, time_t requested_before) {
+static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
time_t query_after = requested_after;
time_t query_before = requested_before;
+ bool query_enable_streaming = requested_enable_streaming;
+
time_t wall_clock_time = now_realtime_sec();
time_t db_first_entry, db_last_entry;
rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
- if (query_after < db_first_entry)
- query_after = db_first_entry;
+ if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
+ // no data requested - just enable streaming
+ ;
+ }
+ else {
+ if (query_after < db_first_entry)
+ query_after = db_first_entry;
+
+ if (query_before > db_last_entry)
+ query_before = db_last_entry;
- if (query_before > db_last_entry)
- query_before = db_last_entry;
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (requested_enable_streaming)
+ query_before = db_last_entry;
- // if the parent asked us to start streaming, then fill the rest with the data that we have
- if (start_streaming)
- query_before = db_last_entry;
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
+ }
- if (query_after > query_before) {
- time_t tmp = query_before;
- query_before = query_after;
- query_after = tmp;
+ query_enable_streaming = (requested_enable_streaming ||
+ query_before == db_last_entry ||
+ !requested_after ||
+ !requested_before) ? true : false;
}
- bool enable_streaming = (start_streaming || query_before == db_last_entry || !requested_after || !requested_before) ? true : false;
-
return replication_query_prepare(
st,
db_first_entry, db_last_entry,
- requested_after, requested_before,
- query_after, query_before, enable_streaming,
+ requested_after, requested_before, requested_enable_streaming,
+ query_after, query_before, query_enable_streaming,
wall_clock_time);
}