diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-25 01:56:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-25 01:56:49 +0200 |
commit | 3a430c181e7655a8460b40e9864395694f223e46 (patch) | |
tree | 538c69896374d1a8587d6f6719033c160014e650 /streaming/replication.c | |
parent | 0c1fbbe591d5b99f747877feb02557354ff621b2 (diff) |
DBENGINE v2 - improvements part 8 (#14319)
* cache 100 pages for each size our tiers need
* smarter page caching
* account the caching structures
* dynamic max number of cached pages
* make variables const to ensure they are not changed
* make sure replication timestamps do not go to the future
* replication now sends chart and dimension states atomically; replication receivers ignores chart and dimension states when rbegin is also ignored
* make sure all pages are flushed on shutdown
* take into account empty points too
* when recalculating retention update first_time_s on metrics only when they are bigger
* Report the datafile number we use to recalculate retention
* Report the datafile number we use to recalculate retention
* rotate db at startup
* make query plans overlap
* Calculate properly first time s
* updated event labels
* negative page caching fix
* Atempt to create missing tables on query failure
* Atempt to create missing tables on query failure (part 2)
* negative page caching for all gaps, to eliminate jv2 scans
* Fix unittest
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 82 |
1 files changed, 41 insertions, 41 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 9defd3f872..b765e3a850 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -155,7 +155,7 @@ static struct replication_query *replication_query_prepare( (unsigned long long) st->last_updated.tv_sec ); #endif - q->query.before = st->last_updated.tv_sec; + q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time); } } @@ -209,9 +209,38 @@ static struct replication_query *replication_query_prepare( return q; } -static void replication_query_finalize(struct replication_query *q, bool executed) { +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(!rd->exposed) continue; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec + ); +} + +static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { size_t dimensions = q->dimensions; + if(wb && q->query.enable_streaming) + replication_send_chart_collection_state(wb, q->st); + + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&q->st->data_collection_lock); + q->query.locked_data_collection = false; + } + // release all the dictionary items acquired // finalize the queries size_t queries = 0; @@ -228,11 +257,6 @@ static void replication_query_finalize(struct replication_query *q, bool execute queries++; } - if(q->query.locked_data_collection) { - netdata_spinlock_unlock(&q->st->data_collection_lock); - q->query.locked_data_collection = false; - } - if(executed) { netdata_spinlock_lock(&replication_queries.spinlock); replication_queries.queries_started += queries; @@ -311,8 +335,8 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s continue; } - if(d->sp.end_time_s < now) - // this dimension does not have any more data + if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s)) + // this dimension does not provide any data continue; if(unlikely(!min_start_time)) @@ -325,13 +349,10 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s min_end_time = MIN(min_end_time, d->sp.end_time_s); } - if(unlikely(min_end_time < now)) - break; - - if(likely(min_start_time <= now)) { + if(likely(min_start_time <= now && min_end_time >= now)) { // we have a valid point - if (unlikely(min_end_time <= min_start_time)) + if (unlikely(min_end_time == min_start_time)) min_start_time = min_end_time - q->st->update_every; #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -382,7 +403,11 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s now = min_end_time + 1; } + else if(unlikely(min_end_time < now)) + // the query does not progress + break; else + // we have gap - all points are in the future now = min_start_time; } @@ -408,27 +433,6 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s q->points_generated = points_generated; } -static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(!rd->exposed) continue; - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", - rrddim_id(rd), - (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, - rd->last_collected_value, - rd->last_calculated_value, - rd->last_stored_value - ); - } - rrddim_foreach_done(rd); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", - (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, - (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec - ); -} - static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { time_t query_after = requested_after; time_t query_before = requested_before; @@ -475,7 +479,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r } void replication_response_cancel_and_finalize(struct replication_query *q) { - replication_query_finalize(q, false); + replication_query_finalize(NULL, q, false); } static bool sender_is_still_connected_for_this_request(struct replication_request *rq); @@ -502,13 +506,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size time_t before = q->query.before; bool enable_streaming = q->query.enable_streaming; - replication_query_finalize(q, q->query.execute); + replication_query_finalize(wb, q, q->query.execute); q = NULL; // IMPORTANT: q is invalid now - // get again the world clock time - if(enable_streaming) - replication_send_chart_collection_state(wb, st); - // get a fresh retention to send to the parent time_t wall_clock_time = now_realtime_sec(); time_t db_first_entry, db_last_entry; |