diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-23 22:18:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-23 22:18:44 +0200 |
commit | dd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch) | |
tree | fecf5514eda33c0a96f4d359f30fd07229d12cf7 /streaming/replication.c | |
parent | c2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff) |
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers
* when there is a discrepancy between update every, fix it
* fix the other occurences of metric update every mismatch
* allow resetting the same timestamp
* validate flushed pages before committing them to disk
* initialize collection with the latest time in mrg
* these should be static functions
* acquire metrics for writing to detect multiple data collections of the same metric
* print the uuid of the metric that is collected twice
* log the discrepancies of completed pages
* 1 second tolerance
* unify validation of pages and related logging across dbengine
* make do_flush_pages() thread safe
* flush pages runs on libuv workers
* added uv events to tp workers
* dont cross datafile spinlock and rwlock
* should be unlock
* prevent the creation of multiple datafiles
* break an infinite replication loop
* do not log the epxansion of the replication window due to start streaming
* log all invalid pages with internal checks
* do not shutdown event loop threads
* add information about collected page events, to find the root cause of invalid collected pages
* rewrite of the gap filling to fix the invalid collected pages problem
* handle multiple collections of the same metric gracefully
* added log about main cache page conflicts; fix gap filling once again...
* keep track of the first metric writer
* it should be an internal fatal - it does not harm users
* do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks
* prevent negative replication completion percentage
* internal error for the discrepancy of mrg
* better logging of dbengine new metrics collection
* without internal checks it is unused
* prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread
* renames and atomics everywhere
* if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it
* Debug for context load
* rrdcontext uuid debug
* rrddim uuid debug
* rrdeng uuid debug
* Revert "rrdeng uuid debug"
This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b.
* Revert "rrddim uuid debug"
This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8.
* Revert "rrdcontext uuid debug"
This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0.
* Revert "Debug for context load"
This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b.
* do not use legacy uuids on multihost dbs
* thread safety for journafile size
* handle other cases of inconsistent collected pages
* make health thread check if it should be running in key loops
* do not log uuids
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 8d136a04a4..9defd3f872 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -108,9 +108,10 @@ static struct replication_query *replication_query_prepare( time_t db_last_entry, time_t requested_after, time_t requested_before, + bool requested_enable_streaming, time_t query_after, time_t query_before, - bool enable_streaming, + bool query_enable_streaming, time_t wall_clock_time ) { size_t dimensions = rrdset_number_of_dimensions(st); @@ -125,11 +126,11 @@ static struct replication_query *replication_query_prepare( q->request.after = requested_after, q->request.before = requested_before, - q->request.enable_streaming = enable_streaming, + q->request.enable_streaming = requested_enable_streaming, q->query.after = query_after; q->query.before = query_before; - q->query.enable_streaming = enable_streaming; + q->query.enable_streaming = query_enable_streaming; q->wall_clock_time = wall_clock_time; @@ -144,6 +145,7 @@ static struct replication_query *replication_query_prepare( q->query.locked_data_collection = true; if (st->last_updated.tv_sec > q->query.before) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " "has start_streaming = true, " @@ -152,6 +154,7 @@ static struct replication_query *replication_query_prepare( (unsigned long long) q->query.before, (unsigned long long) st->last_updated.tv_sec ); +#endif q->query.before = st->last_updated.tv_sec; } } @@ -339,13 +342,15 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s #endif if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { - internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'." - "Interrupting replication query at %ld, before the expected %ld.", - buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), - last_end_time_in_buffer, q->query.before); - q->query.before = last_end_time_in_buffer; q->query.enable_streaming = false; + + internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. " + "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", + buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), + q->request.after, q->request.before, q->request.enable_streaming?"true":"false", + q->query.after, q->query.before, q->query.enable_streaming?"true":"false"); + q->query.interrupted = true; break; @@ -424,37 +429,48 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { ); } -static struct replication_query *replication_response_prepare(RRDSET *st, bool start_streaming, time_t requested_after, time_t requested_before) { +static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { time_t query_after = requested_after; time_t query_before = requested_before; + bool query_enable_streaming = requested_enable_streaming; + time_t wall_clock_time = now_realtime_sec(); time_t db_first_entry, db_last_entry; rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); - if (query_after < db_first_entry) - query_after = db_first_entry; + if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { + // no data requested - just enable streaming + ; + } + else { + if (query_after < db_first_entry) + query_after = db_first_entry; + + if (query_before > db_last_entry) + query_before = db_last_entry; - if (query_before > db_last_entry) - query_before = db_last_entry; + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (requested_enable_streaming) + query_before = db_last_entry; - // if the parent asked us to start streaming, then fill the rest with the data that we have - if (start_streaming) - query_before = db_last_entry; + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; + } - if (query_after > query_before) { - time_t tmp = query_before; - query_before = query_after; - query_after = tmp; + query_enable_streaming = (requested_enable_streaming || + query_before == db_last_entry || + !requested_after || + !requested_before) ? true : false; } - bool enable_streaming = (start_streaming || query_before == db_last_entry || !requested_after || !requested_before) ? true : false; - return replication_query_prepare( st, db_first_entry, db_last_entry, - requested_after, requested_before, - query_after, query_before, enable_streaming, + requested_after, requested_before, requested_enable_streaming, + query_after, query_before, query_enable_streaming, wall_clock_time); } |