summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-06 12:43:38 +0200
committerGitHub <noreply@github.com>2023-02-06 12:43:38 +0200
commitd53ac630f8c6abfd7f9b7bf9fd4029a164ca775f (patch)
tree8faa922024073e41e99cecf7c1d791ef3ce72368 /streaming
parentb634a0730fadeed61c71b2fd62269d5a46d84168 (diff)
replication to streaming transition when there are gaps (#14434)
fix https://github.com/netdata/netdata/issues/14432
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c14
-rw-r--r--streaming/rrdpush.c4
2 files changed, 14 insertions, 4 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index a585166e15..7c1f16b4c7 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -293,7 +293,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
q->query.before = expanded_before;
}
-static void replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
+static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
time_t after = q->query.after;
@@ -475,6 +475,12 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
q->points_read = points_read;
q->points_generated = points_generated;
+
+ bool finished_with_gap = false;
+ if(last_end_time_in_buffer < before - q->st->update_every)
+ finished_with_gap = true;
+
+ return finished_with_gap;
}
static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
@@ -561,8 +567,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
+ bool finished_with_gap = false;
if(q->query.execute)
- replication_query_execute(wb, q, max_msg_size);
+ finished_with_gap = replication_query_execute(wb, q, max_msg_size);
time_t after = q->request.after;
time_t before = q->query.before;
@@ -610,6 +617,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+ if(!finished_with_gap)
+ st->upstream_resync_time_s = 0;
+
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
rrdhost_hostname(st->rrdhost), rrdset_id(st));
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index a7b2a80643..256fa82821 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -321,12 +321,12 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
}
// sends the current chart dimensions
-static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time_s)
+ if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
buffer_print_llu(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);