diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-06 12:43:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-06 12:43:38 +0200 |
commit | d53ac630f8c6abfd7f9b7bf9fd4029a164ca775f (patch) | |
tree | 8faa922024073e41e99cecf7c1d791ef3ce72368 | |
parent | b634a0730fadeed61c71b2fd62269d5a46d84168 (diff) |
replication to streaming transition when there are gaps (#14434)
fix https://github.com/netdata/netdata/issues/14432
-rw-r--r-- | streaming/replication.c | 14 | ||||
-rw-r--r-- | streaming/rrdpush.c | 4 |
2 files changed, 14 insertions, 4 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index a585166e15..7c1f16b4c7 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -293,7 +293,7 @@ static void replication_query_align_to_optimal_before(struct replication_query * q->query.before = expanded_before; } -static void replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { +static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); time_t after = q->query.after; @@ -475,6 +475,12 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s q->points_read = points_read; q->points_generated = points_generated; + + bool finished_with_gap = false; + if(last_end_time_in_buffer < before - q->st->update_every) + finished_with_gap = true; + + return finished_with_gap; } static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { @@ -561,8 +567,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; + bool finished_with_gap = false; if(q->query.execute) - replication_query_execute(wb, q, max_msg_size); + finished_with_gap = replication_query_execute(wb, q, max_msg_size); time_t after = q->request.after; time_t before = q->query.before; @@ -610,6 +617,9 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + if(!finished_with_gap) + st->upstream_resync_time_s = 0; + #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", rrdhost_hostname(st->rrdhost), rrdset_id(st)); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index a7b2a80643..256fa82821 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -321,12 +321,12 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { } // sends the current chart dimensions -static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time_s) + if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) buffer_print_llu(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); |