summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-25 01:56:49 +0200
committerGitHub <noreply@github.com>2023-01-25 01:56:49 +0200
commit3a430c181e7655a8460b40e9864395694f223e46 (patch)
tree538c69896374d1a8587d6f6719033c160014e650 /streaming/replication.c
parent0c1fbbe591d5b99f747877feb02557354ff621b2 (diff)
DBENGINE v2 - improvements part 8 (#14319)
* cache 100 pages for each size our tiers need * smarter page caching * account the caching structures * dynamic max number of cached pages * make variables const to ensure they are not changed * make sure replication timestamps do not go to the future * replication now sends chart and dimension states atomically; replication receivers ignores chart and dimension states when rbegin is also ignored * make sure all pages are flushed on shutdown * take into account empty points too * when recalculating retention update first_time_s on metrics only when they are bigger * Report the datafile number we use to recalculate retention * Report the datafile number we use to recalculate retention * rotate db at startup * make query plans overlap * Calculate properly first time s * updated event labels * negative page caching fix * Atempt to create missing tables on query failure * Atempt to create missing tables on query failure (part 2) * negative page caching for all gaps, to eliminate jv2 scans * Fix unittest Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c82
1 files changed, 41 insertions, 41 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 9defd3f872..b765e3a850 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -155,7 +155,7 @@ static struct replication_query *replication_query_prepare(
(unsigned long long) st->last_updated.tv_sec
);
#endif
- q->query.before = st->last_updated.tv_sec;
+ q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
}
}
@@ -209,9 +209,38 @@ static struct replication_query *replication_query_prepare(
return q;
}
-static void replication_query_finalize(struct replication_query *q, bool executed) {
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(!rd->exposed) continue;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
+ );
+}
+
+static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
size_t dimensions = q->dimensions;
+ if(wb && q->query.enable_streaming)
+ replication_send_chart_collection_state(wb, q->st);
+
+ if(q->query.locked_data_collection) {
+ netdata_spinlock_unlock(&q->st->data_collection_lock);
+ q->query.locked_data_collection = false;
+ }
+
// release all the dictionary items acquired
// finalize the queries
size_t queries = 0;
@@ -228,11 +257,6 @@ static void replication_query_finalize(struct replication_query *q, bool execute
queries++;
}
- if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&q->st->data_collection_lock);
- q->query.locked_data_collection = false;
- }
-
if(executed) {
netdata_spinlock_lock(&replication_queries.spinlock);
replication_queries.queries_started += queries;
@@ -311,8 +335,8 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
continue;
}
- if(d->sp.end_time_s < now)
- // this dimension does not have any more data
+ if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
+ // this dimension does not provide any data
continue;
if(unlikely(!min_start_time))
@@ -325,13 +349,10 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
min_end_time = MIN(min_end_time, d->sp.end_time_s);
}
- if(unlikely(min_end_time < now))
- break;
-
- if(likely(min_start_time <= now)) {
+ if(likely(min_start_time <= now && min_end_time >= now)) {
// we have a valid point
- if (unlikely(min_end_time <= min_start_time))
+ if (unlikely(min_end_time == min_start_time))
min_start_time = min_end_time - q->st->update_every;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -382,7 +403,11 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
now = min_end_time + 1;
}
+ else if(unlikely(min_end_time < now))
+ // the query does not progress
+ break;
else
+ // we have gap - all points are in the future
now = min_start_time;
}
@@ -408,27 +433,6 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
q->points_generated = points_generated;
}
-static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
- }
- rrddim_foreach_done(rd);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
-}
-
static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
time_t query_after = requested_after;
time_t query_before = requested_before;
@@ -475,7 +479,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
}
void replication_response_cancel_and_finalize(struct replication_query *q) {
- replication_query_finalize(q, false);
+ replication_query_finalize(NULL, q, false);
}
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
@@ -502,13 +506,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
time_t before = q->query.before;
bool enable_streaming = q->query.enable_streaming;
- replication_query_finalize(q, q->query.execute);
+ replication_query_finalize(wb, q, q->query.execute);
q = NULL; // IMPORTANT: q is invalid now
- // get again the world clock time
- if(enable_streaming)
- replication_send_chart_collection_state(wb, st);
-
// get a fresh retention to send to the parent
time_t wall_clock_time = now_realtime_sec();
time_t db_first_entry, db_last_entry;