summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-28 12:22:38 +0200
committerGitHub <noreply@github.com>2022-11-28 12:22:38 +0200
commit53a13ab8e110923d097968353a6bc1e22399480f (patch)
treea22fe110436844fcc0331073d37182adefbf0edc /streaming
parent1e9f2c7a2a866be27203c528067adecd283e0ceb (diff)
replication fixes No 7 (#14053)
* move global statistics workers to a separate thread; query statistics per query source; query statistics for ML, exporters, backfilling; reset replication point in time every 10 seconds, instead of every 1; fix compilation warnings; optimize the replication queries code; prevent long tail of replication requests (big sleeps); provide query statistics about replication ; optimize replication sender when most senders are full; optimize replication_request_get_first_available(); reset replication completion calculation; * remove workers utilization from global statistics thread
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c265
-rw-r--r--streaming/replication.h9
2 files changed, 180 insertions, 94 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index d88095761e..2dd662311c 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -6,23 +6,36 @@
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+static struct replication_query_statistics replication_queries = {
+ .queries_started = 0,
+ .queries_finished = 0,
+ .points_read = 0,
+ .points_generated = 0,
+};
+
+struct replication_query_statistics replication_get_query_statistics(void) {
+ return replication_queries;
+}
+
// ----------------------------------------------------------------------------
// sending replication replies
+struct replication_dimension {
+ STORAGE_POINT sp;
+ struct storage_engine_query_handle handle;
+ bool enabled;
+
+ DICTIONARY *dict;
+ const DICTIONARY_ITEM *rda;
+ RRDDIM *rd;
+};
+
static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
size_t dimensions = rrdset_number_of_dimensions(st);
+ size_t points_read = 0, points_generated = 0;
struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
-
- struct {
- DICTIONARY *dict;
- const DICTIONARY_ITEM *rda;
- RRDDIM *rd;
- struct storage_engine_query_handle handle;
- STORAGE_POINT sp;
- bool enabled;
- } data[dimensions];
-
+ struct replication_dimension data[dimensions];
memset(data, 0, sizeof(data));
if(enable_streaming && st->last_updated.tv_sec > before) {
@@ -38,23 +51,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
{
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (rd_dfe.counter >= dimensions) {
+ if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ continue;
+
+ if (unlikely(rd_dfe.counter >= dimensions)) {
internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
rrdhost_hostname(st->rrdhost), rrdset_id(st));
break;
}
- if(rd->exposed) {
- data[rd_dfe.counter].dict = rd_dfe.dict;
- data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- data[rd_dfe.counter].rd = rd;
+ struct replication_dimension *d = &data[rd_dfe.counter];
- ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
+ d->dict = rd_dfe.dict;
+ d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ d->rd = rd;
- data[rd_dfe.counter].enabled = true;
- }
- else
- data[rd_dfe.counter].enabled = false;
+ ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
+ d->enabled = true;
}
rrddim_foreach_done(rd);
}
@@ -62,32 +75,35 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
- for (size_t i = 0; i < dimensions && data[i].rd; i++) {
- if(!data[i].enabled) continue;
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
// fetch the first valid point for the dimension
int max_skip = 100;
- while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
- data[i].sp = ops->next_metric(&data[i].handle);
+ while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
+ d->sp = ops->next_metric(&d->handle);
+ points_read++;
+ }
internal_error(max_skip <= 0,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
- if(data[i].sp.end_time < now)
+ if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
continue;
- if(!min_start_time) {
- min_start_time = data[i].sp.start_time;
- min_end_time = data[i].sp.end_time;
+ if(unlikely(!min_start_time)) {
+ min_start_time = d->sp.start_time;
+ min_end_time = d->sp.end_time;
}
else {
- min_start_time = MIN(min_start_time, data[i].sp.start_time);
- min_end_time = MIN(min_end_time, data[i].sp.end_time);
+ min_start_time = MIN(min_start_time, d->sp.start_time);
+ min_end_time = MIN(min_end_time, d->sp.end_time);
}
}
- if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
+ if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
internal_error(true,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
@@ -97,7 +113,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
break;
}
- if(min_end_time < now) {
+ if(unlikely(min_end_time < now)) {
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
@@ -106,10 +122,10 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
break;
}
- if(min_end_time <= min_start_time)
+ if(unlikely(min_end_time <= min_start_time))
min_start_time = min_end_time - st->update_every;
- if(!actual_after) {
+ if(unlikely(!actual_after)) {
actual_after = min_end_time;
actual_before = min_end_time;
}
@@ -123,15 +139,19 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
);
// output the replay values for this time
- for (size_t i = 0; i < dimensions && data[i].rd; i++) {
- if(!data[i].enabled) continue;
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
- if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
+ if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
+ rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+
else
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
- rrddim_id(data[i].rd));
+ rrddim_id(d->rd));
+
+ points_generated++;
}
now = min_end_time + 1;
@@ -157,13 +177,22 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
// release all the dictionary items acquired
// finalize the queries
- for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
- if(!data[i].enabled) continue;
+ for(size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ ops->finalize(&d->handle);
+
+ dictionary_acquired_item_release(d->dict, d->rda);
- ops->finalize(&data[i].handle);
- dictionary_acquired_item_release(data[i].dict, data[i].rda);
+ // update global statistics
+ replication_queries.queries_started++;
+ replication_queries.queries_finished++;
}
+ replication_queries.points_read += points_read;
+ replication_queries.points_generated += points_generated;
+
return before;
}
@@ -560,6 +589,8 @@ static struct replication_thread {
size_t sender_resets;
size_t waits;
+ size_t skipped_no_room_last_run;
+
Pvoid_t JudyL_array;
} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
@@ -570,6 +601,7 @@ static struct replication_thread {
.first_time_t = 0,
.next_unique_id = 1,
.skipped_no_room = 0,
+ .skipped_no_room_last_run = 0,
.skipped_not_connected = 0,
.sender_resets = 0,
.waits = 0,
@@ -599,6 +631,13 @@ static void replication_recursive_unlock() {
#endif
}
+void replication_set_next_point_in_time(time_t after, size_t unique_id) {
+ replication_recursive_lock();
+ replication_globals.last_after = after;
+ replication_globals.last_unique_id = unique_id;
+ replication_recursive_unlock();
+}
+
// ----------------------------------------------------------------------------
// replication sort entry management
@@ -626,10 +665,9 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
struct replication_sort_entry *rse = replication_sort_entry_create(rq);
- if(rq->after < (time_t)replication_globals.last_after) {
+ if(rq->after < (time_t)replication_globals.last_after && rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !replication_globals.skipped_no_room_last_run) {
// make it find this request first
- replication_globals.last_after = rq->after;
- replication_globals.last_unique_id = rq->unique_id;
+ replication_set_next_point_in_time(rq->after, rq->unique_id);
}
replication_globals.added++;
@@ -716,8 +754,9 @@ static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
replication_recursive_lock();
+ replication_globals.skipped_no_room_last_run = 0;
- struct replication_request rq = (struct replication_request){ .found = false };
+ struct replication_request rq_to_return = (struct replication_request){ .found = false };
if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
@@ -726,41 +765,50 @@ static struct replication_request replication_request_get_first_available() {
}
bool find_same_after = true;
- while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
+ while(!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
Pvoid_t *our_item_pptr;
- while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
+ while(!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
- struct sender_state *s = rse->rq->sender;
+ struct replication_request *rq = rse->rq;
+ struct sender_state *s = rq->sender;
- bool sender_is_connected =
- rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+ if(likely(s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
+ // there is room for this request in the sender buffer
- bool sender_has_been_flushed_since_this_request =
- rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
+ bool sender_is_connected =
+ rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- bool sender_has_room_to_spare =
- s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
+ bool sender_has_been_flushed_since_this_request =
+ rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
- if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
- replication_globals.skipped_not_connected++;
- if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- break;
- }
+ if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
+ // skip this request, the sender is not connected or it has reconnected
+
+ replication_globals.skipped_not_connected++;
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+ else {
+ // this request is good to execute
- else if(sender_has_room_to_spare) {
- // copy the request to return it
- rq = *rse->rq;
- rq.chart_id = string_dup(rq.chart_id);
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
- // set the return result to found
- rq.found = true;
+ // set the return result to found
+ rq_to_return.found = true;
- if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- break;
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
}
- else
+ else {
replication_globals.skipped_no_room++;
+ replication_globals.skipped_no_room_last_run++;
+ }
}
// call JudyLNext from now on
@@ -771,7 +819,7 @@ static struct replication_request replication_request_get_first_available() {
}
replication_recursive_unlock();
- return rq;
+ return rq_to_return;
}
// ----------------------------------------------------------------------------
@@ -890,8 +938,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
s->replication_reached_max = false;
replication_recursive_lock();
- replication_globals.last_after = 0;
- replication_globals.last_unique_id = 0;
+ replication_set_next_point_in_time(0, 0);
replication_globals.sender_resets++;
replication_recursive_unlock();
}
@@ -929,17 +976,18 @@ static void replication_main_cleanup(void *ptr) {
#define WORKER_JOB_CHECK_CONSISTENCY 15
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
+#define SECONDS_TO_RESET_POINT_IN_TIME 10
static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
- if(host->sender) {
- size_t pending_requests = host->sender->replication_pending_requests;
- size_t dict_entries = dictionary_entries(host->sender->replication_requests);
-
- internal_error(
- !pending_requests && dict_entries,
- "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
- rrdhost_hostname(host), pending_requests, dict_entries);
- }
+ internal_error(
+ host->sender &&
+ !host->sender->replication_pending_requests &&
+ dictionary_entries(host->sender->replication_requests) != 0,
+ "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
+ rrdhost_hostname(host),
+ host->sender->replication_pending_requests,
+ dictionary_entries(host->sender->replication_requests)
+ );
size_t ok = 0;
size_t errors = 0;
@@ -983,21 +1031,17 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
}
static void verify_all_hosts_charts_are_streaming_now(void) {
-#ifdef NETDATA_INTERNAL_CHECKS
worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
size_t errors = 0;
RRDHOST *host;
- dfe_start_reentrant(rrdhost_root_index, host)
+ dfe_start_read(rrdhost_root_index, host)
errors += verify_host_charts_are_streaming_now(host);
dfe_done(host);
size_t executed = replication_globals.executed;
- internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
+ info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
replication_globals.last_executed = executed;
-#else
- ;
-#endif
}
void *replication_thread_main(void *ptr __maybe_unused) {
@@ -1027,7 +1071,9 @@ void *replication_thread_main(void *ptr __maybe_unused) {
time_t latest_first_time_t = 0;
long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
+ bool slow = true; // control the time we sleep - it has to start with true!
usec_t last_now_mono_ut = now_monotonic_usec();
+ time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
while(!netdata_exit) {
@@ -1036,9 +1082,21 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
last_now_mono_ut = now_mono_ut;
+ if(replication_reset_next_point_in_time_countdown-- == 0) {
+ // once per second, make it scan all the pending requests next time
+ replication_set_next_point_in_time(0, 0);
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+ }
+
if(!replication_globals.pending && run_verification_countdown-- == 0) {
- replication_globals.first_time_t = 0; // reset the statistics about completion percentage
+ // reset the statistics about completion percentage
+ replication_globals.first_time_t = 0;
+ latest_first_time_t = 0;
+
verify_all_hosts_charts_are_streaming_now();
+
+ run_verification_countdown = LONG_MAX;
+ slow = true;
}
worker_is_busy(WORKER_JOB_STATISTICS);
@@ -1068,17 +1126,36 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(!rq.found)) {
// make it scan all the pending requests next time
- replication_globals.last_after = 0;
- replication_globals.last_unique_id = 0;
+ replication_set_next_point_in_time(0, 0);
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+
+ // the timeout also defines now frequently we will traverse all the pending requests
+ // when the outbound buffers of all senders is full
+ usec_t timeout;
+ if(slow)
+ // no work to be done, wait for a request to come in
+ timeout = 1000 * USEC_PER_MS;
+
+ else if(replication_globals.pending > 0)
+ // there are pending requests waiting to be executed,
+ // but none could be executed at this time.
+ // try again after this time.
+ timeout = 100 * USEC_PER_MS;
- replication_globals.waits++;
+ else
+ // no pending requests, but there were requests recently (run_verification_countdown)
+ // so, try in a short time.
+ // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
+ timeout = 10 * USEC_PER_MS;
+ replication_globals.waits++;
worker_is_idle();
- sleep_usec(((replication_globals.pending) ? 10 : 1000) * USEC_PER_MS);
+ sleep_usec(timeout);
continue;
}
run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
+ slow = false;
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
diff --git a/streaming/replication.h b/streaming/replication.h
index 9bd2c87da1..e17073bba0 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -5,6 +5,15 @@
#include "daemon/common.h"
+struct replication_query_statistics {
+ size_t queries_started;
+ size_t queries_finished;
+ size_t points_read;
+ size_t points_generated;
+};
+
+struct replication_query_statistics replication_get_query_statistics(void);
+
bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
typedef int (*send_command)(const char *txt, void *data);