diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-28 12:22:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-28 12:22:38 +0200 |
commit | 53a13ab8e110923d097968353a6bc1e22399480f (patch) | |
tree | a22fe110436844fcc0331073d37182adefbf0edc /streaming | |
parent | 1e9f2c7a2a866be27203c528067adecd283e0ceb (diff) |
replication fixes No 7 (#14053)
* move global statistics workers to a separate thread; query statistics per query source; query statistics for ML, exporters, backfilling; reset replication point in time every 10 seconds, instead of every 1; fix compilation warnings; optimize the replication queries code; prevent long tail of replication requests (big sleeps); provide query statistics about replication ; optimize replication sender when most senders are full; optimize replication_request_get_first_available(); reset replication completion calculation;
* remove workers utilization from global statistics thread
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 265 | ||||
-rw-r--r-- | streaming/replication.h | 9 |
2 files changed, 180 insertions, 94 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index d88095761e..2dd662311c 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -6,23 +6,36 @@ #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 +static struct replication_query_statistics replication_queries = { + .queries_started = 0, + .queries_finished = 0, + .points_read = 0, + .points_generated = 0, +}; + +struct replication_query_statistics replication_get_query_statistics(void) { + return replication_queries; +} + // ---------------------------------------------------------------------------- // sending replication replies +struct replication_dimension { + STORAGE_POINT sp; + struct storage_engine_query_handle handle; + bool enabled; + + DICTIONARY *dict; + const DICTIONARY_ITEM *rda; + RRDDIM *rd; +}; + static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { size_t dimensions = rrdset_number_of_dimensions(st); + size_t points_read = 0, points_generated = 0; struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; - - struct { - DICTIONARY *dict; - const DICTIONARY_ITEM *rda; - RRDDIM *rd; - struct storage_engine_query_handle handle; - STORAGE_POINT sp; - bool enabled; - } data[dimensions]; - + struct replication_dimension data[dimensions]; memset(data, 0, sizeof(data)); if(enable_streaming && st->last_updated.tv_sec > before) { @@ -38,23 +51,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti { RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (rd_dfe.counter >= dimensions) { + if(unlikely(!rd || !rd_dfe.item || !rd->exposed)) + continue; + + if (unlikely(rd_dfe.counter >= dimensions)) { internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", rrdhost_hostname(st->rrdhost), rrdset_id(st)); break; } - if(rd->exposed) { - data[rd_dfe.counter].dict = rd_dfe.dict; - data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); - data[rd_dfe.counter].rd = rd; + struct replication_dimension *d = &data[rd_dfe.counter]; - ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before); + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; - data[rd_dfe.counter].enabled = true; - } - else - data[rd_dfe.counter].enabled = false; + ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before); + d->enabled = true; } rrddim_foreach_done(rd); } @@ -62,32 +75,35 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before; while(now <= before) { time_t min_start_time = 0, min_end_time = 0; - for (size_t i = 0; i < dimensions && data[i].rd; i++) { - if(!data[i].enabled) continue; + for (size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; // fetch the first valid point for the dimension int max_skip = 100; - while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0) - data[i].sp = ops->next_metric(&data[i].handle); + while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) { + d->sp = ops->next_metric(&d->handle); + points_read++; + } internal_error(max_skip <= 0, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now); + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now); - if(data[i].sp.end_time < now) + if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp))) continue; - if(!min_start_time) { - min_start_time = data[i].sp.start_time; - min_end_time = data[i].sp.end_time; + if(unlikely(!min_start_time)) { + min_start_time = d->sp.start_time; + min_end_time = d->sp.end_time; } else { - min_start_time = MIN(min_start_time, data[i].sp.start_time); - min_end_time = MIN(min_end_time, data[i].sp.end_time); + min_start_time = MIN(min_start_time, d->sp.start_time); + min_end_time = MIN(min_end_time, d->sp.end_time); } } - if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) { + if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) { internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), @@ -97,7 +113,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti break; } - if(min_end_time < now) { + if(unlikely(min_end_time < now)) { #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", @@ -106,10 +122,10 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti break; } - if(min_end_time <= min_start_time) + if(unlikely(min_end_time <= min_start_time)) min_start_time = min_end_time - st->update_every; - if(!actual_after) { + if(unlikely(!actual_after)) { actual_after = min_end_time; actual_before = min_end_time; } @@ -123,15 +139,19 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti ); // output the replay values for this time - for (size_t i = 0; i < dimensions && data[i].rd; i++) { - if(!data[i].enabled) continue; + for (size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; - if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time) + if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time)) buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", - rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : ""); + rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + else buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", - rrddim_id(data[i].rd)); + rrddim_id(d->rd)); + + points_generated++; } now = min_end_time + 1; @@ -157,13 +177,22 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti // release all the dictionary items acquired // finalize the queries - for(size_t i = 0; i < dimensions && data[i].rda ;i++) { - if(!data[i].enabled) continue; + for(size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &data[i]; + if(unlikely(!d->enabled)) continue; + + ops->finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); - ops->finalize(&data[i].handle); - dictionary_acquired_item_release(data[i].dict, data[i].rda); + // update global statistics + replication_queries.queries_started++; + replication_queries.queries_finished++; } + replication_queries.points_read += points_read; + replication_queries.points_generated += points_generated; + return before; } @@ -560,6 +589,8 @@ static struct replication_thread { size_t sender_resets; size_t waits; + size_t skipped_no_room_last_run; + Pvoid_t JudyL_array; } replication_globals = { .mutex = NETDATA_MUTEX_INITIALIZER, @@ -570,6 +601,7 @@ static struct replication_thread { .first_time_t = 0, .next_unique_id = 1, .skipped_no_room = 0, + .skipped_no_room_last_run = 0, .skipped_not_connected = 0, .sender_resets = 0, .waits = 0, @@ -599,6 +631,13 @@ static void replication_recursive_unlock() { #endif } +void replication_set_next_point_in_time(time_t after, size_t unique_id) { + replication_recursive_lock(); + replication_globals.last_after = after; + replication_globals.last_unique_id = unique_id; + replication_recursive_unlock(); +} + // ---------------------------------------------------------------------------- // replication sort entry management @@ -626,10 +665,9 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat struct replication_sort_entry *rse = replication_sort_entry_create(rq); - if(rq->after < (time_t)replication_globals.last_after) { + if(rq->after < (time_t)replication_globals.last_after && rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !replication_globals.skipped_no_room_last_run) { // make it find this request first - replication_globals.last_after = rq->after; - replication_globals.last_unique_id = rq->unique_id; + replication_set_next_point_in_time(rq->after, rq->unique_id); } replication_globals.added++; @@ -716,8 +754,9 @@ static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; replication_recursive_lock(); + replication_globals.skipped_no_room_last_run = 0; - struct replication_request rq = (struct replication_request){ .found = false }; + struct replication_request rq_to_return = (struct replication_request){ .found = false }; if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) { @@ -726,41 +765,50 @@ static struct replication_request replication_request_get_first_available() { } bool find_same_after = true; - while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) { + while(!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) { Pvoid_t *our_item_pptr; - while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) { + while(!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) { struct replication_sort_entry *rse = *our_item_pptr; - struct sender_state *s = rse->rq->sender; + struct replication_request *rq = rse->rq; + struct sender_state *s = rq->sender; - bool sender_is_connected = - rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + if(likely(s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { + // there is room for this request in the sender buffer - bool sender_has_been_flushed_since_this_request = - rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); + bool sender_is_connected = + rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - bool sender_has_room_to_spare = - s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; + bool sender_has_been_flushed_since_this_request = + rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); - if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { - replication_globals.skipped_not_connected++; - if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - break; - } + if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { + // skip this request, the sender is not connected or it has reconnected + + replication_globals.skipped_not_connected++; + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } + else { + // this request is good to execute - else if(sender_has_room_to_spare) { - // copy the request to return it - rq = *rse->rq; - rq.chart_id = string_dup(rq.chart_id); + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); - // set the return result to found - rq.found = true; + // set the return result to found + rq_to_return.found = true; - if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - break; + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } } - else + else { replication_globals.skipped_no_room++; + replication_globals.skipped_no_room_last_run++; + } } // call JudyLNext from now on @@ -771,7 +819,7 @@ static struct replication_request replication_request_get_first_available() { } replication_recursive_unlock(); - return rq; + return rq_to_return; } // ---------------------------------------------------------------------------- @@ -890,8 +938,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { s->replication_reached_max = false; replication_recursive_lock(); - replication_globals.last_after = 0; - replication_globals.last_unique_id = 0; + replication_set_next_point_in_time(0, 0); replication_globals.sender_resets++; replication_recursive_unlock(); } @@ -929,17 +976,18 @@ static void replication_main_cleanup(void *ptr) { #define WORKER_JOB_CHECK_CONSISTENCY 15 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10 +#define SECONDS_TO_RESET_POINT_IN_TIME 10 static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { - if(host->sender) { - size_t pending_requests = host->sender->replication_pending_requests; - size_t dict_entries = dictionary_entries(host->sender->replication_requests); - - internal_error( - !pending_requests && dict_entries, - "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", - rrdhost_hostname(host), pending_requests, dict_entries); - } + internal_error( + host->sender && + !host->sender->replication_pending_requests && + dictionary_entries(host->sender->replication_requests) != 0, + "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", + rrdhost_hostname(host), + host->sender->replication_pending_requests, + dictionary_entries(host->sender->replication_requests) + ); size_t ok = 0; size_t errors = 0; @@ -983,21 +1031,17 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { } static void verify_all_hosts_charts_are_streaming_now(void) { -#ifdef NETDATA_INTERNAL_CHECKS worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY); size_t errors = 0; RRDHOST *host; - dfe_start_reentrant(rrdhost_root_index, host) + dfe_start_read(rrdhost_root_index, host) errors += verify_host_charts_are_streaming_now(host); dfe_done(host); size_t executed = replication_globals.executed; - internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors); + info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors); replication_globals.last_executed = executed; -#else - ; -#endif } void *replication_thread_main(void *ptr __maybe_unused) { @@ -1027,7 +1071,9 @@ void *replication_thread_main(void *ptr __maybe_unused) { time_t latest_first_time_t = 0; long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place + bool slow = true; // control the time we sleep - it has to start with true! usec_t last_now_mono_ut = now_monotonic_usec(); + time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds while(!netdata_exit) { @@ -1036,9 +1082,21 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { last_now_mono_ut = now_mono_ut; + if(replication_reset_next_point_in_time_countdown-- == 0) { + // once per second, make it scan all the pending requests next time + replication_set_next_point_in_time(0, 0); + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + } + if(!replication_globals.pending && run_verification_countdown-- == 0) { - replication_globals.first_time_t = 0; // reset the statistics about completion percentage + // reset the statistics about completion percentage + replication_globals.first_time_t = 0; + latest_first_time_t = 0; + verify_all_hosts_charts_are_streaming_now(); + + run_verification_countdown = LONG_MAX; + slow = true; } worker_is_busy(WORKER_JOB_STATISTICS); @@ -1068,17 +1126,36 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(unlikely(!rq.found)) { // make it scan all the pending requests next time - replication_globals.last_after = 0; - replication_globals.last_unique_id = 0; + replication_set_next_point_in_time(0, 0); + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + + // the timeout also defines now frequently we will traverse all the pending requests + // when the outbound buffers of all senders is full + usec_t timeout; + if(slow) + // no work to be done, wait for a request to come in + timeout = 1000 * USEC_PER_MS; + + else if(replication_globals.pending > 0) + // there are pending requests waiting to be executed, + // but none could be executed at this time. + // try again after this time. + timeout = 100 * USEC_PER_MS; - replication_globals.waits++; + else + // no pending requests, but there were requests recently (run_verification_countdown) + // so, try in a short time. + // if this is big, one chart replicating will be slow to finish (ping - pong just one chart) + timeout = 10 * USEC_PER_MS; + replication_globals.waits++; worker_is_idle(); - sleep_usec(((replication_globals.pending) ? 10 : 1000) * USEC_PER_MS); + sleep_usec(timeout); continue; } run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; + slow = false; // delete the request from the dictionary worker_is_busy(WORKER_JOB_DELETE_ENTRY); diff --git a/streaming/replication.h b/streaming/replication.h index 9bd2c87da1..e17073bba0 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -5,6 +5,15 @@ #include "daemon/common.h" +struct replication_query_statistics { + size_t queries_started; + size_t queries_finished; + size_t points_read; + size_t points_generated; +}; + +struct replication_query_statistics replication_get_query_statistics(void); + bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before); typedef int (*send_command)(const char *txt, void *data); |