diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-06 12:43:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-06 12:43:38 +0200 |
commit | d53ac630f8c6abfd7f9b7bf9fd4029a164ca775f (patch) | |
tree | 8faa922024073e41e99cecf7c1d791ef3ce72368 /streaming/replication.c | |
parent | b634a0730fadeed61c71b2fd62269d5a46d84168 (diff) |
replication to streaming transition when there are gaps (#14434)
fix https://github.com/netdata/netdata/issues/14432
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index a585166e15..7c1f16b4c7 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -293,7 +293,7 @@ static void replication_query_align_to_optimal_before(struct replication_query * q->query.before = expanded_before; } -static void replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { +static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); time_t after = q->query.after; @@ -475,6 +475,12 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s q->points_read = points_read; q->points_generated = points_generated; + + bool finished_with_gap = false; + if(last_end_time_in_buffer < before - q->st->update_every) + finished_with_gap = true; + + return finished_with_gap; } static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { @@ -561,8 +567,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; + bool finished_with_gap = false; if(q->query.execute) - replication_query_execute(wb, q, max_msg_size); + finished_with_gap = replication_query_execute(wb, q, max_msg_size); time_t after = q->request.after; time_t before = q->query.before; @@ -610,6 +617,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + if(!finished_with_gap) + st->upstream_resync_time_s = 0; + #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", rrdhost_hostname(st->rrdhost), rrdset_id(st)); |