summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-14 13:10:57 +0200
committerGitHub <noreply@github.com>2023-02-14 13:10:57 +0200
commit8fdeec2e85cf5b1de742d672c57e2c0f276e8867 (patch)
treeb40df18a728b0cbea27a33025c90cb750b1e3d08
parent9986391e46260a43319d2539eb5ebd62abf75246 (diff)
replicating gaps (#14506)
* instead of sending RBEGIN/REND without points, jump to the end of the gap on every replication step * Remove check for memory mode NONE so that replication when INTERPOLATE is negotiated will work * Remove unused label --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
-rwxr-xr-xdatabase/engine/rrdengineapi.c1
-rw-r--r--database/rrdset.c8
-rw-r--r--streaming/replication.c17
3 files changed, 13 insertions, 13 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 3d8b6a90cd..da2c7f5364 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -871,6 +871,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
// We need to get a new page
if (!rrdeng_load_page_next(rrddim_handle, false)) {
+ handle->now_s = rrddim_handle->end_time_s;
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
diff --git a/database/rrdset.c b/database/rrdset.c
index 19ee350f95..3c977c463b 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1543,9 +1543,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
// calculate the proper last_collected_time, using usec_since_last_update
last_collect_ut = rrdset_update_last_collected_time(st);
}
- if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) {
- goto after_first_database_work;
- }
// if this set has not been updated in the past
// we fake the last_update time to be = now - usec_since_last_update
@@ -1608,7 +1605,6 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
}
}
-after_first_database_work:
st->counter_done++;
if(stream_buffer.wb && !stream_buffer.v2)
@@ -1670,9 +1666,6 @@ after_first_database_work:
rrddim_foreach_done(rd);
rda_slots = dimensions;
- if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE))
- goto after_second_database_work;
-
rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC);
rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC);
rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC);
@@ -1886,7 +1879,6 @@ after_first_database_work:
, has_reset_value
);
-after_second_database_work:
for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) {
rd = rda->rd;
if(unlikely(!rd)) continue;
diff --git a/streaming/replication.c b/streaming/replication.c
index 2b96dc6cb0..a1767c234b 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -310,7 +310,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
struct storage_engine_query_ops *ops = q->ops;
time_t wall_clock_time = q->wall_clock_time;
- size_t points_read = q->points_read, points_generated = q->points_generated;
+ bool finished_with_gap = false;
+ size_t points_read = 0, points_generated = 0;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
time_t actual_after = 0, actual_before = 0;
@@ -469,9 +470,16 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else if(unlikely(min_end_time < now))
// the query does not progress
break;
- else
+ else {
// we have gap - all points are in the future
now = min_start_time;
+
+ if(min_start_time > before && !points_generated) {
+ before = q->query.before = min_start_time - 1;
+ finished_with_gap = true;
+ break;
+ }
+ }
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -492,10 +500,9 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
- q->points_read = points_read;
- q->points_generated = points_generated;
+ q->points_read += points_read;
+ q->points_generated += points_generated;
- bool finished_with_gap = false;
if(last_end_time_in_buffer < before - q->st->update_every)
finished_with_gap = true;